Flink CDC整合SpringBoot获取变更数据
1.Flink CDC介绍:
(1).概述
CDC是Chanage Data Capture(数据变更捕获)的简称。其核心原理就是监测并捕获数据库的变动(例如增删改),将这些变更按照发生顺序捕获,将捕获到的数据,通过一定的数据转换和清洗或者相关业务的整合,写入目标数据库。
Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地描述他们的ETL管道逻辑,并帮助用户自动生成自定义的Flink操作符并提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化,数据转换,完整的数据库同步和exactly-once语义, 与Apache Flink深度集成并由Apache Flink提供支持,Flink CDC提供:端到端数据集成框架, 为数据集成用户提供可轻松构建作业的API 和 Sink中对多表支持, 以及同步整个数据库, 模式演化的能力。
(2).Flink CDC Source
Flink CDC sources是Apache Flink 的一组源连接器,使用更改数据捕获(CDC)从不同数据库获取更改。一些CDC来源集成Debezium作为捕获数据更改的引擎。其中支持的连接器包括mysql-cdc**,** oceanbase-cdc**,** oracle-cdc**,** sqlserver-cdc等。
(3).版本支持
2.项目概述
(1).项目版本
JDK版本:JDK11
SpringBoot版本:2.6.6
SpringCloudAlibaba版本:2021.0.1.0
SpringCloud版本: 2021.0.1
FlinkCDC-Connector版本:3.0.1(部分兼容2.2.0版本)
(2).项目依赖
<dependencies><!--*******Flink cdc-connector驱动*********--><!--flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本--><dependency><groupId>io.debezium</groupId><artifactId>debezium-core</artifactId><version>1.9.7.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-oracle</artifactId><version>1.9.7.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-sqlserver</artifactId><version>1.9.7.Final</version></dependency><!--解决无法访问com.google.protobuf.GeneratedMessageV3 找不到com.google.protobuf.GeneratedMessageV--><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.5.1</version></dependency><!-- flink-sqlserver依赖 --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>3.0.1</version></dependency><!-- flink-oracle依赖 --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>3.0.1</version></dependency><!-- flink-oceanbase依赖 --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-oceanbase-cdc</artifactId><version>3.0.1</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version><exclusions><!--Exclude the included Guava as it conflicts withtheFlink shaded guava --><exclusion><groupId>com.google.guava</groupId><artifactId>guava</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><!-- 高版本兼容低版本,需要排除依赖 flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>2.2.0</version><exclusions><exclusion><groupId>io.debezium</groupId><artifactId>debezium-core</artifactId></exclusion><exclusion><groupId>io.debezium</groupId><artifactId>debezium-connector-sqlserver</artifactId></exclusion></exclusions></dependency><!-- 高版本兼容低版本,需要排除依赖 需要排除依赖 flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>2.2.0</version><exclusions><exclusion><groupId>io.debezium</groupId><artifactId>debezium-core</artifactId></exclusion><exclusion><groupId>io.debezium</groupId><artifactId>debezium-connector-oracle</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>18.0-12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.13.2</version></dependency><!--********************************数据库连接驱动********************************--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.oceanbase</groupId><artifactId>oceanbase-client</artifactId><version>2.4.3</version></dependency><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>sqljdbc4</artifactId><version>4.0</version></dependency><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc11</artifactId><version>23.3.0.23.09</version></dependency><!--********************************Spring相关********************************--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.3.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.16</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency></dependencies>
3.Flink CDC环境准备
(1).OceanBase数据库
数据库版本:
5.7.25-OceanBase_CE-v4.3.1.0
代理版本:
oblogproxy-2.0.2-100000012024060321.el7.x86_64
步骤:
(1).部署安装数据库
按照以上版本安装OceanBase数据库,具体操作请参考官网网址:https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000818649
(2).安装代理服务oblogproxy
oblogproxy 是 OceanBase 数据库的增量日志代理服务。oblogproxy 支持实时增量链路接入和管理,方便应用接入 OceanBase 数据库的增量日志。同时支持在网络隔离时订阅增量日志。
安装oblogproxy-2.0.2代理,其中代理版本和OceanBase数据库的版本是有对应的,目前OceanBase4.3.1.0版本是最新的版本,对应oblogproxy代理版本是2.0.2,对应Flink CDC 版本是2.2.0及以上。
下载完成后, 下面的操作均在 oblogproxy 项目目录中进行,oblogproxy 项目目录默认为 /usr/local/oblogproxy。oblogproxy 的配置文件默认放在 conf/conf.json。
需修改 conf/conf.json 的以下配置:(oblogproxy 需要配置用户的用户名和密码,用户必须是 OceanBase 的 sys 租户的用户才能连接。)
其中加密的用户名和密码通过以下命令获取:
./bin/logproxy -x username
./bin/logproxy -x password
然后将结果分别保存到 conf/conf.json 文件的 ob_sys_username 和 ob_sys_password 配置项
其中conf/conf.json中binlog mode选项默认false,不要设置为true,使用CDC模式下不要开启binlog模式,这样的话会启动报错。
(3).运行代理服务oblogproxy
先进入 oblogproxy 项目目录默认为 /usr/local/oblogproxy
cd /usr/local/oblogproxy
再通过以下命令启动服务。
./run.sh start
可以通过日志目录查看启动状态:
cd /usr/local/oblogproxy/log
tail –f logproxy.log
(2).SQLServer数据库
数据库版本:
Microsoft SQL Server 2017 (RTM) - 14.0.1000.169 (X64)
Aug 22 2017 17:04:49
Copyright © 2017 Microsoft Corporation
Enterprise Edition (64-bit) on Windows 10 Enterprise 10.0 <X64> (Build 22621: ) (Hypervisor)
代理版本:
开启上述安装版本数据库下的SQLServerAgent代理服务,此服务是获取数据库变更信息的重要服务,不开启的话无法获取增量数据变动。
步骤:
(1).部署安装数据库
按照以上版本安装SQLServer数据库, FlinkCDC Connector最低要求SQLServer2017版本及以上,同时建议安装企业版,因为其他版本可能会有CPU内核的限制,这样不利于在线上大用户量并发的情况下获取数据变动,如果仅仅用于测试可以安装开发版本。
此处安装一定要选择混合模式,并且设置sa用户的连接密码,企业级开发以及FlinkCDC连接过程中也是需要提供数据库的用户和密码的,并且添加当前用户为SQLServer管理员。
(2).开启代理服务SQLServer Agent
(3).开启CDC模式
对于要获取数据变更的数据库和表都要开启CDC模式
数据库开启CDC:
EXEC sys.sp_cdc_enable_db;
判断当前数据库(datagather)是否启动CDC成功:
SELECT is_cdc_enabled FROM sys.databases WHERE name = ‘datagather’;
开启当前数据库下表的CDC模式
EXEC sys.sp_cdc_enable_table
@source_schema = ‘dbo’,
@source_name = ‘a_a_test’,
@role_name = ‘cdc_role’;
– schema_name 是表所属的架构(schema)的名称。
– table_name 是要启用 CDC 跟踪的表的名称。
– cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。
启用 CDC 后,SQL Server 将自动跟踪启用了 CDC 的表上的数据更改,并将更改信息存储在 CDC 相关的表中,您可以使用这些信息进行数据更改追踪和同步。
– 查询在当前数据库下所有的表:
SELECT * FROM INFORMATION_SCHEMA.TABLES
如果有多个数据库和多个表,按照以上操作逐一进行开启工作。
4.Flink CDC Connector整合SpringBoot
(1).目录结构
(2).版本说明
其中version1和version2分别对应代码的版本,version1使用了Flink CDC Connector2.2.0版本,变更数据是在序列化器中获取的,version2使用了FlinkCDC Connector3.0.1(目前最新版本),变更数据是在自定义Sink中获取的,RichSinkFunction 提供了更多的功能和灵活性。它允许用户访问 Flink 运行时的上下文信息,如状态管理、计时器和广播变量等。此外,RichSinkFunction 还可以处理异步 I/O 操作,提高数据输出的效率。
(3).项目设计思路
version1
该版本是原始版本,使用的Connector的版本均为2.2.0,针对不同类型的数据库,配置不同的启动监听器,在项目启动时候,通过读取配置文件flink.properties来获取不同数据库类型的CDC连接器的配置信息。
启动完成后,在不同类型的序列化器中获取到变更数据,封装成ConcurrentHashMap,根据变更的数据以及信息,按照类型创建异步任务拼接出insert, delete, update,操作类型的SQL语句,对目标库进行增删改的操作,完成数据的变更和高效操作。
version2
该版本是完善的版本,使用的Connector的版本均为3.0.1,针对不同类型的数据库,配置不同的启动监听器,在项目启动时候,通过读取配置文件flink.properties来获取不同数据库类型的CDC连接器的配置信息,自定义Sink并且封装统一的变更实体来接收数据而不是map,来获取CDC过程获取到的变更信息,而不是直接在序列化器中,这样更加高效而且使得处理变更数据和Flink CDC Connector的序列化器。
由于不同数据库支持的SQL语法不同,结合高内聚低耦合的设计思想以及后期维护和可扩展性,具体变更业务通过策略模式和工厂模式结合,实现对不同类型的数据库组装新增, 删除,修改的SQL语句,通过反射调用,大大提高了应用程序的扩展性和可维护性。
不同数据库类型的实现代码如下所示:
(4).OceanBase数据库
现在验证OceanBase数据库的数据变更获取,把变更结果展现在目标数据库SQLServer中,在application.just配置文件中把flink.version1.oceanbase改为true。
由条件注解来控制@ConditionalOnExpression(“#{@environment.getProperty(‘flink.version1.oceanbase’) == ‘true’ && @environment.getProperty(‘flink.version2.oceanbase’) == ‘false’}”)
该条件注解的含义是flink.version1.oceanbase和flink.version2.oceanbase只有一个为true,另一个为false的时候才可以启动,这样的话每个类型数据库的CDC程序运行时只能同时存在一个。
可以看到version1中的oceanbase类型的CDC连接器启动成功,现在往表中插入一条数据,我们看oceanbase cdc 的连接器连接数据库的信息
新增操作
我们在该库下的a_a_test表中添加一条数据:
在oceanbase的序列化器中可以看到添加的变更数据信息
在我们的Runable接口中,可以看到针对目标数据库SQLServer,已经处理好了插入语句,语法是SQLServer的语法,如下图所示:
修改操作
删除操作
version2和version1的效果一致,此处不在演示.
(5).SQLServer数据库
现在验证SQLServer数据库的数据变更获取,把变更结果展现在目标数据库SQLServer中,在application.just配置文件中把flink.version2.sqlserver改为true
由条件注解来控制@ConditionalOnExpression(“#{@environment.getProperty(‘flink.version2.sqlserver’) == ‘true’ && @environment.getProperty(‘flink.version1.sqlserver’) == ‘false’}”)该条件注解的含义是flink.version1.sqlserver和flink.version2.sqlserver只有一个为true,另一个为false的时候才可以启动,这样的话每个类型数据库的CDC程序运行时只能同时存在一个
可以看到version2中的sqlserver类型的CDC连接器启动成功
SQLServer的验证和OceanBase一样,大家可以自行验证哦
以上是使用FlinkCDC整合SpringBoot获取变更数据的分享,希望可以给大家提供思路和帮助
版权归原作者 雨落纠纷 所有, 如有侵权,请联系我们删除。