0


Flink sql join 快速入门

目录


Flink stream join

基于窗口join

Flink 中 Window 可以将无限流切分成有限流,是处理有限流的核心组件,现在Flink 中 Window 可以是时间驱动的(Time Window),也可以是数据驱动的(Count Window)

在这里插入图片描述

窗口连接将共享一个公key并位于同一窗口中的两个流的元素连接起来。这些窗口可以通过使用窗口赋值器来定义,并根据两个流中的元素进行计算。

然后,来自两边的元素被传递给用户定义的JoinFunction或FlatJoinFusion,用户可以在其中发出符合连接条件的结果。

stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)

Tumbling window join(滚动窗口join)

滚动窗口有固定的尺寸,窗口间的元素无重复。当执行滚动窗口连接时,具有公key和公共滚动窗口的所有元素将作为成对组合进行连接(inner join)

因为这就像一个内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素时,不会发出。

在这里插入图片描述

如图所示,我们定义了一个大小为2毫秒的翻转窗口,其结果为[0,1],[2,3]形式的窗口

DataStream<Integer> orangeStream =...DataStream<Integer> greenStream =...

orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (newJoinFunction<Integer,Integer,String>(){@OverridepublicStringjoin(Integer first,Integer second){return first +","+ second;}});

Sliding Window Join(滑动窗口join)

滑动窗口有固定尺寸,数据可能会重复(当滑动尺寸小于窗口尺寸,数据会重复)。当执行滑动窗口连接时,具有公共key和公共滑动窗口的所有元素将作为成对组合进行连接,并传递给JoinFunction或FlatJoinFusion。

在本例中,我们使用大小为两毫秒的滑动窗口,并将其滑动一毫秒,从而产生滑动窗口[1,0],[0,1],[1,2],[2,3]…。x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素

在这里插入图片描述

DataStream<Integer> orangeStream =...DataStream<Integer> greenStream =...

orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2)/* size */,Time.milliseconds(1)/* slide */)).apply (newJoinFunction<Integer,Integer,String>(){@OverridepublicStringjoin(Integer first,Integer second){return first +","+ second;}});

Session Window Join(会话窗口join)

会话窗口不重叠并且没有固定的开始和结束时间,会话在固定时间内没有接受到数据时,会关闭当前会话,并开启新的会话。当执行会话窗口连接时,具有相同key的所有元素(当“组合”时满足会话条件)将以成对组合的方式连接,并传递给JoinFunction或FlatJoinFusion

本例我们定义了一个会话窗口连接,其中每个会话被至少1ms的间隔分隔。

在这里插入图片描述

DataStream<Integer> orangeStream =...DataStream<Integer> greenStream =...

orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (newJoinFunction<Integer,Integer,String>(){@OverridepublicStringjoin(Integer first,Integer second){return first +","+ second;}});

Interval Join

间隔连接使用一个公共key连接两个流的元素(A和B),其中流B的元素具有与流A中元素的时间戳相对的时间间隔中的时间戳,也就是:

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]

。那么A和B有相同的key,就可以进行内部join

间隔联接当前仅支持事件时间。

在上面的示例中,我们连接了两个流“橙色”和“绿色”,下限为-2毫秒,上限为+1毫秒。

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

DataStream<Integer> orangeStream =...DataStream<Integer> greenStream =...

orangeStream
    .keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2),Time.milliseconds(1)).process (newProcessJoinFunction<Integer,Integer,String(){@OverridepublicvoidprocessElement(Integer left,Integer right,Context ctx,Collector<String> out){
            out.collect(first +","+ second);}});

Flink sql query join

流式join

Regular Joins(双流join)

双流join是最通用的联接类型(支持 Batch\Streaming),其中任何新记录或联接两侧的更改都是可见的,并影响整体的Join结果。

对于流式查询,双流join的语法是最灵活的,允许任何类型的更新(插入、更新、删除)输入表。然而,此操作具有重要的操作含义:它需要将连接输入的两侧永远保持在Flink状态。因此,根据所有输入表和中间联接结果的不同输入行的数量,计算查询结果所需的状态可能会无限增长。可以为查询配置提供适当的状态生存时间(TTL),以防止状态大小过大。同时,这可能会影响查询结果的正确性。

因为资源问题 Regular Join 通常是不可持续的,一般只用做有界数据流的 Join。

数据一直根据输入流一直更新,“逐步逼近”最终的精确值,下游可能看到不断变化的结果,为了执行结果更新,下游需要定义主键。同时,状态可能会无限增长

