0


27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数 )介绍及详细示例(3)

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)

41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录


本文简单的介绍了Flink 的窗口函数(即滚动窗口、滑动窗口和累积窗口)及具体的示例验证过程。
本文依赖flink和kafka集群能正常使用。
本文的示例是在Flink 1.17版本中验证的。
注:
本文在写作过程中出现1个官方示例直接使用offset关键字设置偏移量时不能运行通过情况和一个官方示例在表有主键的情况下不能通过窗口进行聚合运行通过,由于想尽快将该专栏写完,故未深究,待完成该专栏后再仔细的研究原因。当然知道原因的大佬,欢迎指出,谢谢。
分别对应的目录是:
1)、示例1-使用offset累积窗口查询、统计
2)、示例2-使用滚动窗口查询、统计(表含主键)

一、Windowing table-valued functions (Windowing TVFs)

Windows 是处理无限流的核心。Windows 将流拆分为有限大小的“桶”,我们可以在其上应用计算。本文档重点介绍如何在 Flink SQL 中执行窗口化,以及程序员如何从其提供的功能中获得最大收益。

Apache Flink 提供了几个窗口表值函数 (TVF) 来将表的元素划分为窗口,包括:

  • Tumble Windows(滚动窗口)
  • Hop Windows(滑动窗口)
  • Cumulate Windows(累积窗口)
  • Session Windows (会话窗口,截至Flink 1.17版本还不支持)

每个元素在逻辑上可以属于多个窗口,具体取决于您使用的窗口表值函数。例如,HOP 窗口创建重叠窗口,其中可以将单个元素分配给多个窗口。

Windowing TVFs 是 Flink 定义的多态表函数(缩写为 PTF)。PTF 是 SQL 2016 标准的一部分,这是一个特殊的表函数,但可以将表作为参数。PTF 是更改表格形状的强大功能。由于 PTF 在语义上与表类似,因此它们的调用发生在 SELECT 语句的 FROM 子句中。

Windowing TVFs 是旧版分组窗口函数的替代品。Windowing TVFs 更符合 SQL 标准,并且更强大,可以支持复杂的基于窗口的计算,例如 Window TopN、Window Join。但是,分组窗口函数只能支持窗口聚合。

如何应用基于窗口 TVF 的进一步计算,将在后面的章节中进行介绍:

  • Window Aggregation
  • Window TopN
  • Window Join
  • Window Deduplication

Apache Flink 提供了 3 个内置windowing TVFs:TUMBLE、HOP 和 CUMULATE。windowing TVF 的返回值是一个新关系,它包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的附加 3 列,以指示分配的窗口。在流式处理模式下,“window_time”字段是窗口的时间属性。在批处理模式下,“window_time”字段是基于输入时间字段类型的 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。“window_time”字段可用于后续的基于时间的操作,例如聚合上的另一个windowing TVF或 interval joins。window_time的值始终等于 window_end - 1ms。

1、TUMBLE滚动窗口

TUMBLE 函数将每个元素分配给指定窗口大小的窗口。Tumbling windows具有固定大小,不会重叠。例如,假设您指定了一个大小为 5 分钟的Tumbling windows。在这种情况下,Flink 将评估当前窗口,并且每五分钟启动一个新窗口,如下图所示。
在这里插入图片描述

TUMBLE 函数根据时间属性字段为关系的每一行分配一个窗口。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。TUMBLE 的返回值是一个新关系,包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是window TVF 之后的常规timestamp 列。

TUMBLE 函数三个必需参数,一个可选参数:

TUMBLE(TABLEdata, DESCRIPTOR(timecol), size [,offset])# data:是一个表参数,可以是与时间属性列的任何关系。# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到tumbling windows。# size:是指定tumbling windows宽度的持续时间。# offset:是一个可选参数,用于指定窗口开始移动的偏移量。

1)、示例1-使用滚动窗口查询、统计(表不含主键)

具体验证过程如下

---1、建表
Flink SQL>CREATETABLE orders (>`id`    STRING,>     price       DECIMAL(32,2),>     proctime as PROCTIME()>)WITH(>'connector'='kafka',>'topic'='orders_topic',>'properties.bootstrap.servers'='192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',>'properties.group.id'='testGroup',>'scan.startup.mode'='earliest-offset',>'format'='csv'>);---2、插入数据并查询# 插入数据略,就是用kafka写入该表中# 最终表内数据---3、滑动窗口的2种方式查询SELECT*FROMTABLE(TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'5' MINUTES));# 或SELECT*FROMTABLE(
    TUMBLE(DATA=>TABLE orders,-- DATA必须是第一个参数
      TIMECOL => DESCRIPTOR(proctime),
      SIZE =>INTERVAL'5' MINUTES));

