1、前言
工作中,我们很多时候需要根据某些状态的变化更新另一个业务的逻辑,比如订单的生成,成交等,需要更新或者通知其他的业务。我们通常的操作通过业务埋点、接口的调用或者中间件完成。
但是状态变化的入口比较多的时候,就很容易漏掉某些地方。代码维护起来也比较麻烦。今天介绍阿里出品的 【canal】中间件完成数据库字段的监听。
2、canal的简单介绍
canal详见介绍件官网:https://github.com/alibaba/canal
2.1 家族成员:
【canal.adapter】:客户端落地的适配以及功能
![](https://img-blog.csdnimg.cn/ffc3c4ab26a64797ad07a0c62f36801c.png)
【canal.admin】:提供WebUI的管理界面
【canal.deployer】:canal服务
【canal.example】:客户端提供的demo
2.2 工作原理
3、 实践目标
使用canal监控mysql数据的变化,将变化的数据推送到kafka,并使用canal-admin动态管理需要监控的数据库表。
4、工具准备
其中kafka是依赖zookeeper的,所以也需要zookeeper。
5、配置并启动kafka
Kafka QuickStart
5.1 修改配置
vim config/server.properties
换成自己的IP
替换成自己zookeeper的地址
5.2 启动server
- 启动zookeeper脚本
bin/zkServer.sh start
- 启动kafka脚本
bin/kafka-server-start.sh -daemon config/server.properties &
- 查看是否启动成功脚本
jps -ml
此时kafka启动成功。
5.3 注意事项
值得注意的是官方文档中查看topic的命令,
# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181
在心的kafka版本中已经改变,可移步kafka官方文档: Apache Kafka
新版本中使用bootstrap-server,如下
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
6、启动canal-admin
6.1 修改配置
改成对应的ip
** 6.2 执行 conf/canal.manage.sql**
** **该脚本是canal-admin的管理脚本。
** 6.3 启动canal-admin**
sh bin/startup.sh
** 6.4 查看启动状态**
6.5 访问页面
此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,
默认密码:admin/123456
7、启动canal-server
7.1 修改配置脚本
vim conf/canal_local.properties
换成canal-admin的IP
7.2 启动服务 指定local
sh bin/startup.sh local
**7.3 查询启动状态 **
8、管理平台配置
8.1 查看canal服务的状态
8.2 配置实例
修改监听的数据库信息:
canal.instance.master.address=192.168.88.111:3306
canal.instance.dbUsername=***
canal.instance.dbPassword=***#默认监听全库
canal.instance.filter.regex=test.test_user
#配置不可访问的库表
canal.instance.filter.black.regex=
#配置mq的主题/路由
canal.mq.topic=example
保存即可。
8.3 启动实例
9、编写客户端监听kafka的客户端
@Test
public void test01(){
// 修改打印日志的级别,不然会不停的打印debug日志,影响阅读
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
Logger root = loggerContext.getLogger("root");
root.setLevel(Level.INFO);
//设置消费者属性
Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.88.111:9092");
//反序列化器,与生产者的序列化器相对应
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
//设置消费者的消费者群组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"example");
// properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
try {
//消费者订阅主题(可以多个,支持正则表达式,进行模糊匹配)
consumer.subscribe(Collections.singletonList("example"));
System.out.println("-------------------------消费端准备就绪,等待消息接受------------------------------------");
//kafka消费者是通过拉取的方式获得服务端消息
while(true){
//循环调用poll方法,获取数据。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String, String> record:records){
String topic = record.topic();
String value = record.value();
if (StringUtils.isNotEmpty(value)) {
System.out.println(String.format("topic:%s;" + "value:%s", topic,value));
}
}
}
} finally {
consumer.close();
}
}
10、验证
修改数据库字段,可以接收到修改的信息
版权归原作者 智_永无止境 所有, 如有侵权,请联系我们删除。