特性:

  • 支持INNER、LEFT、RIGHT、FULL OUT JOIN
  • 语义语法和传统sql join一致
  • 左右流都会触发更新
  • 状态持续增长、一般结合 state TTl配合使用

语法:

SELECT*FROM Orders
[INNER|RIGHT|LEFT|FULLOUTER]JOIN Product
ON Orders.productId = Product.id

在这里插入图片描述

  • 流表 join 流表> 如果其中一个流表触发更新操作,同样触发join生成最新的结果CREATETABLE users ( user_id STRING, name STRING, age INT, gmt_time TIMESTAMP(3))WITH('connector'='kafka','topic'='users','properties.bootstrap.servers'='localhost:9092','properties.group.id'='orders2ConsumerGroup','scan.startup.mode'='latest-offset','format'='json');CREATETABLE address ( user_id STRING, address STRING, update_time TIMESTAMP(3))WITH('connector'='kafka','topic'='address','properties.bootstrap.servers'='localhost:9092','properties.group.id'='orders2ConsumerGroup','scan.startup.mode'='latest-offset','format'='json');select u.user_id,u.name,u.age,a.address FROM users AS u LEFTJOIN address AS aON u.user_id = a.user_id;--users-- {"user_id":"u1","name":"li","age":20,"gmt_time":"2022-11-01 10:00:00"}-- {"user_id":"u2","name":"li","age":20,"gmt_time":"2022-11-01 10:00:10"}--address-- {"user_id":"u1","address":"shanghai","update_time":"2022-11-01 10:00:05"}-- {"user_id":"u2","address":"beijing","update_time":"2022-11-01 10:00:15"}-- {"user_id":"u2","address":"anhui","update_time":"2022-11-01 10:00:16"}-- user_id name age address-- u1 li 20 shanghai-- u2 li 20 beijing-- u2 li 20 anhui
  • 维表 join 维表CREATETABLE users (`user_id` STRING,`name` STRING,`age`INT,`gmt_time`TIMESTAMP(3))WITH('connector'='jdbc','url'='jdbc:mysql://localhost:3306/flink','table-name'='user','username'='root','password'='123456')CREATETABLE address (`user_id` STRING,`address` STRING,`gmt_time`TIMESTAMP(3))WITH('connector'='jdbc','url'='jdbc:mysql://localhost:3306/flink','table-name'='user_address','username'='root','password'='123456')select users.name, users.user_id, users.age, address.address from users,address where users.user_id = address.user_id

Interval Joins(区间join)

是双流join的优化,基于处理时间或事件时间,在一定时间区间内数据,相同的key进行join(支持 Batch\Streaming)。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。对于stream查询,时间区间oin只支持有时间属性的

append-only

表。由于时间属性是准单调递增的,Flink可以从其状态中删除旧值,而不会影响结果的正确性。

特征:由于给定了关联的区间,因此只需要保留很少的状态,内存压力较小。但是缺点是如果关联的数据晚到或者早到,导致落不到 JOIN 区间内,就可能导致结果不准确。只支持普通 Append 数据流,不支持含 Retract 的动态表。支持事件时间和处理时间

  • 支持INNER、LEFT、RIGHT、FULL OUT JOIN
  • 语义语法和传统sql join一致
  • 左右流都会触发更新
  • state根据时间区间保留,自动清理
  • 输出流保留时间属性

在这里插入图片描述

如:如果订单在收到订单10小时后发货,则此查询将把所有订单与其相应的发货联系起来
# 两表有时间戳字段,并且作为 watermark。或者使用PROCTIME() 函数来生成一个处理时间戳SELECT*FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time -INTERVAL'10'HOURAND s.ship_time

有效的join连接条件

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

流表和流表

CREATETABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32,2),
    update_time  timestamp(3),
    WATERMARK FOR update_time AS update_time
)WITH('connector'='kafka','topic'='currency_rates','properties.bootstrap.servers'='localhost:9092','properties.group.id'='currencyRatesGroup','scan.startup.mode'='latest-offset','format'='json');CREATETABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  timestamp(3),
    WATERMARK FOR order_time AS order_time)WITH('connector'='kafka','topic'='order','properties.bootstrap.servers'='localhost:9092','properties.group.id'='orders2ConsumerGroup','scan.startup.mode'='latest-offset','format'='json');select o.order_id,o.price,o.order_time,c.currency 
