0


Flink CDC基本概念以及MySQL同步到MySQL

    本文基于Flink CDC v2.4.2版本和Flink 1.17.1版本。

欢迎来到Flink CDC

    Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地描述他们的ETL管道逻辑,并帮助用户自动生成自定义Flink算子和提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化(schema evolution)、数据转换(data transformation)、全数据库同步(full database synchronization)和仅一次语义(exactly-once semantic)。

    与Apache Flink深度集成并由其提供支持,Flink CDC提供:

✅端到端数据集成框架
✅基于数据集成API用户可轻松构建作业
✅源端/目标端中多表支持
✅整个数据库的同步
✅模式演化能力


核心概念

数据管道(Data Pipeline

    由于Flink CDC中的事件(events)以管道( pipeline)方式从上游流向下游,因此整个ETL任务被称为数据管道(**Data Pipeline**)。

数据源(Data Source

    数据源用于访问元数据(metadata)并从外部系统读取变更的数据(the changed data)。一个数据源可以同时从多个表中读取数据。

    注意,这里的数据源并不是指的外部系统这个数据源,而是Flink中自身定义的数据源,Flink用这个数据源来从外部系统读取变更的数据。

数据接收器(Data Sink

    数据接收器用于应用模式更改(schema changes)并将更改数据写入外部系统。一个数据接收器可以同时写多个表。

表ID(Table ID)

    在与外部系统连接时,需要与外部系统的存储对象建立映射关系。需要唯一确定存储对象,这就是Table ID所指。为了与大多数外部系统兼容,表ID由一个3元组表示:(namespace, schemaName, tableName)。连接器应该在表ID和外部系统中的存储对象之间建立映射。下表列出了不同数据系统表ID中的部分:

数据系统表ID组成例子Oracle/PostgreSQLdatabase, schema, tablemydb.default.ordersMySQL/Doris/StarRocksdatabase, tablemydb.ordersKafkatopicorders

转换(Transform

    Transform模块帮助用户根据表中的数据列来删除和扩展数据列。此外,它还可以帮助用户在同步过程中过滤一些不必要的数据。

路由(Route

    路由指定匹配源表列表和映射到目标表的规则。最典型的场景是合并子数据库和子表,将多个上游源表路由到同一个目标表。

连接器(connectors)

    这里connector分了两个章节,需要说明connector、souce、sink的区别。source和sink都可以称为connector。或者connector包括source和sink,由于历史原因,先是source,sink,后面使用connector对source和sink做了统一。

管道连接器(pipeline connectors)

    Flink CDC提供了几个源和接收器连接器来与外部系统进行交互。通过将发布的jar添加到Flink CDC环境中,并在YAML管道定义中指定连接器,您可以使用开箱即用的连接器。

支持连接器

连接器支持的连接器类型外部系统Apache DorisSink

  • Apache Doris: 1.2.x, 2.x.x
    KafkaSink

  • Kafka
    MySQLSource

  • MySQL: 5.6, 5.7, 8.0.x

  • RDS MySQL: 5.6, 5.7, 8.0.x

  • PolarDB MySQL: 5.6, 5.7, 8.0.x

  • Aurora MySQL: 5.6, 5.7, 8.0.x

  • MariaDB: 10.x

  • PolarDB X: 2.0.1
    PaimonSink

  • Paimon: 0.6, 0.7, 0.8
    StarRocksSink

  • StarRocks: 2.x, 3.x

开发自己的连接器

    如果提供的连接器不能满足您的要求,您可以开发自己的连接器,以使您的外部系统参与Flink CDC管道。查看Flink CDC api,了解如何开发自己的连接器。

Flink源

Flink CDC 源

    Flink CDC源是Apache Flink的一组源连接器(source connectors),使用变更数据捕获(CDC)从不同的数据库摄取更改。一些CDC源集成了Debezium作为捕获数据变化的引擎。所以它可以充分利用Debezium的能力。了解更多关于什么是Debezium。

debezium

支持的连接器

连接器数据库驱动mongodb-cdc

  • MongoDB: 3.6, 4.x, 5.0, 6.0, 6.1
    MongoDB Driver: 4.9.1mysql-cdc

  • MySQL: 5.6, 5.7, 8.0.x

  • RDS MySQL: 5.6, 5.7, 8.0.x

  • PolarDB MySQL: 5.6, 5.7, 8.0.x

  • Aurora MySQL: 5.6, 5.7, 8.0.x

  • MariaDB: 10.x

  • PolarDB X: 2.0.1
    JDBC Driver: 8.0.28oceanbase-cdc

  • OceanBase CE: 3.1.x, 4.x

  • OceanBase EE: 2.x, 3.x, 4.x
    OceanBase Driver: 2.4.xoracle-cdc

  • Oracle: 11, 12, 19, 21
    Oracle Driver: 19.3.0.0postgres-cdc

  • PostgreSQL: 9.6, 10, 11, 12, 13, 14
    JDBC Driver: 42.5.1sqlserver-cdc

  • Sqlserver: 2012, 2014, 2016, 2017, 2019
    JDBC Driver: 9.4.1.jre8tidb-cdc

  • TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0
    JDBC Driver: 8.0.27db2-cdc

  • Db2: 11.5
    Db2 Driver: 11.5.0.0vitess-cdc

  • Vitess: 8.0.x, 9.0.x
    MySql JDBC Driver: 8.0.26

    支持的Flink版本

    Flink CDC 版本Flink版本1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.1.4.01.13.2.0.1.13.2.1.1.13.2.2.1.13., 1.14.2.3.1.13., 1.14., 1.15., 1.16.2.4.1.13., 1.14., 1.15., 1.16., 1.17.3.0.1.14., 1.15., 1.16., 1.17., 1.18.

    特征

1、支持读取数据库快照,即使发生故障,也能以仅一次处理方式继续读取binlogs。
2、数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
3、用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。

    下表显示了连接器(connector)的当前特性:

连接器无锁读并行读仅一次读增量快照读mongodb-cdc✅✅✅✅mysql-cdc✅✅✅✅oracle-cdc✅✅✅✅postgres-cdc✅✅✅✅sqlserver-cdc✅✅✅✅oceanbase-cdc❌❌❌❌tidb-cdc✅❌✅❌db2-cdc✅✅✅✅vitess-cdc✅❌✅❌

MySQL同步到MySQL

DataStream方式实现

需要的依赖pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.leboop.www</groupId>
    <artifactId>flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.17.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>

        <!-- flink客户端 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--  Table API for Java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink cdc for mysql -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.18.0</version>
        </dependency>

        <!-- json解析 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version> <!-- 请使用最新的版本号 -->
        </dependency>

    </dependencies>
</project>

如果缺少依赖,可能报错如下:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitter
    at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:17)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 1 more

添加如下依赖即可:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.0</version>
</dependency>

如果报错如下:

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
    at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getPipelineExecutor(StreamExecutionEnvironment.java:2717)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2194)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)
    at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:62)

添加如下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>

准备工作

    本文仅仅为了演示,在windows本地安装了8.0.30版本的MySQL。如图:

准备两个数据库,分别作为本次案例的source和sink,如图:

建表语句分别如下:


CREATE TABLE `human` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `name` varchar(100) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `human_sink` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `name` varchar(100) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

MySQL CDC需要开启biglog日志,执行如下SQL查看biglog日志是否开启

SHOW VARIABLES LIKE 'log_bin';

如图:

Value为ON,表示开启。

代码

package com.leboop.cdc;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * Description TODO.
 * Date 2024/7/28 15:48
 *
 * @author leb
 * @version 2.0
 */
public class MysqlCDCDemo {
    public static void main(String[] args) throws Exception {
        // flink source,source类型为mysql
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(80)
                .databaseList("cdc_demo")
                .tableList("cdc_demo.human")
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverId("1")
                .build();

        // 初始化环境.
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // enable checkpoint
        env.enableCheckpointing(3000);

        DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 1 parallel source tasks
                .setParallelism(1);

        // 将数据打印到客户端.
        stringDataStreamSource
                .print().setParallelism(1); // use parallelism 1 for sink

        // 数据同步到mysql
        stringDataStreamSource.addSink(new RichSinkFunction<String>() {
            private Connection connection = null;
            private PreparedStatement preparedStatement = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                if (connection == null) {
                    Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
                    connection = DriverManager.getConnection("jdbc:mysql://localhost:80", "root", "root");//获取连接
                    connection.setAutoCommit(false);//关闭自动提交
                }
            }

            @Override
            public void invoke(String value, Context context) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                String op = jsonObject.getString("op");
                if ("r".equals(op)) { // 首次全量
                    System.out.println("执行清表操作");
                    connection.prepareStatement("truncate table cdc_sink.human_sink").execute(); // 清空目标表数据
                    JSONObject after = jsonObject.getJSONObject("after");
                    Integer id = after.getInteger("id");
                    String name = after.getString("name");
                    Integer age = after.getInteger("age");
                    preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");
                    preparedStatement.setInt(1, id);
                    preparedStatement.setString(2, name);
                    preparedStatement.setInt(3, age);
                    preparedStatement.execute();
                    connection.commit();//预处理完成后统一提交
                }else if("c".equals(op)) { // 新增.
                    JSONObject after = jsonObject.getJSONObject("after");
                    Integer id = after.getInteger("id");
                    String name = after.getString("name");
                    Integer age = after.getInteger("age");
                    preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");
                    preparedStatement.setInt(1, id);
                    preparedStatement.setString(2, name);
                    preparedStatement.setInt(3, age);
                    preparedStatement.execute();
                    connection.commit();//预处理完成后统一提交
                }
                else if ("d".equals(op)) { // 删除
                    JSONObject after = jsonObject.getJSONObject("before");
                    Integer id = after.getInteger("id");
                    preparedStatement = connection.prepareStatement("delete from cdc_sink.human_sink where id = ?");
                    preparedStatement.setInt(1, id);
                    preparedStatement.execute();
                    connection.commit();//预处理完成后统一提交
                } else if ("u".equals(op)) { // 更新
                    JSONObject after = jsonObject.getJSONObject("after");
                    Integer id = after.getInteger("id");
                    String name = after.getString("name");
                    Integer age = after.getInteger("age");
                    preparedStatement = connection.prepareStatement("update cdc_sink.human_sink set name = ?, age = ? where id = ?");
                    preparedStatement.setString(1, name);
                    preparedStatement.setInt(2, age);
                    preparedStatement.setInt(3, id);
                    preparedStatement.execute();
                    connection.commit();//预处理完成后统一提交
                } else {
                    System.out.println("不支持的操作op=" + op);
                }
            }

            @Override
            public void close() throws Exception {
                System.out.println("执行close方法");
                if (preparedStatement != null) {
                    preparedStatement.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        });

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

(1)Flink源

如下代码连接了本地MySQL数据库cdc_demo。

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(80)
                .databaseList("cdc_demo")
                .tableList("cdc_demo.human")
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverId("1")
                .build();

new JsonDebeziumDeserializationSchema()将读取的MySQL binlog数据反序列为JSON字符串数据,后面通过控制台输出可以看到。

(2)server id

    每个用于读取binlog的MySQL数据库客户端都应该有一个唯一的id,称为服务器id。MySQL服务器将使用此id来维护网络连接和binlog位置。因此,如果不同的作业共享相同的服务器id,可能会导致从错误的binlog位置读取。因此,建议为每个阅读器设置不同的服务器id,例如,假设源并行度为4,那么我们可以使用'5401-5404',为4个源阅读器中的每一个分配唯一的服务器id。

(3)从MySQL源读取数据

        DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 1 parallel source tasks
                .setParallelism(1);

代码从MySQL源读取了数据,并设置读取并行度为1,如果这里并行度为4,则前面需要4个server id,例如"1-4"。

(3)将读取的MySQL数据打印到控制台

        // 将数据打印到客户端.
        stringDataStreamSource
                .print().setParallelism(1); // use parallelism 1 for sink

这里仅仅为了查看Binglog日志读取后,转换成Json字符串是什么样的。下面展示了三条该字符串:

{"before":null,"after":{"id":7,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722218198388,"transaction":null}
{"before":{"id":6,"name":"zhangsan","age":12},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218564000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":75954,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722218564587,"transaction":null}
{"before":{"id":7,"name":"lisi","age":23},"after":{"id":7,"name":"lisi","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218597000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":76582,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722218597551,"transaction":null}

第一条Json数据格式化后如下:

{
  "before": null,
  "after": {
    "id": 7,
    "name": "lisi",
    "age": 23
  },
  "source": {
    "version": "1.9.7.Final",
    "connector": "mysql",
    "name": "mysql_binlog_source",
    "ts_ms": 0,
    "snapshot": "false",
    "db": "cdc_demo",
    "sequence": null,
    "table": "human",
    "server_id": 0,
    "gtid": null,
    "file": "",
    "pos": 0,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "r",
  "ts_ms": 1722218198388,
  "transaction": null
}

其中before表示操作前的数据,after表示操作后的数据。op表示操作类型,分为:

  • "op": "d" 代表删除操作
  • "op": "u" 代表更新操作
  • "op": "c"** **代表新增操作
  • "op": "r" 代表全量读取,而不是来自 binlog 的增量读取

例如上面第一条为首次全量同步cdc_demo数据库human表Json格式的binglog数据,因此before为null,after为数据,op为r。类似地,第二条为更新数据;第三条数据为删除一条数据,其op值为d。

(4)sink

这里使用匿名内部类RichSinkFunction实现了MySQL sink。

测试

    先向human表中插入2条数据,SQL如下:
insert into cdc_demo.human(id,name,age) values(1,"zhangsan",12);
insert into cdc_demo.human(id,name,age) values(2,"lisi",23);

然后启动程序,输出日志如下:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
{"before":null,"after":{"id":1,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401429,"transaction":null}
{"before":null,"after":{"id":2,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401430,"transaction":null}
七月 29, 2024 10:16:42 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to localhost:80 at LEBOOP-bin.000005/80097 (sid:1, cid:803)
执行清表操作
执行清表操作

查看human_sink表,可以看到human表中的两条数据已经被同步:

接着执行如下更新、删除、新增SQL:

update cdc_demo.human set age = 10 where id = 1;
delete from cdc_demo.human where id = 2;
insert into cdc_demo.human(id,name,age) values(3,"zhangsan",12);

输出日志如下:

{"before":{"id":1,"name":"zhangsan","age":12},"after":{"id":1,"name":"zhangsan","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81312,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722219563829,"transaction":null}
{"before":{"id":2,"name":"lisi","age":23},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81647,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722219563849,"transaction":null}
{"before":null,"after":{"id":3,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81954,"row":0,"thread":57,"query":null},"op":"c","ts_ms":1722219563872,"transaction":null}

如图:

最终看到两张表数据保持一致,如图:

SQL方式实现

需要的依赖pom.xml

    在DataStream方式上,还需要添加如下依赖:
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.0.0-1.16</version>
        </dependency>

如果报错如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:534)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:277)
    at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
    at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:30)

添加如下依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

如果报错如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.human_sink'.

Table options are:

'connector'='jdbc'
'driver'='com.mysql.cj.jdbc.Driver'
'password'='******'
'table-name'='human_sink'
'url'='jdbc:mysql://localhost:80/cdc_sink'
'username'='root'
    at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)
    at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)
    at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)
    at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
    at scala.collection.Iterator.foreach(Iterator.scala:929)
    at scala.collection.Iterator.foreach$(Iterator.scala:929)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)
    at scala.collection.IterableLike.foreach(IterableLike.scala:71)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:234)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
    at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:68)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)
    at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
    at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
    ... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
mysql-cdc
print
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)
    ... 21 more

