0


【大数据】Flink SQL 语法篇(九):Window TopN、Deduplication

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(九):Window TopN、Deduplication 

1.Window TopN

Window TopN 定义(支持 Streaming):Window TopN 是一种特殊的 TopN,它的返回结果是每一个窗口内的 N 个最小值或者最大值。

应用场景:小伙伴萌会问了,我有了 TopN 为啥还需要 Window TopN 呢?还记得上一篇博客介绍 TopN 说道的 TopN 时会出现中间结果,从而出现回撤数据的嘛?Window TopN 不会出现回撤数据,因为 Window TopN 实现是在窗口结束时输出最终结果,不会产生中间结果。而且注意,因为是窗口上面的操作,Window TopN 在窗口结束时,会自动把 State 给清除。

SQL 语法标准:

SELECT[column_list]FROM(SELECT[column_list],
     ROW_NUMBER()OVER(PARTITIONBY window_start, window_end [, col_key1...]ORDERBY col1 [asc|desc][, col2 [asc|desc]...])AS rownum
   FROM table_name)-- windowing TVFWHERE rownum <= N [AND conditions]

实际案例:取当前这一分钟的搜索关键词下的搜索热度前 10 名的词条数据。

-- 输入表字段:-- 字段名         备注-- key              搜索关键词-- name             搜索热度名称-- search_cnt       热搜消费热度(比如 3000)-- timestamp        消费词条时间戳CREATETABLE source_table (
    name BIGINTNOTNULL,
    search_cnt BIGINTNOTNULL,keyBIGINTNOTNULL,
    row_time AS cast(CURRENT_TIMESTAMPastimestamp(3)),
    WATERMARK FOR row_time AS row_time
)WITH(...);-- 输出表字段:-- 字段名         备注-- key              搜索关键词-- name             搜索热度名称-- search_cnt       热搜消费热度(比如 3000)-- window_start     窗口开始时间戳-- window_end       窗口结束时间戳CREATETABLE sink_table (keyBIGINT,
    name BIGINT,
    search_cnt BIGINT,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3))WITH(...);-- 处理 sql:INSERTINTO sink_table
SELECTkey, name, search_cnt, window_start, window_end
FROM(SELECTkey, name, search_cnt, window_start, window_end, 
     ROW_NUMBER()OVER(PARTITIONBY window_start, window_end,keyORDERBY search_cnt desc)AS rownum
   FROM(SELECT window_start, window_end,key, name,max(search_cnt)as search_cnt
      -- window tvf 写法FROMTABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time),INTERVAL'1' MINUTES))GROUPBY window_start, window_end,key, name
   ))WHERE rownum <=100

输出结果:

+I[关键词1, 词条1,8670,2021-1-28T22:34,2021-1-28T22:35]+I[关键词1, 词条2,6928,2021-1-28T22:34,2021-1-28T22:35]+I[关键词1, 词条3,1735,2021-1-28T22:34,2021-1-28T22:35]+I[关键词1, 词条4,7287,2021-1-28T22:34,2021-1-28T22:35]...

SQL 语义:

  • 数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,将数据按照窗口聚合的 Key 通过 Hash 分发策略发送到下游窗口聚合算子。
  • 窗口聚合算子:进行窗口聚合计算,随着时间的推进,将窗口聚合结果计算完成发往下游窗口排序算子。
  • 窗口排序算子:这个算子其实也是一个窗口算子,只不过这个窗口算子为每个 Key 维护了一个 TopN 的榜单数据,接受到上游发送的窗口结果数据进行排序,随着时间的推进,窗口的结束,将排序的结果输出到下游数据汇算子。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

2.Deduplication

Deduplication 定义(支持 Batch / Streaming):Deduplication 其实就是去重,也即上文介绍到的 TopN 中

row_number = 1

的场景,但是这里有一点不一样在于其 排序字段 一定是 时间属性列,不能是其他非时间属性的普通列。在

row_number = 1

时,如果排序字段是普通列 Planner 会翻译成 TopN 算子,如果是时间属性列 Planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。

应用场景:比如上游数据发重了,或者计算 DAU 明细数据等场景,都可以使用 Deduplication 语法去做去重。

SQL 语法标准:

SELECT[column_list]FROM(SELECT[column_list],
     ROW_NUMBER()OVER([PARTITIONBY col1[, col2...]]ORDERBY time_attr [asc|desc])AS rownum
   FROM table_name)WHERE rownum =1
  • ROW_NUMBER():标识当前数据的排序值。
  • PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序。
  • ORDER BY time_attr [asc|desc]:标识排序规则,必须为时间戳列,当前 Flink SQL 支持处理时间、事件时间,ASC 代表保留第一行,DESC 代表保留最后一行。
  • WHERE rownum = 1:这个子句是一定需要的,而且必须为 rownum = 1

2.1 案例 1(事件时间)

某一游戏用户等级的场景,每一个用户都有一个用户等级,需要求出当前用户等级在 星星⭐,月亮🌙,太阳🌞 的用户数分别有多少。

