《Flink SQL 语法篇》系列,共包含以下 10 篇文章:
- Flink SQL 语法篇(一):CREATE
- Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
- Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
- Flink SQL 语法篇(四):Group 聚合、Over 聚合
- Flink SQL 语法篇(五):Regular Join、Interval Join
- Flink SQL 语法篇(六):Temporal Join
- Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
- Flink SQL 语法篇(八):集合、Order By、Limit、TopN
- Flink SQL 语法篇(九):Window TopN、Deduplication
- Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Flink SQL 语法篇(五):Regular Join、Interval Join
Flink 也支持了非常多的数据 Join 方式,主要包括以下三种:
- 动态表(流)与动态表(流)的 Join
- 动态表(流)与外部维表(比如 Redis)的 Join
- 动态表字段的列转行(一种特殊的 Join)
细分 Flink SQL 支持的 Join:
Regular Join
:流与流的 Join,包括 Inner Equal Join、Outer Equal JoinInterval Join
:流与流的 Join,两条流一段时间区间内的 JoinTemporal Join
:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 JoinLookup Join
:流与外部维表的 JoinArray Expansion
:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行Table Function
:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join
1.Regular Join
Regular Join 定义(支持 Batch / Streaming):Regular Join 其实就是和离线 Hive SQL 一样的 Regular Join,通过条件关联两条流数据输出。
应用场景:Join 其实在我们的数仓建设过程中应用是非常广泛的。离线数仓可以说基本上是离不开 Join 的。那么实时数仓的建设也必然离不开 Join,比如日志关联扩充维度数据,构建宽表;日志通过 ID 关联计算 CTR。
Regular Join 包含以下几种(以
L
作为左流中的数据标识,
R
作为右流中的数据标识):
Inner Join
(Inner Equal Join
):流任务中,只有两条流 Join 到才输出,输出+[L, R]
。Left Join
(Outer Equal Join
):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出+[L, R]
,没 Join 到输出+[L, null]
),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出-[L, null]
,然后输出+[L, R]
。Right Join
(Outer Equal Join
):有 Left Join 一样,左表和右表的执行逻辑完全相反。Full Join
(Outer Equal Join
):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出+[L, R]
,没 Join 到输出+[null, R]
;对左流来说:Join 到输出+[L, R]
,没 Join 到输出+[L, null]
)。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤-[null, R]
,输出+[L, R]
,右流数据到达为例:回撤-[L, null]
,输出+[L, R]
)。
1.1 Inner Join 案例
实际案例:案例为 曝光日志 关联 点击日志 筛选既有曝光又有点击的数据。
-- 曝光日志数据CREATETABLE show_log_table (
log_id BIGINT,
show_params STRING
)WITH('connector'='datagen','rows-per-second'='2','fields.show_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='100');-- 点击日志数据CREATETABLE click_log_table (
log_id BIGINT,
click_params STRING
)WITH('connector'='datagen','rows-per-second'='2','fields.click_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
)WITH('connector'='print');-- 流的 INNER JOIN,条件为 log_idINSERTINTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
INNERJOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[5, d,5, f]+I[5, d,5,8]+I[5, d,5,2]+I[3,4,3,0]+I[3,4,3,3]...
1.2 Left Join 案例
CREATETABLE show_log_table (
log_id BIGINT,
show_params STRING
)WITH('connector'='datagen','rows-per-second'='1','fields.show_params.length'='3','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE click_log_table (
log_id BIGINT,
click_params STRING
)WITH('connector'='datagen','rows-per-second'='1','fields.click_params.length'='3','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
)WITH('connector'='print');INSERTINTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
LEFTJOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[5, f3c,5, c05]+I[5,6e2,5,1f6]+I[5,86b,5,1f6]+I[5, f3c,5,1f6]-D[3,4ab,null,null]-D[3,6f2,null,null]+I[3,4ab,3,765]+I[3,6f2,3,765]+I[2,3c4,null,null]+I[3,4ab,3, a8b]+I[3,6f2,3, a8b]+I[2, c03,null,null]...
1.3 Full Join 案例
CREATETABLE show_log_table (
log_id BIGINT,
show_params STRING
)WITH('connector'='datagen','rows-per-second'='2','fields.show_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE click_log_table (
log_id BIGINT,
click_params STRING
)WITH('connector'='datagen','rows-per-second'='2','fields.click_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
)WITH('connector'='print');INSERTINTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
FULLJOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[null,null,7,6]+I[6,5,null,null]-D[1, c,null,null]+I[1, c,1,2]+I[3,1,null,null]+I[null,null,7, d]+I[10,0,null,null]+I[null,null,2,6]-D[null,null,7,6]-D[null,null,7, d]...
关于 Regular Join 的注意事项:
- 实时 Regular Join 可以不是 等值 Join。等值 Join 和 非等值 Join 区别在于,等值 Join 数据 Shuffle 策略是
Hash
,会按照 Join on 中的等值条件作为id
发往对应的下游;非等值 Join 数据 Shuffle 策略是Global
,所有数据发往一个并发,按照非等值条件进行关联。 - Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出。
- 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。
2.Interval Join(时间区间 Join)
Interval Join 定义(支持 Batch / Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。
应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生 回撤流,但是在实时数仓中一般写入的 Sink 都是类似于 Kafka 这样的消息队列,然后后面接 Clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以博主理解 Interval Join 就是用于消灭回撤流的。
Interval Join 包含以下几种(以
L
作为左流中的数据标识,
R
作为右流中的数据标识):
Inner Interval Join
:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出+[L, R]
Left Interval Join
:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,则会输出+[L, R]
。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出+[L, null]
,如果右流 State 中的数据过期了,就直接从 State 中删除。Right Interval Join
:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反Full Interval Join
:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出+[L, R]
。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据过期了,就将这些数据从 State 中删除并且输出(左流过期输出+[L, null]
,右流过期输出+[null, R]
)
可以发现
Inner Interval Join
和其他三种
Outer Interval Join
的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
2.1 Inner Interval Join
实际案例:还是刚刚的案例,曝光日志 关联 点击日志 筛选既有曝光又有点击的数据,条件是曝光关联之后发生
4
4
4 小时之内的点击。
CREATETABLE show_log_table (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),
WATERMARK FOR row_time AS row_time
)WITH('connector'='datagen','rows-per-second'='1','fields.show_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE click_log_table (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),
WATERMARK FOR row_time AS row_time
)WITH('connector'='datagen','rows-per-second'='1','fields.click_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
)WITH('connector'='print');INSERTINTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table INNERJOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time -INTERVAL'4'HOURAND click_log_table.row_time;
输出结果如下:
6>+I[2, a,2,6]6>+I[2,6,2,6]2>+I[4,1,4,5]2>+I[10,8,10, d]2>+I[10,7,10, d]2>+I[10, d,10, d]2>+I[5, b,5, d]6>+I[1, a,1,7]
2.2 Left Interval Join
CREATETABLE show_log (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),
WATERMARK FOR row_time AS row_time
)WITH('connector'='datagen','rows-per-second'='1','fields.show_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE click_log (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),
WATERMARK FOR row_time AS row_time
)WITH('connector'='datagen','rows-per-second'='1','fields.click_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
)WITH('connector'='print');INSERTINTO sink_table
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log LEFTJOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time -INTERVAL'5'SECONDAND click_log.row_time +INTERVAL'5'SECOND;
输出结果如下:
+I[6, e,6,7]+I[11, d,null,null]+I[7, b,null,null]+I[8,0,8,3]+I[13,6,null,null]
2.3 Full Interval Join
CREATETABLE show_log (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),
WATERMARK FOR row_time AS row_time
)WITH('connector'='datagen','rows-per-second'='1','fields.show_params.length'='1','fields.log_id.min'='5','fields.log_id.max'='15');CREATETABLE click_log (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),
WATERMARK FOR row_time AS row_time
)WITH('connector'='datagen','rows-per-second'='1','fields.click_params.length'='1','fields.log_id.min'='1','fields.log_id.max'='10');CREATETABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
)WITH('connector'='print');INSERTINTO sink_table
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log LEFTJOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time -INTERVAL'5'SECONDAND click_log.row_time +INTERVAL'5'SECOND;
输出结果如下:
+I[6,1,null,null]+I[7,3,7,8]+I[null,null,6,6]+I[null,null,4, d]+I[8, d,null,null]+I[null,null,3, b]
关于 Interval Join 的注意事项:
- 实时 Interval Join 可以不是 等值 Join。等值 Join 和 非等值 Join 区别在于,等值 Join 数据 Shuffle 策略是
Hash
,会按照 Join on 中的等值条件作为id
发往对应的下游;非等值 Join 数据 Shuffle 策略是Global
,所有数据发往一个并发,然后将满足条件的数据进行关联输出。
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。