0


Paimon数据湖详解(第49天)

系列文章目录

一. Paimon数据湖增删改查
二. 查询优化
三. 系统表
四. Lookup Joins


文章目录


前言

本文主要详解了Paimon数据湖的使用。


Paimon数据湖的使用

1、创建Table

1.1 创建catalog管理的表

在Paimon catalog中创建的表由catalog管理。当表从catalog中删除时,它的表文件也将被删除。类似Hive中的内部表。

示例(在Filesystem Metastore中创建):

在这里插入图片描述

CREATETABLE mypaimon.test.MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
);

解释: NOT ENFORCED叫做非强制。因为Paimon主要是进行海量数据分析,如果在海量数据上保证数据的主键,那么是非常消耗性能。我们不能完全依赖Paimon对数据进行主键的管理,我们需要在代码层面去确保数据的主键。
1.2 分区表

示例如下:

-- 正常创建分区表CREATETABLE mypaimon.test.MyPartitionTable1 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(dt, hh);-- 错误创建分区表:分区字段不在PRIMARY KEY中CREATETABLE mypaimon.test.MyPartitionTable2 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(item_id);-- 字段指定默认值CREATETABLE mypaimon.test.MyPartitionTable3 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(dt, hh)with('fields.item_id.deafult-value'='999');
注意: 在阿里云Flink的1.15版本及以下,如果定义了主键,那么分区字段必须是主键的子集

可能遇到的错误:

在这里插入图片描述

原因: 在阿里云Flink的15版本及以下,如果定义了主键,分区字段必须是主键的子集
1.3 Create Table As(了解)

在开源版本中,Paimon支持 create table as 语法,表可以由查询的结果创建和填充。当使用CREATE TABLE作为SELECT时,我们可以指定主键或分区,语法请参考下面的sql。

但是在阿里云flink中,虽然也支持CREATE TABLE AS写法,但是目前不支持paimon表作为源表。所以以下SQL了解即可。

/* 可以修改设置 */CREATETABLE MyTableAll (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED 
) PARTITIONED BY(dt, hh);CREATETABLE MyTableAllAs WITH('primary-key'='dt,hh','partition'='dt')ASSELECT*FROM MyTableAll;
1.4 Create Table Like

要创建与另一个表具有相同模式、分区和表属性的表,可以使用create table LIKE。

CREATETABLEIFNOTEXISTS mypaimon.test.MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED);CREATETABLE mypaimon.test.MyTableLike LIKE mypaimon.test.MyTable;
1- create table as既会复制数据,也会复制表结构
2- create table like只会复制表结构
1.5 表属性

用户可以指定表属性来启用特性或提高Paimon的性能。表属性详细请参考:https://help.aliyun.com/zh/flink/developer-reference/apache-paimon-connector?spm=a2c4g.11174283.0.i3

示例:

CREATETABLE mypaimon.test.MyTableProperties (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,PRIMARYKEY(dt, hh, user_id)NOT ENFORCED
) PARTITIONED BY(dt, hh)WITH('bucket'='2','bucket-key'='user_id');
1.6 创建外部表

外部表由catalog记录,但是不由catalog管理。如果删除了paimon的外部表,那么只会删除元数据信息不会删除对应的数据。类似Hive的外部表。

Flink SQL支持读写外部表。外部Paimon表是通过指定连接器和路径属性创建的。下面的SQL创建了一个名为MyExternalTable的外部表,它有2列,其中表文件的路径是 oss://gz18-paimon-oss/test.db/word_count。

-- 不能使用paimon的catalog来创建CREATETABLE MyExternalTable (
    word STRING PRIMARYKEYNOT ENFORCED,
    cnt BIGINT)WITH('connector'='paimon','path'='oss://gz18-paimon-oss/test.db/MyExternalTable','auto-create'='true'-- 如果表路径不存在,此table属性将为空表创建表文件,目前仅由Flink支持);

