0


【大数据】Flink SQL 语法篇(六):Temporal Join

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(六):Temporal Join 

Temporal Join 定义(支持 Batch / Streaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 Join 这个 拉链快照表 的 Join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做

Versioned Table

,使用一个明细表去 Join 这个

Versioned Table

的 Join 操作就叫做 Temporal Join。Temporal Join 中,

Versioned Table

其实就是对同一条

key

(在 DDL 中以 Primary Key 标记同一个

key

)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择

Versioned Table

对应时间区间内的快照数据进行 Join。

应用场景:比如常见的汇率数据(实时的根据汇率计算总金额),在

     12 
    
   
     : 
    
   
     00 
    
   
  
    12:00 
   
  
12:00 之前(事件时间),人民币和美元汇率是  
 
  
   
   
     7 
    
   
     : 
    
   
     1 
    
   
  
    7:1 
   
  
7:1,在  
 
  
   
   
     12 
    
   
     : 
    
   
     00 
    
   
  
    12:00 
   
  
12:00 之后变为  
 
  
   
   
     6 
    
   
     : 
    
   
     1 
    
   
  
    6:1 
   
  
6:1,那么在  
 
  
   
   
     12 
    
   
     : 
    
   
     00 
    
   
  
    12:00 
   
  
12:00 之前数据就要按照  
 
  
   
   
     7 
    
   
     : 
    
   
     1 
    
   
  
    7:1 
   
  
7:1 进行计算, 
 
  
   
   
     12 
    
   
     : 
    
   
     00 
    
   
  
    12:00 
   
  
12:00 之后就要按照  
 
  
   
   
     6 
    
   
     : 
    
   
     1 
    
   
  
    6:1 
   
  
6:1 计算。在事件时间语义的任务中,事件时间  
 
  
   
   
     12 
    
   
     : 
    
   
     00 
    
   
  
    12:00 
   
  
12:00 之前的数据,要按照  
 
  
   
   
     7 
    
   
     : 
    
   
     1 
    
   
  
    7:1 
   
  
7:1 进行计算, 
 
  
   
   
     12 
    
   
     : 
    
   
     00 
    
   
  
    12:00 
   
  
12:00 之后的数据,要按照  
 
  
   
   
     6 
    
   
     : 
    
   
     1 
    
   
  
    6:1 
   
  
6:1 进行计算。这其实就是离线中快照的概念,维护具体汇率的表在 Flink SQL 体系中就叫做 
Versioned Table

1.Versioned Table 的两种定义方式

Verisoned Table

:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。举一个汇率场景的案例来看一下一个 Versioned Table 的两种定义方式。

1.1 PRIMARY KEY 定义方式

-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到CREATETABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32,2),
    update_time TIMESTAMP(3) METADATA FROM`values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,-- PRIMARY KEY 定义方式PRIMARYKEY(currency)NOT ENFORCED
)WITH('connector'='kafka','value.format'='debezium-json',/* ... */);

1.2 Deduplicate 定义方式

-- 定义一个 append-only 的数据源表CREATETABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32,2),
    update_time TIMESTAMP(3) METADATA FROM`values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time
)WITH('connector'='kafka','value.format'='debezium-json',/* ... */);-- 将数据源表按照 Deduplicate 方式定义为 Versioned TableCREATEVIEW versioned_rates ASSELECT currency, conversion_rate, update_time   -- 1. 定义 `update_time` 为时间字段FROM(SELECT*,
      ROW_NUMBER()OVER(PARTITIONBY currency  -- 2. 定义 `currency` 为主键ORDERBY update_time DESC-- 3. ORDER BY 中必须是时间戳列)AS rownum 
      FROM currency_rates)WHERE rownum =1;

2.应用案例

Temporal Join 支持的时间语义:事件时间、处理时间。

2.1 案例一(事件时间)

-- 1. 定义一个输入订单表CREATETABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
)WITH(/* ... */);-- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到CREATETABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32,2),
    update_time TIMESTAMP(3) METADATA FROM`values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,PRIMARYKEY(currency)NOT ENFORCED
)WITH('connector'='kafka','value.format'='debezium-json',/* ... */);SELECT 
     order_id,
     price,
     currency,
     conversion_rate,
     order_time,FROM orders
-- 3. Temporal Join 逻辑-- SQL 语法为:FOR SYSTEM_TIME AS OFLEFTJOIN currency_rates FOR SYSTEM_TIME ASOF orders.order_time
ON orders.currency = currency_rates.currency;

结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:

order_id  price  货币       汇率             order_time
=============================================
o_001     11.11  EUR       1.1412:00:00
o_002     12.51  EUR       1.1012:06:00
  • 事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark。
  • 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中。

2.2 案例二(处理时间)

10:15>SELECT*FROM LatestRates;

currency   rate
==============
US Dollar   102
Euro        114
Yen           110:30>SELECT*FROM LatestRates;

currency   rate
==============
US Dollar   102
Euro        114
Yen           1-- 10:42 时,Euro 的汇率从 114 变为 11610:52>SELECT*FROM LatestRates;

currency   rate
==============
US Dollar   102
Euro        116<==== 从 114 变为 116
Yen           1-- 从 Orders 表查询数据SELECT*FROM Orders;

amount currency
===============2 Euro             <== 在处理时间 10:15 到达的一条数据
     1 US Dollar        <== 在处理时间 10:30 到达的一条数据
     2 Euro             <== 在处理时间 10:52 到达的一条数据

-- 执行关联查询SELECT
  o.amount, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME ASOF o.proctime AS r
  ON r.currency = o.currency

-- 结果如下:
amount currency     rate   amount*rate
==================================2 Euro          114228<== 在处理时间 10:15 到达的一条数据
     1 US Dollar     102102<== 在处理时间 10:30 到达的一条数据
     2 Euro          116232<== 在处理时间 10:52 到达的一条数据

可以发现处理时间就比较好理解了,因为处理时间语义中是根据左流数据到达的时间决定拿到的汇率值。Flink 就只为

LatestRates

维护了最新的状态数据,不需要关心历史版本的数据。

标签: 大数据 flink sql

本文转载自: https://blog.csdn.net/be_racle/article/details/136310348
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。

“【大数据】Flink SQL 语法篇(六):Temporal Join”的评论:

还没有评论