0


springboot集成Flink-CDC

文章目录

一、Flink&Flink CDC官网

Flink CDC地址
Flink官网地址

二、CDC&Flink CDC介绍

1、 什么是cdc

CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。

2、什么是Flink CDC

用于 Apache Flink 的 CDC 连接器是一组源连接器,用于®Apache Flink®,使用变更数据捕获 (CDC) 从不同的数据库引入变更。 Apache Flink 的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。查看更多关于什么是Debezium的信息。®
在这里插入图片描述

3、支持的连接器

在这里插入图片描述

三、springboot整合Filnk CDC

1、官网示例

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
    
    StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
    
    // enable checkpoint
    env.enableCheckpointing(3000);env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1for sink to keep message ordering
    
    env.execute("Print MySQL Snapshot + Binlog");}}

2、Maven依赖

1) Flink和Flink CDC版本映射

在这里插入图片描述

2)具体maven依赖

<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.3.12.RELEASE</spring-boot.version><log4j2.version>2.17.0</log4j2.version><mybatis-plus.version>3.4.1</mybatis-plus.version><sharding.jdbc.version>4.1.0</sharding.jdbc.version><flink.version>1.16.0</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--log4j漏洞修复--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j2.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j2.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j2.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-1.2-api</artifactId><version>${log4j2.version}</version></dependency><!--引入log4j2依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><!--分页插件--><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId><version>1.4.1</version></dependency><!--数据库驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><!--8.0.25中没有com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset方法--><version>8.0.29</version></dependency><!--数据库链接池--><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.4</version></dependency><!-- 引入easyexcel依赖--><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.1.1</version></dependency><!--分库分表--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><!--<version>4.0.0-RC1</version>--><version>4.1.1</version></dependency><!--mybatsiplus及自动生成代码工具--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>org.apache.velocity</groupId><artifactId>velocity-engine-core</artifactId><version>2.3</version></dependency><!--分页插件--><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId><version>1.4.1</version></dependency><!-- 多数据源 --><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1</version></dependency><!--开启优雅注解校验--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!--apache-http请求--><!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.14</version></dependency><!--fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><!--aliyun oss--><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>3.15.0</version></dependency><!--xxl-job--><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.4.0</version></dependency><!-- ShardingSphere 读写分离/分库分表 --><!-- <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
            <version>5.1.2</version>
        </dependency>--><!-- Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!--运行是不要用下面这个,会报错找不到类--><!--<scope>provided</scope>--></dependency><!--从 flink1.11.0 版本开始,需要多引入一个 flink-client包,没有这个依赖会报错 No ExecutorFactory found to execute the application--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><!-- 这里面mysql jdbc驱动引入的8.0.25,需要去除,必须引入8.0.28及以上版本
             否则启动项目是报错: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;--><exclusions><exclusion><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></exclusion></exclusions><!-- <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc --><!--<dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.12.1</version></dependency>

3)项目坑点

  • mysql jdbc驱动包版本过低,项目启动时导致 java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;,找不到该方法,必须引入8.0.28及以上版本
  • flink-connector-mysql-cdc 的2.3.0版本依赖中引入的是8.0.25,需要去除掉
<!--数据库驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><!--8.0.25中没有com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset方法--><version>8.0.29</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><!-- 这里面mysql jdbc驱动引入的8.0.25,需要去除,必须引入8.0.28及以上版本
             否则启动项目是报错: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;--><exclusions><exclusion><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></exclusion></exclusions>
  • 项目启动报错:java.lang.NoClassDefFoundError: org/apache/flink/table/api/ValidationException,找不到此类,是因为缺少依赖包,引入相关依赖包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.12.1</version></dependency>
  • Flink的版本和Flink CDC的版本一定要兼容,按照官方给定的版本进行引入

在这里插入图片描述
版本过低会导致:Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation,
要解决此问题,你可以按照以下步骤操作:
确保使用的数据库用户具有 RELOAD 权限。请登录到 MySQL 数据库,并为用户授予 RELOAD 权限。例如,使用以下命令为用户 your_user 授予 RELOAD 权限:

GRANT RELOAD ON *.* TO 'your_user'@'localhost';

Flink官方解决上述问题
MySQL CDC source使用增量快照算法,避免了数据库锁的使用,因此不需要"RELOAD"权限。
从 Flink 1.12 版本开始,Flink 引入了对 MySQL CDC 的集成和支持。在这个版本中,Flink 提供了 flink-connector-mysql-cdc 模块,用于实现基于 MySQL 的 Change Data Capture 功能。
在 Flink 1.12 版本中,MySQL CDC 源使用了增量快照算法来捕获数据变更,并且不需要 RELOAD 权限。这种实现方式避免了数据库锁的使用,提供了低延迟的数据变更捕获能力。
在这里插入图片描述
代码中设置can.incremental.snapshot.enabled开启,详细代码见代码示例

Configuration config =newConfiguration();// 设置增量快照开启为 true
config.setBoolean("scan.incremental.snapshot.enabled",true);
env.configure(config);

3、springboot代码示例

1)创建变更监听器

