0


flink mysql数据表同步SQL CDC

1、CDC简介 Change Data Capture

FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。通过CDC获取源数据表的更新内容,将更新内容作为数据流下发到下游系统,可以做到mysql数据表数据的实时同步操作。

基于Flink CDC的MySQL表数据同步流程大致如下:

  1. 数据源(MySQL):首先,一个MySQL数据库作为数据源,其中包含了想要同步的表。
  2. Flink CDC Connector:Flink CDC Connector是一个用于捕获MySQL表数据变更的组件。它连接到MySQL数据库,并持续监听数据变更(如插入、更新、删除操作)。
  3. 数据捕获:当MySQL表中的数据发生变化时,Flink CDC Connector会捕获这些变更事件,并将它们作为数据流进行处理。
  4. Flink流处理:捕获到的数据流会进入Flink流处理引擎。在Flink中,你可以定义一系列的操作来处理这些数据,比如过滤、聚合、转换等。
  5. 目标存储:处理后的数据会被写入到目标存储系统。这可以是一个数据库、数据仓库、消息队列或其他任何数据存储系统。
  6. 监控与告警:同步过程中,你可以设置监控和告警机制,以便在出现问题时能够及时得到通知并进行处理。
  7. 错误处理与重试:在同步过程中,可能会遇到各种错误,如网络问题、目标存储故障等。你需要设计合适的错误处理机制,比如重试策略,以确保数据的可靠性和一致性。

2、CDC配置

(1)开启MySql的binlog

1,修改 mysql 的配置文件 my.cnf
追加内容:
log-bin=mysql-bin #binlog
binlog_format=ROW #选择row
server_id=1 #mysql实例id
2,重启 mysql:
service mysql restart
3,登录 mysql 客户端,查看 log_bin 开启状态
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON|
+---------------+-------+
————————————————
状态为ON表示该功能已开启

(2)创建Mysql数据库表

两个库表源表和目标表:

源表:

CREATE TABLE zentao.zt_group (
id MEDIUMINT(7) UNSIGNED NOT NULL AUTO_INCREMENT,
project MEDIUMINT(7) UNSIGNED NOT NULL DEFAULT '0',
vision VARCHAR(10) NOT NULL DEFAULT 'rnd' COLLATE 'utf8_general_ci',
name CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
role CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
desc CHAR(255) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
acl TEXT NULL DEFAULT NULL COLLATE 'utf8_general_ci',
developer ENUM('0','1') NOT NULL DEFAULT '1' COLLATE 'utf8_general_ci',
PRIMARY KEY (id) USING BTREE
);

INSERT INTO zt_group (id, project, vision, name, role, desc, acl, developer) VALUES (1, 0, 'rnd', '管理员', 'admin', '系统管理员', NULL, '1');
INSERT INTO zt_group (id, project, vision, name, role, desc, acl, developer) VALUES (2, 0, 'rnd', '研发', 'dev', '研发人员', NULL, '1');
INSERT INTO zt_group (id, project, vision, name, role, desc, acl, developer) VALUES (3, 0, 'rnd', '测试', 'qa', '测试人员', NULL, '1');
INSERT INTO zt_group (id, project, vision, name, role, desc, acl, developer) VALUES (4, 0, 'rnd', '项目经理', 'pm', '项目经理', NULL, '1');
INSERT INTO zt_group (id, project, vision, name, role, desc, acl, developer) VALUES (5, 0, 'rnd', '产品经理', 'po', '产品经理', NULL, '1');
INSERT INTO zt_group (id, project, vision, name, role, desc, acl, developer) VALUES (6, 0, 'rnd', '研发主管', 'td', '研发主管', NULL, '1');

目标表:

CREATE TABLE wfg.zentao_zt_group (
id MEDIUMINT(7) UNSIGNED NOT NULL AUTO_INCREMENT,
project MEDIUMINT(7) UNSIGNED NOT NULL DEFAULT '0',
vision VARCHAR(10) NOT NULL DEFAULT 'rnd' COLLATE 'utf8_general_ci',
name CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
role CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
desc CHAR(255) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
acl TEXT NULL DEFAULT NULL COLLATE 'utf8_general_ci',
developer ENUM('0','1') NOT NULL DEFAULT '1' COLLATE 'utf8_general_ci',
PRIMARY KEY (id) USING BTREE
);

3、SQL CDC同步数据代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TenTao2wfgUserSql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.enableCheckpointing(5000);

        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS source_test");
        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS target_test");

        // 动态表,此为source表
        tEnv.executeSql("CREATE TABLE source_test.zentao (\n" +
                "    id                           INT,\n" +
                "    project                      INT,\n" +
                "    vision                       STRING,\n" +
                "    name                         STRING,\n" +
                "    role                         STRING,\n" +
                "    desc                         STRING,\n" +
                "    acl                          STRING,\n" +
                "    developer                    INT,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'mysql-cdc',\n" +
                "  'hostname' = '127.0.0.1',\n" +
                "  'port' = '3306',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '123456',\n" +
                "  'database-name' = 'zentao',\n" +
                "  'table-name' = 'zt_group',\n" +
                "  'scan.incremental.snapshot.enabled' = 'false'\n" +
                ")");
    // 动态表,此为sink表。sink表和source表的connector不一样
        tEnv.executeSql("CREATE TABLE target_test.zentao (\n" +
                "    id                           INT,\n" +
                "    project                      INT,\n" +
                "    vision                       STRING,\n" +
                "    name                         STRING,\n" +
                "    role                         STRING,\n" +
                "    desc                         STRING,\n" +
                "    acl                          STRING,\n" +
                "    developer                    INT,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = 'jdbc:mysql://127.0.0.1:3306/wfg',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '123456',\n" +
                "  'table-name' = 'zentao_zt_group'\n" +
                ")");

        tEnv.executeSql("INSERT INTO target_test.zentao (id, project, vision, name, role, desc,acl,developer) \n" +
                "select f.id,\n" +
                "       f.project,\n" +
                "       f.vision,\n" +
                "       f.name,\n" +
                "       f.role,\n" +
                "       f.desc,\n" +
                "       f.acl,\n" +
                "       f.developer \n" +
                "       from source_test.zentao f ");
    }
}
标签: flink mysql cdc

本文转载自: https://blog.csdn.net/mqiqe/article/details/137806850
版权归原作者 王小工 所有, 如有侵权,请联系我们删除。

“flink mysql数据表同步SQL CDC”的评论:

还没有评论