0


SpringBoot集成Flink-CDC

Flink CDC

CDC相关介绍

CDC是什么?

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到MQ以供其他服务进行订阅及消费

CDC分类

CDC主要分为基于查询基于Binlog
基于查询基于Binlog开源产品Sqoop、DataXCanal、Maxwell、Debezium执行模式BatchStreaming是否可以捕获所有数据变化否是延迟性高延迟低延迟是否增加数据库压力是否
基于查询的都是Batch模式(即数据到达一定量后/一定时间才行会执行), 同时也因为这种模式, 那么延迟是必然高的, 而基于Streaming则是可以做到按条的粒度, 每条数据发生变化, 那么就会监听到

Flink CDC

Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的source组件。

目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

Java中集成Flink CDC

MySQL相关设置

执行初始化SQL数据
-- 创建whitebrocade数据库
DROP DATABASE IF EXISTS whitebrocade;
CREATE DATABASE whitebrocade;
USER whitebrocade;
-- 创建student表
CREATE TABLE `student` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `description` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

-- 插入数据
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `description`) VALUES (1, '小牛马', '我是小牛马');
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `description`) VALUES (2, '中牛马', '我是中牛马');
开启Binlog

通常来说默认安装MySQL的cnf都是存在/etc下的

sudo vim /etc/my.cnf
# 添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
# 数据库id
server-id = 1
# 时区, 如果不修改数据库时区, 那么Flink MySQL CDC无法启动
default-time-zone = '+8:00'
# 启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
# binlog类型,maxwell要求为row类型
binlog_format=row
# 启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=whitebrocade
修改数据库时区

永久修改, 那么就修改my.cnf配置(刚刚配置已经修改了, 记得重启即可)

default-time-zone = '+8:00'

临时修改(重启会丢失)

# MySQL 8 执行这个
set persist time_zone='+8:00';

# MySQL 5.x版本执行这个
set time_zone='+8:00';
重启MySQL

注意了, 设置后需要重启MySQL!

service mysqld restart

代码(直接处理BaseLogHander或者kafka间接处理)

pom依赖
<properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version><!-- 这里的依赖版本不要删除, 比如说es, easy-es的, 下边的案例会使用到 --><es.vsersion>7.12.0</es.vsersion><easy-es.vsersion>2.0.0</easy-es.vsersion><flink.version>1.19.0</flink.version><kafka-clients.version>3.8.0</kafka-clients.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- hutool --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.32</version></dependency><!-- Flink CDC依赖 start--><!-- Flink核心依赖, 提供了Flink的核心API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!--  Flink流处理Java API依赖
         对于引入Scala还是Java, 参考下面这篇博客: https://developer.aliyun.com/ask/526584
         --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink客户端工具依赖, 包含命令行界面和实用函数 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink连接器基础包, 包含连接器公共功能 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!-- Flink Kafka连接器, 用于和Apache Kafka集成, 注意kafka软件和这个依赖的版本问题, 可能会抱错, 报错参考以下博客方式进行解决
        版本集成问题: 参考博客 https://blog.csdn.net/qq_34526237/article/details/130968153
        https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/configuration/overview/
        https://blog.csdn.net/weixin_55787608/article/details/141436268
        https://www.cnblogs.com/qq1035807396/p/16227816.html
        https://blog.csdn.net/g5guj/article/details/137229597
        https://blog.csdn.net/x950913/article/details/108249507
        --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version><exclusions><!-- 排除掉kafka client, 用自己指定的kafka client, 可能会因为kafka太新, 导致的版本不兼容 --><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><!-- kafka client --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka-clients.version}</version></dependency><!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API桥接器, 连接DataStream API和Table API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- Flink JSON格式化数据依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- 开启Web UI支持, 端口为8081, 默认为不开启--><!--<dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>1.19.1</version>
            </dependency>--><!-- MySQL CDC依赖
            org.apache.flink的适用MySQL 8.0
            具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408
            https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/
             --><dependency><!--MySQL 8.0适用--><!--<groupId>org.apache.flink</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>3.1.0</version>--><!-- MySQL 5.7适用 , 2.3.0, 3.0.1均可用 --><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version><!-- <version>3.0.1</version> --></dependency><!-- gson工具类 --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.11.0</version></dependency><!-- ognl表达式 --><dependency><groupId>ognl</groupId><artifactId>ognl</artifactId><version>3.1.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.31</version></dependency></dependencies>
yaml
# 应用服务 WEB 访问端口server:port:9999# Flink CDC相关配置flink-cdc:cdcConfig:parallelism:1enableCheckpointing:5000mysqlConfig:sourceName: mysql-source
    jobName: mysql-stream-cdc
    hostname: 192.168.132.101
    port:3306username: root
    password:12345678databaseList: whitebrocade
    tableList: whitebrocade.student
    includeSchemaChanges:falsekafkaConfig:sourceName: kafka-source
    jobName: kafka-stream-cdc
    bootstrapServers: localhost:9092groupId: test_group
    topics: test_topic
FlinkCDCConfig
/**
 * @author whiteBrocade
 * @version 1.0
 * @description: Flink CDC配置
 */@Data@Configuration@ConfigurationProperties("flink-cdc")publicclassFlinkCDCConfig{privateCdcConfig cdcConfig;privateMysqlConfig mysqlConfig;privateKafkaConfig kafkaConfig;@DatapublicstaticclassCdcConfig{/**
         * 并行度
         */privateInteger parallelism;/**
         * 检查点间隔, 单位毫秒
         */privateInteger enableCheckpointing;}@DatapublicstaticclassMysqlConfig{/**
         * MySQL数据源名称
         */privateString sourceName;/**
         * JOB名称
         */privateString jobName;/**
         * 数据库地址
         */privateString hostname;/**
         * 数据库端口
         */privateInteger port;/**
         * 数据库用户名
         */privateString username;/**
         * 数据库密码
         */privateString password;/**
         * 数据库名
         */privateString[] databaseList;/**
         * 表名
         */privateString[] tableList;/**
         * 是否包含schema变更
         */privateBoolean includeSchemaChanges;}@DatapublicstaticclassKafkaConfig{/**
         * Kafka数据源名称
         */privateString sourceName;/**
         * JOB名称
         */privateString jobName;/**
         * kafka地址
         */privateString bootstrapServers;/**
         * 消费组id
         */privateString groupId;/**
         * kafka主题
         */privateString topics;}}