Flink SQL>SELECT*FROMTABLE(>    TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'5' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1910:38:26.566|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.566||+I |2|15.00|2023-09-1910:38:26.566|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.566||+I |3|20.00|2023-09-1910:38:26.566|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.566||+I |4|30.00|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567||+I |5|60.00|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567||+I |6|800.98|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567||+I |7|100.90|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567||+I |8|11.00|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567||+I |9|18.00|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567||+I |10|123.00|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567||+I |11|35.78|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567||+I |12|45.68|2023-09-1910:38:26.567|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:26.567|

Flink SQL>SELECT*FROMTABLE(>    TUMBLE(>DATA=>TABLE orders,>      TIMECOL => DESCRIPTOR(proctime),>      SIZE =>INTERVAL'5' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1910:38:58.165|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.165||+I |2|15.00|2023-09-1910:38:58.165|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.165||+I |3|20.00|2023-09-1910:38:58.165|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.165||+I |4|30.00|2023-09-1910:38:58.165|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.165||+I |5|60.00|2023-09-1910:38:58.165|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.165||+I |6|800.98|2023-09-1910:38:58.166|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.166||+I |7|100.90|2023-09-1910:38:58.166|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.166||+I |8|11.00|2023-09-1910:38:58.166|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.166||+I |9|18.00|2023-09-1910:38:58.166|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.166||+I |10|123.00|2023-09-1910:38:58.166|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.166||+I |11|35.78|2023-09-1910:38:58.166|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.166||+I |12|45.68|2023-09-1910:38:58.167|2023-09-1910:35:00.000|2023-09-1910:40:00.000|2023-09-1910:38:58.167|---4、滑动窗口的计算# orders表一边写入数据,一边进行窗口计算,结果如下:
Flink SQL>SELECT window_start, window_end,sum(price)>FROMTABLE(>     TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'5' MINUTES))>GROUPBY window_start, window_end;+----+-------------------------+-------------------------+------------------------------------------+| op |            window_start |              window_end |                                   EXPR$2|+----+-------------------------+-------------------------+------------------------------------------+|+I |2023-09-1910:35:00.000|2023-09-1910:40:00.000|1270.34|

Flink SQL>SELECT window_start, window_end,sum(price)>FROMTABLE(>     TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'5' MINUTES))>GROUPBY window_start, window_end;+----+-------------------------+-------------------------+------------------------------------------+| op |            window_start |              window_end |                                   EXPR$2|+----+-------------------------+-------------------------+------------------------------------------+|+I |2023-09-1910:40:00.000|2023-09-1910:45:00.000|1428.02|

2)、示例2-使用滚动窗口查询、统计(表含主键)

验证过程如下,表如果设置了主键,好像不能对数据进行计算。

-----1、建表
Flink SQL>CREATETABLE orders2 (>`id`    STRING,>    price       DECIMAL(32,2),>    proctime as PROCTIME(),>PRIMARYKEY(id)NOT ENFORCED
>)WITH(>'connector'='kafka',>'topic'='orders2_topic',>'properties.bootstrap.servers'='192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',>'properties.group.id'='testGroup',>'scan.startup.mode'='earliest-offset',>'value.format'='debezium-json'>);

Flink SQL>desc orders2;+----------+-----------------------------+-------+---------+---------------+-----------+|     name |type|null|key|        extras | watermark |+----------+-----------------------------+-------+---------+---------------+-----------+|       id |                      STRING |FALSE| PRI(id)||||    price |DECIMAL(32,2)|TRUE||||| proctime | TIMESTAMP_LTZ(3)*PROCTIME*|FALSE||AS PROCTIME()||+----------+-----------------------------+-------+---------+---------------+-----------+-----2、插入数据,并查询
Flink SQL>select*from orders2;+----+--------------------------------+------------------------------------+-------------------------+| op |                             id |                              price |                proctime |+----+--------------------------------+------------------------------------+-------------------------+|+I |1|10.00|2023-09-1913:23:00.764||+I |2|12.00|2023-09-1913:23:34.945||+I |3|4.00|2023-09-1913:23:43.993||+I |4|20.00|2023-09-1913:23:51.384|-----3、滑动窗口的2种查询方式
Flink SQL>SELECT*FROMTABLE( TUMBLE(TABLE orders2, DESCRIPTOR(proctime),INTERVAL'5' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1913:25:14.066|2023-09-1913:25:00.000|2023-09-1913:30:00.000|2023-09-1913:25:14.066||+I |2|12.00|2023-09-1913:25:14.066|2023-09-1913:25:00.000|2023-09-1913:30:00.000|2023-09-1913:25:14.066||+I |3|4.00|2023-09-1913:25:14.066|2023-09-1913:25:00.000|2023-09-1913:30:00.000|2023-09-1913:25:14.066||+I |4|20.00|2023-09-1913:25:14.067|2023-09-1913:25:00.000|2023-09-1913:30:00.000|2023-09-1913:25:14.067|

Flink SQL>SELECT*FROMTABLE(>    TUMBLE(>DATA=>TABLE orders2,>      TIMECOL => DESCRIPTOR(proctime),>      SIZE =>INTERVAL'5' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1913:25:39.548|2023-09-1913:25:00.000|2023-09-1913:30:00.000|2023-09-1913:25:39.548||+I |2|12.00|2023-09-1913:25:39.548|2023-09-1913:25:00.000|2023-09-1913:30:00.000|2023-09-1913:25:39.548||+I |3|4.00|2023-09-1913:25:39.548|2023-09-1913:25:00.000|2023-09-1913:30:00.000|2023-09-1913:25:39.548||+I |4|20.00|2023-09-1913:25:39.548|2023-09-1913:25:00.000|2023-09-1913:30:00.000|2023-09-1913:25:39.548|-----4、滑动窗口的计算
Flink SQL>SELECT window_start, window_end,sum(price)>FROMTABLE(>     TUMBLE(TABLE orders2, DESCRIPTOR(proctime),INTERVAL'10' MINUTES))>GROUPBY window_start, window_end;[ERROR] Could notexecuteSQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming updateanddelete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, orders2]],fields=[id, price])

3)、官方示例-使用滚动窗口查询、统计(未验证)

