0


【Flink】窗口实战:TUMBLE、HOP、SESSION

窗口实战:TUMBLE、HOP、SESSION 

在流式计算中,流通常是无穷无尽的,我们无法知道什么时候数据源会继续 / 停止发送数据,所以在流上处理聚合事件(

count

sum

等)的处理方式与批处理中的处理方式会有所差异。在流上一般用窗口(Window)来限定聚合的范围,例如 “过去 2 分钟网站点击量的计数”、“在最近 100 个人中点赞这个视频的总人数”。窗口的概念相当于帮我们收集了一张有限数据的动态表,我们可以对表中的数据进行聚合计算。

窗口函数是一种特殊的函数,它并不在

SELECT

的投影列表中使用,而是在

GROUP BY

子句中使用。

1.TUMBLE WINDOW

TUMBLE WINDOW(滚动窗口)将每个进入的数据分配到一个指定窗口大小的窗口中。滚动窗口可以自定义固定的大小,并且不会出现重叠。我们可以对窗口内的数据进行计算。

1.1 语法

TUMBLE(time_attr,interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR
  • 如果在 Event Time 时间模式下(使用 WATERMARK FOR 语句定义了时间戳字段),那么 TUMBLE、HOP、SESSION 窗口函数的第一个参数必须为该字段。
  • 如果在 Processing Time 时间模式下,则 TUMBLE、HOP、SESSION 窗口函数的第一个参数必须为 proctime() 函数生成的计算列,下文用 PROCTIME 举例,请在实际作业中替换为实际的列名。

1.2 标识函数

  函数名 

  功能描述 
TUMBLE_START(time-attr, size-interval)

返回窗口的起始时间(包含边界)。例如

[00:10, 00:15)

窗口,返回

00:10

TUMBLE_END(time-attr, size-interval)

返回窗口的结束时间(包含边界)。例如

[00:00, 00:15]

窗口,返回

00:15

TUMBLE_ROWTIME(time-attr, size-interval)

返回窗口的结束时间(不包含边界)。例如

[00:00, 00:15]

窗口,返回

00:14:59.999

。返回值是一个

rowtime attribute

,即可以基于该字段做时间属性的操作。

TUMBLE_PROCTIME(time-attr, size-interval)

返回窗口的结束时间(不包含边界)。例如

[00:00, 00:15]

窗口,返回

00:14:59.999

。返回值是一个

proctime attribute

,即可以基于该字段做时间属性的操作。

1.3 模拟用例

下文以 TUMBLE WINDOW 为例,帮助您更容易地理解 TUMBLE WINDOW。使用 Event Time 模拟统计 每小时各用户收入金额

示例数据:
username(VARCHAR)income(BIGINT)times(TIMESTAMP)Tom202021-11-11 10:30:00.0Jack102021-11-11 10:35:00.0Tom102021-11-11 10:35:00.0Tom102021-11-11 10:40:00.0Tom152021-11-11 11:30:00.0Jack102021-11-11 11:30:00.0Jack152021-11-11 11:40:00.0

CREATETABLE user_income (
    username VARCHAR,
    income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times -INTERVAL'3'SECOND)WITH('connector'='filesystem','path'='input/sales01.csv','format'='csv');CREATETABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT)WITH('connector'='print');INSERTINTO output
SELECT
    TUMBLE_START(times,INTERVAL'1'HOUR),
    TUMBLE_END(times,INTERVAL'1'HOUR),
    username,SUM(income)FROM user_income
GROUPBY TUMBLE(times,INTERVAL'1'HOUR),username;

在这里插入图片描述

2.HOP WINDOW

HOP WINDOW(滑动窗口)将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。

HOP WINDOW 保持窗口大小(Size)不变,每次滑动指定的时间周期(Slide),因而允许窗口之间的相互重叠。

Slide 的大小决定了 Flink 创建新窗口的频率。

  • 当 Slide 小于 Size 时,相邻窗口会重叠,一个时间会被分配到多个窗口。
  • 当 Slide 大于 Size 时,可能会导致有些事件被丢弃。
  • 当 Slide 等于 Size 时,等于是 TUMBLE WINDOW。

2.1 语法

HOP(time_attr, sliding_interval, window_size_interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • sliding_interval:用来设置 滑动时间周期大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR
  • window_size_interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR

2.2 标识函数

  函数名 

  功能描述 
HOP_START(time-attr, slide-interval,size-interval)

返回该窗口的起始时间

HOP_END(time-attr, slide-interval,size-interval)

返回该窗口的结束时间

2.3 模拟用例

下文以 HOP WINDOW 为例,帮助您更容易地理解 HOP WINDOW。使用 Event Time 模拟统计每小时各用户收入金额,1 小时的窗口,30 分钟滑动一次

示例数据:
username(VARCHAR)income(BIGINT)times(TIMESTAMP)Tom202021-11-11 10:30:00.0Jack102021-11-11 10:35:00.0Tom102021-11-11 10:35:00.0Tom102021-11-11 10:40:00.0Tom152021-11-11 11:35:00.0Jack102021-11-11 11:30:00.0Jack152021-11-11 11:40:00.0

CREATETABLE user_income (
    username VARCHAR,
    Income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times -INTERVAL'3'MINUTE)WITH('connector'='filesystem','path'='input/sales02.csv','format'='csv');CREATETABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT)WITH('connector'='print');INSERTINTO output