相关枚举
OperatorTypeEnum
/**
 * @author whiteBrocade
 * @version 1.0
 * @description 操作类型枚举
 */@Getter@AllArgsConstructorpublicenumOperatorTypeEnum{/**
     * 新增
     */INSERT(1),/**
     * 修改
     */UPDATE(2),/**
     * 删除
     */DELETE(3),;/**
     * 类型
     */privatefinalint type;/**
     * 根据type获取枚举
     *
     * @param type 类型
     * @return OperatorTypeEnum
     */publicstaticOperatorTypeEnumgetEnumByType(int type){for(OperatorTypeEnum operatorTypeEnum :OperatorTypeEnum.values()){if(operatorTypeEnum.getType()== type){return operatorTypeEnum;}}thrownewRuntimeException(StrUtil.format("未找到type={}的OperatorTypeEnum", type));}}
MySqlStrategyEnum
/**
 * @author whiteBrocade
 * @version 1.0
 * @description MySql处理策略枚举
 * todo 后续在这里新增相关枚举即可
 */@Getter@AllArgsConstructorpublicenumMySqlStrategyEnum{/**
     * Student处理策略
     */STUDENT(Student.class.getSimpleName(),Student.class,Introspector.decapitalize(StudentLogHandler.class.getSimpleName())),;/**
     * 表名
     */privatefinalString tableName;/**
     * class对象
     */privatefinalClass<?> varClass;/**
     * MySql处理器名
     */privatefinalString mySqlHandlerName;/**
     * 策略选择器, 根据传入的 DataChangeInfo 对象中的 tableName 属性, 从一系列预定义的策略 (StrategyEnum) 中选择一个合适的处理策略, 并封装进 StrategyHandleSelector 对象中返回
     *
     * @param mySqlDataChangeInfo 数据变更对象
     * @return StrategyHandlerSelector
     */publicstaticMySqlStrategyHandleSelectorgetSelector(MySqlDataChangeInfo mySqlDataChangeInfo){Assert.notNull(mySqlDataChangeInfo,"MySqlDataChangeInfo不能为null");String tableName = mySqlDataChangeInfo.getTableName();MySqlStrategyHandleSelector selector =newMySqlStrategyHandleSelector();// 遍历所有的策略枚举(StrategyEnum), 寻找与当前表名相匹配的策略for(MySqlStrategyEnum mySqlStrategyEnum :values()){// 如果找到匹配的策略, 创建并配置 StrategyHandleSelectorif(mySqlStrategyEnum.getTableName().equalsIgnoreCase(tableName)){
                selector.setMySqlHandlerName(mySqlStrategyEnum.mySqlHandlerName);
                selector.setOperatorTime(mySqlDataChangeInfo.getOperatorTime());Integer operatorType = mySqlDataChangeInfo.getOperatorType();
                selector.setOperatorType(operatorType);OperatorTypeEnum operatorTypeEnum =OperatorTypeEnum.getEnumByType(operatorType);JSONObject jsonObject;// 删除, 就获取操作前的数if(OperatorTypeEnum.DELETE.equals(operatorTypeEnum)){
                    jsonObject =JSONUtil.parseObj(mySqlDataChangeInfo.getBeforeData());}else{// 其余操作, 比如薪资,修改使用操作后的数据
                    jsonObject =JSONUtil.parseObj(mySqlDataChangeInfo.getAfterData());}
                selector.setData(BeanUtil.copyProperties(jsonObject, mySqlStrategyEnum.varClass));return selector;}}thrownewRuntimeException(StrUtil.format("没有找到的表名={}绑定的StrategyHandleSelector", tableName));}}
model
Student
/**
 * @author whiteBrocade
 * @version 1.0
 * @description 学生类, 用于演示
 */@DatapublicclassStudent{/**
     * id
     */privateLong id;/**
     * 姓名
     */privateString name;/**
     * 描述
     */privateString description;}
MySqlDataChangeInfo
/**
 * @author whiteBrocade
 * @version 1.0
 * @description MySQL数据变更对象
 */@Data@BuilderpublicclassMySqlDataChangeInfoimplementsSerializable{/**
     * 变更前数据
     */privateString beforeData;/**
     * 变更后数据
     */privateString afterData;/**
     * 变更类型 1->新增 2->修改 3->删除
     */privateInteger operatorType;/**
     * binlog文件名
     */privateString fileName;/**
     * binlog当前读取点位
     */privateInteger filePos;/**
     * 数据库名
     */privateString database;/**
     * 表名
     */privateString tableName;/**
     * 变更时间
     */privateLong operatorTime;}
MySqlStrategyHandleSelector
/**
 * @author whiteBrocade
 * @version 1.0
 * @description 策略处理选择器
 */@DatapublicclassMySqlStrategyHandleSelector{/**
     * MySql策略处理器名称, 当mySql的binLog变化时候如何处理, 就会调用对应的处理器进行处理
     */privateString mySqlHandlerName;/**
     * 数据源
     */privateObject data;/**
     * 操作时间
     */privateLong operatorTime;/**
     * 操作类型
     */privateInteger operatorType;}
自定义Sink
LogSink
/**
 * @author whiteBrocade
 * @description: 日志算子
 */@Slf4j@ServicepublicclassLogSinkextendsRichSinkFunction<MySqlDataChangeInfo>implementsSerializable{@Overridepublicvoidinvoke(MySqlDataChangeInfo mySqlDataChangeInfo,Context context)throwsException{
        log.info("MySQL数据变化对象: {}",JSONUtil.toJsonStr(mySqlDataChangeInfo));}}
CustomMySqlSink
/**
 * @author whiteBrocade
 * @version 1.0
 * @description 自定义Sink算子, 这个是根据ognl表达式区分ddl语句类型, 搭配
 */@Slf4j@ComponentpublicclassCustomMySqlSinkextendsRichSinkFunction<String>{publicstaticfinalString OP ="op";publicstaticfinalString BEFORE ="before";publicstaticfinalString AFTER ="after";@Overridepublicvoidinvoke(String json,Context context)throwsException{// op字段:  该字段也有4种取值,分别是C(create)、U(Update)、D(Delete)、Read// 对于U操作,其数据部分同时包含了Before和After
        log.info("监听到数据: {}", json);String op =JSONUtil.getValue(json, OP,String.class);// 语句的idString beforeData =JSONUtil.getValue(json, BEFORE,String.class);String afterData =JSONUtil.getValue(json, AFTER,String.class);// 如果是update语句if(Envelope.Operation.UPDATE.toString().equalsIgnoreCase(op)){
            log.info("执行update语句, 操作前的数据: {}, 操作后的数据: {}", beforeData, afterData);}// 如果是delete语句if(Envelope.Operation.DELETE.toString().equalsIgnoreCase(op)){
            log.info("执行delete语句, 操作前的数据: {}, 操作后的数据: {}", beforeData, afterData);}// 如果是新增if(Envelope.Operation.CREATE.toString().equalsIgnoreCase(op)){
            log.info("执行insert语句, 操作前的数据: {}, 操作后的数据: {}", beforeData, afterData);}}// 前置操作@Overridepublicvoidopen(OpenContext openContext)throwsException{super.open(openContext);}// 后置操作@Overridepublicvoidclose()throwsException{super.close();}}
