窗口计算离不开水印,即 watermark,关于 watermark 可以看我上一篇文章 Flink 时间属性及 WATERMARK 水印 。
1. 滚动窗口
滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为
[0:00, 0:05)
、
[0:05, 0:10)
、
[0:10, 0:15)
等窗口。
CREATE TEMPORARY TABLE src_kafka (
`create_time` TIMESTAMP(3) NOT NULL METADATA from 'timestamp',
`offset` bigINT NOT NULL METADATA from 'offset' ,
`partition` INT NOT NULL METADATA from 'partition' ,
cs1 STRING,
ip STRING,
b STRING,
t STRING,
n STRING,
tm BIGINT,
event_time as TO_TIMESTAMP(tm),
watermark for event_time as event_time - INTERVAL '2' SECOND, -- 定义水位线
PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'ai_log',
'properties.bootstrap.servers' = 'alikafka',
'properties.group.id' = 'pyspark-consumer',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end,
b as platform,
count(distinct cs1) as uv
from src_kafka
where n = 'ADCompanyShow'
and cs1 is not null
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE),b
;
上述案例是要按照 每分钟的滚动窗口来统计埋点事件为 ADCompanyShow 的登录用户数量,以平台维度区分,从上图中可知 web 端每分钟登录用户数具体数值。
2 滑动窗口
滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。
滑动窗口有两个参数:slide和size。slide为滑动步长,size为窗口大小。
- slide < size,则窗口会重叠,每个元素会被分配到多个窗口。
- slide = size,则等同于滚动窗口(TUMBLE)。
- slide > size,则为跳跃窗口,窗口之间不重叠且有间隙。
通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。
2.1 slide < size
CREATE TEMPORARY TABLE src_kafka (
`create_time` TIMESTAMP(3) NOT NULL METADATA from 'timestamp',
`offset` bigINT NOT NULL METADATA from 'offset' ,
`partition` INT NOT NULL METADATA from 'partition' ,
cs1 STRING,
ip STRING,
b STRING,
t STRING,
n STRING,
tm BIGINT,
event_time as TO_TIMESTAMP(tm),
watermark for event_time as event_time - INTERVAL '2' SECOND,
PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'ai_log',
'properties.bootstrap.servers' = 'alikafka',
'properties.group.id' = 'pyspark-consumer',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
SELECT
HOP_START (event_time, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_start,
HOP_END (event_time, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end,
COUNT (cs1) as uv
FROM src_kafka
where n = 'ADCompanyShow'
and cs1 is not null
and b ='web'
GROUP BY HOP (event_time, INTERVAL '30' SECOND, INTERVAL '1' MINUTE);
上述案例中,每隔30秒统计1次近1分钟内 埋点事件为 ADCompanyShow 的登录用户数,**请注意**:每个窗口的开始时间、结束时间 会根据 slide 进行调整,因此窗口会重叠。
滑动窗口通过窗口大小和滑动步长定义的重叠窗口,用于连续的实时分析。适用于需要频繁更新的实时分析场景,如实时指标监控、滚动平均值计算等。通过合理使用滑动窗口,可以实现对数据流的连续分析,满足实时性和数据完整性的需求。
2.2 slide = size
SELECT
HOP_START (event_time, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) as window_start,
HOP_END (event_time, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) as window_end,
COUNT (cs1) as uv
FROM src_kafka
where n = 'ADCompanyShow'
and cs1 is not null
and b ='web'
GROUP BY HOP (event_time, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE);
由上图可知,当 滑动步长 == 窗口大小时,滑动窗口就等于滚动窗口。
2.3 slide > size
SELECT
HOP_START (event_time, INTERVAL '2' MINUTE, INTERVAL '1' MINUTE) as window_start,
HOP_END (event_time, INTERVAL '2' MINUTE, INTERVAL '1' MINUTE) as window_end,
COUNT (cs1) as uv
FROM src_kafka
where n = 'ADCompanyShow'
and cs1 is not null
and b ='web'
GROUP BY HOP (event_time, INTERVAL '2' MINUTE, INTERVAL '1' MINUTE);
由上图可知,当 滑动步长 > 窗口大小时,则为跳跃窗口,窗口之间不重叠且有间隙。
3 会话窗口
会话窗口(SESSION)通过 SESSION 活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,该窗口就会关闭。
3.1 会话窗口的特点
- 动态长度:会话窗口的长度是变化的,不是固定的时间段。它会根据你的活动来决定窗口的开始和结束。
- 会话间隙:这是一个设定的时间间隔,如果在时间间隔内没有任何活动,窗口就会关闭,新的活动会开启一个新的会话窗口。
3.2 实际应用
会话窗口经常用于分析用户在网站或应用上的行为。例如:
- 网站分析:你可以分析用户在网站上的会话长度、每个会话的页面浏览量等。
- 购物行为:分析用户在电商网站上的购物行为,一个会话可能代表一次购物体验。
- 用户留存:分析用户的活跃度和留存率,看看用户在一段时间内的活动情况。
SELECT
SESSION_START(event_time, INTERVAL '2' minute) as window_start,
SESSION_END(event_time, INTERVAL '2' minute) as window_end,
cs1 as uid,
COUNT(1) as viewcnt
FROM src_kafka
where n = 'ADCompanyShow'
and cs1 is not null
and b = 'web'
GROUP BY SESSION(event_time, INTERVAL '2' minute), cs1
;
3.3 窗口更新逻辑
会话窗口的开始时间和截止时间是**根据用户活动的时间戳和预定义的会话间隔**来计算的。会话窗口的计算方式如下:
会话窗口开始时间:这是会话窗口中第一个事件的时间戳。
会话窗口截止时间:这是会话窗口中最后一个事件的时间戳加上会话间隔。
假设我们有一系列用户活动的时间戳,并定义会话间隔为 20 分钟。下面是计算会话窗口开始时间和截止时间的步骤:
初始化:- 遍历用户的活动时间戳。- 如果当前会话为空,则将当前事件的时间戳作为会话开始时间。
更新会话窗口:- 对于每个新的事件时间戳,检查是否在当前会话的时间范围内(即是否在会话截止时间之前)。- 如果在会话范围内,则更新会话截止时间为当前事件的时间戳加上会话间隔。- 如果不在会话范围内,则当前会话结束,开始一个新的会话,新的会话开始时间为当前事件的时间戳。
输出会话窗口:- 每当一个会话结束时,输出会话的开始时间和截止时间。- 最后一个会话在遍历结束后输出。
假设我们有以下用户活动时间戳,定义会话间隔为20分钟:
2024-08-05 10:00:00
2024-08-05 10:05:00
2024-08-05 10:25:00
2024-08-05 10:35:00
2024-08-05 11:00:00
计算步骤如下:
第一个事件:
时间戳:2024-08-05 10:00:00
会话开始时间:2024-08-05 10:00:00
会话截止时间:2024-08-05 10:20:00
第二个事件:
时间戳:2024-08-05 10:05:00
在当前会话范围内(10:20之前)
更新会话截止时间:2024-08-05 10:25:00
第三个事件:
时间戳:2024-08-05 10:25:00
在当前会话范围内(10:25之前)
更新会话截止时间:2024-08-05 10:45:00
第四个事件:
时间戳:2024-08-05 10:35:00
在当前会话范围内(10:45之前)
更新会话截止时间:2024-08-05 10:55:00
第五个事件:
时间戳:2024-08-05 11:00:00
不在当前会话范围内(10:55之后)
当前会话结束,输出会话:
会话开始时间:2024-08-05 10:00:00
会话截止时间:2024-08-05 10:55:00
新会话开始时间:2024-08-05 11:00:00
新会话截止时间:2024-08-05 11:20:00
4 累计窗口
在Flink SQL中,累计窗口(Cumulative Window)是一种特殊的窗口类型,用于按固定的时间间隔逐步累积数据。与滚动窗口(Tumbling Window)和滑动窗口(Sliding Window)不同,累计窗口会在每个窗口结束时输出累积的结果。
4.1 累计窗口的特点
- 固定的时间间隔:累计窗口按照固定的时间间隔进行累积。
- 逐步累积:每个窗口的结果是从窗口的开始时间到当前时间的累积结果。
- 多次输出:累计窗口会在多个时间点输出结果,而不是仅在窗口结束时输出。
with src_kafka_1 as (
select *
from src_kafka
where n = 'ADCompanyShow'
and cs1 is not null
and b = 'web'
)
SELECT
window_start,
window_end,
count(distinct cs1) as cumulate_dau
FROM TABLE(
CUMULATE(
TABLE src_kafka_1,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTES,
INTERVAL '1' DAY))
GROUP BY window_start, window_end;
我想计算当天累计活跃用户数,按照1分钟的 step 更新数据,窗口的开始时间是基于时间属性列的时间戳**对窗口步长进行取整计算**的结果,通常从
0
开始,如上图开始时间:2024-08-05 00:00:00;窗口的结束时间是基于时间属性列的时间戳对最大长度进行取整计算的结果,第一个窗口的结束时间:2024-08-05 16:40:00 ,是因为我在这个时间点提交的任务,之后的结束时间戳都是在之前基础上加上一个 step 。
5.OVER 窗口
OVER窗口(OVER Window)是传统数据库的标准开窗,不同于Group By Window,OVER窗口中**每1个元素都对应1个窗口**。OVER窗口可以按照实际元素的行或实际的元素值(时间戳值)确定窗口,因此流数据元素可能分布在多个窗口中。
在应用OVER窗口的流式数据中,每个元素都对应1个OVER窗口。每个元素都触发1次数据计算,**每个触发计算的元素所确定的行,都是该元素所在窗口的最后1行**
Flink SQL中对OVER窗口的定义遵循标准SQL的定义语法,传统OVER窗口没有对其进行更细粒度的窗口类型命名划分。按照计算行的定义方式,OVER Window可以分为以下两类:
- ROWS OVER Window:每1行元素都被视为新的计算行,即每1行都是一个新的窗口。
- RANGE OVER Window:具有相同时间值的所有元素行视为同一计算行,具有相同时间值的所有行都是同一个窗口。
5.1 行窗口
select event_time,cs1 as uid,count(1) over(partition by cs1 order by event_time rows between 2 preceding and current row) as cnt
from src_kafka
where n = 'ADCompanyShow'
and cs1 is not null
and b = 'web'
;
5.2 范围窗口
select
event_time
,cs1 as uid
,count(1) over(partition by cs1 order by event_time RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW) as cnt
from src_kafka
where n = 'ADCompanyShow'
and cs1 is not null
and b = 'web'
;
5.3 总结
尽管离线计算和实时计算中关于 over窗口函数 的代码相同,但两者在执行过程中存在以下主要区别:
- 数据处理方式:- 流式计算:数据是连续不断地到达的,
OVER
窗口函数会在数据流中实时地计算窗口结果。每当新数据到达时,窗口计算会实时更新。- 批计算:数据是一次性读取并处理的,OVER
窗口函数会在整个数据集上一次性计算窗口结果。所有数据都读取完毕后,窗口计算才会开始。 - 计算延迟:- 流式计算:适用于需要低延迟、实时更新的场景。计算结果会随着数据的到达实时更新。- 批计算:适用于数据量较大、对计算延迟要求不高的场景。计算结果在所有数据读取完毕后一次性计算。
- 状态管理:- 流式计算:需要持续管理状态,因为数据是不断到达的。Flink会维护窗口状态,以便在新的数据到达时更新计算结果。- 批计算:状态管理相对简单,因为所有数据一次性读取并处理。窗口状态只在整个数据集上计算时维护。
以上就是本次关于流式计算中不同窗口计算类型,有任何问题欢迎各位在讨论区交流。
版权归原作者 Luckyforever%- 所有, 如有侵权,请联系我们删除。