文章目录
MySQL CDC 连接器允许从 MySQL 数据库读取快照数据(
比如:flink任务消费时刻的整表数据
)和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。
本篇只关注mysql整表与增量读取的实现,对于并发读取等能力后续再探索。
一. 运行前准备
1. 依赖
1.1. Maven dependency
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 --><version>2.4.0</version></dependency>
1.2. SQL Client JAR(推荐)
下载 flink-sql-connector-mysql-cdc-2.4.0.jar 到
<FLINK_HOME>/lib/
目录下。
2. 配置 MySQL 服务器(必须)
你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。
# 创建用户
mysql>CREATEUSER'user'@'localhost' IDENTIFIED BY'password';# 赋权
mysql>GRANTSELECT,SHOWDATABASES,REPLICATION SLAVE,REPLICATION CLIENT ON*.*TO'user' IDENTIFIED BY'password';# 刷新权限
mysql> FLUSH PRIVILEGES;
注意:
在
scan.incremental.snapshot.enabled
参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。
二. 功能说明
1. 启动模式
配置选项
scan.startup.mode
指定 MySQL CDC 使用者的启动模式。有效枚举包括:
- initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取
最新的 binlog
。- earliest-offset:跳过快照阶段,从
可读取
的最早 binlog 位点开始读取- latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在
连接器启动之后的数据更改
。- specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
- timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
MySQLSource.builder().startupOptions(StartupOptions.earliest())// 从最早位点启动.startupOptions(StartupOptions.latest())// 从最晚位点启动.startupOptions(StartupOptions.specificOffset("mysql-bin.000003",4L)// 从指定 binlog 文件名和位置启动.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19"))// 从 GTID 集合启动.startupOptions(StartupOptions.timestamp(1667232000000L)// 从时间戳启动....build()CREATETABLE mysql_source (...)WITH('connector'='mysql-cdc','scan.startup.mode'='earliest-offset',-- 从最早位点启动'scan.startup.mode'='latest-offset',-- 从最晚位点启动'scan.startup.mode'='specific-offset',-- 从特定位点启动'scan.startup.specific-offset.file'='mysql-bin.000003',-- 在特定位点启动模式下指定 binlog 文件名'scan.startup.specific-offset.pos'='4',-- 在特定位点启动模式下指定 binlog 位置'scan.startup.specific-offset.gtid-set'='24DA167-0C0C-11E8-8442-00059A3C7B00:1-19',-- 在特定位点启动模式下指定 GTID 集合'scan.startup.mode'='timestamp',-- 从特定位点启动'scan.startup.timestamp-millis'='1667232000000'-- 在时间戳启动模式下指定启动时间戳...)
2. 全量阶段支持 checkpoint
增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。
3. 关于无主键表
从2.4.0 版本开始支持无主键表,使用无主键表必须设置
scan.incremental.snapshot.chunk.key-column
,且只能选择非空类型的一个字段。
在使用无主键表时,需要注意以下两种情况。
- 配置
scan.incremental.snapshot.chunk.key-column
时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。- 无主键表的处理语义由
scan.incremental.snapshot.chunk.key-column
指定的列的行为决定:
- 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
- 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。
Exactly-Once 处理
MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。
三. 实战
1. 实现mysql整表与增量表同步
-- 'scan.startup.mode'= 'initial' -- CREATETABLE tjy_sql1
(`id`int,`name` string,`face` string
,PRIMARYKEY(id)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='xxx','port'='3306','username'='middle_test','password'='123456','database-name'='middle_test','table-name'='tjy_fortest1'-- ,'scan.incremental.snapshot.enabled' = 'false' -- initial: 默认值,全表同步,然后进行增量同步;-- 'scan.startup.mode'= 'initial' -- 'debezium.snapshot.mode' = 'initial' ); CREATETABLE tjy_sql1_sink
(`id`int,`name` string,`face` string
,PRIMARYKEY(id)NOT ENFORCED
)WITH('connector'='mysql-x','url'='jdbc:mysql://xxx:3306/middle_test?useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true','username'='middle_test','password'='123456','table-name'='flink_type','table-name'='tjy_fortest2');insertinto tjy_sql1_sink select*from tjy_sql1;
FAQ
相关问题:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)
可能涉及到的问题
版权归原作者 roman_日积跬步-终至千里 所有, 如有侵权,请联系我们删除。