0


Canal+Kafka实现MySQL与Redis数据同步(二)

Canal+Kafka实现MySQL与Redis数据同步(二)

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group1
      #序列化反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量抓取batch-size:65536# 缓存容量buffer-memory:524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

publicclassCanalBean{//数据privateList<TbCommodityInfo> data;//数据库名称privateString database;privatelong es;//递增,从1开始privateint id;//是否是DDL语句privateboolean isDdl;//表结构的字段类型privateMysqlType mysqlType;//UPDATE语句,旧数据privateString old;//主键名称privateList<String> pkNames;//sql语句privateString sql;privateSqlType sqlType;//表名privateString table;privatelong ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等privateString type;//getter、setter方法}publicclassMysqlType{privateString id;privateString commodity_name;privateString commodity_price;privateString number;privateString description;//getter、setter方法}publicclassSqlType{privateint id;privateint commodity_name;privateint commodity_price;privateint number;privateint description;}

最后就可以创建一个消费者CanalConsumer进行消费:

@ComponentpublicclassCanalConsumer{//日志记录privatestaticLogger log =LoggerFactory.getLogger(CanalConsumer.class);//redis操作工具类@ResourceprivateRedisClient redisClient;//监听的队列名称为:canaltopic@KafkaListener(topics ="canaltopic")publicvoidreceive(ConsumerRecord<?,?> consumer){String value =(String) consumer.value();
        log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为javaBeanCanalBean canalBean =JSONObject.parseObject(value,CanalBean.class);//获取是否是DDL语句boolean isDdl = canalBean.getIsDdl();//获取类型String type = canalBean.getType();//不是DDL语句if(!isDdl){List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();//过期时间longTIME_OUT=600L;if("INSERT".equals(type)){//新增语句for(TbCommodityInfo tbCommodityInfo : tbCommodityInfos){String id = tbCommodityInfo.getId();//新增到redis中,过期时间是10分钟
                    redisClient.setString(id,JSONObject.toJSONString(tbCommodityInfo),TIME_OUT);}}elseif("UPDATE".equals(type)){//更新语句for(TbCommodityInfo tbCommodityInfo : tbCommodityInfos){String id = tbCommodityInfo.getId();//更新到redis中,过期时间是10分钟
                    redisClient.setString(id,JSONObject.toJSONString(tbCommodityInfo),TIME_OUT);}}else{//删除语句for(TbCommodityInfo tbCommodityInfo : tbCommodityInfos){String id = tbCommodityInfo.getId();//从redis中删除
                    redisClient.deleteKey(id);}}}}}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATETABLE`tb_commodity_info`(`id`varchar(32)NOTNULL,`commodity_name`varchar(512)DEFAULTNULLCOMMENT'商品名称',`commodity_price`varchar(36)DEFAULT'0'COMMENT'商品价格',`number`int(10)DEFAULT'0'COMMENT'商品数量',`description`varchar(2048)DEFAULT''COMMENT'商品描述',PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERTINTO`canaldb`.`tb_commodity_info`(`id`,`commodity_name`,`commodity_price`,`number`,`description`)VALUES('3e71a81fd80711eaaed600163e046cc3','叉包','3.99','3','大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

img

Redis也查到了对应的数据,证明同步成功!

img

如果更新呢?试一下Update语句:

UPDATE`canaldb`.`tb_commodity_info`SET`commodity_name`='青菜包',`description`='便宜的青菜包'WHERE`id`='3e71a81fd80711eaaed600163e046cc3';

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。 网的回答,大家参考一下img

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

标签: kafka mysql redis

本文转载自: https://blog.csdn.net/weixin_46294086/article/details/134489332
版权归原作者 JavaGPT 所有, 如有侵权,请联系我们删除。

“Canal+Kafka实现MySQL与Redis数据同步(二)”的评论:

还没有评论