采用Flink CDC操作SQL Server数据库获取增量变更数据
Flink CDC 1.12版本引入了对SQL Server的支持,包括
SqlServerCatalog
和
SqlServerTable
。在
SqlServerCatalog
中,你可以根据表名获取对应的字段和字段类型。
SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。
1、准备工作
软件版本
Flink 1.17.1
数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)
1.1、数据库准备 启动CDC
-- 开启SQL Server数据库CDC。 在需要开启CDC的数据库执行此命令EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库select name, is_cdc_enabled from sys.databases
1.2、开启SQL Server代理
打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启
1.3、为需要跟踪更改的表启用 CDC。
-- 开启表级别的CDC --需要开启先SQL Server代理 然后执行 EXEC sys.sp_cdc_enable_table
@source_schema='dbo',-- source_schema@source_name='AIR_STATION_HOUR_DATA',-- table_name@capture_instance=NULL,-- capture_instance@supports_net_changes=1,-- supports_net_changes@role_name=NULL-- role_name-- 验证表是否开启cdc成功EXEC sys.sp_cdc_help_change_data_capture
2、代码编写
2.1、引入依赖
<properties><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>mssql-jdbc</artifactId><version>9.4.1.jre8</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency></dependencies>
2.2、代码编写
2.2.1 、数据库配置文件编写
publicclassSQLServerConstant{publicstaticfinalStringSQLSERVER_HOST="0.0.0.0";//数据库地址publicstaticfinalIntegerSQLSERVER_PORT=1433;//端口publicstaticfinalStringSQLSERVER_DATABASE="HBDC_AQI";//库publicstaticfinalStringSQLSERVER_TABLE_LIST="dbo.AIR_STATION_HOUR_DATA";// 表publicstaticfinalStringSQLSERVER_USER_NAME="sa";//用户publicstaticfinalStringSQLSERVER_PASSWORD="*******";//密码}
2.2.2 CDC数据实体类
@DatapublicclassDataChangeInfoimplementsSerializable{/**
* 数据库名
*/privateString database;/**
* 表名
*/privateString tableName;/**
* 变更时间
*/privateLocalDateTime changeTime;/**
* 变更类型 1新增 2修改 3删除
*/privateInteger eventType;/**
* 变更前数据
*/privateString beforeData;/**
* 变更后数据
*/privateString afterData;}
2.2.2 、SQLServer消息读取自定义序列化
@Slf4jpublicclassSQLServerJsonDebeziumDeserializationSchemaimplementsDebeziumDeserializationSchema<DataChangeInfo>{publicstaticfinalStringTS_MS="ts_ms";publicstaticfinalStringBEFORE="before";publicstaticfinalStringAFTER="after";publicstaticfinalStringSOURCE="source";publicstaticfinalStringCREATE="CREATE";publicstaticfinalStringUPDATE="UPDATE";@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<DataChangeInfo> collector)throwsException{try{String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct =(Struct) sourceRecord.value();finalStruct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo =newDataChangeInfo();
dataChangeInfo.setBeforeData(getJsonObject(struct,BEFORE).toJSONString());
dataChangeInfo.setAfterData(getJsonObject(struct,AFTER).toJSONString());// 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除Envelope.Operation operation =Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE)?1:UPDATE.equals(type)?2:3;
dataChangeInfo.setEventType(eventType);
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);ZoneId zone =ZoneId.systemDefault();Long timestamp =Optional.ofNullable(struct.get(TS_MS)).map(x ->Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));//7.输出数据
collector.collect(dataChangeInfo);}catch(Exception e){
log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());}}/**
*
* 从源数据获取出变更之前或之后的数据
*/privateJSONObjectgetJsonObject(Struct value,String fieldElement){Struct element = value.getStruct(fieldElement);JSONObject jsonObject =newJSONObject();if(element !=null){Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for(Field field : fieldList){Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);}}return jsonObject;}@OverridepublicTypeInformation<DataChangeInfo>getProducedType(){returnTypeInformation.of(DataChangeInfo.class);}}
2.2.3 、功能工具类
publicclassFlinkSourceUtil{/**
* 构造SQL Server CDC数据源
*/publicstaticDebeziumSourceFunction<DataChangeInfo>buildDataChangeSource(){String[] tables =SQLSERVER_TABLE_LIST.replace(" ","").split(",");returnSqlServerSource.<DataChangeInfo>builder().hostname(SQLSERVER_HOST).port(SQLSERVER_PORT).database(SQLSERVER_DATABASE)// monitor sqlserver database.tableList(tables)// monitor products table.username(SQLSERVER_USER_NAME).password(SQLSERVER_PASSWORD)/*
*initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
*/.startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial()).deserializer(newSQLServerJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource =buildDataChangeSource();DataStream<DataChangeInfo> streamSource = env
.addSource(dataChangeInfoMySqlSource,"SQLServer-source").setParallelism(1);
streamSource.print();
env.execute("SQLServer-stream-cdc");}}
2.3、运行main方法测试
版权归原作者 浮华1994 所有, 如有侵权,请联系我们删除。