下面是官方的示例,本文未做验证,具体实现可参考上文的例子。

-- tables must have time attribute, e.g. `bidtime` in this table
Flink SQL>desc Bid;+-------------+------------------------+------+-----+--------+---------------------------------+|        name |type|null|key| extras |                       watermark |+-------------+------------------------+------+-----+--------+---------------------------------+|     bidtime |TIMESTAMP(3)*ROWTIME*|true|||`bidtime`-INTERVAL'1'SECOND||       price |DECIMAL(10,2)|true|||||        item |                 STRING |true||||+-------------+------------------------+------+-----+--------+---------------------------------+

Flink SQL>SELECT*FROM Bid;+------------------+-------+------+|          bidtime | price | item |+------------------+-------+------+|2020-04-1508:05|4.00| C    ||2020-04-1508:07|2.00| A    ||2020-04-1508:09|5.00| D    ||2020-04-1508:11|3.00| B    ||2020-04-1508:13|1.00| E    ||2020-04-1508:17|6.00| F    |+------------------+-------+------+

Flink SQL>SELECT*FROMTABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES));-- or with the named params-- note: the DATA param must be the first
Flink SQL>SELECT*FROMTABLE(
   TUMBLE(DATA=>TABLE Bid,
     TIMECOL => DESCRIPTOR(bidtime),
     SIZE =>INTERVAL'10' MINUTES));+------------------+-------+------+------------------+------------------+-------------------------+|          bidtime | price | item |     window_start |       window_end |            window_time  |+------------------+-------+------+------------------+------------------+-------------------------+|2020-04-1508:05|4.00| C    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:07|2.00| A    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:09|5.00| D    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999||2020-04-1508:13|1.00| E    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999||2020-04-1508:17|6.00| F    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999|+------------------+-------+------+------------------+------------------+-------------------------+-- apply aggregation on the tumbling windowed table
Flink SQL>SELECT window_start, window_end,SUM(price)FROMTABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))GROUPBY window_start, window_end;+------------------+------------------+-------+|     window_start |       window_end | price |+------------------+------------------+-------+|2020-04-1508:00|2020-04-1508:10|11.00||2020-04-1508:10|2020-04-1508:20|10.00|+------------------+------------------+-------+

2、HOP滑动窗口

HOP 函数将元素分配给固定长度的窗口。与 TUMBLE 窗口函数一样,窗口的大小由窗口大小参数配置。附加的窗口滑动参数(window slide)控制跳跃窗口的启动频率。因此,如果slide小于窗口大小,则滑动窗口(hopping windows)可能会重叠。在这种情况下,元素被分配给多个窗口。hopping windows也称为“滑动窗口(sliding windows)”。

例如,有大小为 10 分钟的窗口,该窗口滑动 5 分钟。这样,将每 5 分钟获得一个窗口,其中包含过去 10 分钟内到达的事件,如下图所示。
在这里插入图片描述
HOP 函数分配在大小间隔内覆盖行的窗口,并根据时间属性字段移动每slide 。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。HOP 的返回值是一个新关系,它包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是 windowing TVF 后的常规时间戳列。

HOP 采用四个必需参数,一个可选参数:

HOP(TABLEdata, DESCRIPTOR(timecol), slide, size [,offset])# data:是一个表参数,可以是与时间属性列的任何关系。# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到hopping windows。# slide:是指定顺序hopping windows开始之间的持续时间# size :是指定hopping windows宽度的持续时间。# offset :是一个可选参数,用于指定窗口开始移动的偏移量。

1)、示例1-使用滑动窗口查询、统计

----表结构CREATETABLE orders (`id`    STRING,
    price       DECIMAL(32,2),
    proctime as PROCTIME())WITH('connector'='kafka','topic'='orders_topic','properties.bootstrap.servers'='192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv');
