Canal介绍
**canal [kə'næl]**,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。
阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。
目前canal主要支持mysql数据库。
github地址:https://github.com/alibaba/canal
版本下载地址:https://github.com/alibaba/canal/releases
文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart
Canal应用场景
1)、电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎;
2)、价格、库存发生变更实时同步到redis;
3)、数据库异地备份、数据同步;
4)、代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
MySQL主从复制原理
1)、
MySQL master
将数据变更写入二进制日志(
binary log
, 其中记录叫做二进制日志事件
binary log events
,可以通过
show binlog events
进行查看)
2)、
MySQL slave
将 master 的
binary log events
拷贝到它的中继日志(
relay log
)
3)、
MySQL slave
重放
relay log
中事件,将数据变更反映它自己的数据
Canal工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流) ![](https://img-blog.csdnimg.cn/cf9544ad269c45039b4f7ea523891185.png)
Canal安装
参考文档:https://github.com/alibaba/canal/wiki/QuickStart
Canal配置
mq相关参数说明 (>=1.1.5版本)
在1.1.5版本开始,引入了MQ Connector设计,参数配置做了部分调整
参数名
参数说明
默认值
canal.aliyun.accessKey
阿里云ak
无
canal.aliyun.secretKey
阿里云sk
无
canal.aliyun.uid
阿里云uid
无
canal.mq.flatMessage
是否为json格式 如果设置为false,对应MQ收到的消息为protobuf格式 需要通过CanalMessageDeserializer进行解码
false
canal.mq.canalBatchSize
获取canal数据的批次大小
50
canal.mq.canalGetTimeout
获取canal数据的超时时间
100
canal.mq.accessChannel = local
是否为阿里云模式,可选值local/cloud
local
canal.mq.database.hash
是否开启database混淆hash,确保不同库的数据可以均匀分散,如果关闭可以确保只按照业务字段做MQ分区计算
true
canal.mq.send.thread.size
MQ消息发送并行度
30
canal.mq.build.thread.size
MQ消息构建并行度
8
kafka.bootstrap.servers
kafka服务端地址
127.0.0.1:9092
kafka.acks
kafka为
ProducerConfig.ACKS_CONFIG
all
kafka.compression.type
压缩类型
none
kafka.batch.size
kafka为
ProducerConfig.BATCH_SIZE_CONFIG
16384
kafka.linger.ms
kafka为
ProducerConfig.LINGER_MS_CONFIG
, 如果是flatMessage格式建议将该值调大, 如: 200
1
kafka.max.request.size
kafka为
ProducerConfig.MAX_REQUEST_SIZE_CONFIG
1048576
kafka.buffer.memory
kafka为
ProducerConfig.BUFFER_MEMORY_CONFIG
33554432
kafka.max.in.flight.requests.per.connection
kafka为
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
1
kafka.retries
发送失败重试次数
0
kafka.kerberos.enable
kerberos认证
false
kafka.kerberos.krb5.file
kerberos认证
../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file
kerberos认证
../conf/kerberos/jaas.conf
rocketmq.producer.group
rocketMQ为ProducerGroup名
test
rocketmq.enable.message.trace
是否开启message trace
false
rocketmq.customized.trace.topic
message trace的topic
无
rocketmq.namespace
rocketmq的namespace
无
rocketmq.namesrv.addr
rocketmq的namesrv地址
127.0.0.1:9876
rocketmq.retry.times.when.send.failed
重试次数
0
rocketmq.vip.channel.enabled
rocketmq是否开启vip channel
false
rocketmq.tag
rocketmq的tag配置
空值
rabbitmq.host
rabbitMQ配置
无
rabbitmq.virtual.host
rabbitMQ配置
无
rabbitmq.exchange
rabbitMQ配置
无
rabbitmq.username
rabbitMQ配置
无
rabbitmq.password
rabbitMQ配置
无
rabbitmq.deliveryMode
rabbitMQ配置
无
pulsarmq.serverUrl
pulsarmq配置
无
pulsarmq.roleToken
pulsarmq配置
无
pulsarmq.topicTenantPrefix
pulsarmq配置
无
canal.mq.topic
mq里的topic名
无
canal.mq.dynamicTopic
mq里的动态topic规则, 1.1.3版本支持
无
canal.mq.partition
单队列模式的分区下标,
1
canal.mq.enableDynamicQueuePartition
动态获取MQ服务端的分区数,如果设置为true之后会自动根据topic获取分区数替换canal.mq.partitionsNum的定义,目前主要适用于RocketMQ
false
canal.mq.partitionsNum
散列模式的分区数
无
canal.mq.dynamicTopicPartitionNum
mq里的动态队列分区数,比如针对不同topic配置不同partitionsNum
无
canal.mq.partitionHash
散列规则定义 库名.表名 : 唯一主键,比如mytest.person: id 1.1.3版本支持新语法,见下文
无
canal.mq.dynamicTopic 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔
例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
例子4:test\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table
例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
例子4:testA:test\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值
大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力
canal.mq.partitionHash 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔
例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
例子4: 匹配规则啥都不写,则默认发到0这个partition上
例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名 - • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列
注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)
其他详细参数可参考Canal AdminGuide
mq顺序性问题
binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答
1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
1. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意
性能表现
Kafka + 混合DML场景测试
场景
1个topic + 单分区
1个topic+3分区
2个topic+1分区
2个topic+3分区
不开启flatMessage
29.6k rps (9.71k tps)
17.54k rps (6.53k tps)
21.6k rps (7.9k tps)
16.8k rps (5.71k tps)
开启flatMessage
11.79k rps (4.36k tps)
15.97 rps (5.94k tps)
11.91k rps (4.45k tps)
16.96k rps (6.26k tps)
Kafka + 单表的batch insert场景测试
场景
1个topic + 单分区
1个topic+3分区
不开启flatMessage
59.6k rps
45.1k rps
开启flatMessage
51.3k rps
49.6k rps
RocketMQ + 混合DML场景测试
场景
1个topic + 单分区
1个topic+3分区
2个topic+1分区
2个topic+3分区
不开启flatMessage
29.6k rps (10.71k tps)
23.3k rps (8.59k tps)
26.7k rps (9.46k tps)
21.7k rps (7.66k tps)
开启flatMessage
16.75k rps (6.17k tps)
14.96k rps (5.55k tps)
17.83k rps (6.63k tps)
16.93k rps (6.26k tps)
RocketMQ + 单表的batch insert场景测试
场景
1个topic + 单分区
1个topic+3分区
不开启flatMessage
81.2k rps
51.3k rps
开启flatMessage
62.6k rps
57.9k rps
附录:
canal官方文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
Canal+MQ性能表现:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance
版权归原作者 shadow_zed 所有, 如有侵权,请联系我们删除。