FROM orders  o,currency_rates c  
where o.currency=c.currency and o.order_time  BETWEEN c.update_time -INTERVAL'1'HOURAND c.update_time;

Temporal Joins(时态join)

时态表是一个随时间演变的表,在Flink中也称为动态表。时态表中的行与一个或多个时态周期相关联,并且所有Flink表都是时态的(动态的)。时态表包含一个或多个版本化的表快照,它可以是跟踪更改的更改历史表(例如数据库更改日志,包含所有快照),也可以是具体化更改的维表(例如包含最新快照的数据库表)。

**时态表可以分为

版本表

普通表

。**

  • 版本表(流表): 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog (如mysql binlog)可以定义成版本表,版本表内的数据始终不会自动清理,只能通过upsert触发
  • 普通表(维表): 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 、redis的表可以定义成普通表。

特征:

  • 只支持INNER JOIN、LEFT JOIN
  • 只有左流触发更新
  • 输出流保留时间属性

时态join类型

  • JOIN Lookup
  • JOIN 版本表
  • JOIN hive分区表

语法

***使用

FOR SYSTEM_TIME AS OF table1.proctime

表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据(即关联维表当前最新的状态)***

SELECT[column_list]FROM table1 [AS<alias1>][LEFT]JOIN table2 FOR SYSTEM_TIME ASOF table1.{ proctime | rowtime } [AS<alias2>]ON table1.column-name1 = table2.column-name1

扩展:Temporary table(临时表)和Temporal table(时态表)是两个不同概念。Temporary table是临时的表对象,属于当前Session,随着Session的结束而消失,该表不属于Catalog和DB

JOIN Lookup

特性:

Lookup join是针对于由作业流表触发,关联右侧维表来补全数据的场景 。默认情况下,在流表有数据变更,都会触发维表查询(可以通过设置维表是否缓存,来减轻查询压力),由于不保存状态,因此对内存占用较小

  • 左侧为流表、右侧为维表
  • 流表需要指定处理时间
  • 具备lookup能力的外部系统
  • 自己实现LookupTableSource接口connector

举例

kafka作为流表+jdbc、hbase、redis

--维表  CREATETEMPORARYTABLE users (`user_id` STRING,`name` STRING,`age`INT,`gmt_time`TIMESTAMP(3))WITH('connector'='jdbc','url'='jdbc:mysql://localhost:3306/flink','table-name'='user','username'='root','password'='123456');--流表CREATETABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    user_id    STRING,
    order_time  TIMESTAMP(3),
    proctime AS PROCTIME())WITH('connector'='kafka','topic'='order','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='latest-offset','format'='json');----使用FOR SYSTEM_TIME AS OF table1.proc_time表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据(即关联维表当前最新的状态)select orders.order_id,orders.price,orders.order_time,c.name  
FROM orders 
 LEFTJOIN users FOR SYSTEM_TIME ASOF orders.proctime  AS c 
ON orders.user_id = c.user_id;

在这里插入图片描述

JOIN 版本表

可以追溯数据历史版本的表,如:数据库changelog,数据源有:mysql-binlog、kafka-upsert、oracle-cdc等。需要具备

事件时间

主键

两个属性。

  • 版本表:本身具备upsert特性的表,直接作为版本表的数据源使用CREATETABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32,2), update_time TIMESTAMP(3), WATERMARK FOR update_time AS update_time,--事件时间PRIMARYKEY(currency)NOT ENFORCED -- 主键)WITH('connector'='kafka','value.format'='debezium-json',--changelog数据源:{"before":{},"after":{},"op":"u"}/* ... */);SELECT order_id, price, currency, conversion_rate, order_timeFROM ordersLEFTJOIN currency_rates FOR SYSTEM_TIME ASOF orders.order_timeON orders.currency = currency_rates.currency;
  • 版本视图:本身不是版本表,通过视图、函数转换为版本视图-- kafka json格式的数据为append-only数据源CREATETABLE ratesHistory ( currency STRING, conversion_rate DECIMAL(32,2), update_time TIMESTAMP(3), WATERMARK FOR update_time AS update_time --事件时间)WITH('connector'='kafka','format'='json',/* ... */);-- 转化为版本视图CREATEVIEW versionedRates ASSELECT currency,conversion_rate,update_time -- 事件时间:update_timeFROM(SELECT*,ROW_NUMBER()OVER(PARTITIONBY currency -- 主键:currencyORDERBY update_time DESC)AS rowNum FROM ratesHistory)where rowNum=1;SELECT order_id, price, currency, conversion_rate, order_timeFROM ordersLEFTJOIN versionedRates FOR SYSTEM_TIME ASOF orders.order_timeON orders.currency = currency_rates.currency;

