0


flink中的时间属性

1:时间的几种类型(官网概念)

1.1处理时间

指的是执行具体操作时的机器时间(大家熟知的绝对时间, 例如 Java的

System.currentTimeMillis()

) )

在ddl语句中声明一个处理时间:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性,这个事件是系统计算的时间,不需要从我们的源头数据进行提供只需要声明
) WITH (
  ...
);

1.2事件事件

指的是执行具体操作时的机器时间(大家熟知的绝对时间, 例如 Java的

System.currentTimeMillis()

) )

在ddl语句中声明一个事件事件并设置水位线为5秒

CREATE TABLE user_actions (
 user_name STRING,
 data STRING,
 ts BIGINT,
 time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
 -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
 WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
 ...
);

Flink 支持在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义事件时间

1:timestamp 字符串类型的时间例如2021-07-17 00:00:00.0

    1.1函数TO_TIMESTAMP():将 ‘UTC+0’ 时区下格式为 string2(默认为:‘yyyy-MM-dd HH:mm:ss’)的字符串 string1 转换为时间戳,例如TO_TIMESTAMP(FROM_UNIXTIME(app_time /1000,'yyyy-MM-dd HH:mm:ss')),将毫秒转化为timestamp类型。

2:TIMESTAMP_LTZ 返回16位的unix时间

    2.1函数TO_TIMESTAMP_LTZ() 将纪元秒或纪元毫秒转换为 TIMESTAMP_LTZ,有效精度为 0 或 3,0 代表 
TO_TIMESTAMP_LTZ(epochSeconds, 0)

, 3 代表

 TO_TIMESTAMP_LTZ(epochMilliseconds, 3)

3:其他:

2:时间窗口

flink提供了三种窗口函数分别是滚动窗口

  • Tumble Windows 滚动窗口
  • Hop Windows 滑动窗口
  • Cumulate Windows 聚合窗口

2.1:滚动窗口

    滚动串口可以理解为批处理,也可以理解为一个特殊的滑动窗口因为他的窗口大小和滑动步长保持一直,隔一段事件就会进行窗口的滚动,计算出一个时间跨度内数据的变化,如果步长设置的过大我们的滑动窗口就会变成批处理,在实时处理场景下应用有一定的局限性具体还要看公司的业务场景:

滚动窗口有三个必填参数一个可选参数

  • data:是一个表参数,可以是与时间属性列的任何关系。
  • timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到翻滚窗口。
  • size:是指定翻滚窗口宽度的持续时间。
  • offset:是一个可选参数,用于指定窗口启动将移动的偏移量
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

如果我们想使用一个滚动窗口首先要通过ddl语句创建一个带有时间属性的表

依据处理时间创建ddl语句

CREATE TABLE bid(
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
  ...
);

旧版本的写法:
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

成功创建这张表之后可以对现有数据进行sql查询依据滚动窗口

Flink SQL> SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE Bid,
     TIMECOL => DESCRIPTOR(bidtime),
     SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

依据窗口大小来对表数据进行分组查询

Flink SQL> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+

使用窗口查询的时候flink会为表增加三个新字段分别是:

window_start:窗口开始时间

window_end:窗口结束时间

window_time:窗口计算时间

可以在我们完成相关业务逻辑的时候为我们的分析提供一定的支持

2.2:滑动窗口

滑动窗口区别于滚动窗口,或者说包含滚动窗口,滑动窗口类似于一个滑块他会以一定的窗口大小按照一个长度的时间步长在时间线上滑动,先比较于滚动窗口而言,滚动窗口是不存在重复数据的但是滑动窗口因为设置的步长小于窗口长度所以会导致产生重复数据。

和滚动窗口相同滑动窗口同样会产生三个字段

window_start:窗口开始时间

window_end:窗口结束时间

window_time:窗口计算时间

使用滑动窗口的函数名称为HOP,构成为四个必须参数和一个可选参数

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
--  window table-valued function should be used with aggregate operation,
--  this example is just used for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
    HOP(
      DATA => TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      SLIDE => INTERVAL '5' MINUTES,
      SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |           window_time   |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

如上图每间隔五分钟窗口会进行一次统计,之后开启另一个长度为10的窗口,起始位置滑动完五分钟的位置。

2.3:累计窗口

    累计窗口flink新版本推出的另一种窗口形式,相比于滑动和滚动串口,累计窗口在一些应用场景还是十分有用的,比如设想一种场景,业务需求在每天24小时内每分钟计算一次当日的pv数或者uv数,以累加的方式进行计算一分钟计算出一个具体的数值,窗口长度为24小时,为了应对这种情况累计窗口应运而生:

时间属性依然包含"window_start"、"window_end"、"window_time"的附加 3 列

函数名称

CUMULATE

采用四个必需参数,一个可选参数:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
  • data:是一个表参数,可以是与时间属性列的任何关系。
  • timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到翻滚窗口。
  • step:是指定在顺序累积窗口结束之间增加的窗口大小的持续时间。
  • size:是指定累积窗口的最大宽度的持续时间。 必须是 的整数倍。size``````step
  • offset:是一个可选参数,用于指定窗口启动将移动的偏移量。

未完待续。。。。


本文转载自: https://blog.csdn.net/SQL_mjkl963124412/article/details/122717548
版权归原作者 开飞机的杰西 所有, 如有侵权,请联系我们删除。

“flink中的时间属性”的评论:

还没有评论