0


39、Flink 的CDC 格式:maxwell部署以及示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,还依赖maxwell、kafka和flink的环境。

一、maxwell Format

1、maxwell介绍

Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySQL中的数据变化实时流式传输到Kafka、Kinesis和其他流式连接器中。Maxwell为变更日志提供了统一的格式模式,并支持使用JSON序列化消息。

Flink支持将Maxwell JSON消息解释为INSERT/UPDATE/DELETE Flink SQL系统中的消息。在许多情况下,这对于利用此功能非常有用,例如

  • 将增量数据从数据库同步到其他系统
  • 审核日志
  • 数据库上的实时物化视图
  • 数据库表的临时连接更改历史等等。

Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Maxwell JSON消息,并发送到Kafka等外部系统。但是,截至Flink 1.17版本,Flink无法将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UDPATE_AFTER编码为DELETE和INSERT Maxwell消息。

2、binlog设置及验证

设置binlog需要监控的数据库,本示例使用的数据库是mysql5.7

1)、配置

本示例设置的参数参考下面的配置

[root@server4 ~]# cat /etc/my.cnf# For advice on how to change settings please see# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html[mysqld]......

log-bin=mysql-bin  # log-bin的名称,可以是任意名称
binlog-format=row  # 推荐该参数,其他的参数视情况而定,比如mixed、statementserver_id=1# mysql集群环境中不要重复binlog_do_db=test # test是mysql的数据库名称,如果监控多个数据库,可以添加多个binlog_do_db即可,例如下面示例# binlog_do_db=test2# binlog_do_db=test3.....
  • STATEMENT模式(SBR) 每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)
  • ROW模式(RBR) 不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。
  • MIXED模式(MBR) 以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。

2)、重启mysql

保存配置后重启mysql

service mysqld restart

3)、验证

重启后,可以通过2个简单的方法验证是否设置成功。

mysql默认的安装目录:cd /var/lib/mysql

[root@server4 ~]# cd /var/lib/mysql[root@server4 mysql]# ll......
-rw-r----- 1 mysql mysql    1541月  102022 mysql-bin.000001
-rw-r----- 1 mysql mysql       11971月  1612:21 mysql-bin.index
.....

查看mysql-bin.000001文件是否生成,且其大小为154字节。mysql-bin.000001是mysql重启的次数,重启2次则为mysql-bin.000002
在test数据库中创建或添加数据,mysql-bin.000001的大小是否增加

以上情况满足,则说明binlog配置正常

3、部署

1)、下载

去其官网:https://maxwells-daemon.io/quickstart/下载需要的版本。
本示例使用的是:maxwell-1.29.2.tar.gz 注意其不同版本对jdk的要求,最新版本要求jdk11.

2)、解压

解压的目录/usr/local/bigdata/maxwell-1.29.2

tar-zvxf maxwell-1.29.2.tar.gz -C /usr/local/bigdata
[alanchan@server3 maxwell-1.29.2]$ ll
总用量 108
drwxr-xr-x 2 alanchan root  40961月  16 05:45 bin
-rw-r--r-- 1 alanchan root 251331月  242021 config.md
-rw-r--r-- 1 alanchan root 119701月  242021 config.properties.example
-rw-r--r-- 1 alanchan root 102594月  222020 kinesis-producer-library.properties.example
drwxr-xr-x 3 alanchan root 122881月  272021 lib
-rw-r--r-- 1 alanchan root   5484月  222020 LICENSE
-rw-r--r-- 1 alanchan root   4701月  242021 log4j2.xml
-rw-r--r-- 1 alanchan root  33281月  272021 quickstart.md
-rw-r--r-- 1 alanchan root  14291月  272021 README.md

3)、创建元数据库

该步骤需要创建一个mysql数据库,用以保存maxwell的元数据,至于访问这个数据库的用户名和密码则视情况而定,下面的内容是其官方上的操作,也就是创建用户、授权。

本文的示例中使用的是root用户,创建的数据库名称为maxwell。


mysql>CREATEUSER'maxwell'@'%' IDENTIFIED BY'XXXXXX';
mysql>CREATEUSER'maxwell'@'localhost' IDENTIFIED BY'XXXXXX';

mysql>GRANTALLON maxwell.*TO'maxwell'@'%';
mysql>GRANTALLON maxwell.*TO'maxwell'@'localhost';

mysql>GRANTSELECT,REPLICATION CLIENT,REPLICATION SLAVE ON*.*TO'maxwell'@'%';
mysql>GRANTSELECT,REPLICATION CLIENT,REPLICATION SLAVE ON*.*TO'maxwell'@'localhost';

