系列文章目录
一. Paimon数据湖增删改查
二. 查询优化
三. 系统表
四. Lookup Joins
文章目录
前言
本文主要详解了Paimon数据湖的使用。
Paimon数据湖的使用
1、创建Table
1.1 创建catalog管理的表
在Paimon catalog中创建的表由catalog管理。当表从catalog中删除时,它的表文件也将被删除。类似Hive中的内部表。
示例(在Filesystem Metastore中创建):
CREATETABLE mypaimon.test.MyTable (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
);
解释: NOT ENFORCED叫做非强制。因为Paimon主要是进行海量数据分析,如果在海量数据上保证数据的主键,那么是非常消耗性能。我们不能完全依赖Paimon对数据进行主键的管理,我们需要在代码层面去确保数据的主键。
1.2 分区表
示例如下:
-- 正常创建分区表CREATETABLE mypaimon.test.MyPartitionTable1 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(dt, hh);-- 错误创建分区表:分区字段不在PRIMARY KEY中CREATETABLE mypaimon.test.MyPartitionTable2 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(item_id);-- 字段指定默认值CREATETABLE mypaimon.test.MyPartitionTable3 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(dt, hh)with('fields.item_id.deafult-value'='999');
注意: 在阿里云Flink的1.15版本及以下,如果定义了主键,那么分区字段必须是主键的子集
可能遇到的错误:
原因: 在阿里云Flink的15版本及以下,如果定义了主键,分区字段必须是主键的子集
1.3 Create Table As(了解)
在开源版本中,Paimon支持 create table as 语法,表可以由查询的结果创建和填充。当使用CREATE TABLE作为SELECT时,我们可以指定主键或分区,语法请参考下面的sql。
但是在阿里云flink中,虽然也支持CREATE TABLE AS写法,但是目前不支持paimon表作为源表。所以以下SQL了解即可。
/* 可以修改设置 */CREATETABLE MyTableAll (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(dt, hh);CREATETABLE MyTableAllAs WITH('primary-key'='dt,hh','partition'='dt')ASSELECT*FROM MyTableAll;
1.4 Create Table Like
要创建与另一个表具有相同模式、分区和表属性的表,可以使用create table LIKE。
CREATETABLEIFNOTEXISTS mypaimon.test.MyTable (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED);CREATETABLE mypaimon.test.MyTableLike LIKE mypaimon.test.MyTable;
1- create table as既会复制数据,也会复制表结构
2- create table like只会复制表结构
1.5 表属性
用户可以指定表属性来启用特性或提高Paimon的性能。表属性详细请参考:https://help.aliyun.com/zh/flink/developer-reference/apache-paimon-connector?spm=a2c4g.11174283.0.i3
示例:
CREATETABLE mypaimon.test.MyTableProperties (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(dt, hh)WITH('bucket'='2','bucket-key'='user_id');
1.6 创建外部表
外部表由catalog记录,但是不由catalog管理。如果删除了paimon的外部表,那么只会删除元数据信息不会删除对应的数据。类似Hive的外部表。
Flink SQL支持读写外部表。外部Paimon表是通过指定连接器和路径属性创建的。下面的SQL创建了一个名为MyExternalTable的外部表,它有2列,其中表文件的路径是 oss://gz18-paimon-oss/test.db/word_count。
-- 不能使用paimon的catalog来创建CREATETABLE MyExternalTable (
word STRING PRIMARYKEYNOT ENFORCED,
cnt BIGINT)WITH('connector'='paimon','path'='oss://gz18-paimon-oss/test.db/MyExternalTable','auto-create'='true'-- 如果表路径不存在,此table属性将为空表创建表文件,目前仅由Flink支持);
注意: path中的gz18-paimon-oss要改成你自己的oss的bucket名称
并且可以对这个表进行操作
-- 部署到线上运行insertinto MyExternalTable values('hello',1000);-- 批模式查询SELECTsum(cnt)AS total_cnt FROM MyExternalTable;
注意:需要将insert 语句部署到线上运行,才能将数据真正插入到表中。
通过paimon外部表的方式,可以对paimon的数据进行查询和修改。
可能遇到的错误:
原因: 不能在Paimon的Catalog中使用'connector' = 'paimon'
1.7 创建临时表
临时表仅有Flink支持,与外部表一样,临时表只是记录,而不是由当前的FlinkSQL会话管理。如果临时表被删除,它的资源不会被删除。当前FlinkSQL会话关闭的时候,会删除临时表。
如果希望将Paimon catalog与其他表一起使用,但又不希望将它们存储在 catalog 中,则可以创建一个临时表。下面的Flink SQL创建了一个Paimon catalog和一个临时表,并演示了如何同时使用这两个表。
准备工作:
创建temp_table.csv,其中数据为:
1,beijing
2,shanghai
编辑好后上传到OSS的gz18-bucket 目录。
-- 使用创建过的mypaimon catalog-- 【流式作业】创建my_table表并导入数据CREATETABLE mypaimon.test.my_table (
k INT,
v STRING
);-- 【批模式】【部署】运行INSERTINTO mypaimon.test.my_table values(1,'zs'),(2,'ls');-- 以下在批作业中一起【调试】运行即可-- 【批模式】创建temp_table表CREATETEMPORARYTABLE mypaimon.test.temp_table (
k INT,
v STRING
)WITH('connector'='filesystem','path'='oss://gz18-bucket/temp_table.csv','format'='csv');-- 【批模式】查询SELECT my_table.k, my_table.v, temp_table.v
FROM mypaimon.test.my_table
JOIN mypaimon.test.temp_table ON my_table.k = temp_table.k;
注意: path中的gz18-bucket一定要改成你自己的bucket名称
查询结果如下:
如果删除 temp_table后, temp_table.csv依然存在。
-- 在阿里云flink中不需要运行,temporary表只会在运行时生效droptemporarytable mypaimon.test.temp_table;
2、修改Table
2.1 修改或添加Table Properties
ALTERTABLE mypaimon.test.my_table SET('write-buffer-size'='256 MB');
2.2 移除Table Properties
ALTERTABLE mypaimon.test.my_table RESET ('write-buffer-size');
2.3 表重命名
ALTERTABLE mypaimon.test.my_table RENAMETO my_table_new;
注意:如果使用对象存储,如S3或OSS,请谨慎使用此语法,因为对象存储的重命名不是原子性的,在失败的情况下可能只会移动部分文件。
2.4 添加新列
注意:使用的阿里云Flink版本需要是1.17版本,否则不支持添加和修改字段信息。
ALTERTABLE mypaimon.test.my_table ADD(c1 INT, c2 STRING);desc mypaimon.test.my_table;
要添加具有指定位置的新列,请使用FIRST或AFTER col_name。
ALTERTABLE mypaimon.test.my_table ADD c INTFIRST;ALTERTABLE mypaimon.test.my_table ADD b INTAFTER c;
注意:要将一个存在的列修改到一个新的位置,使用FIRST或AFTER col_name。
ALTERTABLE mypaimon.test.my_table MODIFY c1 DOUBLEFIRST;ALTERTABLE mypaimon.test.my_table MODIFY c2 DOUBLEAFTER c1;
2.5 删除列
ALTERTABLE mypaimon.test.my_table DROP(c1, c2);
2.6 修改列的名称
ALTERTABLE mypaimon.test.my_table RENAME c TO cc;
2.7 更改列的空性
CREATETABLE mypaimon.test.null_table (id INTPRIMARYKEYNOT ENFORCED, coupon_info FLOATNOTNULL);
-- 将列' coupon_info '从NOT NULL更改为可空ALTERTABLE mypaimon.test.null_table MODIFY coupon_info FLOAT;
-- 将列' coupon_info '从可空改为NOT NULL-- 如果已经有NULL值,设置如下表选项,在修改表之前静默删除这些记录。SET'table.exec.sink.not-null-enforcer'='DROP';ALTERTABLE mypaimon.test.null_table MODIFY coupon_info FLOATNOTNULL;
2.8 更改列的备注
ALTERTABLE mypaimon.test.null_table MODIFY coupon_info BIGINTCOMMENT'优惠券信息';
2.9 更改列的类型
ALTERTABLE mypaimon.test.null_table MODIFY coupon_info DOUBLE;
对于是否可以转化,可以参考下面链接:
https://paimon.apache.org/docs/0.7/how-to/altering-tables/
2.10 修改watermark
注意:watermark中涉及的字段数据类型必须是timestamp或者timestamp_ltz
只在flink中生效,修改的语法如下:
CREATETABLE mypaimon.test.my_table (
k INT,
v STRING
);desc mypaimon.test.my_table
添加
ALTERTABLE mypaimon.test.my_table ADD(log_ts varchar(20));ALTERTABLE mypaimon.test.my_table ADD(
ts AS TO_TIMESTAMP(log_ts)AFTER log_ts,
WATERMARK FOR ts AS ts -INTERVAL'1'HOUR);
修改
ALTERTABLE mypaimon.test.my_table MODIFY WATERMARK FOR ts AS ts -INTERVAL'2'HOUR;
删除
ALTERTABLE mypaimon.test.my_table DROP WATERMARK;
3、写入Table
可以使用INSERT语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式指定,也可以由查询结果指定。
3.1 语法
INSERT { INTO| OVERWRITE } table_identifier [ part_spec ][ column_list ]
{ value_expr | query }
**(1)**part_spec
一个可选参数,用于指定分区的键和值对的逗号分隔列表。请注意,可以在分区规范中使用类型化文字(例如,date’ 2019-01-02 ')。
语法: PARTITION ( partition_col_name = partition_col_val [ , … ] )
**(2)**column_list
一个可选参数,用于指定属于table_identifier表的以逗号分隔的列列表。
注意:所有指定的列都应该存在于表中,需包括除静态分区列之外的所有列,并且不能重复。
语法: (col_name1 [, column_name2, …])
**(3)**value_expr
指定要插入的值。可以插入显式指定的值或NULL。必须用逗号分隔子句中的每个值。可以指定多个值集来插入多行。
语法: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]
注意:目前,Flink不支持直接使用NULL,因此应该使用
CAST (NULL AS data_type)
将NULL转换为实际数据类型。
3.2 将可空字段写入非空字段
不能在一个表的非空列中插入另一个表的可空列。假设,我们在表a中有一个列key1,它是主键,主键不能为空。在表B中有一个列key2,它是可空的。如果我们运行这样的sql语句:
INSERTINTO A key1 SELECT key2 FROM B
Flink 和 Spark中都会报错,处理方式是:使用”NVL”或”COALESCE”,将一个可空列转成非空列。
INSERTINTO A key1 SELECTCOALESCE(key2,<non-null expression>)FROM B
3.3 修改数据
(1)使用insert将记录/更改应用到表
Paimon支持在sink阶段按分区和桶对数据进行shuffle。
INSERTINTO MyTable SELECT...
(2)覆盖语义
- 对于未分区的表,Paimon支持覆盖整个表
INSERT OVERWRITE MyTable SELECT...
- 对于分区表,Paimon支持覆盖指定分区
INSERT OVERWRITE MyTable PARTITION(key1 = value1, key2 = value2,...)SELECT...
- 动态覆盖
Flink的默认覆盖模式是动态分区覆盖(这意味着Paimon只删除出现在被覆盖数据中的分区)。可以设置dynamic-partition-overwrite 为 false 将其更改为静态覆盖。
dynamic-partition-overwrite配置参数的默认值是true。
参考链接:https://paimon.apache.org/docs/0.7/maintenance/configurations/
-- 假如MyTable是一个分区表-- 动态覆盖INSERT OVERWRITE MyTable SELECT...-- 静态覆盖 (覆盖整个表)INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */SELECT...
3.4 清除数据
(1)清除表:可以使用INSERT OVERWRITE通过插入空值来清除表。
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */SELECT*FROM MyTable WHEREfalse
(2)清除分区:
目前Paimon支持两种方式去清除分区:
- 与清除表一样,可以使用INSERT OVERWRITE 向分区插入空值来清除分区的数据。
-- 语法INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */PARTITION(key1 = value1, key2 = value2,...)SELECT selectSpec FROM MyTable WHEREfalse
-- 案例:CREATETABLE`new-mypaimon`.test.OverwriteTable (
k0 INT,
k1 INT,
v STRING) PARTITIONED BY(k0, k1);-- 在批作业中部署运行insertinto`new-mypaimon`.test.OverwriteTable values(1,1,'a'),(2,2,'b'),(3,3,'c');-- 在批作业中调试运行select*from`new-mypaimon`.test.OverwriteTable;
-- 方法一 -- 在批作业中部署运行INSERT OVERWRITE `new-mypaimon`.test.OverwriteTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */PARTITION(k0 =1)SELECT k1, v FROM`new-mypaimon`.test.OverwriteTable WHEREfalse;-- 在批作业中调试运行select*from`new-mypaimon`.test.OverwriteTable;
- 上面的方法不支持删除多个分区。如果需要删除多个分区,可以通过flink run提交drop-partition作业。
3.5 更新数据
目前,Paimon支持在Flink 1.17及以后的版本中使用UPDATE更新记录。可以在Flink的批处理模式下执行UPDATE。目前阿里云Flink中暂不支持。
需要表具备以下两个特点:
- 必须是主键表;
- 需要对MergeEngine进行重复数据删除或部分更新。
-- 语法UPDATE table_identifier SET column1 = value1, column2 = value2,...WHERE condition;-- 开源版本案例:CREATETABLE UpdateTable(
a STRING,
b INT,
c INT,PRIMARYKEY(a)NOT ENFORCED
)WITH('write-mode'='change-log','merge-engine'='deduplicate');-- 插入INSERTINTO UpdateTable values('myTable',1,1);select*from UpdateTable;
-- 更新UPDATE UpdateTable SET b =1, c =2WHERE a ='myTable';
UPDATE UpdateTable SET b =1, c =3WHERE a ='myTable';
3.6 删除数据
在Flink 1.16和以前的版本中,Paimon只支持通过Flink run提交“delete”作业来删除记录。
在Flink 1.17及更高的版本,可以直接使用删除语句(只在批模式下支持)。但是需要设置两个条件:
- 设置write-mode为change-log
- 如果表有主键, MergeEngine需要重复数据删除
目前阿里云Flink中暂不支持。
-- 语法DELETEFROM table_identifier WHERE conditions;-- 开源版本案例:CREATETABLE DeleteTable (
id BIGINTNOTNULL,
currency STRING,
rate BIGINT,
dt String,PRIMARYKEY(id, dt)NOT ENFORCED) PARTITIONED BY(dt)WITH('write-mode'='change-log','merge-engine'='deduplicate');-- 插入语句INSERTINTO DeleteTable values(1,'UNKNOWN',1,'2023-08-22');select*from DeleteTable;
-- 删除语句DELETEFROM DeleteTable WHERE currency ='UNKNOWN';select*from DeleteTable;
4、查询Table
4.1 批量查询
Paimon的批处理读取返回表快照中的所有数据。默认情况下,批处理读取返回最新的快照。
-- Flink SQLSET'execution.runtime-mode'='batch';
4.1.1 批处理时间旅行
带时间旅行的Paimon批读可以指定一个快照或一个标签,并读取相应的数据。
-- 查看快照信息SELECT*FROM t$snapshots;-- 可以指定snapshot idSELECT*FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 可以指定timestamp (unix milliseconds)SELECT*FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- 可以指定tag,如 'my-tag'SELECT*FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
注意: snapshot的编号从1开始。
案例:
(1)建表并插入数据
CREATETABLE mypaimon.test.travelTable (
k0 INT,
k1 INT,
v STRING) PARTITIONED BY(k0, k1);-- 在批作业中部署运行insertinto mypaimon.test.travelTable values(1,1,'a'),(2,2,'b'),(3,3,'c');-- 在批作业中部署运行insertinto mypaimon.test.travelTable values(4,4,'d');-- 在批作业中部署运行INSERT OVERWRITE mypaimon.test.travelTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */PARTITION(k0 =1)SELECT k1, v FROM mypaimon.test.travelTable WHEREfalse;-- 在批作业中调试运行select*from mypaimon.test.travelTable;
(2)
SELECT*FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '1') */;
(3)
SELECT*FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '2') */;
(4)
SELECT*FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '3') */;
可能遇到的错误:
原因: 进行时间旅行的时候,如果访问了不存在的snapshot快照,那么会报错
4.1.2 批处理增量
读取开始快照(不包含)和结束快照之间的增量变化。
举例:
- ‘5,10’ 意味着 snapshot 5 和 snapshot 10 之间的
- ‘TAG1,TAG3’ 意味着 TAG1 和 TAG3 之间的
SELECT*FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
在批处理SQL中,不允许返回DELETE记录,因此将删除-D的记录。如果你想看到DELETE记录,你可以使用audit_log表:
SELECT*FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
案例:
SELECT*FROM mypaimon.test.travelTable /*+ OPTIONS('incremental-between' = '1,3') */;
总结:在批量查询时,可以指定快照id,也可以指定时间戳,还可以指定tag进行查询。这样就可以实现查询历史状态,也就是时间旅行。
4.2 流式查询
默认情况下,流式读取在第一次启动时生成表上的最新快照,并继续读取最新的更改。
默认情况下,Paimon确保任务的启动被正确处理,并包含全部数据。
-- 开源版本SET'execution.runtime-mode'='streaming';-- 阿里云Flink
在流作业草稿中运行
也可以不使用快照数据,使用最新浏览的模式。
-- 连续读取最新更改,而不在开始时生成快照SELECT*FROM t /*+ OPTIONS('scan.mode' = 'latest') */
4.2.1 流处理时间旅行
如果你只想处理今天及以后的数据,你可以使用分区过滤器:
SELECT*FROM t WHERE dt >'2023-06-26';
如果它不是一个分区表,或者不能按分区进行过滤,可以使用Time travel的流读取。
-- 可以指定snapshot idSELECT*FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 可以指定timestamp (unix milliseconds)SELECT*FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- 在第一次启动时读取快照id 1L,并继续读取更改SELECT*FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
案例:
-- 在流作业草稿中调试运行SELECT*FROM mypaimon.test.travelTable /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */;
如果持续向mypaimon.test.travelTable表插入数据,则会打印出新增的数据。
4.2.2 Consumer ID(了解)
在流式读取表时指定consumer-id,这是一个实验性功能。
SELECT*FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
当流读取Paimon表时,下一个快照id将被记录到文件系统中。这有几个优点:
- 当之前的作业停止后,新启动的作业可以继续消耗之前的进度,而不需要从状态恢复。新的读取将从消费者文件中找到的下一个快照 ID 开始读取。
- 在判断一个快照是否过期时,Paimon会查看文件系统中该表的所有消费者,如果还有消费者依赖这个快照,那么这个快照就不会因为过期而被删除。
- 当没有水印定义时,Paimon表会将快照中的水印传递到下游Paimon表,这意味着可以跟踪整个管道的水印进度。
注意:消费者将防止快照过期。可以指定“consumer.expiration-time”来管理消费者的生命周期。
示例:
(1)指定consumer-id开始流式查询:
SELECT * FROM t /*+ OPTIONS(‘consumer-id’ = ‘test’) */;
(2)停掉原先的流式查询,插入数据:
insert into t values(1,2,3);
(3)再次指定consumer-id流式查询:
SELECT * FROM t /*+ OPTIONS(‘consumer-id’ = ‘test’) */;
4.2.3 Read Overwrite
默认情况下,流读取将忽略INSERT OVERWRITE生成的提交。如果想要读取OVERWRITE的提交,可以配置 streaming-read-overwrite 为true。
总结:流式查询会不断读取最新表的变化,然后在最开始时是否读取全量数据,可以通过scan-mode来指定,使用-full的类型即可。
可以像kafka一样,使用consumer id去管理消费者的消费点位
4.3 查询优化
(1)强烈建议与查询一起指定分区和主键过滤器,这将加快查询的数据跳过。
可以加速数据跳转的过滤函数有:
- =
- <
- <=
=
- IN (…)
- LIKE ‘abc%’
- IS NULL
(2)Paimon将按主键对数据进行排序,这加快了点查询和范围查询的速度。当使用复合主键时,查询过滤器最好在主键的最左边形成一个前缀,以获得良好的加速。
示例:
-- 复合主键的表CREATETABLE orders (
catalog_id BIGINT,
order_id BIGINT,.....,PRIMARYKEY(catalog_id, order_id)NOT ENFORCED
)
通过为主键的最左边的前缀指定一个范围过滤器,查询可以获得很好的加速。
SELECT*FROM orders WHERE catalog_id=1025;SELECT*FROM orders WHERE catalog_id=1025AND order_id=29495;SELECT*FROM orders
WHERE catalog_id=1025AND order_id>2035AND order_id<6000;
但是,下面的过滤器不能很好地加速查询。
SELECT*FROM orders WHERE order_id=29495;SELECT*FROM orders WHERE catalog_id=1025OR order_id=29495;
5、系统表(了解)
在开源版本中,支持查询系统表,系统表中记录了表的一些元数据信息,详见https://paimon.apache.org/docs/0.5/how-to/system-tables/。
6、Lookup Joins
Lookup Joins 是流查询中的一种join。它用从Paimon查询的数据填充表。
在Flink中,Paimon支持对带有主键的表和仅追加表进行lookup join。下面的示例说明了这个特性。
以下案例是使用客户表做维表,与订单表进行关联,然后查看关联的效果。
(1)创建paimon表
-- 使用创建好的mypaimon catalog-- 在paimon catalog中建一个表CREATETABLE mypaimon.test.customers (
id INTPRIMARYKEYNOT ENFORCED,
name STRING,
country STRING,
zip STRING
);
(2)在Mysql中建表并插入数据
CREATEDATABASEIFNOTEXISTS test;CREATETABLE test.customers (
id INTPRIMARYKEY,
name varchar(20),
country varchar(20),
zip varchar(20));INSERTINTO test.customers VALUES(1,'张三','中国','081212'),(2,'李四','中国','872211');
(3)在Flink中创建Mysql映射表
CREATETABLEifnotexists mysql_cdc_customers (
id int,
name string,
country string,
zip string,PRIMARYKEY(id)NOT ENFORCED
)WITH('connector'='mysql','hostname'='rm-cn-lr53rh4wc00028.rwlb.rds.aliyuncs.com','port'='3306','username'='itcast','password'='Itcast123','server-time-zone'='Asia/Shanghai','database-name'='test','table-name'='customers');
注意: hostname、username、password都需要改成自己的。
(4)将mysql数据插入到paimon中
-- 流作业草稿中部署运行 INSERTINTO mypaimon.test.customers
select*from mysql_cdc_customers;
(5)登录ECS控制台
点击实例,点击远程连接,选择通过Workbench远程连接
输入密码,点击确定
(6)创建topic
-- 启动kafka(前提启动zookeeper)
/export/server/zookeeper/bin/zkServer.sh start
cd /export/server/kafka/
bin/kafka-server-start.sh -daemon config/server.properties
-- 创建topic(ip为ECS的内网ip,可以通过ifconfig查询)
bin/kafka-topics.sh --create--topic orders_info --bootstrap-server node1:9092
(7)在Flink创建kafka映射表
--创建一个临时左表,从kafka读数CREATETABLE Orders (
order_id INT,
total INT,
customer_id INT,
proc_time AS PROCTIME())WITH('connector'='kafka','topic'='orders_info','properties.bootstrap.servers'='172.28.237.195:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv');
注意: properties.bootstrap.servers中的IP地址改成你自己的ECS服务器内网(私网)的IP地址
(8)向orders_info中插入一些数据:
bin/kafka-console-producer.sh --broker-list node1:9092 --topic orders_info
1,100,1
2,20,1
(10)现在可以在lookup join中使用customers表
-- 流式作业草稿中调试运行SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN mypaimon.test.customers
FOR SYSTEM_TIME ASOF o.proc_time AS c
ON o.customer_id = c.id;
结果如下:
再向kafka中输入 3,50,2
结果变化如下:
再向kafka中输入 4,66,3
结果没有变化,因为id为3的维表信息关联不到。
此时向mysql中插入一条数据:
insertinto test.customers values(3,'王五','中国','111111');
结果不会更新。
等待片刻(插入到paimon的任务的checkpoint完成)。
再向kafka中输入 5,77,3,此时可以看到能关联到结果。
此时再改变维表值,如在mysql中进行更新
update test.customers set zip='test'where id =3;
此时,已经产生的结果也不再发生变化。新来的结果会关联新数据。
再向kafka中输入 6,88,3 ,结果如下
lookup join 操作将在本地维护一个RocksDB缓存,并实时提取表的最新更新。lookup join 操作将只提取必要的数据,因此筛选条件对性能非常重要。
此特性仅适用于最多包含数千万条记录的表,以避免过度使用本地磁盘。
注意:如果Orders(主表)的记录Join缺失,因为customers(lookup表)对应的数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup。
总结:
(1)如果事实表(Kafka数据源)先于维表(MySQL数据源)到达了,这时事实表就不会关联到维表,即使后续维表数据来了,也不会更新之前的关联结果了
(2)paimon表一旦发生变化,会实时更新维表(Flink的内存)中
(3)如果关联结果已经生成了,然后维表发生了变化,这时之前的关联结果也不会发生变化了
版权归原作者 大数据飞总 所有, 如有侵权,请联系我们删除。