0


Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

前言

关于Canal的介绍及原理不在此赘述,可自行查阅。笔者在使用Canal同步Mysql实时操作记录至RabbitMQ的过程中,也翻阅了一些大牛们的文章,可能是我使用的Canal版本与文中版本不一致,出现了一些问题,在此总结记录一下可行的方案。
注:本文使用的Canal为

v1.1.7

一、Mysql数据库开启bin_log

  • 先查看目标数据库是否开启bin_log
SHOW VARIABLES LIKE'log_bin'

如结果中,log_bin的值为OFF则未开启,为ON则已开启。
在这里插入图片描述

  • 如未开启,可编辑Mysql配置文件:/etc/my.cnf
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式server_id=1# 配置MySQL replaction需要定义,不和Canal的slaveId重复即可

重启MySQL ,再次通过上一步查看配置是否生效。

二、数据库创建新用户

  • 创建专用于数据同步的新用户
-- 创建一个新用户,名称可自行定义createuser canal@'%' IDENTIFIED by'canal';-- 为新用户授权GRANTSELECT,REPLICATION SLAVE,REPLICATION CLIENT,SUPER ON*.*TO'canal'@'%';-- 刷新缓存中的用户数据
FLUSH PRIVILEGES;

三、配置RabhitMQ

以下使用的名称均可自行定义,保证唯一即可

1. 添加交换机

在这里插入图片描述

2. 添加队列

在这里插入图片描述

3. 绑定交换机与队列,设置 Routing key

在这里插入图片描述

四、下载、配置、运行Canal(windows环境)

1. 下载服务端

  • 可到以下地址下载所需版本的包:github-alibaba-canal 本文使用较新的 v1.1.7在这里插入图片描述
  • 选择下载 canal.deployer-1.1.7.tar.gz在这里插入图片描述

2. 配置

  • 解压下载包,获得如下文件。在这里插入图片描述
  • 编辑:conf\canal.properties(仅列出需要修改的配置项)
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
###########################################################             RabbitMQ         ################################################################ host 无需添加端口号
rabbitmq.host =192.168.0.2
# 填写 / 即可
rabbitmq.virtual.host = /
# RabbitMQ的用户名、密码
rabbitmq.username = admin
rabbitmq.password =123456# 上文配置的交换机(exchange)名称:Name
rabbitmq.exchange = canal.exchange
# 交换机类型:Type
rabbitmq.deliveryMode = direct

# 以下两个字段为自行添加,否则会报空指针异常# 队列(queue)名称:Name
rabbitmq.queue = canal.queue
# 绑定队列-交换机时的路由秘钥:Routing key
rabbitmq.routingKey = canal.routing.key
  • 编辑:conf\example\instance.properties(仅列出需要修改的配置项)
# 目标数据库地址canal.instance.master.address=192.168.0.1:3306
# 目标数据库用户名密码canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123

# 表过滤正则表达式(按需修改)# 全库全表 : .*\\..*# 指定库所有表:  库名\..*   例:test\..*# 单表:  库名.表名  例:test.user# 多规则组合使用:  库名1\..*,库名2.表名1,库名3.表名2 (逗号分隔)  例 test\..*,test2.user1,test3.user2 (逗号分隔)canal.instance.filter.regex=.*\\..*
# canal.instance.filter.regex=project.sys_user,project.sys_role

3. 运行

windows环境下直接运行

bin\startup.bat

,linux环境下执行

bin\startup.sh


执行启动脚本后,查看日志信息

logs\canal\canal.log

,出现如下信息,表示启动成功。

