文章目录
一、Flink&Flink CDC官网
Flink CDC地址
Flink官网地址
二、CDC&Flink CDC介绍
1、 什么是cdc
CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。
2、什么是Flink CDC
用于 Apache Flink 的 CDC 连接器是一组源连接器,用于®Apache Flink®,使用变更数据捕获 (CDC) 从不同的数据库引入变更。 Apache Flink 的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。查看更多关于什么是Debezium的信息。®
3、支持的连接器
三、springboot整合Filnk CDC
1、官网示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");}}
2、Maven依赖
1) Flink和Flink CDC版本映射
2)具体maven依赖
<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.3.12.RELEASE</spring-boot.version><log4j2.version>2.17.0</log4j2.version><mybatis-plus.version>3.4.1</mybatis-plus.version><sharding.jdbc.version>4.1.0</sharding.jdbc.version><flink.version>1.16.0</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--log4j漏洞修复--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j2.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j2.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j2.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-1.2-api</artifactId><version>${log4j2.version}</version></dependency><!--引入log4j2依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><!--分页插件--><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId><version>1.4.1</version></dependency><!--数据库驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><!--8.0.25中没有com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset方法--><version>8.0.29</version></dependency><!--数据库链接池--><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.4</version></dependency><!-- 引入easyexcel依赖--><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.1.1</version></dependency><!--分库分表--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><!--<version>4.0.0-RC1</version>--><version>4.1.1</version></dependency><!--mybatsiplus及自动生成代码工具--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>org.apache.velocity</groupId><artifactId>velocity-engine-core</artifactId><version>2.3</version></dependency><!--分页插件--><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId><version>1.4.1</version></dependency><!-- 多数据源 --><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1</version></dependency><!--开启优雅注解校验--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!--apache-http请求--><!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.14</version></dependency><!--fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><!--aliyun oss--><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>3.15.0</version></dependency><!--xxl-job--><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.4.0</version></dependency><!-- ShardingSphere 读写分离/分库分表 --><!-- <dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.1.2</version>
</dependency>--><!-- Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!--运行是不要用下面这个,会报错找不到类--><!--<scope>provided</scope>--></dependency><!--从 flink1.11.0 版本开始,需要多引入一个 flink-client包,没有这个依赖会报错 No ExecutorFactory found to execute the application--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><!-- 这里面mysql jdbc驱动引入的8.0.25,需要去除,必须引入8.0.28及以上版本
否则启动项目是报错: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;--><exclusions><exclusion><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></exclusion></exclusions><!-- <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc --><!--<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.12.1</version></dependency>
3)项目坑点
- mysql jdbc驱动包版本过低,项目启动时导致 java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;,找不到该方法,必须引入8.0.28及以上版本
- flink-connector-mysql-cdc 的2.3.0版本依赖中引入的是8.0.25,需要去除掉
<!--数据库驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><!--8.0.25中没有com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset方法--><version>8.0.29</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><!-- 这里面mysql jdbc驱动引入的8.0.25,需要去除,必须引入8.0.28及以上版本
否则启动项目是报错: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;--><exclusions><exclusion><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></exclusion></exclusions>
- 项目启动报错:java.lang.NoClassDefFoundError: org/apache/flink/table/api/ValidationException,找不到此类,是因为缺少依赖包,引入相关依赖包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.12.1</version></dependency>
- Flink的版本和Flink CDC的版本一定要兼容,按照官方给定的版本进行引入
版本过低会导致:Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation,
要解决此问题,你可以按照以下步骤操作:
确保使用的数据库用户具有 RELOAD 权限。请登录到 MySQL 数据库,并为用户授予 RELOAD 权限。例如,使用以下命令为用户 your_user 授予 RELOAD 权限:
GRANT RELOAD ON *.* TO 'your_user'@'localhost';
Flink官方解决上述问题
MySQL CDC source使用增量快照算法,避免了数据库锁的使用,因此不需要"RELOAD"权限。
从 Flink 1.12 版本开始,Flink 引入了对 MySQL CDC 的集成和支持。在这个版本中,Flink 提供了 flink-connector-mysql-cdc 模块,用于实现基于 MySQL 的 Change Data Capture 功能。
在 Flink 1.12 版本中,MySQL CDC 源使用了增量快照算法来捕获数据变更,并且不需要 RELOAD 权限。这种实现方式避免了数据库锁的使用,提供了低延迟的数据变更捕获能力。
代码中设置can.incremental.snapshot.enabled开启,详细代码见代码示例
Configuration config =newConfiguration();// 设置增量快照开启为 true
config.setBoolean("scan.incremental.snapshot.enabled",true);
env.configure(config);
3、springboot代码示例
1)创建变更监听器
创建MysqlEventListener 类实现ApplicationRunner ,项目启动时可以启动mysql监听
importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.connectors.mysql.table.StartupOptions;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;/**
* @Description: mysql变更监听器
* @Date: 2023/10/11
**/publicclassMysqlEventListenerimplementsApplicationRunner{@Overridepublicvoidrun(ApplicationArguments args)throwsException{MySqlSource<DataChangeInfo> mySqlSource =MySqlSource.<DataChangeInfo>builder().hostname("yourHostname").port(3306).databaseList("yourDatabaseName")// 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*"..tableList("yourDatabaseName.yourTableName")// 设置捕获的表,数据库.表名.username("yourUsername").password("yourPassword").deserializer(newMysqlDeserialization())// 将 SourceRecord 转换为 自定义对象/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
*/.startupOptions(StartupOptions.latest()).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Configuration config =newConfiguration();// 设置增量快照开启为 true
config.setBoolean("scan.incremental.snapshot.enabled",true);
env.configure(config);
env.setParallelism(1);// DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();DataStreamSource<DataChangeInfo> streamSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"MySQL Source").setParallelism(1);
streamSource.addSink(newDataChangeSink());
env.execute("mysql-stream-cdc");};}
2)自定义数据解析器
importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.debezium.DebeziumDeserializationSchema;importio.debezium.data.Envelope;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.util.Collector;importorg.apache.kafka.connect.data.Field;importorg.apache.kafka.connect.data.Schema;importorg.apache.kafka.connect.data.Struct;importorg.apache.kafka.connect.source.SourceRecord;importjava.util.List;importjava.util.Optional;/**
* @Description: mysql自定序列化
* @Date: 2023/10/11
**/publicclassMysqlDeserializationimplementsDebeziumDeserializationSchema<DataChangeInfo>{publicstaticfinalStringTS_MS="ts_ms";publicstaticfinalStringBIN_FILE="file";publicstaticfinalStringPOS="pos";publicstaticfinalStringCREATE="CREATE";publicstaticfinalStringBEFORE="before";publicstaticfinalStringAFTER="after";publicstaticfinalStringSOURCE="source";publicstaticfinalStringUPDATE="UPDATE";/**
* 反序列化数据,转变为自定义对象DataChangeInfo
* @param sourceRecord
* @param collector
* @throws Exception
*/@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<DataChangeInfo> collector)throwsException{String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct =(Struct) sourceRecord.value();finalStruct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo =newDataChangeInfo();
dataChangeInfo.setBeforeData(getJsonObject(struct,BEFORE).toJSONString());
dataChangeInfo.setAfterData(getJsonObject(struct,AFTER).toJSONString());//5.获取操作类型 CREATE UPDATE DELETEEnvelope.Operation operation =Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE)?1:UPDATE.equals(type)?2:3;
dataChangeInfo.setEventType(eventType);
dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);
dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x ->Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));//7.输出数据
collector.collect(dataChangeInfo);}/**
* 从元数据获取变更前或者变更后的数据
* @param value
* @param fieldElement
* @return
*/privateJSONObjectgetJsonObject(Struct value,String fieldElement){Struct element = value.getStruct(fieldElement);JSONObject jsonObject =newJSONObject();if(element !=null){Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for(Field field : fieldList){Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);}}return jsonObject;}@OverridepublicTypeInformation<DataChangeInfo>getProducedType(){returnTypeInformation.of(DataChangeInfo.class);}}
3)创建变更对象
importlombok.Data;/**
* @Description: 数据变更对象
* @Date: 2023/10/11
**/@DatapublicclassDataChangeInfo{/**
* 变更前数据
*/privateString beforeData;/**
* 变更后数据
*/privateString afterData;/**
* 变更类型 1新增 2修改 3删除
*/privateInteger eventType;/**
* binlog文件名
*/privateString fileName;/**
* binlog当前读取点位
*/privateInteger filePos;/**
* 数据库名
*/privateString database;/**
* 表名
*/privateString tableName;/**
* 变更时间
*/privateLong changeTime;}
4)创建业务处理类
importlombok.extern.slf4j.Slf4j;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;/**
* @Description: 数据处理
* @Date: 2023/10/11
**/@Slf4jpublicclassDataChangeSinkimplementsSinkFunction<String>{@Overridepublicvoidinvoke(String value,Context context)throwsException{
log.info("收到变更原始数据:{}", value);//业务代码}}
5)运行代码监听mysql CDC事件
项目启动成功
- 修改mysql数据库数据 变更前 变更后:点击保存 服务监听结果
版权归原作者 火炎焱小码哥 所有, 如有侵权,请联系我们删除。