0


Flink MySQL CDC 使用总结

前言

学习总结Flink MySQL CDC,主要目的是同步MySQL数据至其他数据源如Hudi、MySQL等,本文主要以 MySQL2Hudi、MySQL2MySQL两个场景进行示例验证。

版本

Flink版本Flink1.14.3、1.15.4、1.16.1Hudi0.13.0MYSQL CDC2.3.0

安装

将下面的Jar包拷贝到flink/lib下面 (以flink1.15.4为例)

Flink CDC,只是对于Source表,比如MySQL CDC,就是抽取MySQL Source表,CDC 官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#,可以查看官方文档了解目前Flink CDC支持哪些数据源,每一种数据源都需要下载对应的Jar包

MySQL CDC 参数

CREATETABLE mysql_cdc_source (
  id intPRIMARYKEYNOT ENFORCED,--主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
)WITH('connector'='mysql-cdc','hostname'='19.168.44.128','port'='3306','username'='root','password'='root-123','database-name'='cdc','table-name'='mysql_cdc_source');

要使用MySQL CDC Source首先要开启MySQL binlog日志,其他参数和详细信息可以查看官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html#id6

示例

创建MySQL Source物理表

mysql -uroot -proot-123 cdc
CREATETABLE`mysql_cdc_source`(`id`int(11)NOTNULL,`name`text,`price`doubleDEFAULTNULL,`ts`int(11)DEFAULTNULL,`dt`text,`insert_time`timestampNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',`update_time`timestampNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'更新时间',PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;

造数

insertinto mysql_cdc_source(id,name,price,ts,dt)values(1,'hudi1',1.1,1000,'20230331');insertinto mysql_cdc_source(id,name,price,ts,dt)values(2,'hudi2',2.2,2000,'20230331');......

CDC MySQL2Hudi

set yarn.application.name=cdc_mysql2hudi;set parallelism.default=1;set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000;set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2hudi;set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATETABLE mysql_cdc_source (
  id intPRIMARYKEYNOT ENFORCED,--主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
)WITH('connector'='mysql-cdc','hostname'='19.168.44.128','port'='3306','username'='root','password'='root-123','database-name'='cdc','table-name'='mysql_cdc_source');CREATETABLE hudi_cdc_sink (
  id intPRIMARYKEYNOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
)WITH('connector'='hudi','path'='/tmp/cdc/hudi_cdc_sink','write.operation'='upsert',--写类型,可选'write.tasks'='1',--并行度,可选,需要传参'table.type'='COPY_ON_WRITE',--表类型,可选'precombine.field'='ts',--可选,预合并字段和历史比较字段,当新来的数据该字段大于历史值时才会更新,默认为ts(如果有这个ts字段的话),需要传参,没有可不填,建议将该值设置为update_time'hoodie.datasource.write.recordkey.field'='id',-- 可选,和primary key效果一样,二者至少选一个'hoodie.datasource.write.keygenerator.class'='org.apache.hudi.keygen.ComplexAvroKeyGenerator',--该参数目前版本有bug'index.type'='BUCKET',-- flink只支持两种index,默认state index,默认state index对于数据量比较大的情况会因为tm内存不足导致GC OOM'hoodie.bucket.index.num.buckets'='16',-- 桶数'hive_sync.enable'='true','hive_sync.mode'='hms','hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf','hive_sync.db'='cdc','hive_sync.table'='hudi_cdc_sink','hoodie.datasource.hive_sync.create_managed_table'='true'--是否为内部表,0.13.0版本开始支持);insertinto hudi_cdc_sink select*from mysql_cdc_source;

注意,要求source表和sink表字段顺序要对应

CDC MySQL2Mysql

创建MySQL Sink物理表

CREATETABLE`test_sink_mysql`(`id`int(11)NOTNULL,`name`text,`price`doubleDEFAULTNULL,`ts`int(11)DEFAULTNULL,`dt`text,`insert_time`timestampNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',`update_time`timestampNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'更新时间',PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;
set yarn.application.name=cdc_mysql2mysql;set parallelism.default=1;set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000;set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATETABLE mysql_cdc_source (
  id intPRIMARYKEYNOT ENFORCED,--主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
)WITH('connector'='mysql-cdc','hostname'='19.168.44.128','port'='3306','username'='root','password'='root-123','database-name'='cdc','table-name'='mysql_cdc_source');createtable test_sink_mysql (
  id intPRIMARYKEYNOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
)with('connector'='jdbc','url'='jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username'='root','password'='root-123','table-name'='test_sink_mysql','sink.buffer-flush.max-rows'='1000000');insertinto test_sink_mysql(id,name,price,ts,dt)select*from mysql_cdc_source;

验证

1、对源表mysql_cdc_source执行insert/update/delete操作,查看目标表数据同步情况,数据实时同步且一致
2、找个比较大的source表,在历史数据同步中间过程中,kill掉任务,利用checkpoint恢复任务,验证全量数据的断点续传
3、对源表执行truncate操作,目标表数据不会同步truncate
4、其他验证…

相关阅读

  • Flink SQL Checkpoint 学习总结
  • Flink SQL操作Hudi并同步Hive使用总结
标签: mysql flink 数据库

本文转载自: https://blog.csdn.net/dkl12/article/details/129936797
版权归原作者 董可伦 所有, 如有侵权,请联系我们删除。

“Flink MySQL CDC 使用总结”的评论:

还没有评论