0


FlinkCDC 实现 MySQL 数据变更实时同步

文章目录

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)在这里插入图片描述 对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:在这里插入图片描述

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATETABLE`quick_chat_msg`(`id`bigintNOTNULLCOMMENT'主键id',`from_id`varchar(20)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'账户id(发送人)',`to_id`varchar(20)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'账户id(接收人)',`relation_id`varchar(50)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'发送关联',`content`varchar(500)DEFAULTNULLCOMMENT'消息内容',`msg_type`tinyint(1)DEFAULTNULLCOMMENT'消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info`varchar(500)DEFAULTNULLCOMMENT'额外信息',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`deleted`tinyint(1)DEFAULTNULLCOMMENT'删除标识',PRIMARYKEY(`id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
在这里插入图片描述
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone ='+8:00';SET time_zone ='+8:00';SHOW VARIABLES LIKE'%time_zone%';

在这里插入图片描述

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency></dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;publicclassMySinkHandlerextendsRichSinkFunction<String>{@Overridepublicvoidinvoke(String value,Context context)throwsException{System.out.println(value);}@Overridepublicvoidopen(Configuration parameters)throwsException{}@Overridepublicvoidclose()throwsException{}}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.configuration.RestOptions;importorg.apache.flink.streaming.api.datastream.DataStreamSink;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@ComponentpublicclassMySqlSourceExample{@PostConstructpublicvoidinit()throwsException{// 配置监听数据源MySqlSource<String> source =MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(newJsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration =newConfiguration();
        configuration.setInteger(RestOptions.PORT,8081);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。
        env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source,WatermarkStrategy.noWatermarks(),"MySQL Source").addSink(newMySinkHandler());
        env.execute();}}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
在这里插入图片描述

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增{"before": null,
    "after":{"id":3,
        "from_id":"dog",
        "to_id":"cat",
        "relation_id":"dog:cat",
        "content":"你好啊",
        "msg_type":1,
        "extra_info": null,
        "create_time":1729164075000,
        "deleted":0},
    "source":{"version":"1.6.4.Final",
        "connector":"mysql",
        "name":"mysql_binlog_source",
        "ts_ms":1729135279000,
        "snapshot":"false",
        "db":"quick_chat",
        "sequence": null,
        "table":"quick_chat_msg",
        "server_id":1,
        "gtid": null,
        "file":"binlog.000002",
        "pos":2452,
        "row":0,
        "thread": null,
        "query": null
    },
    "op":"c",
    "ts_ms":1729135278633,
    "transaction": null
}
# 修改{"before":{"id":3,
        "from_id":"dog",
        "to_id":"cat",
        "relation_id":"dog:cat",
        "content":"你好啊",
        "msg_type":1,
        "extra_info": null,
        "create_time":1729164075000,
        "deleted":0},
    "after":{"id":3,
        "from_id":"dog",
        "to_id":"cat",
        "relation_id":"dog:cat",
        "content":"你好啊,小猫咪",
        "msg_type":1,
        "extra_info": null,
        "create_time":1729164075000,
        "deleted":0},
    "source":{"version":"1.6.4.Final",
        "connector":"mysql",
        "name":"mysql_binlog_source",
        "ts_ms":1729135289000,
        "snapshot":"false",
        "db":"quick_chat",
        "sequence": null,
        "table":"quick_chat_msg",
        "server_id":1,
        "gtid": null,
        "file":"binlog.000002",
        "pos":2825,
        "row":0,
        "thread": null,
        "query": null
    },
    "op":"u",
    "ts_ms":1729135288473,
    "transaction": null
}
# 删除{"before":{"id":3,
        "from_id":"dog",
        "to_id":"cat",
        "relation_id":"dog:cat",
        "content":"你好啊,小猫咪",
        "msg_type":1,
        "extra_info": null,
        "create_time":1729164075000,
        "deleted":0},
    "after": null,
    "source":{"version":"1.6.4.Final",
        "connector":"mysql",
        "name":"mysql_binlog_source",
        "ts_ms":1729135301000,
        "snapshot":"false",
        "db":"quick_chat",
        "sequence": null,
        "table":"quick_chat_msg",
        "server_id":1,
        "gtid": null,
        "file":"binlog.000002",
        "pos":3247,
        "row":0,
        "thread": null,
        "query": null
    },
    "op":"d",
    "ts_ms":1729135300692,
    "transaction": null
}
标签: mysql 数据库 flink

本文转载自: https://blog.csdn.net/weixin_46594796/article/details/142986922
版权归原作者 大连-徐志斌 所有, 如有侵权,请联系我们删除。

“FlinkCDC 实现 MySQL 数据变更实时同步”的评论:

还没有评论