Flink SQL>desc orders;+----------+-----------------------------+-------+-----+---------------+-----------+|     name |type|null|key|        extras | watermark |+----------+-----------------------------+-------+-----+---------------+-----------+|       id |                      STRING |TRUE|||||    price |DECIMAL(32,2)|TRUE||||| proctime | TIMESTAMP_LTZ(3)*PROCTIME*|FALSE||AS PROCTIME()||+----------+-----------------------------+-------+-----+---------------+-----------+---滑动窗口的两种使用方式1SELECT*FROMTABLE( HOP(TABLE orders, DESCRIPTOR(proctime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES));---滑动窗口的两种使用方式2,data需要是西一个参数SELECT*FROMTABLE(
    HOP(DATA=>TABLE orders,
      TIMECOL => DESCRIPTOR(proctime),
      SLIDE =>INTERVAL'5' MINUTES,
      SIZE =>INTERVAL'10' MINUTES));---滑动窗口的计算SELECT window_start, window_end,SUM(price)FROMTABLE(
    HOP(TABLE orders, DESCRIPTOR(proctime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES))GROUPBY window_start, window_end;--------以下是具体的验证数据--------
Flink SQL>select*from orders;+----+--------------------------------+------------------------------------+-------------------------+| op |                             id |                              price |                proctime |+----+--------------------------------+------------------------------------+-------------------------+|+I |1|10.00|2023-09-1914:37:33.300||+I |2|15.00|2023-09-1914:37:33.300||+I |3|20.00|2023-09-1914:37:33.300||+I |4|30.00|2023-09-1914:37:33.300||+I |5|60.00|2023-09-1914:37:33.300||+I |6|800.98|2023-09-1914:37:33.300||+I |7|100.90|2023-09-1914:37:33.300||+I |8|11.00|2023-09-1914:37:33.300||+I |9|18.00|2023-09-1914:37:33.300||+I |10|123.00|2023-09-1914:37:33.300||+I |11|35.78|2023-09-1914:37:33.300||+I |12|45.68|2023-09-1914:37:33.301||+I |13|22.00|2023-09-1914:37:33.301||+I |14|56.78|2023-09-1914:37:33.301||+I |15|78.90|2023-09-1914:37:33.301|

Flink SQL>SELECT*FROMTABLE( HOP(TABLE orders, DESCRIPTOR(proctime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1914:38:39.892|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.892||+I |1|10.00|2023-09-1914:38:39.892|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.892||+I |2|15.00|2023-09-1914:38:39.892|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.892||+I |2|15.00|2023-09-1914:38:39.892|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.892||+I |3|20.00|2023-09-1914:38:39.892|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.892||+I |3|20.00|2023-09-1914:38:39.892|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.892||+I |4|30.00|2023-09-1914:38:39.892|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.892||+I |4|30.00|2023-09-1914:38:39.892|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.892||+I |5|60.00|2023-09-1914:38:39.893|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.893||+I |5|60.00|2023-09-1914:38:39.893|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.893||+I |6|800.98|2023-09-1914:38:39.893|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.893||+I |6|800.98|2023-09-1914:38:39.893|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.893||+I |7|100.90|2023-09-1914:38:39.893|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.893||+I |7|100.90|2023-09-1914:38:39.893|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.893||+I |8|11.00|2023-09-1914:38:39.893|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.893||+I |8|11.00|2023-09-1914:38:39.893|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.893||+I |9|18.00|2023-09-1914:38:39.893|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.893||+I |9|18.00|2023-09-1914:38:39.893|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.893||+I |10|123.00|2023-09-1914:38:39.893|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.893||+I |10|123.00|2023-09-1914:38:39.894|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.894||+I |11|35.78|2023-09-1914:38:39.894|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.894||+I |11|35.78|2023-09-1914:38:39.894|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.894||+I |12|45.68|2023-09-1914:38:39.894|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.894||+I |12|45.68|2023-09-1914:38:39.894|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.894||+I |13|22.00|2023-09-1914:38:39.894|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.894||+I |13|22.00|2023-09-1914:38:39.894|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.894||+I |14|56.78|2023-09-1914:38:39.894|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.894||+I |14|56.78|2023-09-1914:38:39.894|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.894||+I |15|78.90|2023-09-1914:38:39.894|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:38:39.894||+I |15|78.90|2023-09-1914:38:39.894|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:38:39.894|

Flink SQL>SELECT*FROMTABLE(>     HOP(>DATA=>TABLE orders,>       TIMECOL => DESCRIPTOR(proctime),>       SLIDE =>INTERVAL'5' MINUTES,>       SIZE =>INTERVAL'10' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1914:39:35.143|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.143||+I |1|10.00|2023-09-1914:39:35.143|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.143||+I |2|15.00|2023-09-1914:39:35.143|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.143||+I |2|15.00|2023-09-1914:39:35.143|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.143||+I |3|20.00|2023-09-1914:39:35.143|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.143||+I |3|20.00|2023-09-1914:39:35.143|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.143||+I |4|30.00|2023-09-1914:39:35.143|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.143||+I |4|30.00|2023-09-1914:39:35.143|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.143||+I |5|60.00|2023-09-1914:39:35.144|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.144||+I |5|60.00|2023-09-1914:39:35.144|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.144||+I |6|800.98|2023-09-1914:39:35.144|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.144||+I |6|800.98|2023-09-1914:39:35.144|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.144||+I |7|100.90|2023-09-1914:39:35.144|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.144||+I |7|100.90|2023-09-1914:39:35.144|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.144||+I |8|11.00|2023-09-1914:39:35.144|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.144||+I |8|11.00|2023-09-1914:39:35.144|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.144||+I |9|18.00|2023-09-1914:39:35.144|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.144||+I |9|18.00|2023-09-1914:39:35.144|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.144||+I |10|123.00|2023-09-1914:39:35.144|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.144||+I |10|123.00|2023-09-1914:39:35.144|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.144||+I |11|35.78|2023-09-1914:39:35.145|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.145||+I |11|35.78|2023-09-1914:39:35.145|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.145||+I |12|45.68|2023-09-1914:39:35.145|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.145||+I |12|45.68|2023-09-1914:39:35.145|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.145||+I |13|22.00|2023-09-1914:39:35.145|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.145||+I |13|22.00|2023-09-1914:39:35.145|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.145||+I |14|56.78|2023-09-1914:39:35.145|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.145||+I |14|56.78|2023-09-1914:39:35.145|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.145||+I |15|78.90|2023-09-1914:39:35.145|2023-09-1914:35:00.000|2023-09-1914:45:00.000|2023-09-1914:39:35.145||+I |15|78.90|2023-09-1914:39:35.145|2023-09-1914:30:00.000|2023-09-1914:40:00.000|2023-09-1914:39:35.145|

Flink SQL>SELECT window_start, window_end,SUM(price)>FROMTABLE(>     HOP(TABLE orders, DESCRIPTOR(proctime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES))>GROUPBY window_start, window_end;+----+-------------------------+-------------------------+------------------------------------------+| op |            window_start |              window_end |                                   EXPR$2|+----+-------------------------+-------------------------+------------------------------------------+|+I |2023-09-1914:35:00.000|2023-09-1914:45:00.000|1428.02|

2)、官方示例-使用滑动窗口查询、统计(未验证)

>SELECT*FROMTABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES));-- or with the named params-- note: the DATA param must be the first>SELECT*FROMTABLE(
    HOP(DATA=>TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      SLIDE =>INTERVAL'5' MINUTES,
      SIZE =>INTERVAL'10' MINUTES));+------------------+-------+------+------------------+------------------+-------------------------+|          bidtime | price | item |     window_start |       window_end |           window_time   |+------------------+-------+------+------------------+------------------+-------------------------+|2020-04-1508:05|4.00| C    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:05|4.00| C    |2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999||2020-04-1508:07|2.00| A    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:07|2.00| A    |2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999||2020-04-1508:09|5.00| D    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:09|5.00| D    |2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999||2020-04-1508:13|1.00| E    |2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999||2020-04-1508:13|1.00| E    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999||2020-04-1508:17|6.00| F    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999||2020-04-1508:17|6.00| F    |2020-04-1508:15|2020-04-1508:25|2020-04-1508:24:59.999|+------------------+-------+------+------------------+------------------+-------------------------+-- apply aggregation on the hopping windowed table>SELECT window_start, window_end,SUM(price)FROMTABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES))GROUPBY window_start, window_end;+------------------+------------------+-------+|     window_start |       window_end | price |+------------------+------------------+-------+|2020-04-1508:00|2020-04-1508:10|11.00||2020-04-1508:05|2020-04-1508:15|15.00||2020-04-1508:10|2020-04-1508:20|10.00||2020-04-1508:15|2020-04-1508:25|6.00|+------------------+------------------+-------+

3、CUMULATE累积窗口

Cumulating windows在某些情况下非常有用,例如在固定的窗口间隔内提前触发滚动窗口。例如,每日仪表板从 00:00 到每分钟绘制累积 UV,10:00 的 UV 表示从 00:00 到 10:00 的 UV 总数。这可以通过累积窗口轻松有效地实现。

CUMULATE 函数将元素分配给在步长初始间隔内覆盖行的窗口,并在每一步扩展到另一个步长(保持窗口开始固定),直到最大窗口大小。您可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,并将每个滚动窗口拆分为多个窗口,窗口开始和窗口结束步长差异相同。因此,累积窗口确实会重叠并且没有固定大小。

例如,您可以有一个 1 小时步长和 1 天最大大小的累积窗口,您将获得每天的窗口:[00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) 。
在这里插入图片描述

累积(CUMULATE)函数根据时间属性列分配窗口。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。CUMULATE 的返回值是一个新关系,包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是窗口 TVF 之后的常规时间戳列。

CUMULATE 采用四个必需参数,一个可选参数:

CUMULATE(TABLEdata, DESCRIPTOR(timecol), step, size)# data:是一个表参数,可以是与时间属性列的任何关系。# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。# step :是指定顺序累积窗口结束之间增加的窗口大小的持续时间。# size :是指定累积窗口的最大宽度的持续时间。大小必须是步长的整数倍。# offset :是一个可选参数,用于指定窗口开始移动的偏移量。

1)、示例1-使用累积窗口查询、统计

-----表结构CREATETABLE orders (`id`    STRING,
    price       DECIMAL(32,2),
    proctime as PROCTIME())WITH('connector'='kafka','topic'='orders_topic','properties.bootstrap.servers'='192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv');
Flink SQL>desc orders;+----------+-----------------------------+-------+-----+---------------+-----------+|     name |type|null|key|        extras | watermark |+----------+-----------------------------+-------+-----+---------------+-----------+|       id |                      STRING |TRUE|||||    price |DECIMAL(32,2)|TRUE||||| proctime | TIMESTAMP_LTZ(3)*PROCTIME*|FALSE||AS PROCTIME()||+----------+-----------------------------+-------+-----+---------------+-----------+-----累积窗口的查询方式1SELECT*FROMTABLE( 
      CUMULATE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES));-----累积窗口的查询方式2SELECT*FROMTABLE(
    CUMULATE(DATA=>TABLE orders,
      TIMECOL => DESCRIPTOR(proctime),
      STEP =>INTERVAL'2' MINUTES,
      SIZE =>INTERVAL'10' MINUTES));-----累积窗口的计算SELECT window_start, window_end,SUM(price)FROMTABLE(
    CUMULATE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES))GROUPBY window_start, window_end;-----以下是验证过程----------------
