《Flink SQL 语法篇》系列,共包含以下 10 篇文章:
- Flink SQL 语法篇(一):CREATE
- Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
- Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
- Flink SQL 语法篇(四):Group 聚合、Over 聚合
- Flink SQL 语法篇(五):Regular Join、Interval Join
- Flink SQL 语法篇(六):Temporal Join
- Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
- Flink SQL 语法篇(八):集合、Order By、Limit、TopN
- Flink SQL 语法篇(九):Window TopN、Deduplication
- Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Flink SQL 语法篇(九):Window TopN、Deduplication
1.Window TopN
Window TopN 定义(支持 Streaming):Window TopN 是一种特殊的 TopN,它的返回结果是每一个窗口内的 N 个最小值或者最大值。
应用场景:小伙伴萌会问了,我有了 TopN 为啥还需要 Window TopN 呢?还记得上一篇博客介绍 TopN 说道的 TopN 时会出现中间结果,从而出现回撤数据的嘛?Window TopN 不会出现回撤数据,因为 Window TopN 实现是在窗口结束时输出最终结果,不会产生中间结果。而且注意,因为是窗口上面的操作,Window TopN 在窗口结束时,会自动把 State 给清除。
SQL 语法标准:
SELECT[column_list]FROM(SELECT[column_list],
ROW_NUMBER()OVER(PARTITIONBY window_start, window_end [, col_key1...]ORDERBY col1 [asc|desc][, col2 [asc|desc]...])AS rownum
FROM table_name)-- windowing TVFWHERE rownum <= N [AND conditions]
实际案例:取当前这一分钟的搜索关键词下的搜索热度前 10 名的词条数据。
-- 输入表字段:-- 字段名 备注-- key 搜索关键词-- name 搜索热度名称-- search_cnt 热搜消费热度(比如 3000)-- timestamp 消费词条时间戳CREATETABLE source_table (
name BIGINTNOTNULL,
search_cnt BIGINTNOTNULL,keyBIGINTNOTNULL,
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),
WATERMARK FOR row_time AS row_time
)WITH(...);-- 输出表字段:-- 字段名 备注-- key 搜索关键词-- name 搜索热度名称-- search_cnt 热搜消费热度(比如 3000)-- window_start 窗口开始时间戳-- window_end 窗口结束时间戳CREATETABLE sink_table (keyBIGINT,
name BIGINT,
search_cnt BIGINT,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3))WITH(...);-- 处理 sql:INSERTINTO sink_table
SELECTkey, name, search_cnt, window_start, window_end
FROM(SELECTkey, name, search_cnt, window_start, window_end,
ROW_NUMBER()OVER(PARTITIONBY window_start, window_end,keyORDERBY search_cnt desc)AS rownum
FROM(SELECT window_start, window_end,key, name,max(search_cnt)as search_cnt
-- window tvf 写法FROMTABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time),INTERVAL'1' MINUTES))GROUPBY window_start, window_end,key, name
))WHERE rownum <=100
输出结果:
+I[关键词1, 词条1,8670,2021-1-28T22:34,2021-1-28T22:35]+I[关键词1, 词条2,6928,2021-1-28T22:34,2021-1-28T22:35]+I[关键词1, 词条3,1735,2021-1-28T22:34,2021-1-28T22:35]+I[关键词1, 词条4,7287,2021-1-28T22:34,2021-1-28T22:35]...
SQL 语义:
- 数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,将数据按照窗口聚合的 Key 通过 Hash 分发策略发送到下游窗口聚合算子。
- 窗口聚合算子:进行窗口聚合计算,随着时间的推进,将窗口聚合结果计算完成发往下游窗口排序算子。
- 窗口排序算子:这个算子其实也是一个窗口算子,只不过这个窗口算子为每个 Key 维护了一个 TopN 的榜单数据,接受到上游发送的窗口结果数据进行排序,随着时间的推进,窗口的结束,将排序的结果输出到下游数据汇算子。
- 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。
2.Deduplication
Deduplication 定义(支持 Batch / Streaming):Deduplication 其实就是去重,也即上文介绍到的 TopN 中
row_number = 1
的场景,但是这里有一点不一样在于其 排序字段 一定是 时间属性列,不能是其他非时间属性的普通列。在
row_number = 1
时,如果排序字段是普通列 Planner 会翻译成 TopN 算子,如果是时间属性列 Planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。
应用场景:比如上游数据发重了,或者计算 DAU 明细数据等场景,都可以使用 Deduplication 语法去做去重。
SQL 语法标准:
SELECT[column_list]FROM(SELECT[column_list],
ROW_NUMBER()OVER([PARTITIONBY col1[, col2...]]ORDERBY time_attr [asc|desc])AS rownum
FROM table_name)WHERE rownum =1
ROW_NUMBER()
:标识当前数据的排序值。PARTITION BY col1[, col2...]
:标识分区字段,代表按照这个col
字段作为分区粒度对数据进行排序。ORDER BY time_attr [asc|desc]
:标识排序规则,必须为时间戳列,当前 Flink SQL 支持处理时间、事件时间,ASC 代表保留第一行,DESC 代表保留最后一行。WHERE rownum = 1
:这个子句是一定需要的,而且必须为rownum = 1
。
2.1 案例 1(事件时间)
某一游戏用户等级的场景,每一个用户都有一个用户等级,需要求出当前用户等级在 星星⭐,月亮🌙,太阳🌞 的用户数分别有多少。
-- 数据源:当每一个用户的等级初始化及后续变化的时候的数据,即用户等级变化明细数据。CREATETABLE source_table (
user_id BIGINTCOMMENT'用户 id',level STRING COMMENT'用户等级',
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3))COMMENT'事件时间戳',
WATERMARK FOR row_time AS row_time
)WITH('connector'='datagen','rows-per-second'='1','fields.level.length'='1','fields.user_id.min'='1','fields.user_id.max'='1000000');-- 数据汇:输出即每一个等级的用户数CREATETABLE sink_table (level STRING COMMENT'等级',
uv BIGINTCOMMENT'当前等级用户数',
row_time timestamp(3)COMMENT'时间戳')WITH('connector'='print');-- 处理逻辑:INSERTINTO sink_table
selectlevel,count(1)as uv,max(row_time)as row_time
from(SELECT
user_id,level,
row_time,
row_number()over(partitionby user_id orderby row_time)as rn
FROM source_table
)where rn =1groupbylevel
输出结果:
+I[等级 1,6928,2021-1-28T22:34]-I[等级 1,6928,2021-1-28T22:34]+I[等级 1,8670,2021-1-28T22:34]-I[等级 1,8670,2021-1-28T22:34]+I[等级 1,77287,2021-1-28T22:34]...
可以看到其有回撤数据。
其对应的 SQL 语义如下:
- 数据源:消费到 Kafka 中数据后,将数据按照
partition by
的 Key 通过 Hash 分发策略发送到下游去重算子。 - Deduplication 去重算子:接受到上游数据之后,根据
order by
中的条件判断当前的这条数据和之前数据时间戳大小,以上面案例来说,如果当前数据时间戳大于之前数据时间戳,则撤回之前向下游发的中间结果,然后将最新的结果发向下游(发送策略也为 Hash,具体的 Hash 策略为按照group by
中 Key 进行发送),如果当前数据时间戳小于之前数据时间戳,则不做操作。此算子产出的结果就是每一个用户的对应的最新等级信息。 - Group by 聚合算子:接受到上游数据之后,根据
Group by
聚合粒度对数据进行聚合计算结果(每一个等级的用户数),发往下游数据汇算子。 - 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。
2.2 案例 2(处理时间)
最原始的日志是明细数据,需要我们根据用户
id
筛选出这个用户当天的第一条数据,发往下游,下游可以据此计算分各种维度的 DAU。
-- 数据源:原始日志明细数据CREATETABLE source_table (
user_id BIGINTCOMMENT'用户 id',
name STRING COMMENT'用户姓名',
server_timestamp BIGINTCOMMENT'用户访问时间戳',
proctime AS PROCTIME())WITH('connector'='datagen','rows-per-second'='1','fields.name.length'='1','fields.user_id.min'='1','fields.user_id.max'='10','fields.server_timestamp.min'='1','fields.server_timestamp.max'='100000');-- 数据汇:根据 user_id 去重的第一条数据CREATETABLE sink_table (
user_id BIGINT,
name STRING,
server_timestamp BIGINT)WITH('connector'='print');-- 处理逻辑:INSERTINTO sink_table
select user_id,
name,
server_timestamp
from(SELECT
user_id,
name,
server_timestamp,
row_number()over(partitionby user_id orderby proctime)as rn
FROM source_table
)where rn =1
输出结果:
+I[1, 用户 1,2021-1-28T22:34]+I[2, 用户 2,2021-1-28T22:34]+I[3, 用户 3,2021-1-28T22:34]...
可以看到这个处理逻辑是没有回撤数据的。其对应的 SQL 语义如下:
- 数据源:消费到 Kafka 中数据后,将数据按照
partition by
的 Key 通过 Hash 分发策略发送到下游去重算子。 - Deduplication 去重算子:处理时间语义下,如果是当前 Key 的第一条数据,则直接发往下游,如果判断(根据 State 中是否存储过该 Key)不是第一条,则直接丢弃。
- 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。
⭐ 在 Deduplication 关于是否会出现回撤流,博主总结如下:
- Order by 事件时间 DESC:会出现回撤流,因为当前 Key 下 可能会有 比当前事件时间还大的数据。
- Order by 事件时间 ASC:会出现回撤流,因为当前 Key 下 可能会有 比当前事件时间还小的数据。
- Order by 处理时间 DESC:会出现回撤流,因为当前 Key 下 可能会有 比当前处理时间还大的数据。
- Order by 处理时间 ASC:不会出现回撤流,因为当前 Key 下 不可能会有 比当前处理时间还小的数据。
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。