0


springboot集成canal

目录

项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求

原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。

具体步骤

一、打开mysql的binlog

1.1 打开 MySQL 配置文件
my.cnf

(通常位于

/etc/mysql/my.cnf

/etc/my.cnf

)并添加或修改以下设置:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row

**

注意 :确保binlog-format是 row模式

**

1.2 重启mysql服务

具体命令根据你的服务器类型决定

1.3 验证是否生效
SHOW MASTER STATUS;

二、 部署canal 服务端

2.1 下载启动脚本(可能需要梯子)
# 下载脚本wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 
2.2 启动服务
# 构建一个destination name为test的队列sh run.sh -ecanal.auto.scan=false \-ecanal.destinations=test \-ecanal.instance.master.address=127.0.0.1:3306  \-ecanal.instance.dbUsername=canal  \-ecanal.instance.dbPassword=canal  \-ecanal.instance.connectionCharset=UTF-8 \-ecanal.instance.tsdb.enable=true \-ecanal.instance.gtidon=false  \-ecanal.instance.filter.regex=.*\\..* 

参数解释:

-ecanal.auto.scan=false:

关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。
-ecanal.destinations=test:

设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-ecanal.instance.master.address=127.0.0.1:3306:

指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。
-ecanal.instance.dbUsername=canal:

设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-ecanal.instance.dbPassword=canal:

设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。
-ecanal.instance.connectionCharset=UTF-8:

设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-ecanal.instance.tsdb.enable=true:

启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。
-ecanal.instance.gtidon=false:

关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。
-ecanal.instance.filter.regex=.*\\..*:

设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>

可以看到这样的打印:
image.png

docker-compose方式

docker-compose.yml文件:

#version: '3.7'services:canal-server:image: canal/canal-server:latest
    container_name: canal-server
    ports:-"11111:11111"environment:- canal.auto.scan=false
      - canal.destinations=nps_canal
      - canal.instance.master.address={ip}:3306- canal.instance.dbUsername=root
      - canal.instance.dbPassword={pwd}- canal.instance.connectionCharset=UTF-8- canal.instance.tsdb.enable=true
      - canal.instance.gtidon=false
      - canal.instance.filter.regex=nps.record
    deploy:resources:limits:memory: 4096M
    restart: always

注意:修改主数据库的ip和登录密码,同时要特别注意canal.instance.filter.regex=nps.record 这样代表监听数据库nps下面的record表,一定不能像docker运行命令那样写成nps.record,不用管转义符的事情。

三、springboot端集成canal客户端

3.1 添加依赖 /配置
<!--  canal begin--><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.0</version></dependency><!--  canal end-->
canal:host: 127.0.0.1 #自己的canal服务器ipport:11111#canal默认端口destination: test #配置文件配置的名称username: root
  password:214365batch:size:100
