表定义
- 动态表(dynamic table):动态表是流的另一种表达方式,动态表作为一个逻辑的抽象概念,使我们更容易理解flink中将streaming发展到table这个层次的设计,本质都是对无边界、持续变更数据的表示形式,所以动态表与流之间可以相互转换。
- 版本表(dynamic table):动态表之上的定义,版本是一个拥有主键和时间属性的动态表(建表语句必需包含PRIMARY KEY和WATERMARK),所以版本表可以追踪每个key在某时间点/时间内的变化情况。版本表可以直接从changelog格式的source创建,或者基于append-only的源创建版本视图。
- 时态表(temporal table):时态表是随着时间变化而变化的,也就是动态表,时态表包含一个或多个版本表的快照;当它能追踪所有记录的历史变更(来自数据库的changelog)时,就是个版本表;如果它只能表示所有记录经过物化后的最新快照(直接一个数据库表),那就是个普通表。
Regular Joins
Regular Joins是flink这么多join类型中最普通的,任意一侧的数据流有变更或者新增,都会影响到join结果。Regular joins是通过把双流的输入数据都保存在flink的状态中,存在state过度膨胀的隐患,所以我们在使用时要合理设置table状态的TTL(table.exec.state.ttl),这要结合具体的业务场景,否则会影响join结果的正确性。
有两种join类型,内连接(INNER Equi-JOIN)和外连接(OUTER Equi-JOIN),两者都只支持等值连接,且至少一个连接条件。
Interval Joins
Interval join要求至少有一个等值谓词连接和一个时间约束条件,这个时间属性定义了流的时间范围,且作为WATERMARK
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND
- 与Regular join一样是双流,但是它加上了时间区间的概念,可以清理状态中较旧的数据,而不会影响join结果的正确性
- 通过InnerJoin算子实现,水位线来控制join的数据区间以及清理数据,所以两个输入流都要定义WATERMARK,否则会变回Regular join
- WATERMARK可以定义为event-time或process-time
- 只支持append-only的输入流,当尝试使用cdc作为输入源(Retract)时出报错
Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[ tb_order]], fields=[order_id, price, currency, order_time])
Temporal Joins
与时态表的join,通过上述时态表的描述可得知,可以关联得到记录的历史版本或只能得到最新版本,flink sql遵循SQL:2011的标准语法
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
从使用形式划分,可以分为3种:Event Time Temporal Join、Processing Time Temporal Join、Temporal Table Function Join
Event Time Temporal Join
Event Time temporal join的是一个版本表,意味着可以根据主表的事件时间关联到当时维表的具体版本。
Temporal table join currently only supports ‘FOR SYSTEM_TIME AS OF’ left table’s time attribute field
Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found.
Temporal table’s primary key [currency] must be included in the equivalence condition of temporal join
- 左表必需定义事件时间,右表除了定义事件时间外,还需要定义主键(即版本表)
- 水位线替触发join,所以两侧流都需要设置正常的水位线
- 右表的主键必需作为等值谓词连接
- 与regular join相比,结果不会受到右侧流影响,即输出结果以左流为主,右流只是补充了左流的信息,效果与left join相似
- 与interval join相比,不需要定义时间窗口,即可以关联到更久之前的维度,因为版本表会保存全量维度最新版本以及上一个水位线之后的变更
- 一个水位线之前的版本数据将会被清理,因为随着水位线的推移,这些数据将不会再被用到
Processing Time Temporal Join
Processing-time temporal join is not supported yet
从flink 1.14开始已经不再支持这种方式,可以使用temporal table function语法替换
右表定义process-time属性,总是能关联到最新的右表记录,其实这和lookup join差不多,只不过右表的记录是以HashMap全部保存着最新的版本在状态中。这种方式的强大之处在于,可以直接对接不能变成flink动态表的外部表(例如hbase)
- 与regular joins相比,右表的数据变化不会影响到之前已经join出的结果
- 与interval joins相比,不需要定义时间窗口,且所有旧版本数据都不会保存在状态中
Temporal Table Function Join
Join key must be the same as temporal table’s primary key
create table if not exists tb_order (
`order_id` int,
`price` int,
`currency` string,
`order_time` timestamp(3),
`proc_time` AS PROCTIME(),
primary key(order_id) not enforced)
WITH (
'connector'='mysql-cdc',
'table-name'='tb_order',
)
create table if not exists tb_currency (
`currency` string,
`rate` int,
`update_time` timestamp(3),
`proc_time` AS PROCTIME(),
)
WITH (
'connector'='mysql-cdc',
'table-name'='tb_currency',
)
TemporalTableFunction rate = tEnv.from("tb_currency").createTemporalTableFunction("proc_time", "currency");
tEnv.createTemporarySystemFunction("rate",rate);
select * from tb_order o , LATERAL TABLE(rate(o.order_time)) c where o.currency=c.currency
上面例子实现了process time temporal join,两建表语句都不指定事件时间,且tb_currency无需指定primary key(即非版本表),但是在定义TemporalTableFunction可以指定任意字段为主键,所以如果建表语句指定了事件时间,且TemporalTableFunction也使用事件时间,那么相当于间接创建了版本表。
- 先要定义table funtion,指定一个时间属性(event-time或process-time)和主键
- TemporalTableFunction定义的主键必须作为等值谓词连接
- 除了可以和版本表join,还能和普通的表示最新版本的表/视图join,即它包含了event-time/processing-time temporal join两种
Lookup Join
通过查询外部存储系统的数据以丰富流的属性,这种join方式要求流表必须有一个processing time属性,外部数据表的connector要实现
LookupTableSource
接口
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'table-name' = 'customers'
);
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
- 外部表的数据变化不会影响到已经join出的结果
- 上面所有join都是双流,而lookup join是单流
版权归原作者 矛始 所有, 如有侵权,请联系我们删除。