Event Time Temporal Join(版本表)

事件时间临时join 允许针对版本化表进行联接。这意味着可以通过更改元数据来丰富表,并在某个时间点检索其值。临时join取一个任意表(左输入),并将每一行与版本化表(右输入)中相应行的相关版本相关联。没有时间窗口

特性:

与双流join不同,尽管构建端发生了更改,但之前的临时表结果不会受到影响。与间隔join相比,时态表join没有定义oin记录的时间窗口。左侧表的记录总是在时间属性指定的时间与右侧表的版本连接。因此,构建端的行可能任意陈旧

  • 左侧为流表、右侧为版本表
  • 两侧表都需要指定事件时间
  • 版本表的数据会持续增加

满足场景:

  1. 左输入表为流表,右输入表为版本表( Changelog 动态表,即 Upsert、Retract 数据流,而非 Append 数据流)
  2. 两侧表都需要设置watermark,版本表需要设置主键,主键必须包含在 JOIN 等值条件中
  3. 版本表发生变更,不会触发查询结果输出,会根据主键更新临时表

**举例 **

用户在下订单时,需要根据订单时间的汇率,计算订单金额,其中下单是以不同的货币技术,我们需要将他输出到特定货币(CNY)

在这里插入图片描述

# 订单表(普通表)CREATETABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time   timestamp(3),
    WATERMARK FOR order_time AS order_time
)WITH('connector'='kafka','topic'='order','properties.bootstrap.servers'='localhost:9092','properties.group.id'='orders2ConsumerGroup','scan.startup.mode'='latest-offset','format'='json');-- 汇率表 (版本表)CREATETABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32,2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time,PRIMARYKEY(currency)NOT ENFORCED
)WITH('connector'='kafka','topic'='currency_rates','properties.bootstrap.servers'='localhost:9092','properties.group.id'='currencyRatesGroup','scan.startup.mode'='latest-offset','format'='debezium-json','debezium-json.schema-include'='true');select o.order_id,o.price,o.order_time,c.currency  
FROM orders AS o 
LEFTJOIN currency_rates FOR SYSTEM_TIME ASOF o.order_time  AS c 
ON o.currency = c.currency;-- 汇率表(版本视图)CREATETABLE ratesHistory (
    currency STRING,
    conversion_rate DECIMAL(32,2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time
)WITH('connector'='kafka','topic'='currency_rates','properties.bootstrap.servers'='localhost:9092','properties.group.id'='currencyRatesGroup','scan.startup.mode'='latest-offset','format'='json');CREATEVIEW versionedRates ASSELECT currency,conversion_rate,update_time 
FROM(SELECT*,ROW_NUMBER()OVER(PARTITIONBY currency
                              ORDERBY update_time DESC)AS rowNum 
  FROM ratesHistory)where rowNum=1;select o.order_id,o.price,o.order_time,c.currency  
FROM orders AS o 
LEFTJOIN versionedRates FOR SYSTEM_TIME ASOF o.order_time  AS c 
ON o.currency = c.currency;

在这里插入图片描述

Processing Time Temporal Join(不建议使用)

由于基于处理时间的时态表 JOIN 存在 Bug(参见 FLINK-19830),因此在最新的 Flink 版本中已被禁用

处理时间临时表join

使用处理时间属性将行与外部版本化表中key的最新版本相关联。和

事件时间临时join 

的区别是:右侧版本话表没有版本时间作为事件时间用来设置watermark,因此需要使用处理时间作为版本时间。这种join的强大之处在于,当在Flink中将表具体化为动态表不可行时,它允许Flink直接与外部系统协作。

SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE(Rates(o_proctime))WHERE
  r_currency = o_currency

JOIN LATERAL

JOIN LATERAL 是单流驱动的join,根据左表逐条数据动态和右表进行JOIN。相对于其他flink Jon,JOIN LATERAL 的右边不是一个物理表,而是一个视图(view)或者Table-valued Funciton。LATERAL和 CROSS APPLY的语义相同

单流驱动的join

# LATERALSELECT 
    e.NAME, e.DEPTNO, d.NAME 
FROM EMPS e, LATERAL (SELECT* 
  FORM DEPTS d 
  WHERE e.DEPTNO=d.DEPTNO 
)as d;# inner joinSELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag;SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)SELECT order_id, res
FROM Orders
LEFTOUTERJOIN LATERAL TABLE(table_func(order_id)) t(res)ONTRUE# CROSS APPLAYSELECT 
    c.customerid, c.city, o.orderid 
FROM Customers c,CROSS APPLAY(SELECT 
      o.orderid, o.customerid 
  FROM Orders o 
  WHERE o.customerid = c.customerid 
)as o 
# LATERALSELECT 
    e.NAME, e.DEPTNO, d.NAME 
FROM EMPS e, LATERAL (SELECT* 
  FORM DEPTS d 
  WHERE e.DEPTNO=d.DEPTNO 
)as d;

窗口Join

窗口函数

Windowing table-valued functions (Windowing TVFs) Apache Flink提供了几个窗口表值函数(TVF),用于将表的元素划分为多个窗口,包括:

  • Tumble Windows
  • Hop Windows
  • Cumulate Windows
  • Session Windows (will be supported soon)

。基于SQL的窗口函数

TUMBLE(滚动窗口)

滚动窗口有固定的尺寸,窗口间的元素无重复

在这里插入图片描述

TUMBLE(TABLEdata, DESCRIPTOR(timecol), size [,offset])
  • data: 包含时间属性列的表
  • timecol: 是一个列描述符,指示数据的哪个时间属性列应映射到滚动窗口
  • size: 是指定滚动窗口宽度的持续时间。
  • offset: 是一个可选参数,用于指定窗口将要开始的偏移量。

例:每10分钟将10分钟内的金额汇总计算

# 其中 watermark(`bidtime` - INTERVAL '1' SECOND     )SELECT window_start, window_end,SUM(price)FROMTABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))GROUPBY window_start, window_end;

HOP(滑动窗口)

HOP函数将元素分配给固定长度的窗口。与TUMBLE窗口函数类似,窗口的大小由窗口大小参数配置。另一个窗口滑动参数控制跳转窗口的启动频率

在这里插入图片描述

HOP(TABLEdata, DESCRIPTOR(timecol), slide, size [,offset])
  • data: 包含时间属性列的表
  • timecol: 是一个列描述符,指示数据的哪个时间属性列应该映射到滑动窗口
  • slide: 每个滑动窗口创建的间隔时间
  • size: 是指定滑动窗口宽度的持续时间
  • offset: 是一个可选参数,用于指定窗口将要开始的偏移量。

例:将10分钟内的金额汇总计算,并且每5分钟触发一次计算

# 其中 watermark(`bidtime` - INTERVAL '1' SECOND     )SELECT window_start, window_end,SUM(price)FROMTABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES))GROUPBY window_start, window_end;

CUMULATE(累积窗口)

CUMULATE函数有固定的窗口大小和步长,同一个窗口会按照步长逐步累计时间的形式,触发窗口计算操作,其他在同一窗口触发计算的多个滚动窗口有相同的window_start,window_end会累加步长的时间长度。

在这里插入图片描述

CUMULATE(TABLEdata, DESCRIPTOR(timecol), step, size)
  • data: 包含时间属性列的表
  • timecol: 是一个列描述符,指示数据的哪个时间属性列应映射到滚动窗口。
  • step: 是指定连续累积窗口结束之间增加的窗口大小的持续时间(步长)
  • size: 是指定累积窗口的最大宽度的持续时间。大小必须是步长的整数倍。
  • offset: 是一个可选参数,用于指定窗口将要开始的偏移量。

例:每2分钟计算总金额,并在累积10分钟后,计算总金额

SELECT window_start, window_end,SUM(price)FROMTABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES))GROUPBY window_start, window_end;