MySqlDataChangeSink
/**
 * @author whiteBrocade
 * @version 1.0
 * @description Mysql变更Sink算子
 */@Slf4j@Component@AllArgsConstructorpublicclassMySqlDataChangeSinkextendsRichSinkFunction<MySqlDataChangeInfo>implementsSerializable{/**
     * BaseLogHandler相关的缓存
     * Spring自动将相关BaseLogHandler的Bean注入注入到本地缓存Map中
     */privatefinalMap<String,BaseLogHandler> strategyHandlerMap;/**
     * 数据处理逻辑
     */@Override@SneakyThrowspublicvoidinvoke(MySqlDataChangeInfo mySqlDataChangeInfo,Context context){
        log.info("收到变更原始数据:{}", mySqlDataChangeInfo);// 选择策略MySqlStrategyHandleSelector selector =MySqlStrategyEnum.getSelector(mySqlDataChangeInfo);Assert.notNull("MySqlStrategyHandleSelector不能为空");BaseLogHandler<Object> handler = strategyHandlerMap.get(selector.getMySqlHandlerName());Integer operatorType = selector.getOperatorType();OperatorTypeEnum operatorTypeEnum =OperatorTypeEnum.getEnumByType(operatorType);switch(operatorTypeEnum){case INSERT:// insert操作
                handler.handleInsertLog(selector.getData(), selector.getOperatorTime());break;case UPDATE:// update操作
                handler.handleUpdateLog(selector.getData(), selector.getOperatorTime());break;case DELETE:// delete操作
                handler.handleDeleteLog(selector.getData(), selector.getOperatorTime());break;default:thrownewRuntimeException("不支持的操作类型");}}/**
     * 写入逻辑
     */@Override@SneakyThrowspublicvoidwriteWatermark(Watermark watermark){
        log.info("触发了写入逻辑writeWatermark");super.writeWatermark(watermark);}/**
     * 开始
     */@Override@SneakyThrowspublicvoidopen(OpenContext openContext){
        log.info("触发了开始逻辑open");super.open(openContext);}/**
     * 结束
     */@Override@SneakyThrowspublicvoidfinish(){
        log.info("触发了结束逻辑finish");super.finish();}}
MySqlChangeInfoKafkaProducerSink
/**
 * @author whiteBrocade
 * @version 1.0
 * @description Kafka队列中MySQL消息变更Sink
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MySqlChangeInfoKafkaProducerSink {

    /**
     * Flink相关配置
     */
    private final FlinkCDCConfig flinkCDCConfig;

    /**
     * 自定义kafKA序列化处理器
     */
    private final KafkaSerializer kafkaSerializer;

    /**
     * 获取kafka生产者算子
     */
    public KafkaSink<MySqlDataChangeInfo> getKafkaProducerSink() {
        FlinkCDCConfig.KafkaConfig kafkaConfig = flinkCDCConfig.getKafkaConfig();

        kafkaSerializer.setTopic(kafkaConfig.getTopics());
        // 创建KafkaSink算子
        KafkaSink<MySqlDataChangeInfo> kafkaProducerSink = KafkaSink.<MySqlDataChangeInfo>builder()
                // 设置集群地址
                .setBootstrapServers(kafkaConfig.getBootstrapServers())
                // 设置事务前缀
                .setTransactionalIdPrefix("Kafka_Transactional_" + kafkaConfig.getTopics() + IdUtil.getSnowflakeNextIdStr())
                .setRecordSerializer(kafkaSerializer)
                // 设置传递保证
                // At Most Once (至多一次): 系统保证消息要么被成功传递一次,要么根本不被传递。这种保证意味着消息可能会丢失,但不会被传递多次
                // At Least Once (至少一次): 系统保证消息至少会被传递一次,但可能会导致消息的重复传递。这种保证确保了消息的不丢失,但应用可能会多次消费, 需要自己实现幂等
                // Exactly Once (精确一次): 系统保证消息会被确切地传递一次,而没有任何重复。这是最高级别的传递保证,确保消息不会丢失且不会多次消费
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 设置kafka各种参数
                // .setKafkaProducerConfig(properties)
                /*
                sinkProducer的超时时间默认为1个小时,
                但是kafka broker的超时时间默认是15分钟, kafka broker不允许producer的超时时间比他大
                所以有两种解决办法:
                        1.生产者的超时时间调小
                        2.将broker的超时时间调大
                这里选择方案一, 将生产者时间调小, 将kafka producer的超时时间调至和broker一致即可
                参考博客 https://blog.csdn.net/LangLang1111111/article/details/121395831
                https://blog.csdn.net/weixin_64261178/article/details/140298696
                */
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(15 * 60 * 1000))
                .build();

        return kafkaProducerSink;
    }
MySqlChangeInfoKafkaConsumerSink
/**
 * @author whiteBrocade
 * @description: 自定义 MySqlChangeInfo kafka消费者sink
 */
@Slf4j
@Service
public class MySqlChangeInfoKafkaConsumerSink  extends RichSinkFunction<MySqlDataChangeInfo> implements Serializable {

