《Flink SQL 语法篇》系列,共包含以下 10 篇文章:
- Flink SQL 语法篇(一):CREATE
- Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
- Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
- Flink SQL 语法篇(四):Group 聚合、Over 聚合
- Flink SQL 语法篇(五):Regular Join、Interval Join
- Flink SQL 语法篇(六):Temporal Join
- Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
- Flink SQL 语法篇(八):集合、Order By、Limit、TopN
- Flink SQL 语法篇(九):Window TopN、Deduplication
- Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Flink SQL 语法篇(四):Group 聚合、Over 聚合
1.Group 聚合
1.1 基础概念
Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合,按窗口划分(纵向)就是 窗口聚合。

1.2 窗口聚合和 Group 聚合
应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行
count
、
sum
等聚合操作。
那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的
Group By key
换成时间就行,那这两个聚合的区别到底在哪?
首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照
1
1
1 分钟的粒度进行聚合,如下 **滚动窗口** SQL:
-- 数据源表CREATETABLE source_table (-- 维度数据
dim STRING,-- 用户 id
user_id BIGINT,-- 用户
price BIGINT,-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),-- watermark 设置
WATERMARK FOR row_time AS row_time -INTERVAL'5'SECOND)WITH('connector'='datagen','rows-per-second'='10','fields.dim.length'='1','fields.user_id.min'='1','fields.user_id.max'='100000','fields.price.min'='1','fields.price.max'='100000')-- 数据汇表CREATETABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint)WITH('connector'='print')-- 数据处理逻辑insertinto sink_table
select dim,count(*)as pv,sum(price)as sum_price,max(price)as max_price,min(price)as min_price,-- 计算 uv 数count(distinct user_id)as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time,interval'1'minute)AS STRING))*1000as window_start
from source_table
groupby
dim,-- 按照 Flink SQL tumble 窗口写法划分窗口
tumble(row_time,interval'1'minute)
转换为 Group 聚合 的写法如下:
-- 数据源表CREATETABLE source_table (-- 维度数据
dim STRING,-- 用户 id
user_id BIGINT,-- 用户
price BIGINT,-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),-- watermark 设置
WATERMARK FOR row_time AS row_time -INTERVAL'5'SECOND)WITH('connector'='datagen','rows-per-second'='10','fields.dim.length'='1','fields.user_id.min'='1','fields.user_id.max'='100000','fields.price.min'='1','fields.price.max'='100000');-- 数据汇表CREATETABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint)WITH('connector'='print');-- 数据处理逻辑insertinto sink_table
select dim,count(*)as pv,sum(price)as sum_price,max(price)as max_price,min(price)as min_price,-- 计算 uv 数count(distinct user_id)as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING)))/60asbigint)as window_start
from source_table
groupby
dim,-- 将秒级别时间戳 / 60 转化为 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING)))/60asbigint)
确实没错,上面这个转换是一点问题都没有的。
但是窗口聚合和 Group by 聚合的差异在于:
- 本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑
allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。 - 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。
1.3 SQL 语义
SQL 语义这里也拿离线和实时做对比,
Order
为 Kafka,
target_table
为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。
- 数据源算子(
From Order):数据源算子一直运行,实时的从OrderKafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的shuffle策略是根据group by中的key进行发送,相同的key发到同一个 SubTask(并发) 中。 - Group 聚合算子(
group by key+sum / count / max / min):接收到上游算子发的一条一条的数据,去状态state中找这个key之前的sum / count / max / min结果。如果有结果oldResult,拿出来和当前的数据进行sum / count / max / min计算出这个key的新结果newResult,并将新结果[key, newResult]更新到state中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息-[key, oldResult],然后再将新结果发往下游+[key, newResult];如果state中没有当前key的结果,则直接使用当前这条数据计算sum / max / min结果newResult,并将新结果[key, newResult]更新到state中,当前是第一次往下游发,则不需要先发回撤消息,直接发送+[key, newResult]。 - 数据汇算子(
INSERT INTO target_table):接收到上游发的一条一条的数据,写入到target_tableKafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于running状态的。
1.4 Group 聚合支持 Grouping sets、Rollup、Cube
Group 聚合也支持
Grouping sets
、
Rollup
、
Cube
。举一个
Grouping sets
的案例:
SELECT
supplier_id
, rating
, product_id
,COUNT(*)FROM(VALUES('supplier1','product1',4),('supplier1','product2',3),('supplier2','product3',3),('supplier2','product4',4))AS Products(supplier_id, product_id, rating)GROUPBY GROUPING SET(( supplier_id, product_id, rating ),( supplier_id, product_id ),( supplier_id, rating ),( supplier_id ),( product_id, rating ),( product_id ),( rating ),())
2.Over 聚合
Over 聚合定义(支持 Batch / Streaming):可以理解为是一种特殊的滑动窗口聚合函数。
那这里我们拿 Over 聚合 与 窗口聚合 做一个对比,其之间的最大不同之处在于:
- 窗口聚合:不在
group by中的字段,不能直接在select中拿到。 - Over 聚合:能够保留原始字段。
注意:其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?
- 应用场景:计算最近一段滑动窗口的聚合结果数据。
- 实际案例:查询每个产品最近一小时订单的金额总和。
SELECT order_id, order_time, amount,SUM(amount)OVER(PARTITIONBY product
ORDERBY order_time
RANGE BETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)AS one_hour_prod_amount_sum
FROM Orders
- Over 聚合的语法总结如下:
SELECT
agg_func(agg_col)OVER([PARTITIONBY col1[, col2,...]]ORDERBY time_col
range_definition),...FROM...
ORDER BY:必须是时间戳列(事件时间、处理时间)。PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照product进行聚合。range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。如下案例所示。
2.1 时间区间聚合
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例
1
1
1 小时的区间,最新输出的一条数据的
sum
聚合结果就是最近一小时数据的
amount
之和。
CREATETABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMPasTIMESTAMP(3)),
WATERMARK FOR order_time AS order_time -INTERVAL'0.001'SECOND)WITH('connector'='datagen','rows-per-second'='1','fields.order_id.min'='1','fields.order_id.max'='2','fields.amount.min'='1','fields.amount.max'='10','fields.product.min'='1','fields.product.max'='2');CREATETABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT)WITH('connector'='print');INSERTINTO sink_table
SELECT product, order_time, amount,SUM(amount)OVER(PARTITIONBY product
ORDERBY order_time
-- 标识统计范围是一个 product 的最近 1 小时的数据
RANGE BETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)AS one_hour_prod_amount_sum
FROM source_table
2.2 行数聚合
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的
sum
聚合结果就是最近
5
5
5 行数据的
amount
之和。
CREATETABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMPasTIMESTAMP(3)),
WATERMARK FOR order_time AS order_time -INTERVAL'0.001'SECOND)WITH('connector'='datagen','rows-per-second'='1','fields.order_id.min'='1','fields.order_id.max'='2','fields.amount.min'='1','fields.amount.max'='2','fields.product.min'='1','fields.product.max'='2');CREATETABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT)WITH('connector'='print');INSERTINTO sink_table
SELECT product, order_time, amount,SUM(amount)OVER(PARTITIONBY product
ORDERBY order_time
-- 标识统计范围是一个 product 的最近 5 行数据ROWSBETWEEN5PRECEDINGANDCURRENTROW)AS one_hour_prod_amount_sum
FROM source_table
预跑结果如下:
+I[2,2021-12-24T22:18:19.147,1,9]+I[1,2021-12-24T22:18:20.147,2,11]+I[1,2021-12-24T22:18:21.147,2,12]+I[1,2021-12-24T22:18:22.147,2,12]+I[1,2021-12-24T22:18:23.148,2,12]+I[1,2021-12-24T22:18:24.147,1,11]+I[1,2021-12-24T22:18:25.146,1,10]+I[1,2021-12-24T22:18:26.147,1,9]+I[2,2021-12-24T22:18:27.145,2,11]+I[2,2021-12-24T22:18:28.148,1,10]+I[2,2021-12-24T22:18:29.145,2,10]
当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:
SELECT order_id, order_time, amount,SUM(amount)OVER w AS sum_amount,AVG(amount)OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS(PARTITIONBY product
ORDERBY order_time
RANGE BETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。