前言
FlinkCDC 是一款基于 Change Data Capture(CDC)技术的数据同步工具,可以用于将关系型数据库中的数据实时同步到 Flink 流处理中进行实时计算和分析,下图来自官网的介绍。
下图是 FlinkCDC 与其它常见 开源 CDC 方案的对比:
可以看见的是相比于其它开源产品,FlinkCDC 不仅支持增量同步,还支持全量/全量+增量的同步,同时 FlinkCDC 还支持故障恢复(基于检查点机制实现),能够快速恢复数据同步的进度,并且支持的数据源也很丰富(在 2.3 版本已支持 MongoDB、MySQL、OceanBase、Oracle、PostgressSQL、SQLServer、TiDB、Db2 等数据源)。
本文将介绍 FlinkCDC 在数据同步和故障恢复等方面的内容(以 MySQL 和 Oracle 为例),同时完整代码也已上传到GitHub。
效果展示
MySQL
Oracle(相比 MySQL 延迟会稍高)
数据库配置
MySQL(5.7)
修改
my.cnf
配置文件(Windows 下是 my.ini 文件),增加以下配置内容:
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 对于 MySQL 集群, 不同节点的 server_id 必须不同
server_id=1
# 过期时间
expire_logs_days=30
Tips: 修改完成后需要重启 MySQL 服务
建库建表:
# 建库
create database flink;
# 建表
create table flink.`user` (
`id` bigint(20) not null,
`username` varchar(20) default null,
`password` varchar(63) default null,
`status` int(2) default null,
`create_time` datetime default null,
primary key (`id`)
) ENGINE = InnoDB default CHARSET = utf8mb4;
创建用户并授权:
# 创建用户 flink
CREATE USER flink IDENTIFIED BY 'flink';
# 授权
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'@'%';
# 将 flink 库的所有权限授权给 flink 用户
GRANT ALL PRIVILEGES ON flink.* TO 'flink'@'%';
# 刷新权限
FLUSH PRIVILEGES;
Oracle(11g)
以 DBA 身份连接:
# SID 需要根据实际情况进行设置, 比如: XE.exportORACLE_SID=SID
sqlplus /nolog
CONNECT sys/manager AS SYSDBA
配置日志:
alter system set db_recovery_file_dest_size = 20G;# 日志文件的地址可以根据自己的情况进行设置
alter system set db_recovery_file_dest ='/opt/oracle/oradata/recovery_area'scope=spfile;shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
确认是否配置成功:
archive log list;
创建用户并授权:
CREATEUSER flink IDENTIFIED BY flink;GRANTCREATESESSIONTO flink;GRANT FLASHBACK ANYTABLETO flink;GRANTSELECTANYTABLETO flink;GRANT SELECT_CATALOG_ROLE TO flink;GRANT EXECUTE_CATALOG_ROLE TO flink;GRANTSELECTANYTRANSACTIONTO flink;GRANTCREATETABLETO flink;
建表并增加日志记录:
# 建表CREATETABLE flink."user"(
id NUMBER NOTNULL,
username VARCHAR2(20),
password VARCHAR2(63),statusINTEGER,
create_time TIMESTAMP,PRIMARYKEY(id));# 日志配置ALTERTABLE flink."user"ADD SUPPLEMENTAL LOG DATA(ALL)COLUMNS;
代码配置
运行环境
依赖版本Java17flink-connector2.1.0flink1.13.0maven3.6.2
连接配置
flinkcdc:data-source:# 默认类型为 MySQLaddr: localhost:3306database: flink
username: flink
password: flink
table-list:- user
Tips: 关于数据源的连接完整配置属性可参考 DataSourceProperties.java 文件,关于检查点的配置可参考 CheckPointProperties.java 文件
恢复点配置
为了实现故障恢复(应用停止运行过程中数据库有增删改操作的情况)的情况,需要在代码中进行恢复点的相关配置:
// 获取配置的恢复点路径, 首次运行不存在会默认进行创建var saveDir = checkPointProperties.getSaveDir();var folder =newFile(saveDir);if(!folder.exists()&&!folder.isDirectory()){if(!folder.mkdirs()){thrownewIllegalStateException("文件夹创建失败");}}var dataSourceType = dataSourceProperties.getType().name().toLowerCase();var dataSourceSaveDir = saveDir +File.separator + dataSourceType;var savepointDir =SavepointUtils.getSavepointRestore(dataSourceSaveDir);var configuration =newConfiguration();if(savepointDir !=null){// 设置恢复点路径var savepointRestoreSettings =SavepointRestoreSettings.forPath(savepointDir);SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration);}// 启用检查点并设置检查点的保存路径var env =StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(checkPointProperties.getInterval(),CheckpointingMode.EXACTLY_ONCE);var checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(checkPointProperties.getStorageType().getPrefix()+ dataSourceSaveDir);
通用注意点
为了避免数值类型显示是一堆字符串,需要增加以下配置:
// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#%E9%80%9A%E7%94%A8-faq Q5
prop.setProperty("bigint.unsigned.handling.mode","long");
prop.setProperty("decimal.handling.mode","double");
ORACLE 配置注意点
为了避免日志增长过快以及读取日志满的问题,需要增加以下配置:
// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#oracle-cdc-faq Q1
prop.setProperty("log.mining.strategy","online_catalog");
prop.setProperty("log.mining.continuous.mine","true");
对于 Oracle 11g,连接配置中需要增加:
// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#oracle-cdc-faq Q2
prop.setProperty("database.tablename.case.insensitive","false");
项目运行及使用介绍
下载代码
由于本人将博客相关的示例代码都集中到了一个仓库,因此如果不想拉取整个仓库,推荐使用
GitZip for github
这个插件,就可以只下载部分的文件(选中指定文件后点击右下角的下载按钮):
使用介绍
对于需要监控的表,只需要创建相应的实体类,并新建一个类继承
AbstractMessageListener
(可重写其中的 create、delete、update、read等方法处理相应的事件)即可,其中 FlickCdcMessageListener 注解内的参数填相应的表名即可监听相应的表变更事件(同时需要在 yaml 文件中 tableList 中增加要监听的表,如果是 Oracle 数据库还需要增加日志配置):
importcn.butterfly.flinkcdc.annotation.FlickCdcMessageListener;importcn.butterfly.flinkcdc.pojo.User;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;/**
* 用户表消息监听器
*
* @author zjw
* @date 2023-03-14
*/@Slf4j@Component@FlickCdcMessageListener("user")publicclassUserMessageListenerextendsAbstractMessageListener<User>{@Overridepublicvoidcreate(User user){
log.info("新增用户: {}", user);}}
其它注意点
- FlinkCDC 默认的同步策略是第一次运行先进行全量同步,后续即可进行增量读取,因此表数据量比较大的时候,重写 AbstractMessageListener#read 方法时需要特别注意处理大量数据的情况。
- 由于 Flink CDC 是根据数据库的事务日志来获取数据更改的,如果恢复点之后发生了数据更改,那么在恢复点之后的数据将被重复读取,因此需要考虑重复读取的情况。
总结
本文简单介绍了 FlinkCDC 的数据同步和故障恢复方面的内容,对相关基础知识进行了省略(例如检查点),如果是第一次接触和使用 FlinkCDC,建议先结合官网的示例进行学习,同时建议先通读一篇官方的FAQ。
参考文献
- 基于 Flink CDC 实现海量数据的实时同步和转换
- https://github.com/ververica/flink-cdc-connectors#supported-tested-databases
版权归原作者 庄周de蝴蝶 所有, 如有侵权,请联系我们删除。