1、CDC简介 Change Data Capture
FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。通过CDC获取源数据表的更新内容,将更新内容作为数据流下发到下游系统,可以做到mysql数据表数据的实时同步操作。
基于Flink CDC的MySQL表数据同步流程大致如下:
- 数据源(MySQL):首先,一个MySQL数据库作为数据源,其中包含了想要同步的表。
- Flink CDC Connector:Flink CDC Connector是一个用于捕获MySQL表数据变更的组件。它连接到MySQL数据库,并持续监听数据变更(如插入、更新、删除操作)。
- 数据捕获:当MySQL表中的数据发生变化时,Flink CDC Connector会捕获这些变更事件,并将它们作为数据流进行处理。
- Flink流处理:捕获到的数据流会进入Flink流处理引擎。在Flink中,你可以定义一系列的操作来处理这些数据,比如过滤、聚合、转换等。
- 目标存储:处理后的数据会被写入到目标存储系统。这可以是一个数据库、数据仓库、消息队列或其他任何数据存储系统。
- 监控与告警:同步过程中,你可以设置监控和告警机制,以便在出现问题时能够及时得到通知并进行处理。
- 错误处理与重试:在同步过程中,可能会遇到各种错误,如网络问题、目标存储故障等。你需要设计合适的错误处理机制,比如重试策略,以确保数据的可靠性和一致性。
2、CDC配置
(1)开启MySql的binlog
1,修改 mysql 的配置文件 my.cnf
追加内容:
log-bin=mysql-bin #binlog
binlog_format=ROW #选择row
server_id=1 #mysql实例id
2,重启 mysql:
service mysql restart
3,登录 mysql 客户端,查看 log_bin 开启状态
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON|
+---------------+-------+
————————————————
状态为ON表示该功能已开启
(2)创建Mysql数据库表
两个库表源表和目标表:
源表:
CREATE TABLE zentao.
zt_group
(
id
MEDIUMINT(7) UNSIGNED NOT NULL AUTO_INCREMENT,
project
MEDIUMINT(7) UNSIGNED NOT NULL DEFAULT '0',
vision
VARCHAR(10) NOT NULL DEFAULT 'rnd' COLLATE 'utf8_general_ci',
name
CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
role
CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
desc
CHAR(255) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
acl
TEXT NULL DEFAULT NULL COLLATE 'utf8_general_ci',
developer
ENUM('0','1') NOT NULL DEFAULT '1' COLLATE 'utf8_general_ci',
PRIMARY KEY (id
) USING BTREE
);INSERT INTO
zt_group
(id
,project
,vision
,name
,role
,desc
,acl
,developer
) VALUES (1, 0, 'rnd', '管理员', 'admin', '系统管理员', NULL, '1');
INSERT INTOzt_group
(id
,project
,vision
,name
,role
,desc
,acl
,developer
) VALUES (2, 0, 'rnd', '研发', 'dev', '研发人员', NULL, '1');
INSERT INTOzt_group
(id
,project
,vision
,name
,role
,desc
,acl
,developer
) VALUES (3, 0, 'rnd', '测试', 'qa', '测试人员', NULL, '1');
INSERT INTOzt_group
(id
,project
,vision
,name
,role
,desc
,acl
,developer
) VALUES (4, 0, 'rnd', '项目经理', 'pm', '项目经理', NULL, '1');
INSERT INTOzt_group
(id
,project
,vision
,name
,role
,desc
,acl
,developer
) VALUES (5, 0, 'rnd', '产品经理', 'po', '产品经理', NULL, '1');
INSERT INTOzt_group
(id
,project
,vision
,name
,role
,desc
,acl
,developer
) VALUES (6, 0, 'rnd', '研发主管', 'td', '研发主管', NULL, '1');目标表:
CREATE TABLE wfg.
zentao_zt_group
(
id
MEDIUMINT(7) UNSIGNED NOT NULL AUTO_INCREMENT,
project
MEDIUMINT(7) UNSIGNED NOT NULL DEFAULT '0',
vision
VARCHAR(10) NOT NULL DEFAULT 'rnd' COLLATE 'utf8_general_ci',
name
CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
role
CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
desc
CHAR(255) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
acl
TEXT NULL DEFAULT NULL COLLATE 'utf8_general_ci',
developer
ENUM('0','1') NOT NULL DEFAULT '1' COLLATE 'utf8_general_ci',
PRIMARY KEY (id
) USING BTREE
);
3、SQL CDC同步数据代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class TenTao2wfgUserSql { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); env.enableCheckpointing(5000); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS source_test"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS target_test"); // 动态表,此为source表 tEnv.executeSql("CREATE TABLE source_test.zentao (\n" + " id INT,\n" + " project INT,\n" + " vision STRING,\n" + " name STRING,\n" + " role STRING,\n" + " desc STRING,\n" + " acl STRING,\n" + " developer INT,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '127.0.0.1',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'database-name' = 'zentao',\n" + " 'table-name' = 'zt_group',\n" + " 'scan.incremental.snapshot.enabled' = 'false'\n" + ")"); // 动态表,此为sink表。sink表和source表的connector不一样 tEnv.executeSql("CREATE TABLE target_test.zentao (\n" + " id INT,\n" + " project INT,\n" + " vision STRING,\n" + " name STRING,\n" + " role STRING,\n" + " desc STRING,\n" + " acl STRING,\n" + " developer INT,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://127.0.0.1:3306/wfg',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'table-name' = 'zentao_zt_group'\n" + ")"); tEnv.executeSql("INSERT INTO target_test.zentao (id, project, vision, name, role, desc,acl,developer) \n" + "select f.id,\n" + " f.project,\n" + " f.vision,\n" + " f.name,\n" + " f.role,\n" + " f.desc,\n" + " f.acl,\n" + " f.developer \n" + " from source_test.zentao f "); } }
版权归原作者 王小工 所有, 如有侵权,请联系我们删除。