注意: path中的gz18-paimon-oss要改成你自己的oss的bucket名称

并且可以对这个表进行操作

-- 部署到线上运行insertinto MyExternalTable values('hello',1000);-- 批模式查询SELECTsum(cnt)AS total_cnt FROM MyExternalTable;

注意:需要将insert 语句部署到线上运行,才能将数据真正插入到表中。

通过paimon外部表的方式,可以对paimon的数据进行查询和修改。

可能遇到的错误:

在这里插入图片描述

原因: 不能在Paimon的Catalog中使用'connector' = 'paimon'
1.7 创建临时表

临时表仅有Flink支持,与外部表一样,临时表只是记录,而不是由当前的FlinkSQL会话管理。如果临时表被删除,它的资源不会被删除。当前FlinkSQL会话关闭的时候,会删除临时表。

如果希望将Paimon catalog与其他表一起使用,但又不希望将它们存储在 catalog 中,则可以创建一个临时表。下面的Flink SQL创建了一个Paimon catalog和一个临时表,并演示了如何同时使用这两个表。

准备工作:

创建temp_table.csv,其中数据为:

1,beijing
2,shanghai

编辑好后上传到OSS的gz18-bucket 目录。

-- 使用创建过的mypaimon catalog-- 【流式作业】创建my_table表并导入数据CREATETABLE mypaimon.test.my_table (
    k INT,
    v STRING
);-- 【批模式】【部署】运行INSERTINTO mypaimon.test.my_table values(1,'zs'),(2,'ls');-- 以下在批作业中一起【调试】运行即可-- 【批模式】创建temp_table表CREATETEMPORARYTABLE mypaimon.test.temp_table (
    k INT,
    v STRING
)WITH('connector'='filesystem','path'='oss://gz18-bucket/temp_table.csv','format'='csv');-- 【批模式】查询SELECT my_table.k, my_table.v, temp_table.v 
FROM mypaimon.test.my_table 
JOIN mypaimon.test.temp_table ON my_table.k = temp_table.k;

注意: path中的gz18-bucket一定要改成你自己的bucket名称

在这里插入图片描述

查询结果如下:
在这里插入图片描述

如果删除 temp_table后, temp_table.csv依然存在。

-- 在阿里云flink中不需要运行,temporary表只会在运行时生效droptemporarytable mypaimon.test.temp_table;

2、修改Table

2.1 修改或添加Table Properties
ALTERTABLE mypaimon.test.my_table SET('write-buffer-size'='256 MB');

在这里插入图片描述

2.2 移除Table Properties
ALTERTABLE mypaimon.test.my_table RESET ('write-buffer-size');
2.3 表重命名
ALTERTABLE mypaimon.test.my_table RENAMETO my_table_new;

注意:如果使用对象存储,如S3或OSS,请谨慎使用此语法,因为对象存储的重命名不是原子性的,在失败的情况下可能只会移动部分文件。

2.4 添加新列

注意:使用的阿里云Flink版本需要是1.17版本,否则不支持添加和修改字段信息。

ALTERTABLE mypaimon.test.my_table ADD(c1 INT, c2 STRING);desc mypaimon.test.my_table;

在这里插入图片描述

要添加具有指定位置的新列,请使用FIRST或AFTER col_name。

ALTERTABLE mypaimon.test.my_table ADD c INTFIRST;ALTERTABLE mypaimon.test.my_table ADD b INTAFTER c;

注意:要将一个存在的列修改到一个新的位置,使用FIRST或AFTER col_name。

