Canal+Kafka实现MySQL与Redis数据同步(二)
创建MQ消费者进行同步
在application.yml配置文件加上kafka的配置信息:
spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group1
#序列化反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
# 批量抓取batch-size:65536# 缓存容量buffer-memory:524288
根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:
publicclassCanalBean{//数据privateList<TbCommodityInfo> data;//数据库名称privateString database;privatelong es;//递增,从1开始privateint id;//是否是DDL语句privateboolean isDdl;//表结构的字段类型privateMysqlType mysqlType;//UPDATE语句,旧数据privateString old;//主键名称privateList<String> pkNames;//sql语句privateString sql;privateSqlType sqlType;//表名privateString table;privatelong ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等privateString type;//getter、setter方法}publicclassMysqlType{privateString id;privateString commodity_name;privateString commodity_price;privateString number;privateString description;//getter、setter方法}publicclassSqlType{privateint id;privateint commodity_name;privateint commodity_price;privateint number;privateint description;}
最后就可以创建一个消费者CanalConsumer进行消费:
@ComponentpublicclassCanalConsumer{//日志记录privatestaticLogger log =LoggerFactory.getLogger(CanalConsumer.class);//redis操作工具类@ResourceprivateRedisClient redisClient;//监听的队列名称为:canaltopic@KafkaListener(topics ="canaltopic")publicvoidreceive(ConsumerRecord<?,?> consumer){String value =(String) consumer.value();
log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为javaBeanCanalBean canalBean =JSONObject.parseObject(value,CanalBean.class);//获取是否是DDL语句boolean isDdl = canalBean.getIsDdl();//获取类型String type = canalBean.getType();//不是DDL语句if(!isDdl){List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();//过期时间longTIME_OUT=600L;if("INSERT".equals(type)){//新增语句for(TbCommodityInfo tbCommodityInfo : tbCommodityInfos){String id = tbCommodityInfo.getId();//新增到redis中,过期时间是10分钟
redisClient.setString(id,JSONObject.toJSONString(tbCommodityInfo),TIME_OUT);}}elseif("UPDATE".equals(type)){//更新语句for(TbCommodityInfo tbCommodityInfo : tbCommodityInfos){String id = tbCommodityInfo.getId();//更新到redis中,过期时间是10分钟
redisClient.setString(id,JSONObject.toJSONString(tbCommodityInfo),TIME_OUT);}}else{//删除语句for(TbCommodityInfo tbCommodityInfo : tbCommodityInfos){String id = tbCommodityInfo.getId();//从redis中删除
redisClient.deleteKey(id);}}}}}
测试MySQL与Redis同步
mysql对应的表结构如下:
CREATETABLE`tb_commodity_info`(`id`varchar(32)NOTNULL,`commodity_name`varchar(512)DEFAULTNULLCOMMENT'商品名称',`commodity_price`varchar(36)DEFAULT'0'COMMENT'商品价格',`number`int(10)DEFAULT'0'COMMENT'商品数量',`description`varchar(2048)DEFAULT''COMMENT'商品描述',PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='商品信息表';
首先在MySQL创建表。然后启动项目,接着新增一条数据:
INSERTINTO`canaldb`.`tb_commodity_info`(`id`,`commodity_name`,`commodity_price`,`number`,`description`)VALUES('3e71a81fd80711eaaed600163e046cc3','叉包','3.99','3','大叉包,老喜欢');
tb_commodity_info表查到新增的数据:
Redis也查到了对应的数据,证明同步成功!
如果更新呢?试一下Update语句:
UPDATE`canaldb`.`tb_commodity_info`SET`commodity_name`='青菜包',`description`='便宜的青菜包'WHERE`id`='3e71a81fd80711eaaed600163e046cc3';
没有问题!
总结
canal的缺点:
- canal只能同步增量数据。
- 不是实时同步,是准实时同步。
- 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
- MQ顺序性问题。 网的回答,大家参考一下
尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。
版权归原作者 JavaGPT 所有, 如有侵权,请联系我们删除。