    /**
     * 数据处理逻辑
     */
    @Override
    @SneakyThrows
    public void invoke(MySqlDataChangeInfo mySqlDataChangeInfo, Context context) {
        log.info("正在消费kafka数据:{}", JSONUtil.toJsonStr(mySqlDataChangeInfo));
    }
}
序列化器和反序列化器
KafkaDeserializer
/**
 * @author whiteBrocade
 * @description: 自定义kafka反序列化器
 */@Slf4j@ServicepublicclassKafkaDeserializerimplementsKafkaRecordDeserializationSchema<MySqlDataChangeInfo>{@Overridepublicvoiddeserialize(ConsumerRecord<byte[],byte[]>record,Collector<MySqlDataChangeInfo> collector){String valueJsonStr =newString(record.value(),StandardCharsets.UTF_8);// log.info("反序列化前kafka数据: {}", valueJsonStr);MySqlDataChangeInfo mySqlDataChangeInfo =JSONUtil.toBean(valueJsonStr,MySqlDataChangeInfo.class);
        collector.collect(mySqlDataChangeInfo);}@OverridepublicTypeInformation<MySqlDataChangeInfo>getProducedType(){returnTypeInformation.of(MySqlDataChangeInfo.class);}}
KafkaSerializer
/**
 * @author whiteBrocade
 * @version 1.0
 * @description: kafka消息 自定义序列化器
 */@Slf4j@Setter@ServicepublicclassKafkaSerializerimplementsKafkaRecordSerializationSchema<MySqlDataChangeInfo>{/**
     * 主体名称
     */privateString topic;/**
     * 序列化
     */@Nullable@OverridepublicProducerRecord<byte[],byte[]>serialize(MySqlDataChangeInfo mySqlDataChangeInfo,KafkaSinkContext context,Long timestamp){Assert.notNull(topic,"必须指定发送的topic");String jsonStr =JSONUtil.toJsonStr(mySqlDataChangeInfo);
        log.info("投递kafka到topic={}的数据key: {}, value:", topic, jsonStr);returnnewProducerRecord<>(topic, jsonStr.getBytes());}@Overridepublicvoidopen(SerializationSchema.InitializationContext context,KafkaSinkContext sinkContext)throwsException{KafkaRecordSerializationSchema.super.open(context, sinkContext);}}
MySqlDeserializer
/**
 * @author whiteBrocade
 * @version 1.0
 * @description 自定义MySQ反序列化器
 */@Slf4j@ServicepublicclassMySqlDeserializerimplementsDebeziumDeserializationSchema<MySqlDataChangeInfo>{publicstaticfinalString TS_MS ="ts_ms";publicstaticfinalString BIN_FILE ="file";publicstaticfinalString POS ="pos";publicstaticfinalString BEFORE ="before";publicstaticfinalString AFTER ="after";publicstaticfinalString SOURCE ="source";/**
     * 反序列化数据, 转为变更JSON对象
     *
     * @param sourceRecord SourceRecord
     * @param collector    Collector<DataChangeInfo>
     */@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<MySqlDataChangeInfo> collector){try{// 根据主题的格式,获取数据库名(database)和表名(tableName)String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct =(Struct) sourceRecord.value();finalStruct source = struct.getStruct(SOURCE);MySqlDataChangeInfo.MySqlDataChangeInfoBuilder infoBuilder =MySqlDataChangeInfo.builder();// 变更前的数据String beforeData =this.getJsonObject(struct, BEFORE).toJSONString();
            infoBuilder.beforeData(beforeData);// 变更后的数据String afterData =this.getJsonObject(struct, AFTER).toJSONString();
            infoBuilder.afterData(afterData);// 操作类型OperatorTypeEnum operatorTypeEnum =this.getOperatorTypeEnumBySourceRecord(sourceRecord);
            infoBuilder.operatorType(operatorTypeEnum.getType());// 文件名称
            infoBuilder.fileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
            infoBuilder.filePos(Optional.ofNullable(source.get(POS)).map(x ->Integer.parseInt(x.toString())).orElse(0));
            infoBuilder.database(database);
            infoBuilder.tableName(tableName);
            infoBuilder.operatorTime(Optional.ofNullable(struct.get(TS_MS)).map(x ->Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));// 收集数据MySqlDataChangeInfo mySqlDataChangeInfo = infoBuilder.build();
            collector.collect(mySqlDataChangeInfo);}catch(Exception e){
            log.error("反序列binlog失败", e);thrownewRuntimeException("反序列binlog失败");}}@OverridepublicTypeInformation<MySqlDataChangeInfo>getProducedType(){returnTypeInformation.of(MySqlDataChangeInfo.class);}/**
     * 从源数据获取出变更之前或之后的数据
     *
     * @param value        Struct
     * @param fieldElement 字段
     * @return JSONObject
     */privateJSONObjectgetJsonObject(Struct value,String fieldElement){Struct element = value.getStruct(fieldElement);JSONObject jsonObject =newJSONObject();if(element !=null){Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for(Field field : fieldList){Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);}}return jsonObject;}/**
     * 通过SourceRecord获取OperatorTypeEnum
     *
     * @param sourceRecord SourceRecord
     * @return OperatorTypeEnum
     */privateOperatorTypeEnumgetOperatorTypeEnumBySourceRecord(SourceRecord sourceRecord){// 获取操作类型  CREATE UPDATE DELETEEnvelope.Operation operation =Envelope.operationFor(sourceRecord);OperatorTypeEnum operatorTypeEnum =null;switch(operation){case CREATE:
                operatorTypeEnum =OperatorTypeEnum.INSERT;break;case UPDATE:
                operatorTypeEnum =OperatorTypeEnum.UPDATE;break;case DELETE:
                operatorTypeEnum =OperatorTypeEnum.DELETE;break;default:thrownewRuntimeException(StrUtil.format("不支持的操作类型OperatorTypeEnum={}", operation.toString()));}return operatorTypeEnum;}}
LogHandler
BaseLogHandler
/**
 * @author whiteBrocade
 * @version 1.0
 * @description 日志处理器
 * todo 新建一个类实现该BaseLogHandler类, 添加相应的处理逻辑即可, 可参考StudentLogHandler实现
 */publicinterfaceBaseLogHandler<T>extendsSerializable{/**
     * 日志处理
     *
     * @param data 数据转换后模型
     * @param operatorTime 操作时间
     */voidhandleInsertLog(T data,Long operatorTime);/**
     * 日志处理
     *
     * @param data 数据转换后模型
     * @param operatorTime 操作时间
     */voidhandleUpdateLog(T data,Long operatorTime);/**
     * 日志处理
     *
     * @param data 数据转换后模型
     * @param operatorTime 操作时间
     */voidhandleDeleteLog(T data,Long operatorTime);}