ALTERTABLE mypaimon.test.my_table MODIFY c1 DOUBLEFIRST;ALTERTABLE mypaimon.test.my_table MODIFY c2 DOUBLEAFTER c1;
2.5 删除列
ALTERTABLE mypaimon.test.my_table DROP(c1, c2);
2.6 修改列的名称
ALTERTABLE mypaimon.test.my_table RENAME c TO cc;
2.7 更改列的空性
CREATETABLE mypaimon.test.null_table (id INTPRIMARYKEYNOT ENFORCED, coupon_info FLOATNOTNULL);
-- 将列' coupon_info '从NOT NULL更改为可空ALTERTABLE mypaimon.test.null_table MODIFY coupon_info FLOAT;
-- 将列' coupon_info '从可空改为NOT NULL-- 如果已经有NULL值,设置如下表选项,在修改表之前静默删除这些记录。SET'table.exec.sink.not-null-enforcer'='DROP';ALTERTABLE mypaimon.test.null_table MODIFY coupon_info FLOATNOTNULL;
2.8 更改列的备注
ALTERTABLE mypaimon.test.null_table MODIFY coupon_info BIGINTCOMMENT'优惠券信息';
2.9 更改列的类型
ALTERTABLE mypaimon.test.null_table MODIFY coupon_info DOUBLE;

对于是否可以转化,可以参考下面链接:

https://paimon.apache.org/docs/0.7/how-to/altering-tables/

在这里插入图片描述

2.10 修改watermark

注意:watermark中涉及的字段数据类型必须是timestamp或者timestamp_ltz

只在flink中生效,修改的语法如下:

CREATETABLE mypaimon.test.my_table (
    k INT,
    v STRING
);desc mypaimon.test.my_table

image-20240303091437682
添加

ALTERTABLE mypaimon.test.my_table ADD(log_ts varchar(20));ALTERTABLE mypaimon.test.my_table ADD(
    ts AS TO_TIMESTAMP(log_ts)AFTER log_ts,
    WATERMARK FOR ts AS ts -INTERVAL'1'HOUR);

在这里插入图片描述

修改

ALTERTABLE mypaimon.test.my_table MODIFY WATERMARK FOR ts AS ts -INTERVAL'2'HOUR;

在这里插入图片描述

删除

ALTERTABLE mypaimon.test.my_table DROP WATERMARK;

在这里插入图片描述

3、写入Table

可以使用INSERT语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式指定,也可以由查询结果指定。

3.1 语法
INSERT { INTO| OVERWRITE } table_identifier [ part_spec ][ column_list ] 
{ value_expr | query }

**(1)**part_spec

一个可选参数,用于指定分区的键和值对的逗号分隔列表。请注意,可以在分区规范中使用类型化文字(例如,date’ 2019-01-02 ')。

语法: PARTITION ( partition_col_name = partition_col_val [ , … ] )

**(2)**column_list

一个可选参数,用于指定属于table_identifier表的以逗号分隔的列列表。

注意:所有指定的列都应该存在于表中,需包括除静态分区列之外的所有列,并且不能重复。

语法: (col_name1 [, column_name2, …])

**(3)**value_expr

指定要插入的值。可以插入显式指定的值或NULL。必须用逗号分隔子句中的每个值。可以指定多个值集来插入多行。

语法: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]

注意:目前,Flink不支持直接使用NULL,因此应该使用

CAST (NULL AS data_type)

将NULL转换为实际数据类型。

3.2 将可空字段写入非空字段

不能在一个表的非空列中插入另一个表的可空列。假设,我们在表a中有一个列key1,它是主键,主键不能为空。在表B中有一个列key2,它是可空的。如果我们运行这样的sql语句:

INSERTINTO A key1 SELECT key2 FROM B

Flink 和 Spark中都会报错,处理方式是:使用”NVL”或”COALESCE”,将一个可空列转成非空列。

INSERTINTO A key1 SELECTCOALESCE(key2,<non-null expression>)FROM B
3.3 修改数据

(1)使用insert将记录/更改应用到表

Paimon支持在sink阶段按分区和桶对数据进行shuffle。

INSERTINTO MyTable SELECT...

(2)覆盖语义

  • 对于未分区的表,Paimon支持覆盖整个表
INSERT OVERWRITE MyTable SELECT...
  • 对于分区表,Paimon支持覆盖指定分区
