0


采用Flink CDC操作SQL Server数据库获取增量变更数据

采用Flink CDC操作SQL Server数据库获取增量变更数据

Flink CDC 1.12版本引入了对SQL Server的支持,包括

SqlServerCatalog

SqlServerTable

。在

SqlServerCatalog

中,你可以根据表名获取对应的字段和字段类型。

SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。

1、准备工作

软件版本

Flink 1.17.1

数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)

1.1、数据库准备 启动CDC

--  开启SQL Server数据库CDC。  在需要开启CDC的数据库执行此命令EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库select name, is_cdc_enabled from sys.databases

1.2、开启SQL Server代理

打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启

在这里插入图片描述

1.3、为需要跟踪更改的表启用 CDC。

-- 开启表级别的CDC   --需要开启先SQL Server代理  然后执行 EXEC sys.sp_cdc_enable_table
        @source_schema='dbo',-- source_schema@source_name='AIR_STATION_HOUR_DATA',-- table_name@capture_instance=NULL,-- capture_instance@supports_net_changes=1,-- supports_net_changes@role_name=NULL-- role_name--  验证表是否开启cdc成功EXEC sys.sp_cdc_help_change_data_capture

2、代码编写

2.1、引入依赖

<properties><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>mssql-jdbc</artifactId><version>9.4.1.jre8</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency></dependencies>

2.2、代码编写

2.2.1 、数据库配置文件编写

publicclassSQLServerConstant{publicstaticfinalStringSQLSERVER_HOST="0.0.0.0";//数据库地址publicstaticfinalIntegerSQLSERVER_PORT=1433;//端口publicstaticfinalStringSQLSERVER_DATABASE="HBDC_AQI";//库publicstaticfinalStringSQLSERVER_TABLE_LIST="dbo.AIR_STATION_HOUR_DATA";// 表publicstaticfinalStringSQLSERVER_USER_NAME="sa";//用户publicstaticfinalStringSQLSERVER_PASSWORD="*******";//密码}

2.2.2 CDC数据实体类

@DatapublicclassDataChangeInfoimplementsSerializable{/**
     * 数据库名
     */privateString database;/**
     * 表名
     */privateString tableName;/**
     * 变更时间
     */privateLocalDateTime changeTime;/**
     * 变更类型 1新增 2修改 3删除
     */privateInteger eventType;/**
     * 变更前数据
     */privateString beforeData;/**
     * 变更后数据
     */privateString afterData;}

2.2.2 、SQLServer消息读取自定义序列化

@Slf4jpublicclassSQLServerJsonDebeziumDeserializationSchemaimplementsDebeziumDeserializationSchema<DataChangeInfo>{publicstaticfinalStringTS_MS="ts_ms";publicstaticfinalStringBEFORE="before";publicstaticfinalStringAFTER="after";publicstaticfinalStringSOURCE="source";publicstaticfinalStringCREATE="CREATE";publicstaticfinalStringUPDATE="UPDATE";@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<DataChangeInfo> collector)throwsException{try{String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct =(Struct) sourceRecord.value();finalStruct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo =newDataChangeInfo();
            dataChangeInfo.setBeforeData(getJsonObject(struct,BEFORE).toJSONString());
            dataChangeInfo.setAfterData(getJsonObject(struct,AFTER).toJSONString());// 获取操作类型  CREATE UPDATE DELETE  1新增 2修改 3删除Envelope.Operation operation =Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE)?1:UPDATE.equals(type)?2:3;
            dataChangeInfo.setEventType(eventType);
            dataChangeInfo.setDatabase(database);
            dataChangeInfo.setTableName(tableName);ZoneId zone =ZoneId.systemDefault();Long timestamp =Optional.ofNullable(struct.get(TS_MS)).map(x ->Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
            dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));//7.输出数据
            collector.collect(dataChangeInfo);}catch(Exception e){
            log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());}}/**
     *
     * 从源数据获取出变更之前或之后的数据
     */privateJSONObjectgetJsonObject(Struct value,String fieldElement){Struct element = value.getStruct(fieldElement);JSONObject jsonObject =newJSONObject();if(element !=null){Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for(Field field : fieldList){Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);}}return jsonObject;}@OverridepublicTypeInformation<DataChangeInfo>getProducedType(){returnTypeInformation.of(DataChangeInfo.class);}}

2.2.3 、功能工具类

publicclassFlinkSourceUtil{/**
     * 构造SQL Server CDC数据源
     */publicstaticDebeziumSourceFunction<DataChangeInfo>buildDataChangeSource(){String[] tables =SQLSERVER_TABLE_LIST.replace(" ","").split(",");returnSqlServerSource.<DataChangeInfo>builder().hostname(SQLSERVER_HOST).port(SQLSERVER_PORT).database(SQLSERVER_DATABASE)// monitor sqlserver database.tableList(tables)// monitor products table.username(SQLSERVER_USER_NAME).password(SQLSERVER_PASSWORD)/*
                 *initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 */.startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial()).deserializer(newSQLServerJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource =buildDataChangeSource();DataStream<DataChangeInfo> streamSource = env
                .addSource(dataChangeInfoMySqlSource,"SQLServer-source").setParallelism(1);
        streamSource.print();

        env.execute("SQLServer-stream-cdc");}}

2.3、运行main方法测试

在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_22494169/article/details/137546019
版权归原作者 浮华1994 所有, 如有侵权,请联系我们删除。

“采用Flink CDC操作SQL Server数据库获取增量变更数据”的评论:

还没有评论