Flink SQL>select*from orders;+----+--------------------------------+------------------------------------+-------------------------+| op |                             id |                              price |                proctime |+----+--------------------------------+------------------------------------+-------------------------+|+I |1|10.00|2023-09-1914:37:33.300||+I |2|15.00|2023-09-1914:37:33.300||+I |3|20.00|2023-09-1914:37:33.300||+I |4|30.00|2023-09-1914:37:33.300||+I |5|60.00|2023-09-1914:37:33.300||+I |6|800.98|2023-09-1914:37:33.300||+I |7|100.90|2023-09-1914:37:33.300||+I |8|11.00|2023-09-1914:37:33.300||+I |9|18.00|2023-09-1914:37:33.300||+I |10|123.00|2023-09-1914:37:33.300||+I |11|35.78|2023-09-1914:37:33.300||+I |12|45.68|2023-09-1914:37:33.301||+I |13|22.00|2023-09-1914:37:33.301||+I |14|56.78|2023-09-1914:37:33.301||+I |15|78.90|2023-09-1914:37:33.301|

Flink SQL>desc orders;+----------+-----------------------------+-------+-----+---------------+-----------+|     name |type|null|key|        extras | watermark |+----------+-----------------------------+-------+-----+---------------+-----------+|       id |                      STRING |TRUE|||||    price |DECIMAL(32,2)|TRUE||||| proctime | TIMESTAMP_LTZ(3)*PROCTIME*|FALSE||AS PROCTIME()||+----------+-----------------------------+-------+-----+---------------+-----------+

