FlinkCDC接入多表或整库
前言
flink cdc实时同步mysql维表
本文是基于flink 1.16
一、导入flink-doris-connector jar包
将 flink-doris-connector-1.16-1.4.0.jar 上传至 $FLINK_HOME/lib目录下
可至 maven官网 下载
二、cdc脚本
--database 同步到Doris的数据库名。
--table-prefix Doris表前缀名,例如 --table-prefix ods_。
--table-suffix 同上,Doris表的后缀名。
--including-tables 需要同步的MySQL表,可以使用"|" 分隔多个表,并支持正则表达式。 比如--including-tables table1|tbl.*就是同步table1和所有以tbl开头的表。
--excluding-tables 不需要同步的表,用法同上。
--mysql-conf MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1 ,您可以在这里查看所有配置MySQL-CDC,其中hostname/username/password/database-name 是必需的。
--sink-conf Doris Sink 的所有配置,可以在这里查看完整的配置项。
--table-conf Doris表的配置项,即properties中包含的内容。 例如 --table-conf replication_num=1
--ignore-default-value 关闭同步mysql表结构的默认值。适用于同步mysql数据到doris时,字段有默认值,但实际插入数据为null情况。
--use-new-schema-change 新的schema change支持同步mysql多列变更、默认值。
# 若要接入整库,则 including-tables 和 excluding-tables不填
三、脚本配置
vim cdc.sh
# 写入如下内容exportHADOOP_CLASSPATH=`hadoop classpath`# 我部署的是yarn session模式,以application形式启动程序,这可自行更改$FLINK_HOME/bin/flink run-application -t yarn-application \-Djobmanager.memory.process.size=700m \-Dtaskmanager.memory.process.size=1024m \#必须设置checkpoint-Dexecution.checkpointing.interval=10s \
-Drest.bind-port=8082-8087 \-Dparallelism.default=1\-Denv.java.opts="-Dfile.encoding=UTF-8"\#程序入口及jar包位置-c org.apache.doris.flink.tools.cdc.CdcTools \$FLINK_HOME/lib/flink-doris-connector-1.16-1.4.0.jar \#以下是cdc配置
mysql-sync-database \--database doris_test --mysql-conf hostname=127.0.0.1 --mysql-conf username=root --mysql-conf password=123456 --mysql-conf database-name=cc_test --including-tables "tbl1|test.*" --sink-conf fenodes=127.0.0.1:8030 --sink-conf username=root --sink-conf password=000000 --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 --sink-conf sink.label-prefix=test_1
# 同步tbl1和所有以test开头的表
四、运行脚本
sh cdc.sh
本文转载自: https://blog.csdn.net/weixin_44378305/article/details/132869934
版权归原作者 派大星的海洋ku 所有, 如有侵权,请联系我们删除。
版权归原作者 派大星的海洋ku 所有, 如有侵权,请联系我们删除。