文章目录
前言
Flink CDC主要关注于从源数据库(如MySQL、PostgreSQL等)捕获数据变更,并将这些变更实时地提供给Flink作业进行处理。Flink CDC的核心优势在于其实时性和一致性。通过捕获数据库的增量变动记录,Flink CDC能够实时地将这些变更数据同步到Flink流处理作业中,从而实现低延迟的数据处理和分析。同时,Flink CDC还保证了数据的一致性,确保在数据处理过程中数据的准确性和完整性。
为了实现这一功能,Flink社区开发了flink-cdc-connectors组件。这是一个可以直接从MySQL、PostgreSQL等数据库读取全量数据和增量变更数据的source组件。通过配置相应的连接器和参数,Flink作业可以连接到源数据库,并实时捕获和处理数据变更。
MySQL 启用binlog
在使用CDC之前务必要开启MySQl的binlog。
修改my.cnf文件,增加:
server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=15
binlog_do_db=testdb
添加maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.0</version></dependency>
使用DataStream API java代码读取CDC数据流
创建表
CREATETABLE`userdemo`(`user_id`VARCHAR(50)NOTNULLCOLLATE'utf8mb4_general_ci',`user_name`VARCHAR(50)NULLDEFAULTNULLCOLLATE'utf8mb4_general_ci',`age`INT(11)NULLDEFAULT'0',PRIMARYKEY(`user_id`)USINGBTREE)COLLATE='utf8mb4_general_ci'ENGINE=InnoDB;
importcom.ververica.cdc.connectors.mysql.MySqlSource;importcom.ververica.cdc.connectors.mysql.table.StartupOptions;importcom.ververica.cdc.debezium.DebeziumSourceFunction;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @Date: 2024/3/12 10:03
* @Description DataStream API CDC
**/publicclassFlinkMysqlCdc{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DebeziumSourceFunction<String> sourceFunction =MySqlSource.<String>builder().hostname("10.168.192.70").port(3306).username("root").password("XXXXX").databaseList("testdb")// 这里一定要是db.table的形式.tableList("testdb.userdemo").serverTimeZone("GMT+8")// .deserializer(new StringDebeziumDeserializationSchema()).deserializer(newJsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
dataStreamSource.print();
env.execute("FlinkDSCDC");}}
运行程序输出内容入下:
{"before":null,"after":{"user_id":"001","user_name":"sdaf","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"true","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834716,"transaction":null}{"before":null,"after":{"user_id":"002","user_name":"DSF","age":35},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"last","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834720,"transaction":null}
插入数据
INSERTINTO userdemo (user_id,user_name,age)VALUES('004','wangwu',26);
{"before":null,"after":{"user_id":"004","user_name":"wangwu","age":26},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235648000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":649,"row":0,"thread":7,"query":null},"op":"c","ts_ms":1710235647380,"transaction":null}
修改userdemo数据
UPDATE userdemo SET user_name='zhangsan'WHERE user_id='001'
运行结果如下:
{"before":{"user_id":"001","user_name":"sdaf","age":23},"after":{"user_id":"001","user_name":"zhangsan","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235526000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":352,"row":0,"thread":7,"query":null},"op":"u","ts_ms":1710235525246,"transaction":null}
使用Flink SQL读取CDC数据流
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.api.*;publicclassMyFlinkCDCJob{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);// 配置 MySQL CDC 源 String sourceDDL ="CREATE TABLE my_table ("+" id INT NOT NULL,"+" name STRING,"+" age INT,"+" PRIMARY KEY (id) NOT ENFORCED"+") WITH ("+" 'connector' = 'mysql-cdc',"+" 'hostname' = 'your_mysql_hostname',"+" 'port' = '3306',"+" 'username' = 'your_username',"+" 'password' = 'your_password',"+" 'database-name' = 'your_database_name',"+" 'table-name' = 'your_table_name'"+")";
tableEnv.executeSql(sourceDDL);// 定义 Flink 作业逻辑 Table result = tableEnv.sqlQuery("SELECT * FROM my_table");
tableEnv.toRetractStream(result,Row.class).print();// 执行作业
env.execute("My Flink CDC Job");}}
在上面的代码中,我们创建了一个名为
my_table
的表,该表通过 MySQL CDC 连接器连接到 MySQL 数据库。然后,我们执行一个 SQL 查询来选择这个表中的所有数据,并将结果打印到控制台。
请注意,你需要替换示例代码中的
'your_mysql_hostname'
,
'your_username'
,
'your_password'
,
'your_database_name'
, 和
'your_table_name'
为你的实际 MySQL 数据库信息。
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。