INSERT OVERWRITE MyTable PARTITION(key1 = value1, key2 = value2,...)SELECT...
  • 动态覆盖

Flink的默认覆盖模式是动态分区覆盖(这意味着Paimon只删除出现在被覆盖数据中的分区)。可以设置dynamic-partition-overwrite 为 false 将其更改为静态覆盖。

dynamic-partition-overwrite配置参数的默认值是true。

参考链接:https://paimon.apache.org/docs/0.7/maintenance/configurations/

-- 假如MyTable是一个分区表-- 动态覆盖INSERT OVERWRITE MyTable SELECT...-- 静态覆盖 (覆盖整个表)INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */SELECT...
3.4 清除数据

(1)清除表:可以使用INSERT OVERWRITE通过插入空值来清除表。

INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */SELECT*FROM MyTable WHEREfalse

(2)清除分区:

目前Paimon支持两种方式去清除分区:

  • 与清除表一样,可以使用INSERT OVERWRITE 向分区插入空值来清除分区的数据。
-- 语法INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */PARTITION(key1 = value1, key2 = value2,...)SELECT selectSpec FROM MyTable WHEREfalse
-- 案例:CREATETABLE`new-mypaimon`.test.OverwriteTable (
    k0 INT,
    k1 INT,
    v STRING) PARTITIONED BY(k0, k1);-- 在批作业中部署运行insertinto`new-mypaimon`.test.OverwriteTable values(1,1,'a'),(2,2,'b'),(3,3,'c');-- 在批作业中调试运行select*from`new-mypaimon`.test.OverwriteTable;
-- 方法一 -- 在批作业中部署运行INSERT OVERWRITE `new-mypaimon`.test.OverwriteTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */PARTITION(k0 =1)SELECT k1, v FROM`new-mypaimon`.test.OverwriteTable WHEREfalse;-- 在批作业中调试运行select*from`new-mypaimon`.test.OverwriteTable;

在这里插入图片描述

  • 上面的方法不支持删除多个分区。如果需要删除多个分区,可以通过flink run提交drop-partition作业。
3.5 更新数据

目前,Paimon支持在Flink 1.17及以后的版本中使用UPDATE更新记录。可以在Flink的批处理模式下执行UPDATE。目前阿里云Flink中暂不支持。

需要表具备以下两个特点:

  • 必须是主键表;
  • 需要对MergeEngine进行重复数据删除或部分更新。
-- 语法UPDATE table_identifier SET column1 = value1, column2 = value2,...WHERE condition;-- 开源版本案例:CREATETABLE UpdateTable(
 a STRING,
 b INT,
 c INT,PRIMARYKEY(a)NOT ENFORCED
)WITH('write-mode'='change-log','merge-engine'='deduplicate');-- 插入INSERTINTO UpdateTable values('myTable',1,1);select*from UpdateTable;
-- 更新UPDATE UpdateTable SET b =1, c =2WHERE a ='myTable';
UPDATE UpdateTable SET b =1, c =3WHERE a ='myTable';
3.6 删除数据

在Flink 1.16和以前的版本中,Paimon只支持通过Flink run提交“delete”作业来删除记录。

在Flink 1.17及更高的版本,可以直接使用删除语句(只在批模式下支持)。但是需要设置两个条件:

  • 设置write-mode为change-log
  • 如果表有主键, MergeEngine需要重复数据删除

目前阿里云Flink中暂不支持。

-- 语法DELETEFROM table_identifier WHERE conditions;-- 开源版本案例:CREATETABLE DeleteTable (
    id BIGINTNOTNULL,
    currency STRING,
    rate BIGINT,
    dt String,PRIMARYKEY(id, dt)NOT ENFORCED) PARTITIONED BY(dt)WITH('write-mode'='change-log','merge-engine'='deduplicate');-- 插入语句INSERTINTO DeleteTable values(1,'UNKNOWN',1,'2023-08-22');select*from DeleteTable;

