0


springboot集成flink-cdc

文章目录

前文

(1)什么是CDC

CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。

(2)Flink-CDC是什么

CDC Connectors for Apache Flink ®是一组用于Apache Flink ®的源连接器,使用变更数据捕获 (CDC) 从不同数据库获取变更。用于 Apache Flink ®的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。

白话的意思是,Flink-CDC 一个成型的cdc技术实现(Debezium)的包装,我前面也使用过Debezium,并编写了一个简略的博客,感兴趣的可以戳下方连接去看一下

springboot+debezium捕获数据库变更(mysql、sql-server、mongodb、oracle…)

(3)Flink-CDC 特性

  1. 支持读取数据库快照,即使发生故障也能继续读取binlog,一次处理。
  2. DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。
  3. Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监控单个表的更改。

CDC与Flink毕业版本

下表显示了 Flink® CDC 连接器和 Flink® 之间的版本映射:
Flink ® CDC 版本Flink®版本_1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.*1.13.*2.1.*1.13.2.2.1.13. , 1.14.

Springboot项目整合Flink-CDC

(1)说明

按常理来说,一个正常的flink-job 最终我们并不会集成到springboot项目中,我们会直接编写一个maven项目,在发布时使用flink程序来启动任务

比如官网示例:

image-20220825160434743

本文即要使用flink-cdc进行数据变更捕获 (可以视作为一个flink-job),但又要契合我们的springboot项目,使用spring的特性,因此,我们需要转换一下思路,转换成什么样子呢?就是不要将这个flink-cdc作为一个job 使用flink程序进行发布提交,我们就当它在我们开发时一样,作为一个本地项目,main方法启动

(2)引入依赖

flink客户端版本使用 1.13.6 cdc 版本使用 2.0.0

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency></dependencies>

(3)接入springboot项目

无法简单的使用main方法来启动cdc 作业,因为如果这样的话,我们就无法与spring完美的契合

因此我们可以利用springboot的特性, 实现 ApplicationRunner 将flink-cdc 作为一个项目启动时需要运行的分支子任务即可

创建监听类 实现 ApplicationRunner

packagecom.leilei.mysql;importcom.ververica.cdc.connectors.mysql.MySqlSource;importcom.ververica.cdc.connectors.mysql.table.StartupOptions;importcom.ververica.cdc.debezium.DebeziumSourceFunction;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;/**
 * @author lei
 * @create 2022-08-25 13:42
 * @desc mysql变更监听
 **/@ComponentpublicclassMysqlEventListenerimplementsApplicationRunner{privatefinalDataChangeSink dataChangeSink;publicMysqlEventListener(DataChangeSink dataChangeSink){this.dataChangeSink = dataChangeSink;}@Overridepublicvoidrun(ApplicationArguments args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource =buildDataChangeSource();DataStream<DataChangeInfo> streamSource = env
                .addSource(dataChangeInfoMySqlSource,"mysql-source").setParallelism(1);
        streamSource.addSink(dataChangeSink);
        env.execute("mysql-stream-cdc");}/**
     * 构造变更数据源
     *
     * @param
     * @return DebeziumSourceFunction<DataChangeInfo>
     * @author lei
     * @date 2022-08-25 15:29:38
     */privateDebeziumSourceFunction<DataChangeInfo>buildDataChangeSource(){returnMySqlSource.<DataChangeInfo>builder().hostname("10.50.40.145").port(3306).databaseList("paas_common_db").tableList("paas_common_db.base_business_driver_score_*").username("root").password("cdwk-3g-145")/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
                 */.startupOptions(StartupOptions.latest()).deserializer(newMysqlDeserialization()).serverTimeZone("GMT+8").build();}}

自定义数据读取解析器

我这里解析为一个数据变更对象