4)、启动方式

其提供了2种启动方式,即通过命令行参数的形式和通过配置文件的形式,下面是给出的示例

  • 命令行参数形式,输出到控制台 不需要做任何配置即可直接使用,

maxwell --user='root'--password='123456'--host='192.168.10.44'--producer=stdout
# user 和 password 是连接mysql元数据库的账号和密码# host是被监控的mysql的ip# producer是maxwell的输出类型,比如stdout、kafka等
  • 配置文件方式,输出到控制台

maxwell --config../config.properties
# config.properties文件修改内容如下,其他的保持不变,也可以根据自己的需要修改producer=stdout
# mysql login infohost=192.168.10.44
user=root
password=123456

4、示例1:maxwell CDC 输出至控制台

1)、启动maxwell

部署完成后,不需要做任何的改动即可执行下面的命令

maxwell --user='root'--password='123456'--host='192.168.10.44'--producer=stdout

2)、操作mysql监控的数据库,观察其控制台输出

在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下

[alanchan@server3 bin]$ maxwell --user='root'--password='123456'--host='192.168.10.44'--producer=stdout
Using kafka version: 1.0.0
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653290,"xid":20392,"commit":true,"data":{"name":"alanchanchn","scores":109.0},"old":{"scores":199.0}}{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705653456,"xid":20935,"commit":true,"data":{"name":"alan1","scores":5.0}}{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653461,"xid":20951,"commit":true,"data":{"name":"alan1","scores":109.0},"old":{"scores":5.0}}{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705653465,"xid":20967,"commit":true,"data":{"name":"alan1","scores":109.0}}

5、示例2:maxwell CDC 输出至kafka

1)、启动maxwell

部署完成后,不需要做任何的改动即可执行下面的命令

maxwell --user='root'--password='rootroot'--host='192.168.10.44'--producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic

[alanchan@server3 bin]$ maxwell --user='root'--password='rootroot'--host='192.168.10.44'--producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic
Using kafka version: 1.0.0

2)、通过命令行打开kafka消费者

kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning

3)、操作mysql监控的数据库,观察其控制台输出

在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下