Flink SQL>SELECT*FROMTABLE( CUMULATE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.460||+I |1|10.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.460||+I |2|15.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.460||+I |2|15.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.460||+I |3|20.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.460||+I |3|20.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.460||+I |4|30.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.460||+I |4|30.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.460||+I |5|60.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.460||+I |5|60.00|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.460||+I |6|800.98|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.460||+I |6|800.98|2023-09-1914:56:48.460|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.460||+I |7|100.90|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |7|100.90|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461||+I |8|11.00|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |8|11.00|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461||+I |9|18.00|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |9|18.00|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461||+I |10|123.00|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |10|123.00|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461||+I |11|35.78|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |11|35.78|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461||+I |12|45.68|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |12|45.68|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461||+I |13|22.00|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |13|22.00|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461||+I |14|56.78|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |14|56.78|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461||+I |15|78.90|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:56:48.461||+I |15|78.90|2023-09-1914:56:48.461|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:56:48.461|

Flink SQL>SELECT*FROMTABLE(>     CUMULATE(>DATA=>TABLE orders,>       TIMECOL => DESCRIPTOR(proctime),>       STEP =>INTERVAL'2' MINUTES,>       SIZE =>INTERVAL'10' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1914:57:13.264|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.265||+I |1|10.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.265||+I |2|15.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.265||+I |2|15.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.265||+I |3|20.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.265||+I |3|20.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.265||+I |4|30.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.265||+I |4|30.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.265||+I |5|60.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.265||+I |5|60.00|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.265||+I |6|800.98|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.265||+I |6|800.98|2023-09-1914:57:13.265|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.265||+I |7|100.90|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.266||+I |7|100.90|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.266||+I |8|11.00|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.266||+I |8|11.00|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.266||+I |9|18.00|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.266||+I |9|18.00|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.266||+I |10|123.00|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.266||+I |10|123.00|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.266||+I |11|35.78|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.266||+I |11|35.78|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.266||+I |12|45.68|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.266||+I |12|45.68|2023-09-1914:57:13.266|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.266||+I |13|22.00|2023-09-1914:57:13.267|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.267||+I |13|22.00|2023-09-1914:57:13.267|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.267||+I |14|56.78|2023-09-1914:57:13.267|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.267||+I |14|56.78|2023-09-1914:57:13.267|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.267||+I |15|78.90|2023-09-1914:57:13.267|2023-09-1914:50:00.000|2023-09-1914:58:00.000|2023-09-1914:57:13.267||+I |15|78.90|2023-09-1914:57:13.267|2023-09-1914:50:00.000|2023-09-1915:00:00.000|2023-09-1914:57:13.267|

Flink SQL>SELECT window_start, window_end,SUM(price)>FROMTABLE(>     CUMULATE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES))>GROUPBY window_start, window_end;+----+-------------------------+-------------------------+------------------------------------------+| op |            window_start |              window_end |                                   EXPR$2|+----+-------------------------+-------------------------+------------------------------------------+|+I |2023-09-1914:50:00.000|2023-09-1915:00:00.000|1428.02|

2)、官方示例-使用累积窗口查询、统计(未验证)

SELECT*FROMTABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES));-- or with the named params-- note: the DATA param must be the firstSELECT*FROMTABLE(
    CUMULATE(DATA=>TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      STEP =>INTERVAL'2' MINUTES,
      SIZE =>INTERVAL'10' MINUTES));+------------------+-------+------+------------------+------------------+-------------------------+|          bidtime | price | item |     window_start |       window_end |            window_time  |+------------------+-------+------+------------------+------------------+-------------------------+|2020-04-1508:05|4.00| C    |2020-04-1508:00|2020-04-1508:06|2020-04-1508:05:59.999||2020-04-1508:05|4.00| C    |2020-04-1508:00|2020-04-1508:08|2020-04-1508:07:59.999||2020-04-1508:05|4.00| C    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:07|2.00| A    |2020-04-1508:00|2020-04-1508:08|2020-04-1508:07:59.999||2020-04-1508:07|2.00| A    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:09|5.00| D    |2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:10|2020-04-1508:12|2020-04-1508:11:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:10|2020-04-1508:14|2020-04-1508:13:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:10|2020-04-1508:16|2020-04-1508:15:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:10|2020-04-1508:18|2020-04-1508:17:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999||2020-04-1508:13|1.00| E    |2020-04-1508:10|2020-04-1508:14|2020-04-1508:13:59.999||2020-04-1508:13|1.00| E    |2020-04-1508:10|2020-04-1508:16|2020-04-1508:15:59.999||2020-04-1508:13|1.00| E    |2020-04-1508:10|2020-04-1508:18|2020-04-1508:17:59.999||2020-04-1508:13|1.00| E    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999||2020-04-1508:17|6.00| F    |2020-04-1508:10|2020-04-1508:18|2020-04-1508:17:59.999||2020-04-1508:17|6.00| F    |2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999|+------------------+-------+------+------------------+------------------+-------------------------+-- apply aggregation on the cumulating windowed tableSELECT window_start, window_end,SUM(price)FROMTABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES))GROUPBY window_start, window_end;+------------------+------------------+-------+|     window_start |       window_end | price |+------------------+------------------+-------+|2020-04-1508:00|2020-04-1508:06|4.00||2020-04-1508:00|2020-04-1508:08|6.00||2020-04-1508:00|2020-04-1508:10|11.00||2020-04-1508:10|2020-04-1508:12|3.00||2020-04-1508:10|2020-04-1508:14|4.00||2020-04-1508:10|2020-04-1508:16|4.00||2020-04-1508:10|2020-04-1508:18|10.00||2020-04-1508:10|2020-04-1508:20|10.00|+------------------+------------------+-------+

