Flink CDC
CDC相关介绍
CDC是什么?
CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到MQ以供其他服务进行订阅及消费
CDC分类
CDC主要分为基于查询和基于Binlog
基于查询基于Binlog开源产品Sqoop、DataXCanal、Maxwell、Debezium执行模式BatchStreaming是否可以捕获所有数据变化否是延迟性高延迟低延迟是否增加数据库压力是否
基于查询的都是Batch模式(即数据到达一定量后/一定时间才行会执行), 同时也因为这种模式, 那么延迟是必然高的, 而基于Streaming则是可以做到按条的粒度, 每条数据发生变化, 那么就会监听到
Flink CDC
Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的source组件。
目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
Java中集成Flink CDC
MySQL相关设置
执行初始化SQL数据
# 创建test数据库
create database test;
# 在test库中创建studnet, t1, t2, t3表, 插入数据
use test;
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`age` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `student` VALUES (1, 'joy', 18);
INSERT INTO `student` VALUES (2, 'tom', 123123);
CREATE TABLE t1(
`id` VARCHAR(255) PRIMARY KEY,
`name` VARCHAR(255)
);
CREATE TABLE t2(
`id` VARCHAR(255) PRIMARY KEY,
`name` VARCHAR(255)
);
CREATE TABLE t3(
`id` VARCHAR(255) PRIMARY KEY,
`name` VARCHAR(255)
);
# 创建test_route数据库
create database test_route;
# 在test_route库中创建t1, t2, t3表
use test_route;
CREATE TABLE t1(
`id` VARCHAR(255) PRIMARY KEY,
`name` VARCHAR(255)
);
CREATE TABLE t2(
`id` VARCHAR(255) PRIMARY KEY,
`name` VARCHAR(255)
);
CREATE TABLE t3(
`id` VARCHAR(255) PRIMARY KEY,
`name` VARCHAR(255)
);
# 在test_route数据库中的t1, t2, t3表插入数据
use test_route;
INSERT INTO t1 VALUES('1001','zhangsan');
INSERT INTO t1 VALUES('1002','lisi');
INSERT INTO t1 VALUES('1003','wangwu');
INSERT INTO t2 VALUES('1004','zhangsan');
INSERT INTO t2 VALUES('1005','lisi');
INSERT INTO t2 VALUES('1006','wangwu');
INSERT INTO t3 VALUES('1001','F');
INSERT INTO t3 VALUES('1002','F');
INSERT INTO t3 VALUES('1003','M');
开启Binlog
通常来说默认安装MySQL的cnf都是存在/etc下的
sudovim /etc/my.cnf
# 添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
# 数据库id
server-id = 1
# 时区, 如果不修改数据库时区, 那么Flink MySQL CDC无法启动
default-time-zone = '+8:00'
# 启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
# binlog类型,maxwell要求为row类型
binlog_format=row
# 启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=test
binlog-do-db=test_route
修改数据库时区
永久修改, 那么就修改my.cnf配置(刚刚配置已经修改了, 记得重启即可)
default-time-zone = '+8:00'
临时修改(重启会丢失)
# MySQL 8 执行这个set persist time_zone='+8:00';# MySQL 5.x版本执行这个settime_zone='+8:00';
重启MySQL
注意了, 设置后需要重启MySQL!
service mysqld restart
代码
相关依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Flink CDC依赖 start--><!-- Flink核心依赖, 提供了Flink的核心API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!-- Flink流处理Java API依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink客户端工具依赖, 包含命令行界面和实用函数 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink连接器基础包, 包含连接器公共功能 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!-- Flink Kafka连接器, 用于和Apache Kafka集成, 这里不需要集成, 所以注释掉, 代码可以使用其它的MQ代替 --><!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>--><!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API桥接器, 连接DataStream API和Table API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- Flink JSON格式化数据依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- 开启Web UI支持, 端口为8081, 默认为不开启--><!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.19.1</version>
</dependency>--><!-- MySQL CDC依赖
org.apache.flink的适用MySQL 8.0
具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408
--><dependency><!--MySQL 8.0适用--><!--<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>3.1.0</version>--><!-- MySQL 5.7适用 , 2.3.0, 3.0.1均可用--><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><!--<version>2.3.0</version>--><version>3.0.1</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- gson工具类 --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.11.0</version></dependency><!-- ognl表达式 --><dependency><groupId>ognl</groupId><artifactId>ognl</artifactId><version>3.1.1</version></dependency><!-- hutool工具类 --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.26</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.31</version></dependency></dependencies><name>cdc-test</name><description>cdc-test</description><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version><flink.version>1.19.0</flink.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
yaml
# Flink CDC相关配置flink-cdc:mysql:hostname: 192.168.132.101
port:3306username: root
password:12345678databaseList: test
tableList: test.student, test.t1
includeSchemaChanges:falseparallelism:1enableCheckpointing:5000
FlinkCDCConfig
importlombok.Data;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Configuration;/**
* @author whiteBrocade
* @version 1.0
* @description: Flink CDC配置
*/@Data@Configuration@ConfigurationProperties("flink-cdc.mysql")publicclassFlinkCDCConfig{/**
* 数据库地址
*/privateString hostname;/**
* 数据库端口
*/privateInteger port;/**
* 数据库用户名
*/privateString username;/**
* 数据库密码
*/privateString password;/**
* 数据库名
*/privateString[] databaseList;/**
* 表名
*/privateString[] tableList;/**
* 是否包含schema变更
*/privateBoolean includeSchemaChanges;/**
* 并行度
*/privateInteger parallelism;/**
* 检查点间隔, 单位毫秒
*/privateInteger enableCheckpointing;}
相关枚举
OperatorTypeEnum
importlombok.AllArgsConstructor;importlombok.Getter;/**
* @author whiteBrocade
* @version 1.0
* @description 操作类型枚举
*/@Getter@AllArgsConstructorpublicenumOperatorTypeEnum{/**
* 新增
*/INSERT(1),/**
* 修改
*/UPDATE(2),/**
* 删除
*/DELETE(3),;privatefinalint type;}
StrategyEnum
importcn.hutool.core.bean.BeanUtil;importcn.hutool.core.util.ObjUtil;importcn.hutool.json.JSONObject;importcn.hutool.json.JSONUtil;importlombok.AllArgsConstructor;importlombok.Getter;importjava.beans.Introspector;/**
* @author whiteBrocade
* @version 1.0
* @description 表处理策略
* todo 后续在这里新增相关枚举即可
*/@Getter@AllArgsConstructorpublicenumStrategyEnum{/**
* Student处理策略
*/STUDENT("student",Student.class,Introspector.decapitalize(StudentLogHandler.class.getSimpleName())),;/**
* 表名
*/privatefinalString tableName;/**
* class对象
*/privatefinalClass<?> varClass;/**
* 处理器名
*/privatefinalString handlerName;/**
* 策略选择器, 根据传入的 DataChangeInfo 对象中的 tableName 属性, 从一系列预定义的策略 (StrategyEnum) 中选择一个合适的处理策略, 并封装进 StrategyHandleSelector 对象中返回
*
* @param dataChangeInfo 数据变更对象
* @return StrategyHandlerSelector
*/publicstaticStrategyHandleSelectorgetSelector(DataChangeInfo dataChangeInfo){if(ObjUtil.isNull(dataChangeInfo)){returnnull;}String tableName = dataChangeInfo.getTableName();StrategyHandleSelector selector =newStrategyHandleSelector();// 遍历所有的策略枚举(StrategyEnum), 寻找与当前表名相匹配的策略for(StrategyEnum strategyEnum :values()){// 如果找到匹配的策略, 创建并配置 StrategyHandleSelectorif(strategyEnum.getTableName().equals(tableName)){
selector.setHandlerName(strategyEnum.handlerName);
selector.setOperatorTime(dataChangeInfo.getOperatorTime());
selector.setOperatorType(dataChangeInfo.getOperatorType());JSONObject jsonObject =JSONUtil.parseObj(dataChangeInfo.getData());
selector.setData(BeanUtil.copyProperties(jsonObject, strategyEnum.varClass));return selector;}}returnnull;}}
pojo
Student
importlombok.Data;/**
* @author whiteBrocade
* @version 1.0
* @description 学生类, 用于演示
*/@DatapublicclassStudent{/**
* id
*/privateInteger id;/**
* 姓名
*/privateString name;/**
* 年龄
*/privateInteger age;}
DataChangeInfo
importlombok.Data;/**
* @author whiteBrocade
* @version 1.0
* @description 数据变更对象
*/@DatapublicclassDataChangeInfo{/**
* 变更前数据
*/privateString beforeData;/**
* 变更后数据
*/privateString afterData;/**
* 操作的数据
*/privateString data;/**
* 变更类型 1->新增 2->修改 3->删除
*/privateInteger operatorType;/**
* binlog文件名
*/privateString fileName;/**
* binlog当前读取点位
*/privateInteger filePos;/**
* 数据库名
*/privateString database;/**
* 表名
*/privateString tableName;/**
* 变更时间
*/privateLong operatorTime;}
自定义Sink
DataChangeSink
importcn.hutool.core.util.ObjUtil;importlombok.AllArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.apache.flink.api.common.functions.OpenContext;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.springframework.stereotype.Component;importjava.io.Serializable;importjava.util.Map;/**
* @author whiteBrocade
* @version 1.0
* @description
*/@Slf4j@Component@AllArgsConstructorpublicclassDataChangeSinkextendsRichSinkFunction<DataChangeInfo>implementsSerializable{/**
* BaseLogHandler相关的缓存
* Spring自动将相关BaseLogHandler的Bean注入注入到本地缓存Map中
*/privatefinalMap<String,BaseLogHandler> strategyHandlerMap;@Overridepublicvoidinvoke(DataChangeInfo value,Context context){
log.info("收到变更原始数据:{}", value);// 选择策略StrategyHandleSelector selector =StrategyEnum.getSelector(value);if(ObjUtil.isNull(selector)){return;}BaseLogHandler<Object> handler = strategyHandlerMap.get(selector.getHandlerName());// insert操作if(selector.getOperatorType().equals(OperatorTypeEnum.INSERT.getType())){
handler.handleInsertLog(selector.getData(), selector.getOperatorTime());return;}// update操作if(selector.getOperatorType().equals(OperatorTypeEnum.UPDATE.getType())){
handler.handleUpdateLog(selector.getData(), selector.getOperatorTime());return;}// delete操作if(selector.getOperatorType().equals(OperatorTypeEnum.DELETE.getType())){
handler.handleDeleteLog(selector.getData(), selector.getOperatorTime());}}/**
* 前置操作
*/@Overridepublicvoidopen(OpenContext openContext)throwsException{super.open(openContext);}/**
* 后置操作
*/@Overridepublicvoidclose()throwsException{super.close();}}
CustomSink
importlombok.extern.slf4j.Slf4j;importorg.apache.flink.api.common.functions.OpenContext;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.springframework.stereotype.Component;/**
* @author whiteBrocade
* @version 1.0
* @description 自定义Sink处理器, 这个是根据ognl表达式区分ddl语句类型, 搭配
*/@Slf4j@ComponentpublicclassCustomSinkextendsRichSinkFunction<String>{@Overridepublicvoidinvoke(String json,Context context)throwsException{// op字段: 该字段也有4种取值,分别是C(create)、U(Update)、D(Delete)、Read// 对于U操作,其数据部分同时包含了Before和After。
log.info("监听到数据: {}", json);String op =JSONUtil.getValue(json,"op",String.class);// 语句的idInteger id =null;// 如果是update语句if("u".equals(op)){
id =JSONUtil.getValue(json,"after.id",Integer.class);
log.info("执行update语句");// 执行update语句}// 如果是delete语句if("d".equals(op)){
id =JSONUtil.getValue(json,"before.id",Integer.class);
log.info("执行delete语句");// 执行删除语句}// 如果是新增if("c".equals(op)){
log.info("执行insert语句");}}// 前置操作@Overridepublicvoidopen(OpenContext openContext)throwsException{super.open(openContext);}// 后置操作@Overridepublicvoidclose()throwsException{super.close();}}
自定义反序列化器 MySQLDeserialization
importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;importcom.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;importcom.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;importcom.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;importcom.ververica.cdc.debezium.DebeziumDeserializationSchema;importio.debezium.data.Envelope;importlombok.extern.slf4j.Slf4j;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.util.Collector;importorg.springframework.stereotype.Service;importjava.util.List;importjava.util.Optional;/**
* @author whiteBrocade
* @version 1.0
* @description MySQL消息读取 自定义反序列化器
*/@Slf4j@ServicepublicclassMySQLDeserializationimplementsDebeziumDeserializationSchema<DataChangeInfo>{publicstaticfinalStringTS_MS="ts_ms";publicstaticfinalStringBIN_FILE="file";publicstaticfinalStringPOS="pos";publicstaticfinalStringCREATE="CREATE";publicstaticfinalStringBEFORE="before";publicstaticfinalStringAFTER="after";publicstaticfinalStringSOURCE="source";publicstaticfinalStringUPDATE="UPDATE";/**
* 反序列化数据, 转为变更JSON对象
*
* @param sourceRecord SourceRecord
* @param collector Collector<DataChangeInfo>
*/@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<DataChangeInfo> collector){try{// 根据主题的格式,获取数据库名(database)和表名(tableName)String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct =(Struct) sourceRecord.value();finalStruct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo =newDataChangeInfo();// 获取操作类型 CREATE UPDATE DELETEEnvelope.Operation operation =Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE)?OperatorTypeEnum.INSERT.getType():UPDATE.equals(type)?OperatorTypeEnum.UPDATE.getType():OperatorTypeEnum.DELETE.getType();// 一般情况是无需关心其之前之后数据的, 直接获取最新的日志数据即可, 但这里为了演示, 都进行输出// 获取变更前和变更后的数据,并将其设置到DataChangeInfo对象中
dataChangeInfo.setBeforeData(this.getJsonObject(struct,BEFORE).toJSONString());
dataChangeInfo.setAfterData(this.getJsonObject(struct,AFTER).toJSONString());if(eventType ==OperatorTypeEnum.DELETE.getType()){
dataChangeInfo.setData(this.getJsonObject(struct,BEFORE).toJSONString());}else{
dataChangeInfo.setData(this.getJsonObject(struct,AFTER).toJSONString());}
dataChangeInfo.setOperatorType(eventType);
dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x ->Integer.parseInt(x.toString())).orElse(0));
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);
dataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS)).map(x ->Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));// 输出数据
collector.collect(dataChangeInfo);}catch(Exception e){
log.error("反序列binlog失败", e);}}/**
* 从源数据获取出变更之前或之后的数据
*
* @param value Struct
* @param fieldElement 字段
* @return JSONObject
*/privateJSONObjectgetJsonObject(Struct value,String fieldElement){Struct element = value.getStruct(fieldElement);JSONObject jsonObject =newJSONObject();if(element !=null){Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for(Field field : fieldList){Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);}}return jsonObject;}@OverridepublicTypeInformation<DataChangeInfo>getProducedType(){returnTypeInformation.of(DataChangeInfo.class);}}
LogHandler
BaseLogHandler
importjava.io.Serializable;/**
* @author whiteBrocade
* @version 1.0
* @description 日志处理器
* todo 新建一个类实现该BaseLogHandler类, 添加相应的处理逻辑即可, 可参考StudentLogHandler实现
*/publicinterfaceBaseLogHandler<T>extendsSerializable{/**
* 日志处理
*
* @param data 数据转换后模型
* @param operatorTime 操作时间
*/voidhandleInsertLog(T data,Long operatorTime);/**
* 日志处理
*
* @param data 数据转换后模型
* @param operatorTime 操作时间
*/voidhandleUpdateLog(T data,Long operatorTime);/**
* 日志处理
*
* @param data 数据转换后模型
* @param operatorTime 操作时间
*/voidhandleDeleteLog(T data,Long operatorTime);}
StrategyHandleSelector
importlombok.Data;/**
* @author whiteBrocade
* @version 1.0
* @description 策略处理选择器
*/@DatapublicclassStrategyHandleSelector{/**
* 策略处理器名称
*/privateString handlerName;/**
* 数据源
*/privateObject data;/**
* 操作时间
*/privateLong operatorTime;/**
* 操作类型
*/privateInteger operatorType;}
StudentLogHandler
importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;/**
* @author whiteBrocade
* @version 1.0
* @description Student对应处理器
*/@Slf4j@ServicepublicclassStudentLogHandlerimplementsBaseLogHandler<Student>{@OverridepublicvoidhandleInsertLog(Student data,Long operatorTime){
log.info("处理Student表的新增日志: {}", data);}@OverridepublicvoidhandleUpdateLog(Student data,Long operatorTime){
log.info("处理Student表的修改日志: {}", data);}@OverridepublicvoidhandleDeleteLog(Student data,Long operatorTime){
log.info("处理Student表的删除日志: {}", data);}}
JSONUtil
importcom.google.gson.Gson;importcom.google.gson.reflect.TypeToken;importognl.Ognl;importognl.OgnlContext;importjava.util.Map;/**
* @author whiteBrocade
* @version 1.0
* @description: JSON工具类
*/publicclassJSONUtil{/**
* 将指定JSON转为Map对象, Key类型为String,对应JSON的key
* Value分情况:
* 1. Value是字符串, 自动转为字符串, 例如:{"a","b"}
* 2. Value是其他JSON对象, 自动转为Map,例如::{"a":{"b":"2"}}
* 3. Value是数组, 自动转为list<Map>,例如::{"a":[:{"b":"2"},"c":"3"]}
*
* @param json 输入的的JSON对象
* @return 动态Map集合
*/publicstaticMap<String,Object>transferToMap(String json){Gson gson =newGson();Map<String,Object> map = gson.fromJson(json,newTypeToken<Map<String,Object>>(){}.getType());return map;}/**
* 获取指定JSON的指定路径的值
*
* @param json 原始JSON数据
* @param path OGNL原则表达式
* @param clazz Value对应的目标类
* @return clazz对应的数据
*/publicstatic<T>TgetValue(String json,String path,Class<T> clazz){try{Map<String,Object> map =JSONUtil.transferToMap(json);OgnlContext ognlContext =newOgnlContext();
ognlContext.setRoot(map);T value =(T)Ognl.getValue(path, ognlContext, ognlContext.getRoot(), clazz);return value;}catch(Exception e){thrownewRuntimeException(e);}}}
MysqlEventListener
importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;importcom.ververica.cdc.connectors.mysql.table.StartupOptions;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importlombok.AllArgsConstructor;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;/**
* @author whiteBrocade
* @version 1.0
* @description MySQL变更监听
*/@Component@AllArgsConstructorpublicclassMysqlEventListenerimplementsApplicationRunner{/**
* Flink CDC相关配置
*/privatefinalFlinkCDCConfig flinkCDCConfig;/**
* 自定义Sink
* customSink: 通过ognl解析ddl语句类型
* dataChangeSink: 通过struct解析ddl语句类型
* 通常两个选择一个就行
*/privatefinalCustomSink customSink;privatefinalDataChangeSink dataChangeSink;/**
* 自定义反序列化处理器
*/privatefinalMySQLDeserialization mySQLDeserialization;@Overridepublicvoidrun(ApplicationArguments args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置整个Flink程序的默认并行度
env.setParallelism(flinkCDCConfig.getParallelism());// 设置checkpoint 间隔
env.enableCheckpointing(flinkCDCConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// todo 下列的两个MySqlSource选择一个// 自定义的反序列化器// MySqlSource<DataChangeInfo> mySqlSource = this.buildBaseMySqlSource(DataChangeInfo.class)// .deserializer(mySQLDeserialization)// .build();// Flink CDC自带的反序列化器MySqlSource<String> mySqlSource =this.buildBaseMySqlSource(String.class).deserializer(newJsonDebeziumDeserializationSchema()).build();
env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"mysql-source")// 设置该数据源的并行度.setParallelism(flinkCDCConfig.getParallelism())// todo 根据上述的选择,选择对应的Sink// .addSink(dataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink.addSink(customSink);
env.execute("mysql-stream-cdc");}/**
* 构建基本的MySqlSourceBuilder
*
* @param clazz 返回的数据类型Class对象
* @param <T> 源数据中存储的类型
* @return MySqlSourceBuilder
*/private<T>MySqlSourceBuilder<T>buildBaseMySqlSource(Class<T> clazz){returnMySqlSource.<T>builder().hostname(flinkCDCConfig.getHostname()).port(flinkCDCConfig.getPort()).username(flinkCDCConfig.getUsername()).password(flinkCDCConfig.getPassword()).databaseList(flinkCDCConfig.getDatabaseList()).tableList(flinkCDCConfig.getTableList())/* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest: 只进行增量导入(不读取历史变化)
* timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
*/.startupOptions(StartupOptions.latest()).includeSchemaChanges(flinkCDCConfig.getIncludeSchemaChanges())// 包括schema的改变.serverTimeZone("GMT+8");// 时区}}
版权归原作者 whiteBrocade 所有, 如有侵权,请联系我们删除。