packagecom.leilei.mysql;importcom.alibaba.fastjson.JSON;importcom.ververica.cdc.debezium.DebeziumDeserializationSchema;importio.debezium.data.Envelope;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.util.Collector;importorg.apache.kafka.connect.data.Field;importorg.apache.kafka.connect.data.Schema;importorg.apache.kafka.connect.data.Struct;importorg.apache.kafka.connect.source.SourceRecord;importcom.alibaba.fastjson.JSONObject;importjava.util.List;importjava.util.Optional;/**
 * @author lei
 * @create 2022-08-25 13:43
 * @desc mysql消息读取自定义序列化
 **/publicclassMysqlDeserializationimplementsDebeziumDeserializationSchema<DataChangeInfo>{publicstaticfinalString TS_MS ="ts_ms";publicstaticfinalString BIN_FILE ="file";publicstaticfinalString POS ="pos";publicstaticfinalString CREATE ="CREATE";publicstaticfinalString BEFORE ="before";publicstaticfinalString AFTER ="after";publicstaticfinalString SOURCE ="source";publicstaticfinalString UPDATE ="UPDATE";/**
     *
     * 反序列化数据,转为变更JSON对象
     * @param sourceRecord
     * @param collector
     * @return void
     * @author lei
     * @date 2022-08-25 14:44:31
     */@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<DataChangeInfo> collector){String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct =(Struct) sourceRecord.value();finalStruct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo =newDataChangeInfo();
        dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
        dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());//5.获取操作类型  CREATE UPDATE DELETEEnvelope.Operation operation =Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE)?1: UPDATE.equals(type)?2:3;
        dataChangeInfo.setEventType(eventType);
        dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
        dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));
        dataChangeInfo.setDatabase(database);
        dataChangeInfo.setTableName(tableName);
        dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x ->Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));//7.输出数据
        collector.collect(dataChangeInfo);}/**
     *
     * 从袁术数据获取出变更之前或之后的数据
     * @param value
     * @param fieldElement
     * @return JSONObject
     * @author lei
     * @date 2022-08-25 14:48:13
     */privateJSONObjectgetJsonObject(Struct value,String fieldElement){Struct element = value.getStruct(fieldElement);JSONObject jsonObject =newJSONObject();if(element !=null){Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for(Field field : fieldList){Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);}}return jsonObject;}@OverridepublicTypeInformation<DataChangeInfo>getProducedType(){returnTypeInformation.of(DataChangeInfo.class);}}

变更对象

importlombok.Data;/**
 * @author lei
 * @create 2022-08-25 14:33
 * @desc 数据变更对象
 **/@DatapublicclassDataChangeInfo{/**
     * 变更前数据
     */privateString beforeData;/**
     * 变更后数据
     */privateString afterData;/**
     * 变更类型 1新增 2修改 3删除
     */privateInteger eventType;/**
     * binlog文件名
     */privateString fileName;/**
     * binlog当前读取点位
     */privateInteger filePos;/**
     * 数据库名
     */privateString database;/**
     * 表名
     */privateString tableName;/**
     * 变更时间
     */privateLong changeTime;}

自定义sink 交由spring管理

packagecom.leilei.mysql;importlombok.extern.log4j.Log4j2;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importorg.springframework.stereotype.Component;/**
 * @author lei
 * @create 2022-08-25 14:01
 * @desc
 **/@Component@Log4j2publicclassDataChangeSinkimplementsSinkFunction<DataChangeInfo>{@Overridepublicvoidinvoke(DataChangeInfo value,Context context){
        log.info("收到变更原始数据:{}", value);// todo 数据处理;因为此sink也是交由了spring管理,您想进行任何操作都非常简单}}

当然,以上仅仅只是整合思路,如果你想使用flink-cdc 进行数据同步或日志记录等,结合您自身的需求进行调整接口,以上内容,大的架子是没问题的

如果遇到问题,可以先从官网QA寻找:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

项目源码:springboot-flink-cdc


本文转载自: https://blog.csdn.net/leilei1366615/article/details/126528452
版权归原作者 保护我方胖虎 所有, 如有侵权,请联系我们删除。

“springboot集成flink-cdc”的评论:

还没有评论