创建MysqlEventListener 类实现ApplicationRunner ,项目启动时可以启动mysql监听

importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.connectors.mysql.table.StartupOptions;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;/**
 * @Description: mysql变更监听器
 * @Date: 2023/10/11
 **/publicclassMysqlEventListenerimplementsApplicationRunner{@Overridepublicvoidrun(ApplicationArguments args)throwsException{MySqlSource<DataChangeInfo> mySqlSource =MySqlSource.<DataChangeInfo>builder().hostname("yourHostname").port(3306).databaseList("yourDatabaseName")// 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*"..tableList("yourDatabaseName.yourTableName")// 设置捕获的表,数据库.表名.username("yourUsername").password("yourPassword").deserializer(newMysqlDeserialization())// 将 SourceRecord 转换为 自定义对象/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
                 */.startupOptions(StartupOptions.latest()).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Configuration config =newConfiguration();// 设置增量快照开启为 true
        config.setBoolean("scan.incremental.snapshot.enabled",true);
        env.configure(config);
        env.setParallelism(1);//        DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();DataStreamSource<DataChangeInfo> streamSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"MySQL Source").setParallelism(1);
        streamSource.addSink(newDataChangeSink());
        env.execute("mysql-stream-cdc");};}

2)自定义数据解析器

importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.debezium.DebeziumDeserializationSchema;importio.debezium.data.Envelope;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.util.Collector;importorg.apache.kafka.connect.data.Field;importorg.apache.kafka.connect.data.Schema;importorg.apache.kafka.connect.data.Struct;importorg.apache.kafka.connect.source.SourceRecord;importjava.util.List;importjava.util.Optional;/**
 * @Description: mysql自定序列化
 * @Date: 2023/10/11
 **/publicclassMysqlDeserializationimplementsDebeziumDeserializationSchema<DataChangeInfo>{publicstaticfinalStringTS_MS="ts_ms";publicstaticfinalStringBIN_FILE="file";publicstaticfinalStringPOS="pos";publicstaticfinalStringCREATE="CREATE";publicstaticfinalStringBEFORE="before";publicstaticfinalStringAFTER="after";publicstaticfinalStringSOURCE="source";publicstaticfinalStringUPDATE="UPDATE";/**
     *  反序列化数据,转变为自定义对象DataChangeInfo
     * @param sourceRecord
     * @param collector
     * @throws Exception
     */@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<DataChangeInfo> collector)throwsException{String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct =(Struct) sourceRecord.value();finalStruct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo =newDataChangeInfo();
        dataChangeInfo.setBeforeData(getJsonObject(struct,BEFORE).toJSONString());
        dataChangeInfo.setAfterData(getJsonObject(struct,AFTER).toJSONString());//5.获取操作类型  CREATE UPDATE DELETEEnvelope.Operation operation =Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE)?1:UPDATE.equals(type)?2:3;
        dataChangeInfo.setEventType(eventType);
        dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
        dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));
        dataChangeInfo.setDatabase(database);
        dataChangeInfo.setTableName(tableName);
        dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x ->Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));//7.输出数据
        collector.collect(dataChangeInfo);}/**
     * 从元数据获取变更前或者变更后的数据
     * @param value
     * @param fieldElement
     * @return
     */privateJSONObjectgetJsonObject(Struct value,String fieldElement){Struct element = value.getStruct(fieldElement);JSONObject jsonObject =newJSONObject();if(element !=null){Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for(Field field : fieldList){Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);}}return jsonObject;}@OverridepublicTypeInformation<DataChangeInfo>getProducedType(){returnTypeInformation.of(DataChangeInfo.class);}}

3)创建变更对象

importlombok.Data;/**
 * @Description: 数据变更对象
 * @Date: 2023/10/11
 **/@DatapublicclassDataChangeInfo{/**
     * 变更前数据
     */privateString beforeData;/**
     * 变更后数据
     */privateString afterData;/**
     * 变更类型 1新增 2修改 3删除
     */privateInteger eventType;/**
     * binlog文件名
     */privateString fileName;/**
     * binlog当前读取点位
     */privateInteger filePos;/**
     * 数据库名
     */privateString database;/**
     * 表名
     */privateString tableName;/**
     * 变更时间
     */privateLong changeTime;}

4)创建业务处理类

importlombok.extern.slf4j.Slf4j;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;/**
 * @Description: 数据处理
 * @Date: 2023/10/11
 **/@Slf4jpublicclassDataChangeSinkimplementsSinkFunction<String>{@Overridepublicvoidinvoke(String value,Context context)throwsException{
        log.info("收到变更原始数据:{}", value);//业务代码}}

5)运行代码监听mysql CDC事件

项目启动成功
在这里插入图片描述

  • 修改mysql数据库数据 变更前在这里插入图片描述 变更后:点击保存在这里插入图片描述 服务监听结果在这里插入图片描述

本文转载自: https://blog.csdn.net/xiao_ying_bo_ke/article/details/133789398
版权归原作者 火炎焱小码哥 所有, 如有侵权,请联系我们删除。

“springboot集成Flink-CDC”的评论:

还没有评论