【背景】
flink有几种聚合,使用上是有一些不同,需要加以区分:
分组聚合:group agg
over聚合:over agg
窗口聚合:window agg
省流版:
触发计算时机
结果流类型
状态大小
分组聚合group agg
每当有新行就输出更新的结果
update流
保持中间结果,所以状态可能无限膨胀
over agg
每当有新行就输出更新的结果,类似一个滑动窗口
append流
保持中间结果,所以状态可能无限膨胀
window agg
窗口结束产生一个总的聚合结果
append流
不生成中间结果,自动清除状态
下面是详细对比和具体的例子(主要讨论的是流处理下的情况)。
over聚合:over agg
OVER 聚合通过排序后的范围数据为每行输入计算出聚合值。和 GROUP BY 聚合不同, OVER 聚合不会把结果通过分组减少到一行,它会为每行输入增加一个聚合值,结果是一个append流。
OVER窗口的语法。
SELECT agg_func(agg_col)OVER([PARTITIONBY col1[, col2,...]]ORDERBY time_col range_definition),...FROM...
over聚合很少用到,所以本地自己做了一个测试:
测试sql如下:
create table test_window_tab
(
region String
,qa_id String
,count_qa_id Bigint
) COMMENT ''
with
(
'properties.bootstrap.servers' ='',
'json.fail-on-missing-field' = 'false',
'connector' = 'kafka',
'format' = 'json',
'topic' = 'test_window_tab'
)
;
create table dwm_qa_score
(
,qa_id String
,agent_id String
,region String
,saas_id String
,version_timestamp bigint
, ts as to_timestamp(from_unixtime(`version_timestamp`, 'yyyy-MM-dd HH:mm:ss'))
,`event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL
,WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND
) COMMENT ''
with
(
'properties.bootstrap.servers' ='',
'json.fail-on-missing-field' = 'false',
'connector' = 'kafka',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'dwm_qa_score'
)
;
insert into test_window_tab(region,qa_id,count_qa_id)
select region,qa_id,count(1) over w as count_qa_id
from dwm_qa_score
window w as(
partition by region,qa_id
order by ts
rows between 2 preceding and current row
)
dwm_qa_score这个topic现有数据:
{ "qa_id": "123", "agent_id": "497235295815123",
"region": "TH", "version_timestamp": 1709807228
}
{ "qa_id": "123", "agent_id": "497235295815123",
"region": "TH", "version_timestamp": 1709807228
}
{ "qa_id": "123", "agent_id": "497235295815123",
"region": "TH", "version_timestamp": 1709807228
}
{ "qa_id": "123", "agent_id": "497235295815123",
"region": "TH", "version_timestamp": 1709807228
}
{ "qa_id": "123", "agent_id": "497235295815123",
"region": "TH", "version_timestamp": 1709807228
}
{ "qa_id": "1234", "agent_id": "497235295815123",
"region": "TH", "version_timestamp": 1709807228
}
当读数据选择了offset=ealiest-offset,则运行程序会得到结果如下:
{"region":"TH","qa_id":"123","count_qa_id":1}
{"region":"TH","qa_id":"123","count_qa_id":2}
{"region":"TH","qa_id":"123","count_qa_id":3}
{"region":"TH","qa_id":"123","count_qa_id":3}
{"region":"TH","qa_id":"123","count_qa_id":3}
{"region":"TH","qa_id":"1234","count_qa_id":1}
这里注意:
- 对每条数据都会返回一个聚合值
- 由于我们是“rowsbetween2precedingandcurrentrow“,所以count_qa_id最多是3
如果此时往dwm_qa_score这个topic插入新数据:
{ "qa_id": "1234", "agent_id": "497235295815123",
"region": "TH"
}
或者
{ "qa_id": "1234", "agent_id": "497235295815123",
"region": "TH","version_timestamp": null
}
或者
{ "qa_id": "1234", "agent_id": "497235295815123",
"region": "TH","version_timestamp": 0
}
会发现flink作业中输出的record多了一条:
但是在目标kafka:test_window_tab中没有新增结果
原因是我们插入的新数据中没有version_timestamp这一列为空或为0
如果往dwm_qa_score这个topic插入新数据:
{
"qa_id": "1234",
"region": "TH",
"version_timestamp": 1710145110
}
则可以看到对应目标kafka:test_window_tab中会新增结果数据
{"region":"TH","qa_id":"1234","count_qa_id":2}
如果等一分钟后,再次往dwm_qa_score这个topic插入新数据:
{
"qa_id": "1234",
"region": "TH",
"version_timestamp": 1710145110
}
则在目标kafka:test_window_tab中没有新增结果,原因应该是数据过期被丢弃了(watermark)
你可以在一个SELECT子句中定义多个OVER窗口聚合。然而,对于流式查询,由于目前的限制,所有聚合的OVER窗口必须是相同的。
ORDER BY
OVER 窗口需要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持OVER窗口定义在升序(asc)的 时间属性 上。其他的排序不支持。
PARTITION BY
OVER窗口可以定义在一个分区表上。PARTITION BY子句代表着每行数据只在其所属的数据分区进行聚合。
范围(RANGE)定义
范围(RANGE)定义指定了聚合中包含了多少行数据。范围通过BETWEEN子句定义上下边界,其内的所有行都会聚合。Flink 只支持 CURRENT ROW 作为上边界。
有两种方法可以定义范围:ROWS间隔 和RANGE间隔
RANGE 间隔
RANGE间隔是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。下面的RANG间隔定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。
RANGE BETWEENINTERVAL'30'MINUTEPRECEDINGANDCURRENTROW
ROW 间隔
ROWS间隔基于计数。它定义了聚合操作包含的精确行数。下面的ROWS间隔定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。
ROWSBETWEEN10PRECEDINGANDCURRENTROW
常见错误
OVER windows' ordering in stream mode must be defined on a time attribute.
这个报错,是建表的时候需要指定时间语义的字段,WATERMARK 是必须的,而且WATERMARK所用字段必须是order by的时间字段,例如下面用的是 order by load_date,那么WATERMARK就要用load_date生成,即WATERMARK FOR load_date AS load_date - INTERVAL '1' MINUTE
object SqlOverRows02 {defmain(args: Array[String]): Unit ={ val settings = EnvironmentSettings.newInstance().inStreamingMode().build() val tEnv = TableEnvironment.create(settings) tEnv.executeSql(""" |create table projects( |id int, |name string, |score double, |load_date timestamp(3), |WATERMARK FOR load_date AS load_date - INTERVAL '1' MINUTE |)with( |'connector' = 'kafka', |'topic' = 'test-topic', |'properties.bootstrap.servers' = 'server120:9092', |'properties.group.id' = 'testGroup', |'scan.startup.mode' = 'latest-offset', |'format' = 'csv' |) |""".stripMargin) tEnv.executeSql(""" |select | name, | max(score) | over(partition by name | order by load_date | RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )max_score, | min(score) | over(partition by name | order by load_date | RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )min_score, | current_time | from | projects |""".stripMargin).print()}}
分组聚合:group agg
Apache Flink 支持标准的GROUP BY子句来聚合数据。
SELECTCOUNT(*)FROM OrdersGROUPBY order_id
特点:
1、聚合函数把多行输入数据计算为一行结果。例如,有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”、“MAX”和 “MIN”。
2、对于流式查询,重要的是要理解 Flink 运行的是连续查询,永远不会终止,会根据其输入表的更新来更新其结果表。对于上述查询,每当有新行插入Orders表时,Flink 都会实时计算并输出更新后的结果。
3、对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如:MIN/MAX的状态是重量级的,COUNT是轻量级的,因为COUNT只需要保存计数值。
因此,可以设置table-exec-state-ttl,但是可能会影响查询结果的正确性,因为状态超时会被丢弃。
注意:
Flink 对于分组聚合提供了一系列性能优化的方法。更多参见:性能优化,包括MiniBatch 聚合、Local-Global 聚合、拆分 distinct 聚合、在 distinct 聚合上使用 FILTER 修饰符 、MiniBatch Regular Joins
窗口聚合:window agg
窗口聚合是通过GROUP BY子句定义的,其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列(必须包含,否则就变成分组聚合等了)。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。
SELECT...FROM<windowed_table>-- relation applied windowing TVFGROUPBY window_start, window_end,...
窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态(watermark超过窗口end+allowlateness,就会销毁窗口)。
具体例子:
SELECT window_start, window_end,SUM(price)AS
total_price
FROMTABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))
GROUPBY window_start, window_end;
+------------------+------------------+-------------+
| window_start | window_end | total_price |
+------------------+------------------+-------------+
|2020-04-1508:00|2020-04-1508:10|11.00|
|2020-04-1508:10|2020-04-1508:20|10.00|
+------------------+------------------+-------------+
版权归原作者 天上地下 所有, 如有侵权,请联系我们删除。