[alanchan@server1 bin]$ cd../../kafka_2.12-3.0.0/bin/
[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning
{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705654206,"xid":22158,"commit":true,"data":{"name":"test","scores":100.0}}{"database":"cdctest","table":"userscoressink","type":"update","ts":1705654220,"xid":22196,"commit":true,"data":{"name":"test","scores":200.0},"old":{"scores":100.0}}{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705654224,"xid":22210,"commit":true,"data":{"name":"test","scores":200.0}}

二、Flink 与 maxwell 实践

为了使用maxwell格式,使用构建自动化工具(如Maven或SBT)的项目和带有SQL JAR包的SQLClient都需要以下依赖项。

1、maven依赖

该依赖在flink自建工程中已经包含。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.1</version></dependency>

有关如何部署 maxwell 以将变更日志同步到消息队列,请参阅上文的具体事例或想了解更多的信息参考maxwell 文档。

2、Flink sql client 建表示例

maxwell 为变更日志提供了统一的格式,下面是一个从 MySQL 库 userscoressink表中捕获更新操作的简单示例:

{"database":"cdctest","table":"userscoressink","type":"update","ts":1705654220,"xid":22196,"commit":true,"data":{"name":"test","scores":200.0},"old":{"scores":100.0}}

MySQL userscoressink表有2列(name,scores)。上面的 JSON 消息是 userscoressink表上的一个更新事件,表示数据上scores字段值从100变更成为200。

消息已经同步到了一个 Kafka 主题:alan_maxwell_to_kafka_topic,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。

具体启动maxwell参考本文的第一部分的kafka示例,其他不再赘述。下面的部分仅仅是演示maxwell环境都正常后,在Flink SQL client中的操作。

-- 元数据与 MySQL "userscoressink" 表完全相同CREATETABLE userscoressink (
  name STRING,
  scores FLOAT)WITH('connector'='kafka','topic'='alan_maxwell_to_kafka_topic','properties.bootstrap.servers'='server1:9092,server2:9092,server3:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='maxwell-json'-- 使用 maxwell-json 格式);

将 Kafka 主题注册成 Flink 表之后,就可以将 maxwell消息用作变更日志源。

-- 验证,在mysql中新增、修改和删除数据,观察flink sql client 的数据变化
Flink SQL>showtables;
Empty set

Flink SQL>CREATETABLE userscoressink (>   name STRING,>   scores FLOAT>)WITH(>'connector'='kafka',>'topic'='alan_maxwell_to_kafka_topic',>'properties.bootstrap.servers'='server1:9092,server2:9092,server3:9092',>'properties.group.id'='testGroup',>'scan.startup.mode'='earliest-offset',>'format'='maxwell-json'-- 使用 maxwell-json 格式>);[INFO]Execute statement succeed.

Flink SQL>select*from userscoressink;+----+--------------------------------+--------------------------------+| op |                           name |                         scores |+----+--------------------------------+--------------------------------+|+I |                           test |100.0||-U |                           test |100.0||+U |                           test |200.0||-D |                           test |200.0|^CQuery terminated, received a total of4rows-- 关于MySQL "userscoressink" 表的实时物化视图-- 按name分组,对scores进行求和

Flink SQL>select name ,sum(scores) sum_scores from userscoressink groupby name;+----+--------------------------------+--------------------------------+| op |                           name |                     sum_scores |+----+--------------------------------+--------------------------------+|+I |                           test |100.0||-D |                           test |100.0||+I |                           test |200.0||-D |                           test |200.0|

3、Available Metadata

以下格式元数据可以在表定义中公开为只读(VIRTUAL)列。

只有当相应的连接器转发格式元数据时,格式元数据字段才可用。

截至Flink 1.17版本,只有Kafka连接器能够公开其值格式的元数据字段。
在这里插入图片描述

以下示例显示了如何访问Kafka中的Maxwell元数据字段:

CREATETABLE userscoressink2(
  origin_database STRING METADATA FROM'value.database' VIRTUAL,
  origin_table STRING METADATA FROM'value.table' VIRTUAL,
  origin_primary_key_columns ARRAY<STRING> METADATA FROM'value.primary-key-columns' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM'value.ingestion-timestamp' VIRTUAL,
  name STRING,
  scores FLOAT)WITH('connector'='kafka','topic'='alan_maxwell_to_kafka_topic','properties.bootstrap.servers'='server1:9092,server2:9092,server3:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='maxwell-json');# 操作步骤如下
Flink SQL>CREATETABLE userscoressink2(>   origin_database STRING METADATA FROM'value.database' VIRTUAL,>   origin_table STRING METADATA FROM'value.table' VIRTUAL,>   origin_primary_key_columns ARRAY<STRING> METADATA FROM'value.primary-key-columns' VIRTUAL,>   origin_ts TIMESTAMP(3) METADATA FROM'value.ingestion-timestamp' VIRTUAL,>   name STRING,>   scores FLOAT>)WITH(>'connector'='kafka',>'topic'='alan_maxwell_to_kafka_topic',>'properties.bootstrap.servers'='server1:9092,server2:9092,server3:9092',>'properties.group.id'='testGroup',>'scan.startup.mode'='earliest-offset',>'format'='maxwell-json'>);[INFO]Execute statement succeed.

Flink SQL>select*from userscoressink2;+----+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+| op |                origin_database |                   origin_table |     origin_primary_key_columns |               origin_ts |                           name |                         scores |+----+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+|+I |                        cdctest |                 userscoressink |<NULL>|2024-01-1916:50:06.000|                           test |100.0||-U |                        cdctest |                 userscoressink |<NULL>|2024-01-1916:50:20.000|                           test |100.0||+U |                        cdctest |                 userscoressink |<NULL>|2024-01-1916:50:20.000|                           test |200.0||-D |                        cdctest |                 userscoressink |<NULL>|2024-01-1916:50:24.000|                           test |200.0|

4、Format 参数

在这里插入图片描述

5、重要事项:重复的变更事件

Maxwell应用程序允许每次更改事件只投递一次。在这种情况下,Flink在消费Maxwell生产的事件时效果非常好。如果Maxwell应用程序至少在一次交付中工作,它可能会向Kafka交付重复的更改事件,Flink将获得重复的事件。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置表.exec.source.cdc-events-duplicate设置为true,并在源上定义PRIMARY KEY。框架将生成一个额外的有状态运算符,并使用主键来消除更改事件的重复,并生成一个规范化的更改日志流。

6、数据类型映射

目前,maxwell Format 使用 JSON Format 进行序列化和反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档。

以上,本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。

标签: flink 大数据 kafka

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/132043720
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。

“39、Flink 的CDC 格式:maxwell部署以及示例”的评论:

还没有评论