在这里插入图片描述

-- 删除语句DELETEFROM DeleteTable WHERE currency ='UNKNOWN';select*from DeleteTable;

在这里插入图片描述

4、查询Table

4.1 批量查询

Paimon的批处理读取返回表快照中的所有数据。默认情况下,批处理读取返回最新的快照。

-- Flink SQLSET'execution.runtime-mode'='batch';
4.1.1 批处理时间旅行

带时间旅行的Paimon批读可以指定一个快照或一个标签,并读取相应的数据。

-- 查看快照信息SELECT*FROM t$snapshots;-- 可以指定snapshot idSELECT*FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 可以指定timestamp (unix milliseconds)SELECT*FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- 可以指定tag,如 'my-tag'SELECT*FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

注意: snapshot的编号从1开始。

案例:

(1)建表并插入数据

CREATETABLE mypaimon.test.travelTable (
    k0 INT,
    k1 INT,
    v STRING) PARTITIONED BY(k0, k1);-- 在批作业中部署运行insertinto mypaimon.test.travelTable values(1,1,'a'),(2,2,'b'),(3,3,'c');-- 在批作业中部署运行insertinto mypaimon.test.travelTable values(4,4,'d');-- 在批作业中部署运行INSERT OVERWRITE mypaimon.test.travelTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */PARTITION(k0 =1)SELECT k1, v FROM mypaimon.test.travelTable WHEREfalse;-- 在批作业中调试运行select*from mypaimon.test.travelTable;

(2)

SELECT*FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '1') */;

在这里插入图片描述

(3)

SELECT*FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '2') */;

在这里插入图片描述

(4)

SELECT*FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '3') */;

在这里插入图片描述

可能遇到的错误:

在这里插入图片描述

原因: 进行时间旅行的时候,如果访问了不存在的snapshot快照,那么会报错
4.1.2 批处理增量

读取开始快照(不包含)和结束快照之间的增量变化。

举例:

  • ‘5,10’ 意味着 snapshot 5 和 snapshot 10 之间的
  • ‘TAG1,TAG3’ 意味着 TAG1 和 TAG3 之间的
SELECT*FROM t /*+ OPTIONS('incremental-between' = '12,20') */;

在批处理SQL中,不允许返回DELETE记录,因此将删除-D的记录。如果你想看到DELETE记录,你可以使用audit_log表:

SELECT*FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;

案例:

SELECT*FROM mypaimon.test.travelTable /*+ OPTIONS('incremental-between' = '1,3') */;

在这里插入图片描述

总结:在批量查询时,可以指定快照id,也可以指定时间戳,还可以指定tag进行查询。这样就可以实现查询历史状态,也就是时间旅行。

4.2 流式查询

默认情况下,流式读取在第一次启动时生成表上的最新快照,并继续读取最新的更改。

默认情况下,Paimon确保任务的启动被正确处理,并包含全部数据。

-- 开源版本SET'execution.runtime-mode'='streaming';-- 阿里云Flink
在流作业草稿中运行

也可以不使用快照数据,使用最新浏览的模式。

-- 连续读取最新更改,而不在开始时生成快照SELECT*FROM t /*+ OPTIONS('scan.mode' = 'latest') */
4.2.1 流处理时间旅行

如果你只想处理今天及以后的数据,你可以使用分区过滤器:

SELECT*FROM t WHERE dt >'2023-06-26';

如果它不是一个分区表,或者不能按分区进行过滤,可以使用Time travel的流读取。

-- 可以指定snapshot idSELECT*FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 可以指定timestamp (unix milliseconds)SELECT*FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- 在第一次启动时读取快照id 1L,并继续读取更改SELECT*FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;

案例:

-- 在流作业草稿中调试运行SELECT*FROM mypaimon.test.travelTable /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */;

在这里插入图片描述

