0


实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

前言:

    我的场景是从SQL Server数据库获取指定表的增量数据,查询了很多获取增量数据的方案,最终选择了Flink的 flink-connector-sqlserver-cdc ,这个需要用到SQL Server 的CDC(变更数据捕获),通过CDC来获取增量数据,处理数据前需要对数据库进行配置,如果不清楚如何配置可以看看我这篇文章:《SQL Server数据库开启CDC变更数据捕获操作指引》

废话不多说,直接上干货,如有不足还请指正

1、springboot引入依赖:

    <properties>
        <flink.version>1.16.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>9.4.0.jre8</version>
        </dependency>        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>1.13.6</version>
        </dependency>
    </dependencies>

2、yml配置文件

spring:
    datasource:
    url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001
    username: sa
    password: root
    driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# 实时同步SQL Server数据库配置
CDC:
  DataSource:
    host: 127.0.0.1
    port: 1433
    database: HM_5001
    tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4
    username: sa
    password: sa

3、创建SQL server CDC变更数据监听器

import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.io.Serializable;

/**
 * SQL server CDC变更监听器
 **/
@Component
@Slf4j
public class SQLServerCDCListener implements ApplicationRunner, Serializable {

    /**
     * CDC数据源配置
     */
    @Value("${CDC.DataSource.host}")
    private String host;
    @Value("${CDC.DataSource.port}")
    private String port;
    @Value("${CDC.DataSource.database}")
    private String database;
    @Value("${CDC.DataSource.tableList}")
    private String tableList;
    @Value("${CDC.DataSource.username}")
    private String username;
    @Value("${CDC.DataSource.password}")
    private String password;

    private final DataChangeSink dataChangeSink;

    public SQLServerCDCListener(DataChangeSink dataChangeSink) {
        this.dataChangeSink = dataChangeSink;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("开始启动Flink CDC获取ERP变更数据......");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStream<DataChangeInfo> streamSource = env
                .addSource(dataChangeInfoMySqlSource, "SQLServer-source")
                .setParallelism(1);
        streamSource.addSink(dataChangeSink);
        env.execute("SQLServer-stream-cdc");
    }

    /**
     * 构造CDC数据源
     */
    private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
        String[] tables = tableList.replace(" ", "").split(",");
        return SqlServerSource.<DataChangeInfo>builder()
                .hostname(host)
                .port(Integer.parseInt(port))
                .database(database) // monitor sqlserver database
                .tableList(tables) // monitor products table
                .username(username)
                .password(password)
                /*
                 *initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化) 
                 */
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
    }
}

4、反序列化数据,转为变更JSON对象

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;

/**
 * SQLServer消息读取自定义序列化
 **/
@Slf4j
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {

    public static final String TS_MS = "ts_ms";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String CREATE = "CREATE";
    public static final String UPDATE = "UPDATE";

    /**
     *
     * 反序列化数据,转为变更JSON对象
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
        try {
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
            Struct struct = (Struct) sourceRecord.value();
            final Struct source = struct.getStruct(SOURCE);
            DataChangeInfo dataChangeInfo = new DataChangeInfo();
            dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
            dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
            // 获取操作类型  CREATE UPDATE DELETE  1新增 2修改 3删除
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toUpperCase();
            int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
            dataChangeInfo.setEventType(eventType);
            dataChangeInfo.setDatabase(database);
            dataChangeInfo.setTableName(tableName);
            ZoneId zone = ZoneId.systemDefault();
            Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
            dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
            //7.输出数据
            collector.collect(dataChangeInfo);
        } catch (Exception e) {
            log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     *
     * 从源数据获取出变更之前或之后的数据
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }

    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {
        return TypeInformation.of(DataChangeInfo.class);
    }
}

5、CDC 数据实体类

import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * CDC 数据实体类
 */
@Data
public class DataChangeInfo implements Serializable {

    /**
     * 数据库名
     */
    private String database;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 变更时间
     */
    private LocalDateTime changeTime;
    /**
     * 变更类型 1新增 2修改 3删除
     */
    private Integer eventType;
    /**
     * 变更前数据
     */
    private String beforeData;
    /**
     * 变更后数据
     */
    private String afterData;
}

6、自定义ApplicationContextUtil

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.io.Serializable;

@Component
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
    /**
     * 上下文
     */
    private static ApplicationContext context;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }
    public static ApplicationContext getApplicationContext() {
        return context;
    }
    public static <T> T getBean(Class<T> beanClass) {
        return context.getBean(beanClass);
    }
}

7、自定义sink 交由spring管理,处理变更数据

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

/**
 * 自定义sink 交由spring管理
 * 处理变更数据
 **/
@Component
@Slf4j
public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {

    private static final long serialVersionUID = -74375380912179188L;

    private UserMapper userMapper;

    /**
     * 在open()方法中动态注入Spring容器的类
     * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;
     * 但是Flink启动的项目中,默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,
     * 只有在Spring所在的线程才能使用@Autowired,故在Flink自定义的Sink的open()方法中初始化Spring容器
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        userMapper = ApplicationContextUtil.getBean(UserMapper.class);
    }

    @Override
    public void invoke(DataChangeInfo dataChangeInfo, Context context) {
        log.info("收到变更原始数据:{}", dataChangeInfo);
        // TODO 开始处理你的数据吧
    }

以上是我亲自验证测试的结果,已发布生产环境,如有不足还请指正。


本文转载自: https://blog.csdn.net/weixin_42717648/article/details/130148168
版权归原作者 arden.WANG 所有, 如有侵权,请联系我们删除。

“实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据”的评论:

还没有评论