请添加如下依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.0.0-1.16</version>
        </dependency>

代码

package com.leboop.cdc;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

/**
 * Description TODO.
 * Date 2024/7/28 15:48
 *
 * @author leb
 * @version 2.0
 */
public class MysqlCDCSqlDemo {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 3000L);

        // source
        TableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +
                "id BIGINT ,\n" +
                "name STRING ,\n" +
                "age INT ,\n" +
                "PRIMARY KEY (id) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'localhost',\n" +
                " 'port' = '80',\n" +
                " 'username' = 'root',\n" +
                " 'password' = 'root',\n" +
                " 'database-name' = 'cdc_demo',\n" +
                " 'table-name' = 'human') ");

        // 输出source表
        createSourceTable.print();
        System.out.println("创建源表结束");

        // sink
        TableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +
                "id BIGINT ," +
                "name STRING ," +
                "age INT ," +
                "PRIMARY KEY(id) NOT ENFORCED " +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +
                " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
                " 'username' = 'root'," +
                " 'password' = 'root'," +
                " 'table-name' = 'human_sink' )");
        createSinkTable.print();
        System.out.println("创建sink表结束");

        // 插入
        tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");
        System.out.println("插入sink表结束");

    }
}

(1)创建源表

    如下代码创建了Flink中的源表,为什么说是Flink中呢?原因是该代码将mysql中的human表映射为Flink中的flink_human表,后文代码中就可以使用flink_human表了,代码如下:
        // source
        TableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +
                "id BIGINT ,\n" +
                "name STRING ,\n" +
                "age INT ,\n" +
                "PRIMARY KEY (id) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'localhost',\n" +
                " 'port' = '80',\n" +
                " 'username' = 'root',\n" +
                " 'password' = 'root',\n" +
                " 'database-name' = 'cdc_demo',\n" +
                " 'table-name' = 'human') ");

注意这里connector必须是mysql-cdc。

(2)创建目标表

代码如下:

        // sink
        TableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +
                "id BIGINT ," +
                "name STRING ," +
                "age INT ," +
                "PRIMARY KEY(id) NOT ENFORCED " +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +
                " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
                " 'username' = 'root'," +
                " 'password' = 'root'," +
                " 'table-name' = 'human_sink' )");

这里connector的值必须是jdbc,即通过jdbc连接器实现。

(3)同步数据

    通过如下SQL即可以实现数据同步:
        tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");

测试

    与DataStream测试过程相同。

    值得注意的是:对MySQL的insert、update、delete操作可以完成同步,但对有些操作并不能完成同步,例如truncate操作。
标签: flink 大数据 CDC

本文转载自: https://blog.csdn.net/L_15156024189/article/details/140747170
版权归原作者 L(刘二宝) 所有, 如有侵权,请联系我们删除。

“Flink CDC基本概念以及MySQL同步到MySQL”的评论:

还没有评论