前言
前面总结了Spark SQL增量查询Hudi表和Hive增量查询Hudi表。最近项目上也有Flink SQL增量查询Hudi表的需求,正好学习总结一下。
官网文档
地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query
参数
- read.start-commit 增量查询开始时间 对于流读,如果不指定该值,默认取最新的instantTime,也就是流读默认从最新的instantTime开始读(包含最新的)。对于批读,如果不指定该参数,只指定read.end-commit,则实现时间旅行的功能,可查询历史记录
- read.end-commit 增量查询结束时间 不指定该参数则默认读取到最新的记录,该参数一般只适用于批读,因为流读一般的需求是查询所有的增量数据
- read.streaming.enabled 是否流读 默认false
- read.streaming.check-interval 流读的检查时间间隔,单位秒(s),默认值60,也就是一分钟 查询范围 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含开始时间又包含结束时间,对于默认值可参考上面的参数说明
版本
建表造数:
- Hudi 0.9.0
- Spark 2.4.5
我这里建表造数使用Hudi Spark SQL 0.9.0,目的是为了模拟项目上用Java Client和Spark SQL创建的Hudi表,以验证Hudi Flink SQL增量查询时是否兼容旧版本的Hudi表(大家没有这种需求的,可以使用任何方式正常造数)
查询
- Hudi 0.13.0-SNAPSHOT
- Flink 1.14.3 (增量查询)
- Spark 3.1.2 (主要是为了使用Call Procedures命令查看commit信息)
建表造数
-- Spark SQL Hudi 0.9.0createtable hudi.test_flink_incremental (
id int,
name string,
price double,
ts long,
dt string
)using hudi
partitioned by(dt)
options (
primaryKey ='id',
preCombineField ='ts',type='cow');insertinto hudi.test_flink_incremental values(1,'a1',10,1000,'2022-11-25');insertinto hudi.test_flink_incremental values(2,'a2',20,2000,'2022-11-25');update hudi.test_flink_incremental set name='hudi2_update'where id =2;insertinto hudi.test_flink_incremental values(3,'a3',30,3000,'2022-11-26');insertinto hudi.test_flink_incremental values(4,'a4',40,4000,'2022-12-26');
用show_commits看一下有哪些commits(这里查询用的是Hudi的master,因为show_commits是在0.11.0版本开始支持的,也可以通过使用hadoop命令查看.hoodie文件夹下的.commit文件)
call show_commits(table=>'hudi.test_flink_incremental');
2022120515273620221205152723202212051527122022120515270220221205152650
Flink SQL创建Hudi内存表
CREATETABLE test_flink_incremental (
id intPRIMARYKEYNOT ENFORCED,
name VARCHAR(10),
price double,
ts bigint,
dt VARCHAR(10))
PARTITIONED BY(dt)WITH('connector'='hudi','path'='hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental');
建表时不指定增量查询相关的参数,我们在查询时动态指定,这样比较灵活。
动态指定参数方法,在查询语句后面加上如下形式的语句
/*+
options(
'read.start-commit' = '20221205152723',
'read.end-commit'='20221205152736'
)
*/
批读
Flink SQL读Hudi有两种模式:批读和流读。默认批读,先看一下批读的增量查询
验证是否包含起始时间和默认结束时间
select*from test_flink_incremental
/*+
options(
'read.start-commit' = '20221205152723' --起始时间对应id=3的记录
)
*/
结果包含起始时间,不指定结束时间默认读到最新的数据
id name price ts dt
4 a4 40.04000 dt=2022-12-263 a3 30.03000 dt=2022-11-26
验证是否包含结束时间
select*from test_flink_incremental
/*+
options(
'read.start-commit' = '20221205152712', --起始时间对应id=2的记录
'read.end-commit'='20221205152723' --结束时间对应id=3的记录
)
*/
结果包含结束时间
id name price ts dt
3 a3 30.03000 dt=2022-11-262 hudi2_update 20.02000 dt=2022-11-25
验证默认开始时间
这种情况是指定结束时间,但不指定开始时间,如果都不指定,则读表所有的最新版本的记录。
select*from test_flink_incremental
/*+
options(
'read.end-commit'='20221205152712' --结束时间对应id=2的更新记录
)
*/
结果:只查询end-commit对应的记录
id name price ts dt
2 hudi2_update 20.02000 dt=2022-11-25
时间旅行(查询历史记录)
验证是否可以查询历史记录,我们更新id为2的name,更新前name为a2,更新后为hudi2_update,我们验证一下,是否可以通过Flink SQL查询Hudi历史记录,逾期结果查出id=2,name=a2
select*from test_flink_incremental
/*+
options(
'read.end-commit'='20221205152702' --结束时间对应id=2的历史记录
)
*/
结果:可以正确查询历史记录
id name price ts dt
2 a2 20.02000 dt=2022-11-25
流读
开启流读的参数:
read.streaming.enabled = true
流读不需要设置结束时间,因为一般的需求是读所有的增量数据,我们只需要验证开始时间就好了
验证默认开始时间
select*from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4'
)
*/
结果:从最新的instantTime开始增量读取,也就是默认的read.start-commit为最新的instantTime
id name price ts dt
4 a4 40.04000 dt=2022-12-26
验证指定开始时间
select*from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/
结果:
id name price ts dt
2 hudi2_update 20.02000 dt=2022-11-253 a3 30.03000 dt=2022-11-264 a4 40.04000 dt=2022-11-26
如果想第一次查询全部的历史数据,可以将start-commit设置的早一点,比如设置到去年:‘read.start-commit’ = ‘20211205152712’
select*from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20211205152712'
)
*/
id name price ts dt
1 a1 10.01000 dt=2022-11-252 hudi2_update 20.02000 dt=2022-11-253 a3 30.03000 dt=2022-11-264 a4 40.04000 dt=2022-11-26
验证流读的连续性
验证新的增量数据进来,是否可以持续消费Hudi增量数据,验证数据的准确一致性,为了方便验证,我可以使用Flink SQL增量流读Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据验证数据的准确性
Flink SQL读写MySQL需要配置jar包,将
flink-connector-jdbc_2.12-1.14.3.jar
放到
lib
先在MySQL中创建一张Sink表
-- MySQLCREATETABLE`test_sink`(`id`int(11),`name`textDEFAULTNULL,`price`int(11),`ts`int(11),`dt`textDEFAULTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8;
Flink中创建对应的sink表
createtable test_sink (
id int,
name string,
price double,
ts bigint,
dt string
)with('connector'='jdbc','url'='jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8','username'='root','password'='root-123','table-name'='test_sink','sink.buffer-flush.max-rows'='1');
然后流式增量读取Hudi表Sink Mysql
insertinto test_sink
select*from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/
这样会起一个长任务,一直处于running状态,我们可以在yarn-session界面上验证这一点
然后先在MySQL中验证一下历史数据的准确性
再利用Spark SQL往source表插入两条数据
-- Spark SQLinsertinto hudi.test_flink_incremental values(5,'a5',50,5000,'2022-12-07');insertinto hudi.test_flink_incremental values(6,'a6',60,6000,'2022-12-07');
我们增量读取的间隔设置的4s,成功插入数据等待4s后,再在MySQL表中验证一下数据
发现新增的数据已经成功Sink到MySQL中了,并且数据没有重复
最后验证一下更新的增量数据,Spark SQL更新Hudi source表
-- Spark SQLupdate hudi.test_flink_incremental set name='hudi5_update'where id =5;
继续验证结果
结果是更新的增量数据也会insert到MySQL中的sink表,但是不会更新原来的数据
那如果想实现更新的效果呢?我们需要在MySQL和Flink的sink表中加上主键字段,两者缺一不可,如下:
-- MySQLCREATETABLE`test_sink`(`id`int(11),`name`textDEFAULTNULL,`price`int(11),`ts`int(11),`dt`textDEFAULTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;
-- Flink SQLcreatetable test_sink (
id intPRIMARYKEYNOT ENFORCED,
name string,
price double,
ts bigint,
dt string
)with('connector'='jdbc','url'='jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8','username'='root','password'='root-123','table-name'='test_sink','sink.buffer-flush.max-rows'='1');
将刚才起的长任务关掉,重新执行刚才的insert语句,先跑一下历史数据,最后再验证一下增量效果
-- Spark SQLupdate hudi.test_flink_incremental set name='hudi6_update'where id =6;insertinto hudi.test_flink_incremental values(7,'a7',70,7000,'2022-12-07');
可以看到,达到了预期效果,对于id=6的执行更新操作,对于id=7的执行插入操作。
相关阅读
- Apache Hudi 入门学习总结
- Flink SQL 客户端查询Hive配置及问题解决
- Flink SQL操作Hudi并同步Hive使用总结
- Flink SQL通过Hudi HMS Catalog读写Hudi并同步Hive表(强烈推荐这种方式)
- Hudi Spark SQL总结
- Spark SQL增量查询Hudi表
- Hudi Spark SQL Call Procedures学习总结(一)(查询统计表文件信息)
- Hive增量查询Hudi表
版权归原作者 董可伦 所有, 如有侵权,请联系我们删除。