如果持续向mypaimon.test.travelTable表插入数据,则会打印出新增的数据。

4.2.2 Consumer ID(了解)

在流式读取表时指定consumer-id,这是一个实验性功能。

SELECT*FROM t /*+ OPTIONS('consumer-id' = 'myid') */;

当流读取Paimon表时,下一个快照id将被记录到文件系统中。这有几个优点:

  • 当之前的作业停止后,新启动的作业可以继续消耗之前的进度,而不需要从状态恢复。新的读取将从消费者文件中找到的下一个快照 ID 开始读取。
  • 在判断一个快照是否过期时,Paimon会查看文件系统中该表的所有消费者,如果还有消费者依赖这个快照,那么这个快照就不会因为过期而被删除。
  • 当没有水印定义时,Paimon表会将快照中的水印传递到下游Paimon表,这意味着可以跟踪整个管道的水印进度。

注意:消费者将防止快照过期。可以指定“consumer.expiration-time”来管理消费者的生命周期。

示例:

(1)指定consumer-id开始流式查询:

SELECT * FROM t /*+ OPTIONS(‘consumer-id’ = ‘test’) */;

(2)停掉原先的流式查询,插入数据:

insert into t values(1,2,3);

(3)再次指定consumer-id流式查询:

SELECT * FROM t /*+ OPTIONS(‘consumer-id’ = ‘test’) */;

4.2.3 Read Overwrite

默认情况下,流读取将忽略INSERT OVERWRITE生成的提交。如果想要读取OVERWRITE的提交,可以配置 streaming-read-overwrite 为true。

总结:流式查询会不断读取最新表的变化,然后在最开始时是否读取全量数据,可以通过scan-mode来指定,使用-full的类型即可。

可以像kafka一样,使用consumer id去管理消费者的消费点位

4.3 查询优化

(1)强烈建议与查询一起指定分区和主键过滤器,这将加快查询的数据跳过。

可以加速数据跳转的过滤函数有:

  • =
  • <
  • <=
  • =

  • IN (…)
  • LIKE ‘abc%’
  • IS NULL

(2)Paimon将按主键对数据进行排序,这加快了点查询和范围查询的速度。当使用复合主键时,查询过滤器最好在主键的最左边形成一个前缀,以获得良好的加速。

示例:

-- 复合主键的表CREATETABLE orders (
    catalog_id BIGINT,
    order_id BIGINT,.....,PRIMARYKEY(catalog_id, order_id)NOT ENFORCED 
)

通过为主键的最左边的前缀指定一个范围过滤器,查询可以获得很好的加速。

SELECT*FROM orders WHERE catalog_id=1025;SELECT*FROM orders WHERE catalog_id=1025AND order_id=29495;SELECT*FROM orders
  WHERE catalog_id=1025AND order_id>2035AND order_id<6000;

但是,下面的过滤器不能很好地加速查询。

SELECT*FROM orders WHERE order_id=29495;SELECT*FROM orders WHERE catalog_id=1025OR order_id=29495;

5、系统表(了解)

在开源版本中,支持查询系统表,系统表中记录了表的一些元数据信息,详见https://paimon.apache.org/docs/0.5/how-to/system-tables/。

6、Lookup Joins

Lookup Joins 是流查询中的一种join。它用从Paimon查询的数据填充表。

在Flink中,Paimon支持对带有主键的表和仅追加表进行lookup join。下面的示例说明了这个特性。

以下案例是使用客户表做维表,与订单表进行关联,然后查看关联的效果。

(1)创建paimon表

-- 使用创建好的mypaimon catalog-- 在paimon catalog中建一个表CREATETABLE mypaimon.test.customers (
    id INTPRIMARYKEYNOT ENFORCED,
    name STRING,
    country STRING,
    zip STRING
);

(2)在Mysql中建表并插入数据

