0


Canal —— 一款 MySql 实时同步到 ES 的阿里开源神器

一. 前言

Canal 是阿里开源的一款基于 MySql 数据库 binlog 的增量订阅和消费组件,通过它可以订阅数据库的 binlog 日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等。相对于消息队列,通过这种机制可以实现数据的有序化和一致性。

Canal 主要用途是对 MySql 数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对 MySql 的增量数据进行实时同步,支持同步到 MySql、ElasticSearch、HBase 等数据存储中去。

二. Canal 简介和使用场景

2.1. Canal 简介

由上面两张图片可知:

  • Canal 模拟 MySql Slave 的交互协议,伪装自己为 MySql Slave ,向 MySql Master 发送dump 协议。
  • MySql Master 收到 dump 请求,开始推送 binary log 给 Slave (即 Canal )。
  • Canal 解析 binary log 对象(原始为 byte 流)。
  • Canal 对外提供增量数据订阅和消费,提供 Kafka、RocketMQ、RabbitMq、Es、Tcp 等组件来消费。

2.2. Canal 使用场景

1. 同步缓存 Redis/全文搜索 ES:Canal 一个常见应用场景是同步缓存/全文搜索,当数据库变更后通过 binlog 进行缓存/ES 的增量更新。当缓存/ES 更新出现问题时,应该回退 binlog 到过去某个位置进行重新同步,并提供全量刷新缓存/ES 的方法。

2. 下发任务:另一种常见应用场景是下发任务,当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入 MQ(比如 Kafka) 进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等相关系统。这种方式可以保证数据下发的精确性,通过 MQ 发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发 MQ 的代码,从而实现了下发归集。

3. 数据异构:在大型网站架构中,DB 都会采用分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。所谓的数据异构,那就是将需要 join 查询的多表按照某一个维度又聚合在一个DB 中,让你去查询。Canal 就是实现数据异构的手段之一。

三. Canal Server 设计

3.1. 整体设计

Server 代表一个 Canal 运行实例,对应于一个 JVM。

Instance 对应于一个数据队列(1个 Canal Server 对应 1..n 个 Instance),Instance 下的子模块:

  1. EventParser:数据源接入,模拟 slave 协议和 master 进行交互,协议解析;
  2. EventSink:Parser 和 Store 链接器,进行数据过滤,加工,分发的工作;
  3. EventStore:数据存储;
  4. MetaManager:增量订阅 & 消费信息管理器。

整体类图设计:

  • CanalLifeCycle:所有 Canal 模块的生命周期接口;
  • CanalInstance:组合 Parser、Sink、Store 三个子模块,三个子模块的生命周期统一受 CanalInstance 管理;
  • CanalServer:聚合了多个 CanalInstance。

3.2. EventParser 设计

每个 EventParser 都会关联两个内部组件:

  1. CanalLogPositionManager : 记录binlog 最后一次解析成功位置信息,主要是描述下一次canal启动的位点
  2. CanalHAController: 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库

目前开源版本只支持 MySql binlog , 默认通过 MySql binlog dump 远程获取 binlog,但也可以使用 LocalBinlog - 类 relay log 模式,直接消费本地文件中的 binlog。

3.3. CanalLogPositionManager 设计

  • 如果 CanalEventStore 选择的是内存模式,可不保留解析位置,下一次 Canal 启动时直接依赖 CanalMetaManager 记录的最后一次消费成功的位点即可(最后一次 ack 提交的数据位点)。
  • 如果 CanalEventStore 选择的是持久化模式,可通过 Zookeeper 记录位点信息,Canal Instance 发生 failover 切换到另一台机器,可通过读取 Zookeeper 获取位点信息。
  • 可通过实现自己的 CanalLogPositionManager,比如记录位点信息到本地文件 /nas 文件实现简单可用的无 HA 模式。