4、Window Offset

以上三个窗口函数是windows的函数,下面介绍window offset。

Offset偏移量是一个可选参数,可用于更改窗口分配。它可以是正持续时间和负持续时间。窗口偏移的默认值为 0。如果设置不同的偏移值,则同一记录可能会分配给不同的窗口。
以下示例“对于大小为 10 分钟的翻转窗口,时间戳为 2023-09-30 00:00:04 的记录将分配给哪个窗口?”进行说明,具体如下:

  • 如果偏移值为 -16 MINUTE,则记录将分配给窗口 [2023-09-29 23:54:00, 2023-09-30 00:04:00)。
  • 如果偏移值为 -6 MINUTE,则记录分配给窗口 [2023-09-29 23:54:00,2023-09-30 00:04:00)。
  • 如果偏移量为 -4 MINUTE,则记录分配给窗口 [2023-09-29 23:56:00, 2023-09-30 00:06:00)。
  • 如果偏移量为 0,则记录分配给窗口 [2023-09-29 00:00:00, 2023-09-30 00:10:00)。
  • 如果偏移量为 4 MINUTE,则记录分配给窗口 [2023-09-29 23:54:00, 2023-09-30 00:04:00)。
  • 如果偏移量为 6 MINUTE,则记录分配给窗口 [2023-09-29 23:56:00, 2023-09-30 00:06:00)。
  • 如果偏移量为 16 MINUTE,则记录分配给窗口 [2023-09-29 23:56:00, 2023-09-30 00:06:00)。

我们可以发现,一些窗口偏移参数可能对窗口的分配具有相同的影响。在上述情况下,-16 分钟、-6 分钟和 4 分钟对于大小为 10 分钟的滚动窗口具有相同的效果。