3.2 客户端代码
importcn.hutool.json.JSONObject;importcn.hutool.json.JSONUtil;importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importcom.alibaba.otter.canal.protocol.CanalEntry;importcom.alibaba.otter.canal.protocol.Message;importcom.eco.db.entity.Record;importcom.eco.fishway.service.RecordService;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.DisposableBean;importorg.springframework.beans.factory.InitializingBean;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjava.net.InetSocketAddress;importjava.util.List;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;@Slf4j@ComponentpublicclassCanalClientimplementsInitializingBean,DisposableBean{@Value("${canal.host}")privateString canalHost;@Value("${canal.port}")privateint canalPort;@Value("${canal.destination}")privateString canalDestination;@Value("${canal.username}")privateString canalUsername;@Value("${canal.password}")privateString canalPassword;@Value("${canal.batch.size}")privateint batchSize;privatefinalRecordService recordService;privateCanalConnector canalConnector;privateExecutorService executorService;publicCanalClient(RecordService recordService){this.recordService = recordService;}@OverridepublicvoidafterPropertiesSet()throwsException{this.canalConnector =CanalConnectors.newSingleConnector(newInetSocketAddress(canalHost, canalPort),
            canalDestination,
            canalUsername,
            canalPassword
        );this.executorService =Executors.newSingleThreadExecutor();this.executorService.execute(newTask());}@Overridepublicvoiddestroy()throwsException{if(executorService !=null){
            executorService.shutdown();}}privateclassTaskimplementsRunnable{@Overridepublicvoidrun(){while(true){try{//连接
                    canalConnector.connect();//订阅
                    canalConnector.subscribe();while(true){Message message = canalConnector.getWithoutAck(batchSize);// batchSize为每次获取的batchSize大小long batchId = message.getId();//获取批量的数量int size = message.getEntries().size();try{//如果没有数据if(batchId ==-1|| size ==0){// log.info("无数据");// 线程休眠2秒Thread.sleep(2000);}else{// 如果有数据,处理数据printEntry(message.getEntries());// 确认处理完成
                                canalConnector.ack(batchId);}}catch(Exception e){
                            log.error(e.getMessage());// 程序错误,也直接确认,跳过这次偏移
                            canalConnector.ack(batchId);}}catch(Exception e){
                    log.error("Error occurred when running Canal Client", e);}finally{
                    canalConnector.disconnect();}}}}privatevoidprintEntry(List<CanalEntry.Entry> entrys){for(CanalEntry.Entry entry : entrys){if(isTransactionEntry(entry)){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange;try{
                rowChange =CanalEntry.RowChange.parseFrom(entry.getStoreValue());}catch(Exception e){thrownewRuntimeException("ERROR ## parser of eromanga-event has an error , data:"+ entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();//打印Header信息
            log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType);//判断是否是DDL语句if(rowChange.getIsDdl()){
                log.info("================》;isDdl: true,sql:{}", rowChange.getSql());}
            log.info(rowChange.getSql());//获取RowChange对象里的每一行数据,打印出来for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){//如果是删除语句if(eventType ==CanalEntry.EventType.DELETE){
                    log.info(">>>>>>>>>> 删除 >>>>>>>>>>");printColumnAndExecute(rowData.getBeforeColumnsList(),"DELETE");//如果是新增语句}elseif(eventType ==CanalEntry.EventType.INSERT){
                    log.info(">>>>>>>>>> 新增 >>>>>>>>>>");printColumnAndExecute(rowData.getAfterColumnsList(),"INSERT");//如果是更新的语句}else{
                    log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据
                    log.info("------->; before");printColumnAndExecute(rowData.getBeforeColumnsList(),null);//变更后的数据
                    log.info("------->; after");printColumnAndExecute(rowData.getAfterColumnsList(),"UPDATE");}}}}/**
     * 执行数据同步
     * @param columns
     * @param type
     */privatevoidprintColumnAndExecute(List<CanalEntry.Column> columns,String type){if(type ==null){return;}JSONObject jsonObject =newJSONObject();for(CanalEntry.Column column : columns){
            jsonObject.put(column.getName(), column.getValue());}// 此处使用json转对象的方式进行转换Record bean = jsonObject.toBean(Record.class);if(type.equals("INSERT")){// 执行新增
            recordService.save(bean);
            log.info("新增成功->{}", jsonObject.toJSONString(0));}elseif(type.equals("UPDATE")){// 执行编辑
            recordService.updateById(bean);
            log.info("编辑成功->{}", jsonObject.toJSONString(0));}elseif(type.equals("DELETE")){// 执行删除
            recordService.removeById(bean.getRecordId());
            log.info("删除成功->{}", jsonObject.toJSONString(0));}}/**
     * 判断当前entry是否为事务日志
     */privatebooleanisTransactionEntry(CanalEntry.Entry entry){if(entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONBEGIN){
            log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getEntryType());returntrue;}elseif(entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONEND){
            log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getEntryType());returntrue;}else{returnfalse;}}}
3.3 数据同步效果

image.png
有点感叹需求就是最好的老师,但是完不成需求就不好玩了

标签: spring boot 后端 java

本文转载自: https://blog.csdn.net/qq_38899062/article/details/140885120
版权归原作者 Ayu大象 所有, 如有侵权,请联系我们删除。

“springboot集成canal”的评论:

还没有评论