3.4. CanalHAController 类图设计

  • 失败检测常见方式可定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机。
  • 如果有一套数据库主备信息管理系统,当数据库主备切换或者机器下线,推送配置到各个应用节点,HAController 收到后,控制 EventParser 进行链接切换。

3.5. EventSink 类图设计和扩展

  • 数据过滤:支持通配符的过滤模式,表名,字段内容等。
  • 数据路由/分发:解决 1:n(1个 Parser 对应多个 Store 的模式)。
  • 数据归并:解决 n:1(多个 Parser 对应1个 Store)。
  • 数据加工:在进入 store 之前进行额外的处理,比如 join。

数据 1:n 业务:

为了合理的利用数据库资源, 一般常见的业务都是按照 schema 进行隔离,然后在 MySql 上层或者 dao 这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过 cobar/tddl 来解决数据源路由问题。所以,一般一个数据库实例上,会部署多个 schema,每个 schema 会有1个或者多个业务方关注。

数据 n:1 业务:

同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个 Store 进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局Id 进行排序归并。

3.6. EventStore 类图设计和扩展

  1. 抽象 CanalStoreScavenge , 解决数据的清理,比如定时清理,满了之后清理,每次 ack 清理等。

  2. CanalEventStore 接口,主要包含 put/get/ack/rollback 的相关接口。put/get 操作会组成一个生产者/消费者模式,每个 Store 都会有存储大小设计,存储满了,put 操作会阻塞等待 get 获取数据,所以不会无线占用存储,比如内存大小:

  • EventStore 目前实现了 memory 模式,支持按照内存大小和内存记录数进行存储大小限制。
  • 后续可开发基于本地文件的存储模式。
  • 基于文件存储和内存存储,开发 mixed 模式,做成两级队列,内存 buffer 有空位时,将文件的数据读入到内存 buffer 中(可以通过配置进行配置)。
  • mixed 模式实现可以让 Canal 落地消费/订阅的模型,取 1 份 binlog 数据,提供多个客户端消费,消费有快有慢,各自保留消费位点。

3.7. MetaManager 类图设计和扩展

  • MetaManager 目前支持了多种模式,最顶层 Memory 和 Zookeeper 模式,然后是 mixed 模式-先写内存,再写 Zookeeper。
  • 可通过实现自己的 CanalMetaManager,比如记录位点信息到本地文件 /nas 文件,简单可用的无 HA 模式。

四. Canal Client 设计

4.1. 整体设计

在了解具体 API 之前,需要提前了解下 Canal Client 的类设计,这样才可以正确的使用好 Canal。

大致分为几部分:

  1. ClientIdentity:Canal Client 和 Server 交互之间的身份标识,目前 clientId 写死为1001。(目前 Canal Server 上的一个 Instance 只能有一个 Client 消费,ClientId 的设计是为1个Instance 多 Client 消费模式而预留的,暂时不需要理会)。
  2. CanalConnector:SimpleCanalConnector/ClusterCanalConnector 是两种 Connector 的实现,Simple 针对的是简单的 ip 直连模式,Cluster针对多 ip 的模式,可依赖CanalNodeAccessStrategy 进行 failover 控制。
  3. CanalNodeAccessStrategy:SimpleNodeAccessStrategy/ClusterNodeAccessStrategy 是两种 failover 的实现,Simple 针对给定的初始 ip 列表进行 failover 选择,Cluster 基于Zookeeper上的 Cluster 节点动态选择正在运行的 Canal Server。
  4. ClientRunningMonitor/ClientRunningListener/ClientRunningData:Client Running 相关控制,主要为解决 Client 自身的 failover 机制。Canal Client 允许同时启动多个 Canal Client,通过 Running 机制,可保证只有一个 Client 在工作,其他 Client 做为冷备。当运行中的Client 挂了,Running 会控制让冷备中的 Client 转为工作模式,这样就可以确保 Canal Client 也不会是单点,保证整个系统的高可用性。

4.2. Server/Client交互协议