SELECT
    HOP_START(times,INTERVAL'30'MINUTE,INTERVAL'1'HOUR),
    HOP_END(times,INTERVAL'30'MINUTE,INTERVAL'1'HOUR),
    username,SUM(income)FROM user_income
GROUPBY HOP(times,INTERVAL'30'MINUTE,INTERVAL'1'HOUR),username;

在这里插入图片描述

3.SESSION WINDOW

SESSION WINDOW(会话窗口)通过 Session 活动对元素进行分组,Session 窗口与滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 Session 窗口通过一个 sSession 间隔来配置。这个 Session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 Session 将关闭并且后续的元素将被分配到新的 Session 窗口中。

Session Window 并非以长度来划分窗口,而是以 非活跃时间 来划分。例如超过 30 分钟不活跃(没有新数据),则之前的窗口结束,下一个来到的数据将会形成一个新窗口。

3.1 语法

SESSION(time_attr,interval)
  • time_attr:表示 时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 Flink 处理的时刻,一般用在 Processing Time 模式下。
  • interval:用来设置 窗口大小。例如,设置为 1 天:INTERVAL '1' DAY;设置为 2 小时:INTERVAL '2' HOUR

3.2 标识函数

  函数名 

  功能描述 
SESSION_START(time-attr, size-interval)

返回该窗口的起始时间

SESSION_END(time-attr, size-interval)

返回该窗口的结束时间

3.3 模拟用例

下文以 SESSION WINDOW 为例,帮助您更容易地理解 SESSION WINDOW。使用 Event Time 模拟统计每小时各用户收入金额,会话超时时长为 30 分钟

样例数据:
username(VARCHAR)income(BIGINT)times(TIMESTAMP)Tom202021-11-11 10:30:00.0Jack102021-11-11 10:35:00.0Tom102021-11-11 10:35:00.0Tom102021-11-11 10:40:00.0Tom152021-11-11 11:50:00.0Jack102021-11-11 11:40:00.0Jack152021-11-11 11:45:00.0

CREATETABLE user_income (
    username VARCHAR,
    income INT,
    times TIMESTAMP(3),
    WATERMARK FOR times AS times -INTERVAL'3'MINUTE)WITH('connector'='filesystem','path'='input/sales03.csv','format'='csv');CREATETABLE output (
    win_start TIMESTAMP,
    win_end TIMESTAMP,
    username VARCHAR,
    hour_income BIGINT)WITH('connector'='print');INSERTINTO output
SELECT
    SESSION_START(times,INTERVAL'30'MINUTE),
    SESSION_END(times,INTERVAL'30'MINUTE),
    username,SUM(income)FROM user_income
GROUPBYSESSION(times,INTERVAL'30'MINUTE),username;

在这里插入图片描述

4.更多说明

以上三种窗口都有对应的辅助函数。以 TUMBLE 窗口为例(HOP、SESSION 也一样,只是前缀不同),辅助函数如下:

  • TUMBLE_ROWTIME:表示 TUMBLE 窗口的末端界限(包含,可用作 JOIN 或 GROUP 以及 OVER 条件,Event Time 时间模式下使用)。示例如下:
SELECTuser,
       TUMBLE_START(rowtime,INTERVAL'12'HOUR)AS sStart,
       TUMBLE_ROWTIME(rowtime,INTERVAL'12'HOUR)AS snd,SUM(amount)FROM Orders
GROUPBY TUMBLE(rowtime,INTERVAL'12'HOUR),user
  • TUMBLE_PROCTIME:表示 TUMBLE 窗口的末端界限(包含,可用作 JOIN 或 GROUP 以及 OVER 条件,Processing Time 时间模式下使用)。示例如下:
SELECTuser,
       TUMBLE_START(PROCTIME,INTERVAL'12'HOUR)AS sStart,
       TUMBLE_PROCTIME(PROCTIME,INTERVAL'12'HOUR)AS snd,SUM(amount)FROM Orders
GROUPBY TUMBLE(PROCTIME,INTERVAL'12'HOUR),user
标签: flink sql TUMBLE

本文转载自: https://blog.csdn.net/be_racle/article/details/136966114
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。

“【Flink】窗口实战:TUMBLE、HOP、SESSION”的评论:

还没有评论