[main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

五、测试

对监听的数据库表做修改操作,至RabbitMQ控制台的队列中查看是否插入消息。
如下,即成功插入实时操作数据。
在这里插入图片描述

六、项目中监听处理

  • 创建一个maven项目

在这里插入图片描述

  • pom.xml中引入spring-boot-starter-amqp依赖,此包集成了对RabbitMQ的支持。
<!-- RabbitMQ 集成支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- fastjson 解析数据 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.9.graal</version></dependency>
  • 修改配置文件application.yml(此处已按个人偏好,文件类型改为yaml),配置RabbitMQ。
spring:rabbitmq:host: 192.168.0.2
    port:5672username: admin
    password:123456
  • binLog数据实体类BinLogEntity
importcom.alibaba.fastjson.JSONArray;importcom.alibaba.fastjson.JSONObject;importlombok.Data;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;@DatapublicclassBinLogEntity{/**
     * 数据库
     */privateString database;/**
     * 表
     */privateString table;/**
     * 操作类型
     */privateString type;/**
     * 操作数据
     */privateJSONArray data;/**
     * 变更前数据
     */privateJSONArray old;/**
     * 主键名称
     */privateJSONArray pkNames;/**
     * 执行sql语句
     */privateString sql;privateLong es;privateString gtid;privateLong id;privateBoolean isDdl;privateJSONObject mysqlType;privateJSONObject sqlType;privateLong ts;public<T>List<T>getData(Class<T> clazz){if(this.data ==null||this.data.size()==0){returnnull;}returnthis.data.toJavaList(clazz);}public<T>List<T>getOld(Class<T> clazz){if(this.old ==null||this.old.size()==0){returnnull;}returnthis.old.toJavaList(clazz);}publicList<String>getPkNames(){if(this.pkNames ==null||this.pkNames.size()==0){returnnull;}List<String> pkNames =newArrayList<>();for(Object pkName :this.pkNames){
            pkNames.add(pkName.toString());}return pkNames;}publicMap<String,String>getMysqlType(){if(this.mysqlType ==null){returnnull;}Map<String,String> mysqlTypeMap =newHashMap<>();this.mysqlType.forEach((k, v)->{
            mysqlTypeMap.put(k, v.toString());});return mysqlTypeMap;}publicMap<String,Integer>getSqlType(){if(this.sqlType ==null){returnnull;}Map<String,Integer> sqlTypeMap =newHashMap<>();this.sqlType.forEach((k, v)->{
            sqlTypeMap.put(k,Integer.valueOf(v.toString()));});return sqlTypeMap;}}
  • 操作数据实体类
@DatapublicclassUserimplementsSerializable{privatestaticfinallong serialVersionUID =1L;/**
     * ID
     */privateLong id;/**
     * 姓名
     */privateString name;/**
     * 年龄
     */privateInteger age;/**
     * 电话
     */privateString phone;}
  • 监听类CanalListener
importcom.alibaba.fastjson.JSON;importcom.example.canalclient.entity.BinLogEntity;importcom.example.canalclient.entity.User;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;/**
 * 监听数据库数据变化时RabbitMQ发送的信息
 */@ComponentpublicclassCanalListener{@RabbitListener(bindings ={@QueueBinding(
                    value =@Queue(value ="canal.queue", durable ="true"),
                    exchange =@Exchange(value ="canal.exchange"),
                    key ="canal.routing.key")})publicvoidhandleDataChange(@PayloadMessage message){// 获取消息内容String content =newString(message.getBody(),StandardCharsets.UTF_8);// 反序列化BinLogEntity binLog =JSON.parseObject(content,BinLogEntity.class);// 获取操作数据User user = binLog.getData(User.class).get(0);User oldUser = binLog.getOld(User.class).get(0);System.out.println("数据库:"+ binLog.getDatabase());System.out.println("表:"+ binLog.getTable());System.out.println("操作类型:"+ binLog.getType());System.out.println("主键:"+JSON.toJSONString(binLog.getPkNames()));System.out.println("数据:"+JSON.toJSONString(User));System.out.println("原数据:"+JSON.toJSONString(User));System.out.println("MysqlType:"+JSON.toJSONString(binLog.getMysqlType()));}}
  • 打印结果(修改操作)
数据库:project
表:sys_user
操作类型:UPDATE
主键:["id"]
数据:{"id":1,
    "name":"张三",
    "age":21,
    "phone":13333333333}
原数据:{"age":20,
    "phone":12222222222}
MysqlType:{"id":"bigint unsigned",
    "name":"varchar(50)",
    "age":"int(3) unsigned",
    "phone":"varchar(50)"}

至此,已实现对目标数据库实时操作数据进行监听,可根据不同的操作类型,采取相应的业务处理。

七、参考文章

Canal+Msql+RabbitMq数据库同步配置,看这一篇就够了

使用canal同步mysql数据库信息到RabbitMQ

Canal配置connector.subscribe和canal.instance.filter.regex遇到的坑


本文转载自: https://blog.csdn.net/a6780144/article/details/137274332
版权归原作者 JustHuman_nimang 所有, 如有侵权,请联系我们删除。

“Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理”的评论:

还没有评论