get/ack/rollback 协议介绍:

  1. **Message getWithoutAck(int batchSize)**,允许指定 batchSize,一次可以获取多条,每次返回的对象为 message,包含的内容为: a. batch id:唯一标识; b. entries:具体的数据对象,可参见下面的数据介绍。
  2. **getWithoutAck(int batchSize, Long timeout, TimeUnit unit)**,相比于 getWithoutAck(int batchSize),允许设定获取数据的 timeout 超时时间: a. 拿够 batchSize 条记录或者超过 timeout 时间; b. timeout=0,阻塞等到足够的 batchSize。
  3. **void rollback(long batchId)**,顾命思议,回滚上次的 get 请求,重新获取数据。基于 get 获取的 batchId 进行提交,避免误操作。
  4. **void ack(long batchId)**,顾命思议,确认已经消费成功,通知 Server 删除数据。基于 get获取的 batchId 进行提交,避免误操作。

Canal 的 get/ack/rollback 协议和常规的 jms 协议有所不同,允许 get/ack 异步处理,比如可以连续调用 get 多次,后续异步按顺序提交 ack/rollback,项目中称之为流式 API。

流式 API 设计的好处:

  1. get/ack 异步化,减少因 ack 带来的网络延迟和操作成本(99%的状态都是处于正常状态,异常的 rollback 属于个别情况,没必要为个别的 case 牺牲整个性能)。
  2. get 获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询 get 数据,不停的往后发送任务,提高并行化。(作者在实际业务中的一个 case:业务数据消费需要跨中美网络,所以一次操作基本在 200ms 以上,为了减少延迟,所以需要实施并行化)。

流式 API 设计:

  • 每次 get 操作都会在 meta 中产生一个 mark,mark 标记会递增,保证运行过程中 mark 的唯一性;
  • 每次的 get 操作,都会在上一次的 mark 操作记录的 cursor 继续往后取,如果 mark 不存在,则在 last ack cursor 继续往后取;
  • 进行 ack 时,需要按照 mark 的顺序进行数序 ack,不能跳跃 ack。ack 会删除当前的 mark 标记,并将对应的 mark 位置更新为 last ack cursor;
  • 一旦出现异常情况,客户端可发起 rollback 情况,重新置位:删除所有的 mark,清理 get 请求位置,下次请求会从 last ack cursor 继续往后取。

流式 API 带来的异步响应模型:

五. Canal 配置信息

5.1. Canal 配置方式

Canal 配置方式有两种:

  1. ManagerCanalInstanceGenerator:基于 Manager 管理的配置方式,目前 Alibaba 内部配置使用这种方式。大家可以实现 CanalConfigClient,连接各自的管理系统,即可完成接入。
  2. SpringCanalInstanceGenerator:基于本地 spring xml 的配置方式,目前开源版本已经自带该功能所有代码,建议使用。

Spring 配置:

Spring 配置的原理是将整个配置抽象为两部分:

  1. xxxx-instance.xml(Canal 组件的配置定义,可以在多个 Instance 配置中共享);
  2. xxxx.properties(每个 Instance 通道都有各自一份定义,因为每个 MySql 的 ip,帐号,密码等信息不会相同)。

通过 Spring 的 PropertyPlaceholderConfigurer 机制将其融合,生成一份 Instance 实例对象,每个Instance 对应的组件都是相互独立的,互不影响。

properties 配置文件分为两部分:

  1. canal.properties(系统根配置文件),下面详细说明;
  2. instance.properties(Instance 级别的配置文件,每个 Instance 一份)。

5.2. canal.properties

Canal 配置主要分为两部分定义:

1. instance 列表定义,(列出当前 Server 上有多少个 Instance,每个 Instance 的加载方式是Spring/Manager 等) 以下选一些重要的参数说明一下:
参数名字参数说明默认值canal.auto.scan开启instance自动扫描
如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:
a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动
b. instance目录删除:卸载对应instance配置,如已启动则进行关闭
c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作truecanal.instance.global.spring.xml全局的spring配置方式的组件文件lasspath:spring/memory-instance.xml
(spring目录相对于canal.conf.dir)
2. common 参数定义,比如可以将 instance.properties 的公用参数,抽取放置到这里,这样每个Instance 启动的时候就可以共享。【instance.properties 配置定义优先级高于 canal.properties】以下选一些重要的参数说明一下:
参数名字参数说明默认值canal.register.ipcanal server注册到外部zookeeper、admin的ip信息 (针对docker的外部可见ip)无canal.zookeeper.flush.periodcanal持久化数据到zookeeper上的更新频率,单位毫秒1000canal.instance.memory.batch.modecanal内存store中数据缓存模式

  1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量
  2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小MEMSIZEcanal.instance.memory.buffer.sizecanal内存store中可缓存buffer记录数,需要为2的指数16384canal.instance.memory.buffer.memunit内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小1024canal.instance.filter.druid.ddl是否使用druid处理所有的ddl解析来获取库和表名truecanal.instance.filter.query.dml是否忽略dml语句
    (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)falsecanal.instance.parser.parallel
    是否开启binlog并行解析模式

(串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+)
truecanal.admin.managercanal链接canal-admin的地址 (v1.1.4新增)无

5.3. instance.properties

在 canal.properties 定义了 canal.destinations 后,需要在 canal.conf.dir 对应的目录下建立同名的文件。

如果 canal.properties 未定义 instance 列表,但开启了 canal.auto.scan 时:

  1. Server 第一次启动时,会自动扫描 conf 目录下,将文件名做为 instance name,启动对应的instance;
  2. Server 运行过程中,会根据 canal.auto.scan.interval 定义的频率,进行扫描: 1. 发现目录有新增,启动新的 Instance; 2. 发现目录有删除,关闭老的 Instance; 3. 发现对应目录的 instance.properties 有变化,重启 Instance。

instance.properties 参数列表(部分):
参数名字参数说明默认值canal.instance.mysql.slaveIdmysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一
(v1.1.x版本之后canal会自动生成,不需要手工指定)无canal.instance.filter.regex
mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)

常见例子:

  1. 所有表:.* or .\..

  2. canal schema下所有表: canal\..*

  3. canal下的以canal打头的表:canal\.canal.*

  4. canal schema下的一张表:canal\.test1

  5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
    .*\..*canal.instance.filter.black.regexmysql 数据解析表的黑名单,表达式规则见白名单的规则无canal.instance.master.journal.namemysql主库链接时起始的binlog文件无canal.instance.master.positionmysql主库链接时起始的binlog偏移量无canal.instance.master.timestampmysql主库链接时起始的binlog的时间戳无
    几点说明:

1. MySql 链接时的起始位置

  1. canal.instance.master.journal.name + canal.instance.master.position:精确指定一个 binlog位点,进行启动。
  2. canal.instance.master.timestamp:指定一个时间戳,Canal 会自动遍历 mysql binlog,找到对应时间戳的 binlog 位点后,进行启动。
  3. 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)

2. MySql 解析关注表定义

  • 标准的Perl正则,注意转义时需要双斜杠:\。

3. MySql 链接的编码

  • 目前 Canal 版本仅支持一个数据库只有一种编码,如果一个库存在多个编码,需要通过filter.regex 配置,将其拆分为多个 canal instance,为每个 Instance 指定不同的编码。

5.4. instance.xml 配置文件

目前默认支持的 instance.xml 有以下几种:

  1. spring/memory-instance.xml
  2. spring/default-instance.xml
  3. spring/group-instance.xml

在介绍 instance 配置之前,先了解一下 Canal 如何维护一份增量订阅&消费的关系信息:

  • 解析位点(Parse 模块会记录,上一次解析 binlog 到了什么位置,对应组件为:CanalLogPositionManager)。
  • 消费位点(Canal Server 在接收了客户端的 ack 后,就会记录客户端提交的最后位点,对应的组件为:CanalMetaManager)。

