Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
- 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
- 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
- 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
- 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
- 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,还依赖maxwell、kafka和flink的环境。
一、maxwell Format
1、maxwell介绍
Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySQL中的数据变化实时流式传输到Kafka、Kinesis和其他流式连接器中。Maxwell为变更日志提供了统一的格式模式,并支持使用JSON序列化消息。
Flink支持将Maxwell JSON消息解释为INSERT/UPDATE/DELETE Flink SQL系统中的消息。在许多情况下,这对于利用此功能非常有用,例如
- 将增量数据从数据库同步到其他系统
- 审核日志
- 数据库上的实时物化视图
- 数据库表的临时连接更改历史等等。
Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Maxwell JSON消息,并发送到Kafka等外部系统。但是,截至Flink 1.17版本,Flink无法将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UDPATE_AFTER编码为DELETE和INSERT Maxwell消息。
2、binlog设置及验证
设置binlog需要监控的数据库,本示例使用的数据库是mysql5.7
1)、配置
本示例设置的参数参考下面的配置
[root@server4 ~]# cat /etc/my.cnf# For advice on how to change settings please see# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html[mysqld]......
log-bin=mysql-bin # log-bin的名称,可以是任意名称
binlog-format=row # 推荐该参数,其他的参数视情况而定,比如mixed、statementserver_id=1# mysql集群环境中不要重复binlog_do_db=test # test是mysql的数据库名称,如果监控多个数据库,可以添加多个binlog_do_db即可,例如下面示例# binlog_do_db=test2# binlog_do_db=test3.....
- STATEMENT模式(SBR) 每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)
- ROW模式(RBR) 不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。
- MIXED模式(MBR) 以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。
2)、重启mysql
保存配置后重启mysql
service mysqld restart
3)、验证
重启后,可以通过2个简单的方法验证是否设置成功。
mysql默认的安装目录:cd /var/lib/mysql
[root@server4 ~]# cd /var/lib/mysql[root@server4 mysql]# ll......
-rw-r----- 1 mysql mysql 1541月 102022 mysql-bin.000001
-rw-r----- 1 mysql mysql 11971月 1612:21 mysql-bin.index
.....
查看mysql-bin.000001文件是否生成,且其大小为154字节。mysql-bin.000001是mysql重启的次数,重启2次则为mysql-bin.000002
在test数据库中创建或添加数据,mysql-bin.000001的大小是否增加
以上情况满足,则说明binlog配置正常
3、部署
1)、下载
去其官网:https://maxwells-daemon.io/quickstart/下载需要的版本。
本示例使用的是:maxwell-1.29.2.tar.gz 注意其不同版本对jdk的要求,最新版本要求jdk11.
2)、解压
解压的目录/usr/local/bigdata/maxwell-1.29.2
tar-zvxf maxwell-1.29.2.tar.gz -C /usr/local/bigdata
[alanchan@server3 maxwell-1.29.2]$ ll
总用量 108
drwxr-xr-x 2 alanchan root 40961月 16 05:45 bin
-rw-r--r-- 1 alanchan root 251331月 242021 config.md
-rw-r--r-- 1 alanchan root 119701月 242021 config.properties.example
-rw-r--r-- 1 alanchan root 102594月 222020 kinesis-producer-library.properties.example
drwxr-xr-x 3 alanchan root 122881月 272021 lib
-rw-r--r-- 1 alanchan root 5484月 222020 LICENSE
-rw-r--r-- 1 alanchan root 4701月 242021 log4j2.xml
-rw-r--r-- 1 alanchan root 33281月 272021 quickstart.md
-rw-r--r-- 1 alanchan root 14291月 272021 README.md
3)、创建元数据库
该步骤需要创建一个mysql数据库,用以保存maxwell的元数据,至于访问这个数据库的用户名和密码则视情况而定,下面的内容是其官方上的操作,也就是创建用户、授权。
本文的示例中使用的是root用户,创建的数据库名称为maxwell。
mysql>CREATEUSER'maxwell'@'%' IDENTIFIED BY'XXXXXX';
mysql>CREATEUSER'maxwell'@'localhost' IDENTIFIED BY'XXXXXX';
mysql>GRANTALLON maxwell.*TO'maxwell'@'%';
mysql>GRANTALLON maxwell.*TO'maxwell'@'localhost';
mysql>GRANTSELECT,REPLICATION CLIENT,REPLICATION SLAVE ON*.*TO'maxwell'@'%';
mysql>GRANTSELECT,REPLICATION CLIENT,REPLICATION SLAVE ON*.*TO'maxwell'@'localhost';
4)、启动方式
其提供了2种启动方式,即通过命令行参数的形式和通过配置文件的形式,下面是给出的示例
- 命令行参数形式,输出到控制台 不需要做任何配置即可直接使用,
maxwell --user='root'--password='123456'--host='192.168.10.44'--producer=stdout
# user 和 password 是连接mysql元数据库的账号和密码# host是被监控的mysql的ip# producer是maxwell的输出类型,比如stdout、kafka等
- 配置文件方式,输出到控制台
maxwell --config../config.properties
# config.properties文件修改内容如下,其他的保持不变,也可以根据自己的需要修改producer=stdout
# mysql login infohost=192.168.10.44
user=root
password=123456
4、示例1:maxwell CDC 输出至控制台
1)、启动maxwell
部署完成后,不需要做任何的改动即可执行下面的命令
maxwell --user='root'--password='123456'--host='192.168.10.44'--producer=stdout
2)、操作mysql监控的数据库,观察其控制台输出
在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下
[alanchan@server3 bin]$ maxwell --user='root'--password='123456'--host='192.168.10.44'--producer=stdout
Using kafka version: 1.0.0
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653290,"xid":20392,"commit":true,"data":{"name":"alanchanchn","scores":109.0},"old":{"scores":199.0}}{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705653456,"xid":20935,"commit":true,"data":{"name":"alan1","scores":5.0}}{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653461,"xid":20951,"commit":true,"data":{"name":"alan1","scores":109.0},"old":{"scores":5.0}}{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705653465,"xid":20967,"commit":true,"data":{"name":"alan1","scores":109.0}}
5、示例2:maxwell CDC 输出至kafka
1)、启动maxwell
部署完成后,不需要做任何的改动即可执行下面的命令
maxwell --user='root'--password='rootroot'--host='192.168.10.44'--producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic
[alanchan@server3 bin]$ maxwell --user='root'--password='rootroot'--host='192.168.10.44'--producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic
Using kafka version: 1.0.0
2)、通过命令行打开kafka消费者
kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning
3)、操作mysql监控的数据库,观察其控制台输出
在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下
[alanchan@server1 bin]$ cd../../kafka_2.12-3.0.0/bin/
[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning
{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705654206,"xid":22158,"commit":true,"data":{"name":"test","scores":100.0}}{"database":"cdctest","table":"userscoressink","type":"update","ts":1705654220,"xid":22196,"commit":true,"data":{"name":"test","scores":200.0},"old":{"scores":100.0}}{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705654224,"xid":22210,"commit":true,"data":{"name":"test","scores":200.0}}
二、Flink 与 maxwell 实践
为了使用maxwell格式,使用构建自动化工具(如Maven或SBT)的项目和带有SQL JAR包的SQLClient都需要以下依赖项。
1、maven依赖
该依赖在flink自建工程中已经包含。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.1</version></dependency>
有关如何部署 maxwell 以将变更日志同步到消息队列,请参阅上文的具体事例或想了解更多的信息参考maxwell 文档。
2、Flink sql client 建表示例
maxwell 为变更日志提供了统一的格式,下面是一个从 MySQL 库 userscoressink表中捕获更新操作的简单示例:
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705654220,"xid":22196,"commit":true,"data":{"name":"test","scores":200.0},"old":{"scores":100.0}}
MySQL userscoressink表有2列(name,scores)。上面的 JSON 消息是 userscoressink表上的一个更新事件,表示数据上scores字段值从100变更成为200。
消息已经同步到了一个 Kafka 主题:alan_maxwell_to_kafka_topic,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。
具体启动maxwell参考本文的第一部分的kafka示例,其他不再赘述。下面的部分仅仅是演示maxwell环境都正常后,在Flink SQL client中的操作。
-- 元数据与 MySQL "userscoressink" 表完全相同CREATETABLE userscoressink (
name STRING,
scores FLOAT)WITH('connector'='kafka','topic'='alan_maxwell_to_kafka_topic','properties.bootstrap.servers'='server1:9092,server2:9092,server3:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='maxwell-json'-- 使用 maxwell-json 格式);
将 Kafka 主题注册成 Flink 表之后,就可以将 maxwell消息用作变更日志源。
-- 验证,在mysql中新增、修改和删除数据,观察flink sql client 的数据变化
Flink SQL>showtables;
Empty set
Flink SQL>CREATETABLE userscoressink (> name STRING,> scores FLOAT>)WITH(>'connector'='kafka',>'topic'='alan_maxwell_to_kafka_topic',>'properties.bootstrap.servers'='server1:9092,server2:9092,server3:9092',>'properties.group.id'='testGroup',>'scan.startup.mode'='earliest-offset',>'format'='maxwell-json'-- 使用 maxwell-json 格式>);[INFO]Execute statement succeed.
Flink SQL>select*from userscoressink;+----+--------------------------------+--------------------------------+| op | name | scores |+----+--------------------------------+--------------------------------+|+I | test |100.0||-U | test |100.0||+U | test |200.0||-D | test |200.0|^CQuery terminated, received a total of4rows-- 关于MySQL "userscoressink" 表的实时物化视图-- 按name分组,对scores进行求和
Flink SQL>select name ,sum(scores) sum_scores from userscoressink groupby name;+----+--------------------------------+--------------------------------+| op | name | sum_scores |+----+--------------------------------+--------------------------------+|+I | test |100.0||-D | test |100.0||+I | test |200.0||-D | test |200.0|
3、Available Metadata
以下格式元数据可以在表定义中公开为只读(VIRTUAL)列。
只有当相应的连接器转发格式元数据时,格式元数据字段才可用。
截至Flink 1.17版本,只有Kafka连接器能够公开其值格式的元数据字段。
以下示例显示了如何访问Kafka中的Maxwell元数据字段:
CREATETABLE userscoressink2(
origin_database STRING METADATA FROM'value.database' VIRTUAL,
origin_table STRING METADATA FROM'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM'value.ingestion-timestamp' VIRTUAL,
name STRING,
scores FLOAT)WITH('connector'='kafka','topic'='alan_maxwell_to_kafka_topic','properties.bootstrap.servers'='server1:9092,server2:9092,server3:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='maxwell-json');# 操作步骤如下
Flink SQL>CREATETABLE userscoressink2(> origin_database STRING METADATA FROM'value.database' VIRTUAL,> origin_table STRING METADATA FROM'value.table' VIRTUAL,> origin_primary_key_columns ARRAY<STRING> METADATA FROM'value.primary-key-columns' VIRTUAL,> origin_ts TIMESTAMP(3) METADATA FROM'value.ingestion-timestamp' VIRTUAL,> name STRING,> scores FLOAT>)WITH(>'connector'='kafka',>'topic'='alan_maxwell_to_kafka_topic',>'properties.bootstrap.servers'='server1:9092,server2:9092,server3:9092',>'properties.group.id'='testGroup',>'scan.startup.mode'='earliest-offset',>'format'='maxwell-json'>);[INFO]Execute statement succeed.
Flink SQL>select*from userscoressink2;+----+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+| op | origin_database | origin_table | origin_primary_key_columns | origin_ts | name | scores |+----+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+|+I | cdctest | userscoressink |<NULL>|2024-01-1916:50:06.000| test |100.0||-U | cdctest | userscoressink |<NULL>|2024-01-1916:50:20.000| test |100.0||+U | cdctest | userscoressink |<NULL>|2024-01-1916:50:20.000| test |200.0||-D | cdctest | userscoressink |<NULL>|2024-01-1916:50:24.000| test |200.0|
4、Format 参数
5、重要事项:重复的变更事件
Maxwell应用程序允许每次更改事件只投递一次。在这种情况下,Flink在消费Maxwell生产的事件时效果非常好。如果Maxwell应用程序至少在一次交付中工作,它可能会向Kafka交付重复的更改事件,Flink将获得重复的事件。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置表.exec.source.cdc-events-duplicate设置为true,并在源上定义PRIMARY KEY。框架将生成一个额外的有状态运算符,并使用主键来消除更改事件的重复,并生成一个规范化的更改日志流。
6、数据类型映射
目前,maxwell Format 使用 JSON Format 进行序列化和反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档。
以上,本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。