StudentLogHandler
/**
 * @author whiteBrocade
 * @version 1.0
 * @description Student对应处理器
 */@Slf4j@Service@RequiredArgsConstructorpublicclassStudentLogHandlerimplementsBaseLogHandler<Student>{@OverridepublicvoidhandleInsertLog(Student student,Long operatorTime){
        log.info("处理Student表的新增日志: {}", student);}@OverridepublicvoidhandleUpdateLog(Student student,Long operatorTime){
        log.info("处理Student表的修改日志: {}", student);}@OverridepublicvoidhandleDeleteLog(Student student,Long operatorTime){
        log.info("处理Student表的删除日志: {}", student);}}
JOB
MySqlDataChangeJob
/**
 * @author whiteBrocade
 * @version 1.0
 * @description MySQL数据变更 JOb
 */@Slf4j@Component@AllArgsConstructorpublicclassMySqlDataChangeJob{/**
     * Flink CDC相关配置
     */privatefinalFlinkCDCConfig flinkCDCConfig;/**
     * 自定义Sink算子
     * customSink: 通过ognl解析ddl语句类型
     * dataChangeSink: 通过struct解析ddl语句类型
     * kafkaSink: 将MySQL变化投递到Kafka
     * 通常两个选择一个就行
     */privatefinalCustomMySqlSink customMySqlSink;privatefinalMySqlDataChangeSink mySqlDataChangeSink;privatefinalMySqlChangeInfoKafkaProducerSink mysqlChangeInfoKafkaProducerSink;privatefinalLogSink logSink;/**
     * 自定义MySQL反序列化处理器
     */privatefinalMySqlDeserializer mySqlDeserializer;/**
     * 启动Job
     */@SneakyThrowspublicvoidstartJob(){
        log.info("---------------- MySqlDataChangeJob 开始启动 ----------------");FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();FlinkCDCConfig.MysqlConfig mysqlConfig = flinkCDCConfig.getMysqlConfig();// DataStream API执行模式包括:// 流执行模式(Streaming):用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式// 批执行模式(Batch):专门用于批处理的执行模式// 自动模式(AutoMatic):由程序根据输入数据源是否有界,来自动选择是流处理还是批处理执行// 执行模式选择,可以通过命令行方式配置:StreamExecutionEnvironment mySqlEnv =this.buildStreamExecutionEnvironment();// 这里选择自动模式
        mySqlEnv.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// todo 下列的两个MySqlSource选择一个// 自定义的反序列化器MySqlSource<MySqlDataChangeInfo> mySqlSource =this.buildBaseMySqlSource(MySqlDataChangeInfo.class).deserializer(mySqlDeserializer).build();// Flink CDC自带的反序列化器// MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class)//     .deserializer(new JsonDebeziumDeserializationSchema())//     .build();// 从MySQL源中读取数据DataStreamSource<MySqlDataChangeInfo> mySqlDataStreamSource = mySqlEnv.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),
                        mysqlConfig.getSourceName())// 设置该数据源的并行度.setParallelism(cdcConfig.getParallelism());// 添加一个日志sink, 用于观察
        mySqlDataStreamSource.addSink(logSink);// 添加sink算子
        mySqlDataStreamSource
                // todo 根据上述的选择,选择对应的Sink算子// .addSink(customMySqlSink)// .addSink(mySqlDataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink.sinkTo(mysqlChangeInfoKafkaProducerSink.getKafkaProducerSink());// 将MySQL的数据变化投递到Kafka中// 启动服务// execute和executeAsync启动方式对比: https://blog.csdn.net/llg___/article/details/133798713
        mySqlEnv.executeAsync(mysqlConfig.getJobName());
        log.info("---------------- MySqlDataChangeJob 启动完毕 ----------------");}/**
     * 构建流式执行环境
     *
     * @return StreamExecutionEnvironment
     */privateStreamExecutionEnvironmentbuildStreamExecutionEnvironment(){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度
        env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔
        env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/**
     * 构建基本的MySqlSourceBuilder
     *
     * @param clazz 返回的数据类型Class对象
     * @param <T>   源数据中存储的类型
     * @return MySqlSourceBuilder
     */private<T>MySqlSourceBuilder<T>buildBaseMySqlSource(Class<T> clazz){FlinkCDCConfig.MysqlConfig mysqlConfig = flinkCDCConfig.getMysqlConfig();returnMySqlSource.<T>builder().hostname(mysqlConfig.getHostname()).port(mysqlConfig.getPort()).username(mysqlConfig.getUsername()).password(mysqlConfig.getPassword()).databaseList(mysqlConfig.getDatabaseList()).tableList(mysqlConfig.getTableList())/* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest: 只进行增量导入(不读取历史变化)
                 * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
                 */.startupOptions(StartupOptions.latest()).includeSchemaChanges(mysqlConfig.getIncludeSchemaChanges())// 包括schema的改变.serverTimeZone("GMT+8");// 时区}}
KafkaMySqlDataChangeJob
/**
 * @author whiteBrocade
 * @version 1.0
 * @description kafka接受 MySQL数据变更 JOb
 */@Slf4j@Component@AllArgsConstructorpublicclassKafkaMySqlDataChangeJob{/**
     * Flink CDC相关配置
     */privatefinalFlinkCDCConfig flinkCDCConfig;/**
     * 自定义kafKA序列化处理器
     */privatefinalKafkaSerializer kafkaSerializer;/**
     * 自定义Kafka反序列化处理器
     */privatefinalKafkaDeserializer kafkaDeserializer;/**
     * 自定义 MySqlChangeInfo kafka消费者sink
     */privatefinalMySqlChangeInfoKafkaConsumerSink mySqlChangeInfoKafkaConsumerSink;@SneakyThrowspublicvoidstartJob(){
        log.info("---------------- KafkaMySqlDataChangeJob 开始启动 ----------------");FlinkCDCConfig.KafkaConfig kafkaConfig = flinkCDCConfig.getKafkaConfig();StreamExecutionEnvironment kafkaEnv =this.buildStreamExecutionEnvironment();// 创建kafka数据源KafkaSource<MySqlDataChangeInfo> kafkaSource =this.buildBaseKafkaSource(MySqlDataChangeInfo.class)// 1. 自定义反序列化器.setDeserializer(kafkaDeserializer)// 2. 使用Kafka 提供的解析器处理// .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))// 3. 只设置kafka的value反序列化// .setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<MySqlDataChangeInfo> kafkaDataStreamSource = kafkaEnv.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),
                kafkaConfig.getSourceName());// 添加消费组算子进行数据处理
        kafkaDataStreamSource.addSink(mySqlChangeInfoKafkaConsumerSink);// 启动服务// 启动报错java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames 参考博客 https://www.cnblogs.com/yeyuzhuanjia/p/18254652
        kafkaEnv.executeAsync(kafkaConfig.getJobName());
        log.info("---------------- KafkaMySqlDataChangeJob 启动完毕 ----------------");}/**
     * 构建流式执行环境
     *
     * @return StreamExecutionEnvironment
     */privateStreamExecutionEnvironmentbuildStreamExecutionEnvironment(){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度
        env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔
        env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/**
     * 构建基本的kafka数据源
     * 参考 https://cloud.tencent.com/developer/article/2393696
     * https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/
     */private<T>KafkaSourceBuilder<T>buildBaseKafkaSource(Class<T>Clazz){FlinkCDCConfig.KafkaConfig kafkaConfig = flinkCDCConfig.getKafkaConfig();returnKafkaSource.<T>builder()// 设置kafka地址.setBootstrapServers(kafkaConfig.getBootstrapServers())// 设置消费组id.setGroupId(kafkaConfig.getGroupId())// 设置主题,支持多种主题组合.setTopics(kafkaConfig.getTopics())// 消费模式, 支持多种消费模式/* OffsetsInitializer#committedOffsets: 从消费组提交的位点开始消费,不指定位点重置策略,这种策略会报异常,没有设置快照或设置自动提交
                 * OffsetsInitializer#committedOffsets(OffsetResetStrategy.EARLIEST): 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
                 * OffsetsInitializer#timestamp(1657256176000L): 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
                 * OffsetsInitializer#earliest(): 从最早位点开始消费
                 * OffsetsInitializer#latest(): 从最末尾位点开始消费,即从注册时刻开始消费
                 */.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 动态检查新分区, 10 秒检查一次新分区.setProperty("partition.discovery.interval.ms","10000");}}
Runner
/**
 * @author whiteBrocade
 * @description: 数据同步 Runner类
 */@Slf4j@Component@AllArgsConstructorpublicclassDataSyncRunnerimplementsApplicationRunner{privatefinalMySqlDataChangeJob mySqlDataChangeJob;privatefinalKafkaMySqlDataChangeJob kafkaMySqlDataChangeJob;@Override@SneakyThrowspublicvoidrun(ApplicationArguments args){
        mySqlDataChangeJob.startJob();
        kafkaMySqlDataChangeJob.startJob();}}
工具类
JSONUtil
importcom.google.gson.Gson;importcom.google.gson.reflect.TypeToken;importognl.Ognl;importognl.OgnlContext;importjava.util.Map;/**
 * @author whiteBrocade
 * @version 1.0
 * @description: JSON工具类
 */publicclassJSONUtil{/**
     * 将指定JSON转为Map对象, Key类型为String,对应JSON的key
     * Value分情况:
     * 1. Value是字符串, 自动转为字符串, 例如:{"a","b"}
     * 2. Value是其他JSON对象, 自动转为Map,例如::{"a":{"b":"2"}}
     * 3. Value是数组, 自动转为list<Map>,例如::{"a":[:{"b":"2"},"c":"3"]}
     *
     * @param json 输入的的JSON对象
     * @return 动态Map集合
     */publicstaticMap<String,Object>transferToMap(String json){Gson gson =newGson();Map<String,Object> map = gson.fromJson(json,newTypeToken<Map<String,Object>>(){}.getType());return map;}/**
     * 获取指定JSON的指定路径的值
     *
     * @param json  原始JSON数据
     * @param path  OGNL原则表达式
     * @param clazz Value对应的目标类
     * @return clazz对应的数据
     */publicstatic<T>TgetValue(String json,String path,Class<T> clazz){try{Map<String,Object> map =JSONUtil.transferToMap(json);OgnlContext ognlContext =newOgnlContext();
            ognlContext.setRoot(map);T value =(T)Ognl.getValue(path, ognlContext, ognlContext.getRoot(), clazz);return value;}catch(Exception e){thrownewRuntimeException(e);}}}

代码(投递到ActiveMQ)

新增ActiveMQ依赖
<!-- 新增 ActiveMQ, 接受Flink-CDC的日志 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>
yaml文件新增内容
# 引入ActiveMQ为了解耦日志同步, 以及持久化, 这里和kafka一致, 其实Flink也有RabbitMQ相关的连接器spring:activemq:# activemq urlbroker-url: tcp://localhost:61616# 用户名&密码user: admin
    password: admin
    # 是否使用基于内存的ActiveMQ, 实际生产中使用基于独立安装的ActiveMQin-memory:truepool:# 如果此处设置为true,需要添加activemq-pool的依赖包,否则会⾃动配置失败,⽆法注⼊JmsMessagingTemplateenabled:false# 我们需要在配置⽂件 application.yml 中添加⼀个配置# 发布/订阅消息的消息和点对点不同,订阅消息支持多个消费者一起消费。其次,SpringBoot中默认的点对点消息,所以在使用Topic时会不起作用。jms:# 该配置是 false 的话,则为点对点消息,也是 Spring Boot 默认的# 这样是可以解决问题,但是如果这样配置的话,上⾯提到的点对点消息⼜不能正常消费了。所以⼆者不可兼得,这并⾮⼀个好的解决办法# ⽐较好的解决办法是,我们定义⼀个⼯⼚,@JmsListener 注解默认只接收 queue 消息,如果要接收 topic 消息,需要设置⼀下containerFactorypub-sub-domain:true
配置类
/**
 * @author whiteBrocade
 * @version 1.0
 * @description ActiveMqConfig配置
 */@ConfigurationpublicclassActiveMqConfig{/**
     * 用于接受student表的消费信息
     */publicstaticfinalString TOPIC_NAME ="activemq:topic:student";publicstaticfinalString QUEUE_NAME ="activemq:queue:student";@BeanpublicTopictopic(){returnnewActiveMQTopic(TOPIC_NAME);}@BeanpublicQueuequeue(){returnnewActiveMQQueue(QUEUE_NAME);}/**
     * 接收topic消息,需要设置containerFactory
     */@BeanpublicJmsListenerContainerFactorytopicListenerContainer(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory =newDefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);// 相当于在application.yml中配置:spring.jms.pub-sub-domain=true
        factory.setPubSubDomain(true);return factory;}}
生产者
/**
 * @author whiteBrocade
 * @version 1.0
 * @description CustomProducer
 */@Service@RequiredArgsConstructorpublicclassCustomProducer{privatefinalJmsMessagingTemplate jmsMessagingTemplate;@SneakyThrowspublicvoidsendQueueMessage(Queue queue,String msg){String queueName = queue.getQueueName();
        jmsMessagingTemplate.convertAndSend(queueName, msg);}@SneakyThrowspublicvoidsendTopicMessage(Topic topic,String msg){String topicName = topic.getTopicName();
        jmsMessagingTemplate.convertAndSend(topicName, msg);}}
消费者
/**
 * @author whiteBrocade
 * @version 1.0
 * @description CustomQueueConsumer
 */@Slf4j@Service@RequiredArgsConstructorpublicclassCustomQueueConsumer{@JmsListener(destination =ActiveMqConfig.QUEUE_NAME)publicvoidreceiveQueueMsg(String msg){
        log.info("消费者1111收到Queue消息: {}", msg);StudentMqDTO mqDTO =JSONUtil.toBean(msg,StudentMqDTO.class);Student student = mqDTO.getStudent();Integer operatorType = mqDTO.getOperatorType();OperatorTypeEnum operatorTypeEnum =OperatorTypeEnum.getEnumByType(operatorType);switch(operatorTypeEnum){case INSERT:
                log.info("新增Student");break;case UPDATE: 
                log.info("修改Student");break;case DELETE:
                    log.info("删除Student");break;}}@JmsListener(destination =ActiveMqConfig.TOPIC_NAME, containerFactory ="topicListenerContainer")publicvoidreceiveTopicMsg(String msg){
        log.info("消费者1111收到Topic消息: {}", msg);}}
/**
 * @author whiteBrocade
 * @version 1.0
 * @description Custom2QueueConsumer
 */@Slf4j@ServicepublicclassCustom2QueueConsumer{@JmsListener(destination =ActiveMqConfig.TOPIC_NAME, containerFactory ="topicListenerContainer")publicvoidreceiveTopicMsg(String msg){
        log.info("消费者2222收到Topic消息: {}", msg);}}
model
DTO
/**
 * @author whiteBrocade
 * @description: Student MQ DTO
 */@Data@BuilderpublicclassStudentMqDTOimplementsSerializable{privatestaticfinallong serialVersionUID =4308564438724519731L;/**
     * 学生数据
     */privateStudent student;/**
     * 数据在mysql中操作类型, 见OperatorTypeEnum的Type
     */privateInteger operatorType;}
修改StudentLogHandler, 增加MQ投递逻辑
/**
 * @author whiteBrocade
 * @version 1.0
 * @description Student对应处理器
 */@Slf4j@Service@RequiredArgsConstructorpublicclassStudentLogHandlerimplementsBaseLogHandler<Student>{privatefinalQueue queue;@OverridepublicvoidhandleInsertLog(Student student,Long operatorTime){
        log.info("处理Student表的新增日志: {}", student);this.sendMq(student,OperatorTypeEnum.INSERT);}@OverridepublicvoidhandleUpdateLog(Student student,Long operatorTime){
        log.info("处理Student表的修改日志: {}", student);this.sendMq(student,OperatorTypeEnum.UPDATE);}@OverridepublicvoidhandleDeleteLog(Student student,Long operatorTime){
        log.info("处理Student表的删除日志: {}", student);this.sendMq(student,OperatorTypeEnum.DELETE);}/**
     * 发送MQ
     *
     * @param student          Student
     * @param operatorTypeEnum 操作类型枚举
     */privatevoidsendMq(Student student,OperatorTypeEnum operatorTypeEnum){StudentMqDTO mqDTO =StudentMqDTO.builder().student(student).operatorType(operatorTypeEnum.getType()).build();String jsonStr =JSONUtil.toJsonStr(mqDTO);CustomProducer customProducer =SpringUtil.getBean(CustomProducer.class);// 发送到MQ
        customProducer.sendQueueMessage(queue, jsonStr);}}
Controller
/**
 * @author whiteBrocade
 * @version 1.0
 * @description ActiveMqController, 用于测试发送ActiveMQ逻辑
 */@Slf4j@RestController@RequestMapping("/activemq")@RequiredArgsConstructorpublicclassActiveMqController{privatefinalCustomProducer customProducer;privatefinalQueue queue;privatefinalTopic topic;@PostMapping("/send/queue")publicStringsendQueueMessage(){
        log.info("开始发送点对点的消息-------------");Student student =newStudent();
        student.setId(IdUtil.getSnowflakeNextId());
        student.setName("小牛马");
        student.setDescription("我是小牛马");StudentMqDTO mqDTO =StudentMqDTO.builder().student(student).operatorType(1).build();String jsonStr =JSONUtil.toJsonStr(mqDTO);
        customProducer.sendQueueMessage(queue, jsonStr);return"success";}@PostMapping("/send/topic")publicStringsendTopicMessage(){
        log.info("===开始发送订阅消息===");Student student =newStudent();
        student.setId(IdUtil.getSnowflakeNextId());
        student.setName("小牛马");
        student.setDescription("我是小牛马");StudentMqDTO mqDTO =StudentMqDTO.builder().student(student).operatorType(1).build();String jsonStr =JSONUtil.toJsonStr(mqDTO);
        customProducer.sendTopicMessage(topic, jsonStr);return"success";}}
修改MySqlDataChangeJob, 将算子切换成mySqlDataChangeSink
/**
 * @author whiteBrocade
 * @version 1.0
 * @description MySQL数据变更 JOb
 */@Slf4j@Component@AllArgsConstructorpublicclassMySqlDataChangeJob{/**
     * Flink CDC相关配置
     */privatefinalFlinkCDCConfig flinkCDCConfig;/**
     * 自定义Sink算子
     * customSink: 通过ognl解析ddl语句类型
     * dataChangeSink: 通过struct解析ddl语句类型
     * kafkaSink: 将MySQL变化投递到Kafka
     * 通常两个选择一个就行
     */privatefinalCustomMySqlSink customMySqlSink;privatefinalMySqlDataChangeSink mySqlDataChangeSink;privatefinalMySqlChangeInfoKafkaProducerSink mysqlChangeInfoKafkaProducerSink;privatefinalLogSink logSink;/**
     * 自定义MySQL反序列化处理器
     */privatefinalMySqlDeserializer mySqlDeserializer;/**
     * 启动Job
     */@SneakyThrowspublicvoidstartJob(){
        log.info("---------------- MySqlDataChangeJob 开始启动 ----------------");FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();FlinkCDCConfig.MysqlConfig mysqlConfig = flinkCDCConfig.getMysqlConfig();// DataStream API执行模式包括:// 流执行模式(Streaming):用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式// 批执行模式(Batch):专门用于批处理的执行模式// 自动模式(AutoMatic):由程序根据输入数据源是否有界,来自动选择是流处理还是批处理执行// 执行模式选择,可以通过命令行方式配置:StreamExecutionEnvironment mySqlEnv =this.buildStreamExecutionEnvironment();// 这里选择自动模式
        mySqlEnv.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// todo 下列的两个MySqlSource选择一个// 自定义的反序列化器MySqlSource<MySqlDataChangeInfo> mySqlSource =this.buildBaseMySqlSource(MySqlDataChangeInfo.class).deserializer(mySqlDeserializer).build();// Flink CDC自带的反序列化器// MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class)//     .deserializer(new JsonDebeziumDeserializationSchema())//     .build();// 从MySQL源中读取数据DataStreamSource<MySqlDataChangeInfo> mySqlDataStreamSource = mySqlEnv.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),
                        mysqlConfig.getSourceName())// 设置该数据源的并行度.setParallelism(cdcConfig.getParallelism());// 添加一个日志sink, 用于观察
        mySqlDataStreamSource.addSink(logSink);// 添加sink算子
        mySqlDataStreamSource
                // todo 根据上述的选择,选择对应的Sink算子// .addSink(customMySqlSink).addSink(mySqlDataChangeSink);// 添加Sink, 这里配合mySQLDeserialization+dataChangeSink// .sinkTo(mysqlChangeInfoKafkaProducerSink.getKafkaProducerSink()); // 将MySQL的数据变化投递到Kafka中// 启动服务// execute和executeAsync启动方式对比: https://blog.csdn.net/llg___/article/details/133798713
        mySqlEnv.executeAsync(mysqlConfig.getJobName());
        log.info("---------------- MySqlDataChangeJob 启动完毕 ----------------");}/**
     * 构建流式执行环境
     *
     * @return StreamExecutionEnvironment
     */privateStreamExecutionEnvironmentbuildStreamExecutionEnvironment(){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度
        env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔
        env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/**
     * 构建基本的MySqlSourceBuilder
     *
     * @param clazz 返回的数据类型Class对象
     * @param <T>   源数据中存储的类型
     * @return MySqlSourceBuilder
     */private<T>MySqlSourceBuilder<T>buildBaseMySqlSource(Class<T> clazz){FlinkCDCConfig.MysqlConfig mysqlConfig = flinkCDCConfig.getMysqlConfig();returnMySqlSource.<T>builder().hostname(mysqlConfig.getHostname()).port(mysqlConfig.getPort()).username(mysqlConfig.getUsername()).password(mysqlConfig.getPassword()).databaseList(mysqlConfig.getDatabaseList()).tableList(mysqlConfig.getTableList())/* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest: 只进行增量导入(不读取历史变化)
                 * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
                 */.startupOptions(StartupOptions.latest()).includeSchemaChanges(mysqlConfig.getIncludeSchemaChanges())// 包括schema的改变.serverTimeZone("GMT+8");// 时区}}

代码(MySQL通过MQ同步到ES)

  • 换成这里的MQ替换成Kafka也是同理
  • 官方地址Easy-Es,它主要就是简化了ES相关的API, 使用起来像MP一样舒服, 这里不在过多介绍, 跑通下边这个案例要看博主另外一篇博客easy-es使用

同步方案有两种

  • Flink-CDC监听MySQL直接写入ES
  • Flink-CDC监听MySQL写入ActiveMQ, MQ写入到ES(这里实现MQ的)

引入MQ保证同步的一个持久性, 即是宕机了, 那么重启恢复后也是可以继续使用的

新增ES和Eesy-ES依赖
<!-- es依赖 --><!-- 排除springboot中内置的es依赖,以防和easy-es中的依赖冲突--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId></exclusion><exclusion><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>${es.vsersion}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${es.vsersion}</version></dependency><!-- easy-es --><dependency><groupId>org.dromara.easy-es</groupId><artifactId>easy-es-boot-starter</artifactId><version>${easy-es.vsersion}</version></dependency>
修改消费者CustomQueueConsumer
/**
 * @author whiteBrocade
 * @version 1.0
 * @description CustomQueueConsumer
 */@Slf4j@Service@RequiredArgsConstructorpublicclassCustomQueueConsumer{privatefinalStudentEsMapper studentEsMapper;@JmsListener(destination =ActiveMqConfig.QUEUE_NAME)publicvoidreceiveQueueMsg(String msg){
        log.info("消费者1111收到Queue消息: {}", msg);StudentMqDTO mqDTO =JSONUtil.toBean(msg,StudentMqDTO.class);Student student = mqDTO.getStudent();Integer operatorType = mqDTO.getOperatorType();OperatorTypeEnum operatorTypeEnum =OperatorTypeEnum.getEnumByType(operatorType);switch(operatorTypeEnum){case INSERT:// 同步新增到Es中StudentEsEntity studentEsEntity =newStudentEsEntity();BeanUtil.copyProperties(student, studentEsEntity);
                studentEsEntity.setMysqlId(student.getId());
                studentEsMapper.insert(studentEsEntity);break;case UPDATE:case DELETE:// 修改mysql, 再删除ESLambdaEsQueryWrapper<StudentEsEntity> wrapper =newLambdaEsQueryWrapper<>();
                wrapper.eq(StudentEsEntity::getMysqlId, student.getId());
                studentEsMapper.delete(wrapper);break;}}@JmsListener(destination =ActiveMqConfig.TOPIC_NAME, containerFactory ="topicListenerContainer")publicvoidreceiveTopicMsg(String msg){
        log.info("消费者1111收到Topic消息: {}", msg);}}
/**
 * @author whiteBrocade
 * @version 1.0
 * @description Custom2QueueConsumer
 */@Slf4j@ServicepublicclassCustom2QueueConsumer{@JmsListener(destination =ActiveMqConfig.TOPIC_NAME, containerFactory ="topicListenerContainer")publicvoidreceiveTopicMsg(String msg){
        log.info("消费者2222收到Topic消息: {}", msg);}}
标签: spring flink mysql

本文转载自: https://blog.csdn.net/weixin_51918722/article/details/143513443
版权归原作者 whiteBrocade 所有, 如有侵权,请联系我们删除。

“SpringBoot集成Flink-CDC”的评论:

还没有评论