对应的两个位点组件,目前都有几种实现:

  1. Memory(memory-instance.xml 中使用)。
  2. Zookeeper。
  3. Mixed。
  4. Period(default-instance.xml 中使用,集合了 Zookeeper+Memory 模式,先写内存,定时刷新数据到 Zookeeper 上)。

memory-instance.xml 介绍:

所有的组件(parser、sink、store)都选择了内存版模式,记录位点的都选择了 Memory 模式,重启后又会回到初始位点进行解析。

特点:速度最快,依赖最少(不需要 Zookeeper)。

场景:一般应用在 quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境。

default-instance.xml 介绍:

Store 选择了内存模式,其余的 parser/sink 依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入 Zookeeper,保证数据集群共享。

特点:支持 HA。

场景:生产环境,集群化部署。

group-instance.xml 介绍:

主要针对需要进行多库合并时,可以将多个物理 Instance 合并为一个逻辑 Instance,提供客户端访问。

场景:分库业务。比如产品数据拆分了4个库,每个库会有一个 Instance,如果不用 Group,业务上要消费数据时,需要启动4个客户端,分别链接4个 Instance 实例。使用 Group后,可以在 Canal Server 上合并为一个逻辑 Instance,只需要启动1个客户端,链接这个逻辑 Instance即可。

六. Canal 使用

接下来我们来学习下 Canal 的使用,以 MySql 实时同步数据到 ElasticSearch为例。

6.1. Canal 下载

首先我们需要下载 Canal 的各个组件 canal-server、canal-adapter、canal-admin。

下载地址:https://github.com/alibaba/canal/releases。

Canal 官方文档:https://github.com/alibaba/canal/wiki。

Canal 的各个组件的用途各不相同,下面分别介绍下:

  1. canal-server(canal-deploy):可以直接监听 MySql 的 binlog,把自己伪装成 MySql 的从库,只负责接收数据,并不做处理。
  2. canal-adapter:相当于 Canal 的客户端,会从 canal-server 中获取数据,然后对数据进行同步,可以同步到 MySql、ElasticSearch 和 HBase 等存储中去。
  3. canal-admin:为 Canal 提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI 操作界面,方便更多用户快速和安全的操作。

由于不同版本的 MySql、ElasticSearch 和 Canal 会有兼容性问题,所以我们先对其使用版本做个约定:
应用端口版本MySql33065.7ElasticSearch92007.6.2Kibanba56017.6.2canal-server111111.1.15canal-adapter80811.1.15canal-admin80891.1.15

6.2. MySql 配置

由于 Canal 是通过订阅 MySql 的 binlog 来实现数据同步的,所以我们需要开启 MySql 的 binlog写入功能,并设置 binlog-format 为 ROW 模式,我的配置文件为 /mydata/mysql/conf/my.cnf,改为如下内容即可:
[mysqld]
## 设置server_id,同一局域网中需要唯一
server_id=101 
## 指定不需要同步的数据库名称
binlog-ignore-db=mysql  
## 开启二进制日志功能
log-bin=mall-mysql-bin  
## 设置二进制日志使用内存大小(事务)
binlog_cache_size=1M  
## 设置使用的二进制日志格式(mixed,statement,row)
binlog_format=row  
## 二进制日志过期清理时间。默认值为0,表示不自动清理。
expire_logs_days=7  
## 跳过主从复制中遇到的所有错误或指定类型的错误,避免slave端复制中断。
## 如:1062错误是指一些主键重复,1032错误是因为主从数据库数据不一致
slave_skip_errors=1062

配置完成后需要重新启动 MySql,重启成功后通过如下命令查看 binlog 是否启用:

show variables like '%log_bin%'
+---------------------------------+-------------------------------------+
| Variable_name                   | Value                               |
+---------------------------------+-------------------------------------+
| log_bin                         | ON                                  |
| log_bin_basename                | /var/lib/mysql/mall-mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF                                 |
| log_bin_use_v1_row_events       | OFF                                 |
| sql_log_bin                     | ON                                  |
+---------------------------------+-------------------------------------+

