文章目录
1、基本介绍
Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:
- FlinkSQL
- Flink DataStream 和 Table API(本文使用该方式) 对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
2、代码实战
2.1、数据源准备
本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:
CREATETABLE`quick_chat_msg`(`id`bigintNOTNULLCOMMENT'主键id',`from_id`varchar(20)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'账户id(发送人)',`to_id`varchar(20)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'账户id(接收人)',`relation_id`varchar(50)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'发送关联',`content`varchar(500)DEFAULTNULLCOMMENT'消息内容',`msg_type`tinyint(1)DEFAULTNULLCOMMENT'消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info`varchar(500)DEFAULTNULLCOMMENT'额外信息',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`deleted`tinyint(1)DEFAULTNULLCOMMENT'删除标识',PRIMARYKEY(`id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb3;
需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
最后,要把数据库时区配置好,否则会出现问题,命令如下:
SET persist time_zone ='+8:00';SET time_zone ='+8:00';SHOW VARIABLES LIKE'%time_zone%';
2.2、代码实战
首先,引入Flink CDC相关依赖,内容如下:
<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency></dependencies>
第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:
importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;publicclassMySinkHandlerextendsRichSinkFunction<String>{@Overridepublicvoidinvoke(String value,Context context)throwsException{System.out.println(value);}@Overridepublicvoidopen(Configuration parameters)throwsException{}@Overridepublicvoidclose()throwsException{}}
最后,配置好 Flink CDC 监听进程,随着项目启动运行:
importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.configuration.RestOptions;importorg.apache.flink.streaming.api.datastream.DataStreamSink;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@ComponentpublicclassMySqlSourceExample{@PostConstructpublicvoidinit()throwsException{// 配置监听数据源MySqlSource<String> source =MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(newJsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration =newConfiguration();
configuration.setInteger(RestOptions.PORT,8081);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。
env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source,WatermarkStrategy.noWatermarks(),"MySQL Source").addSink(newMySinkHandler());
env.execute();}}
项目启动完毕后,可以通过8081端口访问Flink UI页面:
2.3、数据格式
上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:
# 新增{"before": null,
"after":{"id":3,
"from_id":"dog",
"to_id":"cat",
"relation_id":"dog:cat",
"content":"你好啊",
"msg_type":1,
"extra_info": null,
"create_time":1729164075000,
"deleted":0},
"source":{"version":"1.6.4.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":1729135279000,
"snapshot":"false",
"db":"quick_chat",
"sequence": null,
"table":"quick_chat_msg",
"server_id":1,
"gtid": null,
"file":"binlog.000002",
"pos":2452,
"row":0,
"thread": null,
"query": null
},
"op":"c",
"ts_ms":1729135278633,
"transaction": null
}
# 修改{"before":{"id":3,
"from_id":"dog",
"to_id":"cat",
"relation_id":"dog:cat",
"content":"你好啊",
"msg_type":1,
"extra_info": null,
"create_time":1729164075000,
"deleted":0},
"after":{"id":3,
"from_id":"dog",
"to_id":"cat",
"relation_id":"dog:cat",
"content":"你好啊,小猫咪",
"msg_type":1,
"extra_info": null,
"create_time":1729164075000,
"deleted":0},
"source":{"version":"1.6.4.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":1729135289000,
"snapshot":"false",
"db":"quick_chat",
"sequence": null,
"table":"quick_chat_msg",
"server_id":1,
"gtid": null,
"file":"binlog.000002",
"pos":2825,
"row":0,
"thread": null,
"query": null
},
"op":"u",
"ts_ms":1729135288473,
"transaction": null
}
# 删除{"before":{"id":3,
"from_id":"dog",
"to_id":"cat",
"relation_id":"dog:cat",
"content":"你好啊,小猫咪",
"msg_type":1,
"extra_info": null,
"create_time":1729164075000,
"deleted":0},
"after": null,
"source":{"version":"1.6.4.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":1729135301000,
"snapshot":"false",
"db":"quick_chat",
"sequence": null,
"table":"quick_chat_msg",
"server_id":1,
"gtid": null,
"file":"binlog.000002",
"pos":3247,
"row":0,
"thread": null,
"query": null
},
"op":"d",
"ts_ms":1729135300692,
"transaction": null
}
版权归原作者 大连-徐志斌 所有, 如有侵权,请联系我们删除。