TABLE_CATALOG TABLE_SCHEMA TABLE_NAME TABLE_TYPE
test dbo user_info BASE TABLE
test dbo systranschemas BASE TABLE
test cdc change_tables BASE TABLE
test cdc ddl_history BASE TABLE
test cdc lsn_time_mapping BASE TABLE
test cdc captured_columns BASE TABLE
test cdc index_columns BASE TABLE
test dbo orders BASE TABLE
test cdc dbo_orders_CT BASE TABLE
#### 二、具体实现
##### 2.1 Flik-CDC采集SqlServer主程序
添加依赖包:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-sqlserver-cdc</artifactId>
<version>3.0.0</version>
</dependency>
编写主函数:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置全局并行度
env.setParallelism(1);
// 设置时间语义为ProcessingTime
env.getConfig().setAutoWatermarkInterval(0);
// 每隔60s启动一个检查点
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY\_ONCE);
// checkpoint最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// checkpoint超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许一个checkpoint
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Flink处理程序被cancel后,会保留Checkpoint数据
// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN\_ON\_CANCELLATION);
SourceFunction<String> sqlServerSource = SqlServerSource.<String>builder()
.hostname("localhost")
.port(1433)
.username("SA")
.password("")
.database("test")
.tableList("dbo.t\_info")
.startupOptions(StartupOptions.initial())
.debeziumProperties(getDebeziumProperties())
.deserializer(new CustomerDeserializationSchemaSqlserver())
.build();
DataStreamSource<String> dataStreamSource = env.addSource(sqlServerSource, "\_transaction\_log\_source");
dataStreamSource.print().setParallelism(1);
env.execute("sqlserver-cdc-test");
}
public static Properties getDebeziumProperties() {
Properties properties = new Properties();
properties.put("converters", "sqlserverDebeziumConverter");
properties.put("sqlserverDebeziumConverter.type", "SqlserverDebe
版权归原作者 2401_84160087 所有, 如有侵权,请联系我们删除。