窗口Join语法

窗口join将时间维度添加到join条件本身中。这样做时,窗口join将两个流的元素连接在一起,这两个流共享一个公共键并位于同一窗口中。窗口join的语义与DataStream窗口联接相同

特性:对于流式查询,与连续表上的其他join不同,窗口join不发出中间结果,而只在窗口结束时发出最终结果,后续延迟数据可能会丢失,实时性和准确性方面都相对较差。此外,窗口join在不再需要时清除所有中间状态

窗口触发join的条件:

  • 两个流的水位线均已经推进到window_end

支持的join类型:

INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN

语法:

SELECT...FROM L [LEFT|RIGHT|FULLOUTER]JOIN R -- L and R are relations applied windowing TVFON L.window_start = R.window_start AND L.window_end = R.window_end AND...
  • INNER/LEFT/RIGHT/FULL OUTER JOINSELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, L.window_start, L.window_end FROM(SELECT*FROMTABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) L FULLJOIN(SELECT*FROMTABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) R ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;例:CREATETABLE users ( user_id STRING, name STRING, age INT, gmt_time TIMESTAMP(3), WATERMARK FOR gmt_time AS gmt_time)WITH('connector'='kafka','topic'='users','properties.bootstrap.servers'='localhost:9092','properties.group.id'='orders2ConsumerGroup','scan.startup.mode'='latest-offset','format'='json');CREATETABLE address ( user_id STRING, address STRING, update_time TIMESTAMP(3), WATERMARK FOR update_time AS update_time)WITH('connector'='kafka','topic'='address','properties.bootstrap.servers'='localhost:9092','properties.group.id'='orders2ConsumerGroup','scan.startup.mode'='latest-offset','format'='json');SELECT u.user_id,a.address ,u.window_start,u.window_endFROM(SELECT*FROMTABLE(TUMBLE(TABLE users,DESCRIPTOR(gmt_time),INTERVAL'10' SECONDS)))AS u LEFTJOIN(SELECT*FROMTABLE(TUMBLE(TABLE address,DESCRIPTOR(update_time),INTERVAL'10' SECONDS)))AS aON u.user_id = a.user_id AND u.window_start =a.window_start AND u.window_end = a.window_end;--users-- {"user_id":"u1","name":"li","age":20,"gmt_time":"2022-11-01 10:00:00"}-- {"user_id":"u2","name":"li","age":20,"gmt_time":"2022-11-01 10:00:10"}-- {"user_id":"u2","name":"li","age":20,"gmt_time":"2022-11-01 10:00:20"}--address-- {"user_id":"u1","address":"shanghai","update_time":"2022-11-01 10:00:05"}-- {"user_id":"u2","address":"beijing","update_time":"2022-11-01 10:00:15"}-- {"user_id":"u2","address":"anhui","update_time":"2022-11-01 10:00:20"} user_id address window_start window_end u1 shanghai 2022-11-0110:00:00.0002022-11-0110:00:10.000 u2 beijing 2022-11-0110:00:10.0002022-11-0110:00:20.000 u2 anhui 2022-11-0110:00:10.0002022-11-0110:00:20.000
  • SEMI JOIN如果在公共窗口的右侧至少有一个匹配行,左窗口返回一行。SELECT*FROM(SELECT*FROMTABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) L WHERE L.num IN(SELECT num FROM(SELECT*FROMTABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
  • ANTI JOIN返回左侧窗口没有右侧窗口没有匹配的数据SELECT*FROM(SELECT*FROMTABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) L WHERE L.num NOTIN(SELECT num FROM(SELECT*FROMTABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);

总结

JOIN 类型触发join场景实时性准确度内存占用waterrmark时间属性双流join双流每一个数据流有变更都会触发join,并且状态会保存高先低后高(逐步更新)高(需要设置状态生存时间)否事件时间、处理时间时间区间 JOIN双流拥有相同key且 事件时间处于 lowerBoundTime 和 upperBoundTime之间的元素进行join中中(取决于区间大小)中(取决于区间大小)是(都需要)事件时间、处理时间时态表 JOIN(版本表)单流单流和版本表的join,具有历史版本状态管理功能。流表:事件时间,版本表:事件时间和主键中高(取决于具体实现)高(取决于版本表大小 )是(都需要)事件时间时态表 JOIN(Join Lookup )单流单流和维表的join,join要求一个表具有处理时间属性(流表),另一个表由查找源连接器支持(维表,实现了LookupableTableSource)高高(取决于是否缓存、异步等)低(取决于是否缓存、异步等)是(流表)处理时间JOIN LATERAL单流单流和UDTF的join。JOIN LATERAL 的右边不是一个物理表,而是一个视图(view)或者Table-valued Funciton。不具备状态管理功能高高(取决于是否缓存、异步等)低(取决于是否缓存、异步等)否窗口 JOIN双流相同key且位于相同时间窗口的元素进行 join低低(取决于窗口大小和类型)中(取决于窗口大小)是(都需要)watermark取双流中较慢的为准事件时间、处理时间

标签: sql flink 数据库

本文转载自: https://blog.csdn.net/u011618288/article/details/127863960
版权归原作者 vhicool 所有, 如有侵权,请联系我们删除。

“Flink sql join 快速入门”的评论:

还没有评论