本文基于Flink CDC v2.4.2版本和Flink 1.17.1版本。
欢迎来到Flink CDC
Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地描述他们的ETL管道逻辑,并帮助用户自动生成自定义Flink算子和提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化(schema evolution)、数据转换(data transformation)、全数据库同步(full database synchronization)和仅一次语义(exactly-once semantic)。
与Apache Flink深度集成并由其提供支持,Flink CDC提供:
✅端到端数据集成框架
✅基于数据集成API用户可轻松构建作业
✅源端/目标端中多表支持
✅整个数据库的同步
✅模式演化能力
核心概念
数据管道(Data Pipeline)
由于Flink CDC中的事件(events)以管道( pipeline)方式从上游流向下游,因此整个ETL任务被称为数据管道(**Data Pipeline**)。
数据源(Data Source)
数据源用于访问元数据(metadata)并从外部系统读取变更的数据(the changed data)。一个数据源可以同时从多个表中读取数据。
注意,这里的数据源并不是指的外部系统这个数据源,而是Flink中自身定义的数据源,Flink用这个数据源来从外部系统读取变更的数据。
数据接收器(Data Sink)
数据接收器用于应用模式更改(schema changes)并将更改数据写入外部系统。一个数据接收器可以同时写多个表。
表ID(Table ID)
在与外部系统连接时,需要与外部系统的存储对象建立映射关系。需要唯一确定存储对象,这就是Table ID所指。为了与大多数外部系统兼容,表ID由一个3元组表示:(namespace, schemaName, tableName)。连接器应该在表ID和外部系统中的存储对象之间建立映射。下表列出了不同数据系统表ID中的部分:
数据系统表ID组成例子Oracle/PostgreSQLdatabase, schema, tablemydb.default.ordersMySQL/Doris/StarRocksdatabase, tablemydb.ordersKafkatopicorders
转换(Transform)
Transform模块帮助用户根据表中的数据列来删除和扩展数据列。此外,它还可以帮助用户在同步过程中过滤一些不必要的数据。
路由(Route)
路由指定匹配源表列表和映射到目标表的规则。最典型的场景是合并子数据库和子表,将多个上游源表路由到同一个目标表。
连接器(connectors)
这里connector分了两个章节,需要说明connector、souce、sink的区别。source和sink都可以称为connector。或者connector包括source和sink,由于历史原因,先是source,sink,后面使用connector对source和sink做了统一。
管道连接器(pipeline connectors)
Flink CDC提供了几个源和接收器连接器来与外部系统进行交互。通过将发布的jar添加到Flink CDC环境中,并在YAML管道定义中指定连接器,您可以使用开箱即用的连接器。
支持连接器
连接器支持的连接器类型外部系统Apache DorisSink
Apache Doris: 1.2.x, 2.x.x
KafkaSinkKafka
MySQLSourceMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
PaimonSinkPaimon: 0.6, 0.7, 0.8
StarRocksSinkStarRocks: 2.x, 3.x
开发自己的连接器
如果提供的连接器不能满足您的要求,您可以开发自己的连接器,以使您的外部系统参与Flink CDC管道。查看Flink CDC api,了解如何开发自己的连接器。
Flink源
Flink CDC 源
Flink CDC源是Apache Flink的一组源连接器(source connectors),使用变更数据捕获(CDC)从不同的数据库摄取更改。一些CDC源集成了Debezium作为捕获数据变化的引擎。所以它可以充分利用Debezium的能力。了解更多关于什么是Debezium。
debezium
支持的连接器
连接器数据库驱动mongodb-cdc
MongoDB: 3.6, 4.x, 5.0, 6.0, 6.1
MongoDB Driver: 4.9.1mysql-cdcMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.28oceanbase-cdcOceanBase CE: 3.1.x, 4.x
OceanBase EE: 2.x, 3.x, 4.x
OceanBase Driver: 2.4.xoracle-cdcOracle: 11, 12, 19, 21
Oracle Driver: 19.3.0.0postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14
JDBC Driver: 42.5.1sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019
JDBC Driver: 9.4.1.jre8tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0
JDBC Driver: 8.0.27db2-cdcDb2: 11.5
Db2 Driver: 11.5.0.0vitess-cdcVitess: 8.0.x, 9.0.x
MySql JDBC Driver: 8.0.26支持的Flink版本
Flink CDC 版本Flink版本1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.1.4.01.13.2.0.1.13.2.1.1.13.2.2.1.13., 1.14.2.3.1.13., 1.14., 1.15., 1.16.2.4.1.13., 1.14., 1.15., 1.16., 1.17.3.0.1.14., 1.15., 1.16., 1.17., 1.18.
特征
1、支持读取数据库快照,即使发生故障,也能以仅一次处理方式继续读取binlogs。
2、数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
3、用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。
下表显示了连接器(connector)的当前特性:
连接器无锁读并行读仅一次读增量快照读mongodb-cdc✅✅✅✅mysql-cdc✅✅✅✅oracle-cdc✅✅✅✅postgres-cdc✅✅✅✅sqlserver-cdc✅✅✅✅oceanbase-cdc❌❌❌❌tidb-cdc✅❌✅❌db2-cdc✅✅✅✅vitess-cdc✅❌✅❌
MySQL同步到MySQL
DataStream方式实现
需要的依赖pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.leboop.www</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.17.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- flink客户端 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Table API for Java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink cdc for mysql -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18.0</version>
</dependency>
<!-- json解析 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version> <!-- 请使用最新的版本号 -->
</dependency>
</dependencies>
</project>
如果缺少依赖,可能报错如下:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitter
at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:17)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 1 more
添加如下依赖即可:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18.0</version>
</dependency>
如果报错如下:
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getPipelineExecutor(StreamExecutionEnvironment.java:2717)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2194)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)
at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:62)
添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
准备工作
本文仅仅为了演示,在windows本地安装了8.0.30版本的MySQL。如图:
准备两个数据库,分别作为本次案例的source和sink,如图:
建表语句分别如下:
CREATE TABLE `human` (
`id` bigint NOT NULL AUTO_INCREMENT,
`name` varchar(100) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `human_sink` (
`id` bigint NOT NULL AUTO_INCREMENT,
`name` varchar(100) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
MySQL CDC需要开启biglog日志,执行如下SQL查看biglog日志是否开启
SHOW VARIABLES LIKE 'log_bin';
如图:
Value为ON,表示开启。
代码
package com.leboop.cdc;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* Description TODO.
* Date 2024/7/28 15:48
*
* @author leb
* @version 2.0
*/
public class MysqlCDCDemo {
public static void main(String[] args) throws Exception {
// flink source,source类型为mysql
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(80)
.databaseList("cdc_demo")
.tableList("cdc_demo.human")
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverId("1")
.build();
// 初始化环境.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 1 parallel source tasks
.setParallelism(1);
// 将数据打印到客户端.
stringDataStreamSource
.print().setParallelism(1); // use parallelism 1 for sink
// 数据同步到mysql
stringDataStreamSource.addSink(new RichSinkFunction<String>() {
private Connection connection = null;
private PreparedStatement preparedStatement = null;
@Override
public void open(Configuration parameters) throws Exception {
if (connection == null) {
Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:mysql://localhost:80", "root", "root");//获取连接
connection.setAutoCommit(false);//关闭自动提交
}
}
@Override
public void invoke(String value, Context context) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
String op = jsonObject.getString("op");
if ("r".equals(op)) { // 首次全量
System.out.println("执行清表操作");
connection.prepareStatement("truncate table cdc_sink.human_sink").execute(); // 清空目标表数据
JSONObject after = jsonObject.getJSONObject("after");
Integer id = after.getInteger("id");
String name = after.getString("name");
Integer age = after.getInteger("age");
preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");
preparedStatement.setInt(1, id);
preparedStatement.setString(2, name);
preparedStatement.setInt(3, age);
preparedStatement.execute();
connection.commit();//预处理完成后统一提交
}else if("c".equals(op)) { // 新增.
JSONObject after = jsonObject.getJSONObject("after");
Integer id = after.getInteger("id");
String name = after.getString("name");
Integer age = after.getInteger("age");
preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");
preparedStatement.setInt(1, id);
preparedStatement.setString(2, name);
preparedStatement.setInt(3, age);
preparedStatement.execute();
connection.commit();//预处理完成后统一提交
}
else if ("d".equals(op)) { // 删除
JSONObject after = jsonObject.getJSONObject("before");
Integer id = after.getInteger("id");
preparedStatement = connection.prepareStatement("delete from cdc_sink.human_sink where id = ?");
preparedStatement.setInt(1, id);
preparedStatement.execute();
connection.commit();//预处理完成后统一提交
} else if ("u".equals(op)) { // 更新
JSONObject after = jsonObject.getJSONObject("after");
Integer id = after.getInteger("id");
String name = after.getString("name");
Integer age = after.getInteger("age");
preparedStatement = connection.prepareStatement("update cdc_sink.human_sink set name = ?, age = ? where id = ?");
preparedStatement.setString(1, name);
preparedStatement.setInt(2, age);
preparedStatement.setInt(3, id);
preparedStatement.execute();
connection.commit();//预处理完成后统一提交
} else {
System.out.println("不支持的操作op=" + op);
}
}
@Override
public void close() throws Exception {
System.out.println("执行close方法");
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
}
});
env.execute("Print MySQL Snapshot + Binlog");
}
}
(1)Flink源
如下代码连接了本地MySQL数据库cdc_demo。
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(80)
.databaseList("cdc_demo")
.tableList("cdc_demo.human")
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverId("1")
.build();
new JsonDebeziumDeserializationSchema()将读取的MySQL binlog数据反序列为JSON字符串数据,后面通过控制台输出可以看到。
(2)server id
每个用于读取binlog的MySQL数据库客户端都应该有一个唯一的id,称为服务器id。MySQL服务器将使用此id来维护网络连接和binlog位置。因此,如果不同的作业共享相同的服务器id,可能会导致从错误的binlog位置读取。因此,建议为每个阅读器设置不同的服务器id,例如,假设源并行度为4,那么我们可以使用'5401-5404',为4个源阅读器中的每一个分配唯一的服务器id。
(3)从MySQL源读取数据
DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 1 parallel source tasks
.setParallelism(1);
代码从MySQL源读取了数据,并设置读取并行度为1,如果这里并行度为4,则前面需要4个server id,例如"1-4"。
(3)将读取的MySQL数据打印到控制台
// 将数据打印到客户端.
stringDataStreamSource
.print().setParallelism(1); // use parallelism 1 for sink
这里仅仅为了查看Binglog日志读取后,转换成Json字符串是什么样的。下面展示了三条该字符串:
{"before":null,"after":{"id":7,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722218198388,"transaction":null}
{"before":{"id":6,"name":"zhangsan","age":12},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218564000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":75954,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722218564587,"transaction":null}
{"before":{"id":7,"name":"lisi","age":23},"after":{"id":7,"name":"lisi","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218597000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":76582,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722218597551,"transaction":null}
第一条Json数据格式化后如下:
{
"before": null,
"after": {
"id": 7,
"name": "lisi",
"age": 23
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 0,
"snapshot": "false",
"db": "cdc_demo",
"sequence": null,
"table": "human",
"server_id": 0,
"gtid": null,
"file": "",
"pos": 0,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1722218198388,
"transaction": null
}
其中before表示操作前的数据,after表示操作后的数据。op表示操作类型,分为:
"op": "d"
代表删除操作"op": "u"
代表更新操作"op": "c"
** **代表新增操作"op": "r"
代表全量读取,而不是来自 binlog 的增量读取
例如上面第一条为首次全量同步cdc_demo数据库human表Json格式的binglog数据,因此before为null,after为数据,op为r。类似地,第二条为更新数据;第三条数据为删除一条数据,其op值为d。
(4)sink
这里使用匿名内部类RichSinkFunction实现了MySQL sink。
测试
先向human表中插入2条数据,SQL如下:
insert into cdc_demo.human(id,name,age) values(1,"zhangsan",12);
insert into cdc_demo.human(id,name,age) values(2,"lisi",23);
然后启动程序,输出日志如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
{"before":null,"after":{"id":1,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401429,"transaction":null}
{"before":null,"after":{"id":2,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401430,"transaction":null}
七月 29, 2024 10:16:42 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to localhost:80 at LEBOOP-bin.000005/80097 (sid:1, cid:803)
执行清表操作
执行清表操作
查看human_sink表,可以看到human表中的两条数据已经被同步:
接着执行如下更新、删除、新增SQL:
update cdc_demo.human set age = 10 where id = 1;
delete from cdc_demo.human where id = 2;
insert into cdc_demo.human(id,name,age) values(3,"zhangsan",12);
输出日志如下:
{"before":{"id":1,"name":"zhangsan","age":12},"after":{"id":1,"name":"zhangsan","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81312,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722219563829,"transaction":null}
{"before":{"id":2,"name":"lisi","age":23},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81647,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722219563849,"transaction":null}
{"before":null,"after":{"id":3,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81954,"row":0,"thread":57,"query":null},"op":"c","ts_ms":1722219563872,"transaction":null}
如图:
最终看到两张表数据保持一致,如图:
SQL方式实现
需要的依赖pom.xml
在DataStream方式上,还需要添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.0.0-1.16</version>
</dependency>
如果报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:534)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:277)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:30)
添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
如果报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.human_sink'.
Table options are:
'connector'='jdbc'
'driver'='com.mysql.cj.jdbc.Driver'
'password'='******'
'table-name'='human_sink'
'url'='jdbc:mysql://localhost:80/cdc_sink'
'username'='root'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
at scala.collection.Iterator.foreach(Iterator.scala:929)
at scala.collection.Iterator.foreach$(Iterator.scala:929)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)
at scala.collection.IterableLike.foreach(IterableLike.scala:71)
at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:234)
at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:68)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)
at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
mysql-cdc
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)
... 21 more
请添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.0.0-1.16</version>
</dependency>
代码
package com.leboop.cdc;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
/**
* Description TODO.
* Date 2024/7/28 15:48
*
* @author leb
* @version 2.0
*/
public class MysqlCDCSqlDemo {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 3000L);
// source
TableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +
"id BIGINT ,\n" +
"name STRING ,\n" +
"age INT ,\n" +
"PRIMARY KEY (id) NOT ENFORCED \n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '80',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'database-name' = 'cdc_demo',\n" +
" 'table-name' = 'human') ");
// 输出source表
createSourceTable.print();
System.out.println("创建源表结束");
// sink
TableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +
"id BIGINT ," +
"name STRING ," +
"age INT ," +
"PRIMARY KEY(id) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'table-name' = 'human_sink' )");
createSinkTable.print();
System.out.println("创建sink表结束");
// 插入
tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");
System.out.println("插入sink表结束");
}
}
(1)创建源表
如下代码创建了Flink中的源表,为什么说是Flink中呢?原因是该代码将mysql中的human表映射为Flink中的flink_human表,后文代码中就可以使用flink_human表了,代码如下:
// source
TableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +
"id BIGINT ,\n" +
"name STRING ,\n" +
"age INT ,\n" +
"PRIMARY KEY (id) NOT ENFORCED \n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '80',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'database-name' = 'cdc_demo',\n" +
" 'table-name' = 'human') ");
注意这里connector必须是mysql-cdc。
(2)创建目标表
代码如下:
// sink
TableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +
"id BIGINT ," +
"name STRING ," +
"age INT ," +
"PRIMARY KEY(id) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'table-name' = 'human_sink' )");
这里connector的值必须是jdbc,即通过jdbc连接器实现。
(3)同步数据
通过如下SQL即可以实现数据同步:
tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");
测试
与DataStream测试过程相同。
值得注意的是:对MySQL的insert、update、delete操作可以完成同步,但对有些操作并不能完成同步,例如truncate操作。
版权归原作者 L(刘二宝) 所有, 如有侵权,请联系我们删除。