阿里云Flink MySQL连接器介绍
阿里云提供了MySQL连接器,其作为源表时,扮演的就是flink cdc的角色。
一、特色功能
MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证不多读一条也不少读一条数据。即使发生故障,也能保证通过Exactly Once语义处理数据。MySQL CDC源表支持并发地读取全量数据,通过增量快照算法实现了全程无锁和断点续传。
作为源表,支持以下功能特性。
- 流批一体,支持读取全量和增量数据,无需维护两套流程。
- 支持并发读取全量数据,性能水平扩展。
- 全量读取无缝切换增量读取,自动缩容,节省计算资源。
- 全量阶段读取支持断点续传,更稳定。
- 无锁读取全量数据,不影响在线业务。
二、语法结构
CREATE TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
三、WITH参数
- 通用
参数
说明
是否必填
数据类型
默认值
备注
connector
表类型。
是
STRING
无
作为源表时,可以填写为mysql-cdc或者mysql,二者等价。作为维表或结果表时,固定值为mysql。
hostname
MySQL数据库的IP地址或者Hostname。
是
STRING
无
建议填写专有网络VPC地址。
username
MySQL数据库服务的用户名。
是
STRING
无
无。
password
MySQL数据库服务的密码。
是
STRING
无
无。
database-name
MySQL数据库名称。
是
STRING
无
- 作为源表时,数据库名称支持正则表达式以读取多个数据库的数据。
- 使用正则表达式时,尽量不要使用^和$符号匹配开头和结尾。具体原因详见table-name备注的说明。
table-name
MySQL表名。
是
STRING
无
- 作为源表时,表名支持正则表达式以读取多个表的数据。
- 使用正则表达式时,尽量不要使用^和$符号匹配开头和结尾。具体原因详见以下说明。
说明:MySQL CDC源表在正则匹配表名时,会将您填写的 database-name,table-name 通过字符串 \.(VVR 8.0.1前使用字符.)连接成为一个全路径的正则表达式,然后使用该正则表达式和MySQL数据库中表的全限定名进行正则匹配。例如:当配置'database-name'='db_.*'且'table-name'='tb_.+'时,连接器将会使用正则表达式db_.\.tb_.+(8.0.1版本前为db_..tb_.+)去匹配表的全限定名来确定需要读取的表。
port
MySQL数据库服务的端口号。
否
INTEGER
3306
无。
- 源表独有
参数
说明
是否必填
数据类型
默认值
备注
server-id
数据库客户端的一个数字ID。
否
STRING
默认会随机生成一个5400~6400的值。
该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。
该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。
scan.incremental.snapshot.enabled
是否开启增量快照。
否
BOOLEAN
true
默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括:
- 读取全量数据时,Source可以是并行读取。
- 读取全量数据时,Source支持chunk粒度的检查点。
- 读取全量数据时,Source不需要获取全局读锁(FLUSH TABLES WITH read lock)。
如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。
scan.incremental.snapshot.chunk.size
表的chunk的大小(行数)。
否
INTEGER
8096
当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中,因此chunk 太大,可能导致内存OOM。chunk越小,故障恢复的粒度也越小,但也会降低吞吐。
scan.snapshot.fetch.size
当读取表的全量数据时,每次最多拉取的记录数。
否
INTEGER
1024
无。
scan.startup.mode
消费数据时的启动模式。
否
STRING
initial
参数取值如下:
- initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
- latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。
- earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。
- specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过配置scan.startup.specific-offset.file和scan.startup.specific-offset.pos的方式来指定从特定Binlog文件名和偏移量启动,也可以通过配置scan.startup.specific-offset.gtid-set指定从某个GTID集合启动。
- timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。
scan.startup.specific-offset.file
使用指定位点模式启动时,启动位点的Binlog文件名。
否
STRING
无
使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如mysql-bin.000003。
scan.startup.specific-offset.pos
使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。
否
INTEGER
无
使用该配置时,scan.startup.mode必须配置为specific-offset。
scan.startup.specific-offset.gtid-set
使用指定位点模式启动时,启动位点的GTID集合。
否
STRING
无
使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。
scan.startup.timestamp-millis
使用指定时间模式启动时,启动位点的毫秒时间戳。
否
LONG
无
使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。
重要:在使用指定时间时,MySQL CDC会从最早Binlog开始读取,直至Binlog事件的时间戳大于等于指定的时间戳后开始向下游发送数据。因此请保证指定的时间戳对应的Binlog文件在数据库上没有被清理且可以被读取到。
server-time-zone
数据库在使用的会话时区。
VVR-6.0.2以下版本必填,其他版本选填
STRING
如果您没有指定该参数,则系统默认使用Flink作业运行时的环境时区作为数据库服务器时区,即您选择的可用区所在的时区。
例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见。
debezium.min.row.count.to.stream.results
当表的条数大于该值时,会使用分批读取模式。
否
INTEGER
1000
Flink采用以下方式读取MySQL源表数据:
- 全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有OOM风险。
- 分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有OOM风险,缺点是读取速度相对较慢。
connect.timeout
连接MySQL数据库服务器超时时,重试连接之前等待超时的最长时间。
否
DURATION
30s
无。
connect.max-retries
连接MySQL数据库服务时,连接失败后重试的最大次数。
否
INTEGER
3
无。
connection.pool.size
数据库连接池大小。
否
INTEGER
20
数据库连接池用于复用连接,可以降低数据库连接数量。
jdbc.properties.*
JDBC URL中的自定义连接参数。
否
STRING
无
您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为****'jdbc.properties.useSSL' = 'false'****。
支持的连接参数请参见Mysql Configuration Properties。
heartbeat.interval
Source通过心跳事件推动Binlog位点前进的时间间隔。
否
DURATION
30s
心跳事件用于推动Source中的Binlog位点前进,这对MySQL中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。
scan.incremental.snapshot.chunk.key-column
可以指定某一列作为快照阶段切分分片的切分列。
见备注列。
STRING
无
- 无主键表必填,选择的列必须是非空类型(NOT NULL)。
- 有主键的表为选填,仅支持从主键中选择一列。
说明:仅Flink计算引擎VVR 6.0.7及以上版本支持。
rds.region-id
RDS实例所在的地域 ID。
使用读取OSS归档日志功能时必填。
STRING
无
仅Flink计算引擎VVR 6.0.7及以上版本支持。
地域ID请参见地域和可用区。
rds.access-key-id
阿里云账号Access Key ID。
使用读取OSS归档日志功能时必填。
STRING
无
仅Flink计算引擎VVR 6.0.7及以上版本支持。
rds.access-key-secret
阿里云账号Access Key Secret。
使用读取OSS归档日志功能时必填。
STRING
无
仅Flink计算引擎VVR 6.0.7及以上版本支持。
rds.db-instance-id
RDS实例ID。
使用读取OSS归档日志功能时必填。
STRING
无
仅Flink计算引擎VVR 6.0.7及以上版本支持。
scan.incremental.close-idle-reader.enabled
是否在快照结束后关闭空闲的 Reader。
否
BOOLEAN
false
- 仅Flink计算引擎VVR 8.0.1及以上版本支持。
- 该配置生效需要设置execution.checkpointing.checkpoints-after-tasks-finish.enabled为true。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。