再查看下 MySql 的 binlog 模式:

show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

接下来需要创建一个拥有从库权限的账号,用于订阅 binlog,这里创建的账号为 canal:canal:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

创建好测试用的数据库 canal-test,之后创建一张商品表 product,建表语句如下:

CREATE TABLE `product`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `price` decimal(10, 2) NULL DEFAULT NULL,
  `pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

6.3. canal-server 使用

将我们下载好的压缩包 canal.deployer-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,然后解压到指定目录 /mydata/canal-server,可使用如下命令解压:
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz

解压完成后目录结构如下:

├── bin
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── canal_local.properties
│   ├── canal.properties
│   └── example
│       └── instance.properties
├── lib
├── logs
│   ├── canal
│   │   └── canal.log
│   └── example
│       ├── example.log
│       └── example.log
└── plugin

修改配置文件 conf/example/instance.properties,按如下配置即可,主要是修改数据库相关配置:

# 需要同步数据的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*

使用 startup.sh 脚本启动 canal-server 服务:

sh bin/startup.sh

启动成功后可使用如下命令查看服务日志信息:

tail -f logs/canal/canal.log
2020-10-26 16:18:13.354 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111]
2020-10-26 16:18:19.978 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is r

启动成功后可使用如下命令查看 instance 日志信息:

tail -f logs/example/example.log
2020-10-26 16:18:16.056 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-10-26 16:18:16.061 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-10-26 16:18:18.259 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2020-10-26 16:18:18.282 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-10-26 16:18:18.282 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2020-10-26 16:18:19.543 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-10-26 16:18:19.578 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-10-26 16:18:19.912 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mall-mysql-bin.000006","position":2271,"serverId":101,"timestamp":1603682664000}}
2020-10-26 16:18:22.435 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbou

如果想要停止 canal-server 服务可以使用如下命令:

sh bin/stop.sh

6.4. canal-adapter 使用

将我们下载好的压缩包 canal.adapter-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,然后解压到指定目录 /mydata/canal-adpter,解压完成后目录结构如下:
├── bin
│   ├── adapter.pid
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
│   ├── es6
│   ├── es7
│   │   ├── biz_order.yml
│   │   ├── customer.yml
│   │   └── product.yml
│   ├── hbase
│   ├── kudu
│   ├── logback.xml
│   ├── META-INF
│   │   └── spring.factories
│   └── rdb
├── lib
├── logs
│   └── adapter
│       └── adapter.log
└── plugin

修改配置文件 conf/application.yml,按如下配置即可,主要是修改 canal-server 配置、数据源配置和客户端适配器配置:

canal.conf:
  mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 对应集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批数量
  retries: 0 # 重试次数, -1为无限重试
  timeout: # 同步超时时间, 单位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources: # 源数据库配置
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true
      username: canal
      password: canal
  canalAdapters: # 适配器列表
  - instance: example # canal实例名或者MQ topic名
    groups: # 分组列表
    - groupId: g1 # 分组id, 如果是MQ模式将用到该值
      outerAdapters:
      - name: logger # 日志打印适配器
      - name: es7 # ES同步适配器
        hosts: 127.0.0.1:9200 # ES连接地址
        properties:
          mode: rest # 模式可选transport(9300) 或者 rest(9200)
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch # ES集群名称

添加配置文件 canal-adapter/conf/es7/product.yml,用于配置 MySql 中的表与 ElasticSearch 中索引的映射关系:

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: canal_product # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         p.id AS _id,
         p.title,
         p.sub_title,
         p.price,
         p.pic
        FROM
         product p"        # sql映射
  etlCondition: "where a.c_time>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小

使用 startup.sh 脚本启动 canal-adapter 服务:

sh bin/startup.sh

启动成功后可使用如下命令查看服务日志信息:

tail -f logs/adapter/adapter.log
20-10-26 16:52:55.148 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2020-10-26 16:52:57.005 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2020-10-26 16:52:57.376 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2020-10-26 16:52:58.615 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2020-10-26 16:52:58.651 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /mydata/canal-adapter/plugin
2020-10-26 16:52:59.043 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2020-10-26 16:52:59.044 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2020-10-26 16:52:59.057 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2020-10-26 16:52:59.100 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2020-10-26 16:52:59.153 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2020-10-26 16:52:59.590 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2020-10-26 16:52:59.626 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 31.278 seconds (JVM running for 33.99)
2020-10-26 16:52:59.930 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============

如果需要停止 canal-adapter 服务可以使用如下命令:

sh bin/stop.sh

6.5. canal-admin 使用

将我们下载好的压缩包 canal.admin-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,然后解压到指定目录 /mydata/canal-admin,解压完成后目录结构如下:
├── bin
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
│   ├── canal_manager.sql
│   ├── canal-template.properties
│   ├── instance-template.properties
│   ├── logback.xml
│   └── public
│       ├── avatar.gif
│       ├── index.html
│       ├── logo.png
│       └── static
├── lib
└── logs

创建 canal-admin 需要使用的数据库 canal_manager,创建 SQL 脚本为 /mydata/canal-admin/conf/canal_manager.sql,会创建如下表:

修改配置文件 conf/application.yml,按如下配置即可,主要是修改数据源配置和 canal-admin 的管理账号配置,注意需要用一个有读写权限的数据库账号,比如管理账号 root:root:

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root
  password: root
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

接下来对之前搭建的 canal-server 的 conf/canal_local.properties 文件进行配置,主要是修改canal-admin 的配置,修改完成后使用 sh bin/startup.sh local 重启 canal-server:

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

使用 startup.sh 脚本启动 canal-admin 服务:

sh bin/startup.sh

启动成功后可使用如下命令查看服务日志信息:

tail -f logs/admin.log
020-10-27 10:15:04.210 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2020-10-27 10:15:04.308 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2020-10-27 10:15:04.534 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2020-10-27 10:15:04.573 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 31.203 seconds (JVM running for 34.865)

访问 canal-admin 的 Web 界面,输入账号密码 admin:123456 即可登录,访问地址:http://192.168.3.101:8089

登录成功后即可使用 Web 界面操作 canal-server:

6.6. 数据同步演示

经过上面的一系列步骤,Canal 的数据同步功能已经基本可以使用了,下面我们来演示下数据同步功能。

首先我们需要在 ElasticSearch 中创建索引,和 MySql 中的 product 表相对应,直接在 Kibana 的Dev Tools 中使用如下命令创建即可:

PUT canal_product
{
  "mappings": {
    "properties": {
      "title": {
        "type": "text"
      },
      "sub_title": {
        "type": "text"
      },
      "pic": {
        "type": "text"
      },
      "price": {
        "type": "double"
      }
    }
  }
}

创建完成后可以查看下索引的结构:

之后使用如下 SQL 语句在数据库中创建一条记录:

INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 5, '小米8', ' 全面屏游戏智能手机 6GB+64GB', 1999.00, NULL

创建成功后,在 ElasticSearch 中搜索下,发现数据已经同步了:

再使用如下 SQL 对数据进行修改:

UPDATE product SET title='小米10' WHERE id=5

修改成功后,在 ElasticSearch 中搜索下,发现数据已经修改了:

再使用如下 SQL 对数据进行删除操作:

DELETE FROM product WHERE id=5

删除成功后,在 ElasticSearch 中搜索下,发现数据已经删除了,至此 MySql 同步到 ElasticSearch的功能完成了!


本文转载自: https://blog.csdn.net/mrluo735/article/details/135472409
版权归原作者 流华追梦 所有, 如有侵权,请联系我们删除。

“Canal —— 一款 MySql 实时同步到 ES 的阿里开源神器”的评论:

还没有评论