Flink CDC 的概览和使用
1.什么是 CDC
CDC
(
Change Data Capture
,数据变更抓取)是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。
CDC 可以捕获数据库中的以下类型的数据变化:
- ✅ 插入(
Insert
):当新数据被插入到数据库表中时。 - ✅ 更新(
Update
):当数据库表中的现有数据被修改时。 - ✅ 删除(
Delete
):当数据从数据库表中被删除时。
2.什么是 Flink CDC
Flink CDC
是一个开源的数据库变更日志捕获和处理框架,它可以实时地从各种数据库(如 MySQL、PostgreSQL、Oracle、MongoDB 等)中捕获数据变更并将其转换为流式数据。Flink CDC 可以帮助实时应用程序实时地处理和分析这些流数据,从而实现 数据同步、数据管道、实时分析 和 实时应用 等功能。
本质上是一系列的 Flink Source Connector 集合,用于来获取数据库的实时变更,底层基于 Debezium 实现。
🚀 https://github.com/ververica/flink-cdc-connectors
3.Flink CDC 前生今世
3.1 Flink CDC 1.x
Flink CDC
1.x
开启了 Flink 在 CDC 上的实践之路,Flink CDC
1.x
第一次引入了 Debezium 框架,利用 Debezium 已有的能力将数据库实时变更接入到 Flink 流计算框架中,利用 Flink 丰富的生态对数据进行加工处理,满足不同的业务需求,在功能层面上而言,Flink CDC
1.x
只能说是可以用,但不能生产上用,为什么:
1.x
版本全增量切换时会对表加锁,在同步过程中有段时间业务会处于暂停状态。- 各方面功能还不够完善,比如自动加表、DDL 事件传递等。
总体而言 Flink CDC
1.x
只能说是一个比较有趣的小玩具,还不具备大规模商业盈利的价值。
3.2 Flink CDC 2.x
在
2.x
版本中,Flink CDC 引入了 Netfix DBLog 中的无锁算法,彻底解决了全增量切换上业务停滞的问题,同时得益于 FLIP-27 对 Flink Source API 的重构,Flink CDC 也基于 FLIP-27 升级到了新的框架设计,至此,Flink CDC 被大规模公司使用并投入到生产中。
3.3 Flink CDC 3.x
近期,Flink CDC 发布了全新的
3.0
版本,并宣布捐赠回 Flink 主项目,在新的
3.0
版本中,Flink CDC 对于接口和架构上做了很大的升级和调整,对于整体项目的定位也从之前的 Flink Source Connector 转变为了 Data Integration Engine,未来将与
SeaTunnel
、
DataX
、
Chunjun
等一系列老牌数据集成项目同台竞技,让我们拭目以待。
4.Flink CDC 使用
在本地启动一个 MySQL 的 Docker 环境。
docker run -it --rm--name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4
创建表:
createdatabase cdc_test;use cdc_test;createtable cdc_table (
id intprimarykeyauto_increment,
name varchar(1000),
age int);
在 IDEA 中新建一个Java 项目。
导入依赖:
<flink-cdc.version>2.4.2</flink-cdc.version><flink.version>1.16.3</flink.version><logback.version>1.2.7</logback.version><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>${logback.version}</version></dependency>
编写代码:
publicclassFlinkCDCApplication{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000L);MySqlSource<String> mySqlSource =MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("cdc_test")// set captured database, If you need to synchronize the whole database, Please set tableList to ".*"..tableList("cdc_test.cdc_table")// set captured table.username("root").password("debezium").includeSchemaChanges(true).startupOptions(StartupOptions.latest()).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();
env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"MySQL-CDC").print();
env.execute();}}
添加日志配置:
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
--><configuration><appendername="STDOUT"class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n</pattern></encoder></appender><rootlevel="INFO"><appender-refref="STDOUT"/></root></configuration>
5.Debezium 标准 CDC Event 格式详解
{"before":null,"after":{"id":1,"name":"xing.yu","age":26,"new_column":"dewu"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1702723640000,"snapshot":"false","db":"cdc_test","sequence":null,"table":"cdc_table","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2394,"row":0,"thread":39,"query":null},"op":"c","ts_ms":1702723640483,"transaction":null}{// 表数据更新前的值,update/delete"before":{},// 表数据更新后的值,create/update"after":{},// 元数据信息"source":{},// 操作类型 c/d/u"op":"",// 记录解析时间"ts_ms":"","transaction":""}
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。