-- 数据源:当每一个用户的等级初始化及后续变化的时候的数据,即用户等级变化明细数据。CREATETABLE source_table (
    user_id BIGINTCOMMENT'用户 id',level STRING COMMENT'用户等级',
    row_time AS cast(CURRENT_TIMESTAMPastimestamp(3))COMMENT'事件时间戳',
    WATERMARK FOR row_time AS row_time
)WITH('connector'='datagen','rows-per-second'='1','fields.level.length'='1','fields.user_id.min'='1','fields.user_id.max'='1000000');-- 数据汇:输出即每一个等级的用户数CREATETABLE sink_table (level STRING COMMENT'等级',
    uv BIGINTCOMMENT'当前等级用户数',
    row_time timestamp(3)COMMENT'时间戳')WITH('connector'='print');-- 处理逻辑:INSERTINTO sink_table
selectlevel,count(1)as uv,max(row_time)as row_time
from(SELECT
          user_id,level,
          row_time,
          row_number()over(partitionby user_id orderby row_time)as rn
      FROM source_table
)where rn =1groupbylevel

输出结果:

+I[等级 1,6928,2021-1-28T22:34]-I[等级 1,6928,2021-1-28T22:34]+I[等级 1,8670,2021-1-28T22:34]-I[等级 1,8670,2021-1-28T22:34]+I[等级 1,77287,2021-1-28T22:34]...

可以看到其有回撤数据。

其对应的 SQL 语义如下:

  • 数据源:消费到 Kafka 中数据后,将数据按照 partition by 的 Key 通过 Hash 分发策略发送到下游去重算子。
  • Deduplication 去重算子:接受到上游数据之后,根据 order by 中的条件判断当前的这条数据和之前数据时间戳大小,以上面案例来说,如果当前数据时间戳大于之前数据时间戳,则撤回之前向下游发的中间结果,然后将最新的结果发向下游(发送策略也为 Hash,具体的 Hash 策略为按照 group by 中 Key 进行发送),如果当前数据时间戳小于之前数据时间戳,则不做操作。此算子产出的结果就是每一个用户的对应的最新等级信息。
  • Group by 聚合算子:接受到上游数据之后,根据 Group by 聚合粒度对数据进行聚合计算结果(每一个等级的用户数),发往下游数据汇算子。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

2.2 案例 2(处理时间)

最原始的日志是明细数据,需要我们根据用户

id

筛选出这个用户当天的第一条数据,发往下游,下游可以据此计算分各种维度的 DAU。

-- 数据源:原始日志明细数据CREATETABLE source_table (
    user_id BIGINTCOMMENT'用户 id',
    name STRING COMMENT'用户姓名',
    server_timestamp BIGINTCOMMENT'用户访问时间戳',
    proctime AS PROCTIME())WITH('connector'='datagen','rows-per-second'='1','fields.name.length'='1','fields.user_id.min'='1','fields.user_id.max'='10','fields.server_timestamp.min'='1','fields.server_timestamp.max'='100000');-- 数据汇:根据 user_id 去重的第一条数据CREATETABLE sink_table (
    user_id BIGINT,
    name STRING,
    server_timestamp BIGINT)WITH('connector'='print');-- 处理逻辑:INSERTINTO sink_table
select user_id,
       name,
       server_timestamp
from(SELECT
          user_id,
          name,
          server_timestamp,
          row_number()over(partitionby user_id orderby proctime)as rn
      FROM source_table
)where rn =1

输出结果:

+I[1, 用户 1,2021-1-28T22:34]+I[2, 用户 2,2021-1-28T22:34]+I[3, 用户 3,2021-1-28T22:34]...

可以看到这个处理逻辑是没有回撤数据的。其对应的 SQL 语义如下:

  • 数据源:消费到 Kafka 中数据后,将数据按照 partition by 的 Key 通过 Hash 分发策略发送到下游去重算子。
  • Deduplication 去重算子:处理时间语义下,如果是当前 Key 的第一条数据,则直接发往下游,如果判断(根据 State 中是否存储过该 Key)不是第一条,则直接丢弃。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

⭐ 在 Deduplication 关于是否会出现回撤流,博主总结如下:

  • Order by 事件时间 DESC:会出现回撤流,因为当前 Key 下 可能会有 比当前事件时间还大的数据。
  • Order by 事件时间 ASC:会出现回撤流,因为当前 Key 下 可能会有 比当前事件时间还小的数据。
  • Order by 处理时间 DESC:会出现回撤流,因为当前 Key 下 可能会有 比当前处理时间还大的数据。
  • Order by 处理时间 ASC:不会出现回撤流,因为当前 Key 下 不可能会有 比当前处理时间还小的数据。
标签: 大数据 flink sql

本文转载自: https://blog.csdn.net/be_racle/article/details/136357721
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。

“【大数据】Flink SQL 语法篇(九):Window TopN、Deduplication”的评论:

还没有评论