CREATEDATABASEIFNOTEXISTS test;CREATETABLE test.customers (
    id INTPRIMARYKEY,
    name varchar(20),
    country varchar(20),
    zip varchar(20));INSERTINTO test.customers VALUES(1,'张三','中国','081212'),(2,'李四','中国','872211');

(3)在Flink中创建Mysql映射表

CREATETABLEifnotexists mysql_cdc_customers (
     id int,
     name string,
     country string,
     zip string,PRIMARYKEY(id)NOT ENFORCED
)WITH('connector'='mysql','hostname'='rm-cn-lr53rh4wc00028.rwlb.rds.aliyuncs.com','port'='3306','username'='itcast','password'='Itcast123','server-time-zone'='Asia/Shanghai','database-name'='test','table-name'='customers');

注意: hostname、username、password都需要改成自己的。

(4)将mysql数据插入到paimon中

-- 流作业草稿中部署运行 INSERTINTO mypaimon.test.customers
select*from mysql_cdc_customers;

(5)登录ECS控制台

点击实例,点击远程连接,选择通过Workbench远程连接

输入密码,点击确定

(6)创建topic

-- 启动kafka(前提启动zookeeper)
/export/server/zookeeper/bin/zkServer.sh start
cd /export/server/kafka/
bin/kafka-server-start.sh -daemon config/server.properties

-- 创建topic(ip为ECS的内网ip,可以通过ifconfig查询)
bin/kafka-topics.sh --create--topic orders_info --bootstrap-server node1:9092

(7)在Flink创建kafka映射表

--创建一个临时左表,从kafka读数CREATETABLE Orders (
    order_id INT,
    total INT,
    customer_id INT,
    proc_time AS PROCTIME())WITH('connector'='kafka','topic'='orders_info','properties.bootstrap.servers'='172.28.237.195:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv');

注意: properties.bootstrap.servers中的IP地址改成你自己的ECS服务器内网(私网)的IP地址

(8)向orders_info中插入一些数据:

bin/kafka-console-producer.sh --broker-list node1:9092 --topic orders_info
1,100,1
2,20,1

(10)现在可以在lookup join中使用customers表

-- 流式作业草稿中调试运行SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN mypaimon.test.customers
FOR SYSTEM_TIME ASOF o.proc_time AS c
ON o.customer_id = c.id;

结果如下:

在这里插入图片描述

再向kafka中输入 3,50,2

结果变化如下:

在这里插入图片描述

再向kafka中输入 4,66,3

结果没有变化,因为id为3的维表信息关联不到。

此时向mysql中插入一条数据:

insertinto test.customers values(3,'王五','中国','111111');

结果不会更新。

等待片刻(插入到paimon的任务的checkpoint完成)。

再向kafka中输入 5,77,3,此时可以看到能关联到结果。

在这里插入图片描述

此时再改变维表值,如在mysql中进行更新

update test.customers set zip='test'where id =3;

此时,已经产生的结果也不再发生变化。新来的结果会关联新数据。

再向kafka中输入 6,88,3 ,结果如下

在这里插入图片描述

lookup join 操作将在本地维护一个RocksDB缓存,并实时提取表的最新更新。lookup join 操作将只提取必要的数据,因此筛选条件对性能非常重要。

此特性仅适用于最多包含数千万条记录的表,以避免过度使用本地磁盘。

注意:如果Orders(主表)的记录Join缺失,因为customers(lookup表)对应的数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup。

总结:

(1)如果事实表(Kafka数据源)先于维表(MySQL数据源)到达了,这时事实表就不会关联到维表,即使后续维表数据来了,也不会更新之前的关联结果了

(2)paimon表一旦发生变化,会实时更新维表(Flink的内存)中

(3)如果关联结果已经生成了,然后维表发生了变化,这时之前的关联结果也不会发生变化了


本文转载自: https://blog.csdn.net/syhiiu/article/details/140697905
版权归原作者 大数据飞总 所有, 如有侵权,请联系我们删除。

“Paimon数据湖详解(第49天)”的评论:

还没有评论