窗口偏移的影响仅用于更新窗口分配,对水印没有影响。
目前 Flink(截至1.17版本) 不支持评估单个窗口表值函数,窗口表值函数应与聚合操作一起使用

1)、示例1-使用offset累积窗口查询、统计

此示例仅用于解释表值函数生成的语法和数据

# orders表结构和数据参考上文中的例子# 带上offset的滚动窗口查询方式1SELECT*FROMTABLE(
   TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'10' MINUTES,INTERVAL'1' MINUTES));# 带上offset的滚动窗口查询方式2,实际验证的结果是下面写法报不能识别OFFSET关键字# 错误信息:org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'OFFSET' at line 6, column 6.SELECT*FROMTABLE(
   TUMBLE(DATA=>TABLE orders,
     TIMECOL => DESCRIPTOR(proctime),
     SIZE =>INTERVAL'10' MINUTES,OFFSET=>INTERVAL'1' MINUTES));# 滑动窗口带offset的计算SELECT window_start, window_end,SUM(price)FROMTABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'10' MINUTES,INTERVAL'1' MINUTES))GROUPBY window_start, window_end;------------------------以下为验证过程----------------------------
Flink SQL>SELECT*FROMTABLE(>    TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'10' MINUTES,INTERVAL'1' MINUTES));+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+|+I |1|10.00|2023-09-1915:36:32.623|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.623||+I |2|15.00|2023-09-1915:36:32.623|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.623||+I |3|20.00|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |4|30.00|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |5|60.00|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |6|800.98|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |7|100.90|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |8|11.00|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |9|18.00|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |10|123.00|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |11|35.78|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |12|45.68|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |13|22.00|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |14|56.78|2023-09-1915:36:32.624|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.624||+I |15|78.90|2023-09-1915:36:32.625|2023-09-1915:31:00.000|2023-09-1915:41:00.000|2023-09-1915:36:32.625|

Flink SQL>SELECT*FROMTABLE(>    TUMBLE(>DATA=>TABLE orders,>      TIMECOL => DESCRIPTOR(proctime),>      SIZE =>INTERVAL'10' MINUTES,>OFFSET=>INTERVAL'1' MINUTES));[ERROR] Could notexecuteSQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'OFFSET' at line 6,column6.

Flink SQL>SELECT window_start, window_end,SUM(price)>FROMTABLE(>     TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL'10' MINUTES,INTERVAL'1' MINUTES))>GROUPBY window_start, window_end;+----+-------------------------+-------------------------+------------------------------------------+| op |            window_start |              window_end |                                   EXPR$2|+----+-------------------------+-------------------------+------------------------------------------+|+I |2023-09-1915:41:00.000|2023-09-1915:51:00.000|1428.02|

2)、官方示例-使用offset累积窗口查询、统计(未验证)

-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,--  window table-valued function should be used with aggregate operation,--  this example is just used for explaining the syntax and the data produced by table-valued function.
Flink SQL>SELECT*FROMTABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES,INTERVAL'1' MINUTES));-- or with the named params-- note: the DATA param must be the first
Flink SQL>SELECT*FROMTABLE(
   TUMBLE(DATA=>TABLE Bid,
     TIMECOL => DESCRIPTOR(bidtime),
     SIZE =>INTERVAL'10' MINUTES,OFFSET=>INTERVAL'1' MINUTES));+------------------+-------+------+------------------+------------------+-------------------------+|          bidtime | price | item |     window_start |       window_end |            window_time  |+------------------+-------+------+------------------+------------------+-------------------------+|2020-04-1508:05|4.00| C    |2020-04-1508:01|2020-04-1508:11|2020-04-1508:10:59.999||2020-04-1508:07|2.00| A    |2020-04-1508:01|2020-04-1508:11|2020-04-1508:10:59.999||2020-04-1508:09|5.00| D    |2020-04-1508:01|2020-04-1508:11|2020-04-1508:10:59.999||2020-04-1508:11|3.00| B    |2020-04-1508:11|2020-04-1508:21|2020-04-1508:20:59.999||2020-04-1508:13|1.00| E    |2020-04-1508:11|2020-04-1508:21|2020-04-1508:20:59.999||2020-04-1508:17|6.00| F    |2020-04-1508:11|2020-04-1508:21|2020-04-1508:20:59.999|+------------------+-------+------+------------------+------------------+-------------------------+-- apply aggregation on the tumbling windowed table
Flink SQL>SELECT window_start, window_end,SUM(price)FROMTABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES,INTERVAL'1' MINUTES))GROUPBY window_start, window_end;+------------------+------------------+-------+|     window_start |       window_end | price |+------------------+------------------+-------+|2020-04-1508:01|2020-04-1508:11|11.00||2020-04-1508:11|2020-04-1508:21|10.00|+------------------+------------------+-------+

本文简单的介绍了Flink 的窗口函数及具体的示例。

标签: flink sql 大数据

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/132801818
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。

“27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)”的评论:

还没有评论