0


Flink SQL --维表join

文章目录

本文基于flink-1.13.6

一、维表 join 介绍

维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中(比如MySQL,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表rowtime所对应的的维表快照(事件时间语义)

二、Temporal Table Join

使用语法

SELECTcolumn-names
FROM table1  [AS<alias1>][LEFT]JOIN table2 FOR SYSTEM_TIME ASOF table1.proctime [AS<alias2>]ON table1.column-name1 = table2.key-name1

注意:目前,仅支持INNER JOIN与LEFT JOIN。在join的时候需要使用 FOR SYSTEM_TIME AS OF ,其中table1.proctime表示table1的proctime处理时间属性(计算列)。使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。

样例

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME ASOF o.proctime AS r
  ON r.currency = o.currency

使用说明
仅支持Blink planner
仅支持SQL,目前不支持Table API
目前不支持基于事件时间(event time)的temporal table join
维表可能会不断变化,JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化
维表和维表不能进行JOIN
维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件

三、维表Join案例

3.1、背景

Kafka中有一份用户行为数据,包括pv,buy,cart,fav行为;MySQL中有一份省份区域的维表数据。现将两种表进行JOIN,统计每个区域的购买行为数量。

3.2、实践

3.2.1、维表存储在MySQL中

-- mysqlCREATETABLE`dim_province`(`province_id`bigint(20)DEFAULTNULL,`province_name`varchar(50)DEFAULTNULL,`region_name`varchar(50)DEFAULTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8;insertinto dim_province (province_id, province_name, region_name)values(1,"山东","华东"),(2,"广东","华南"),(3,"河南","华中"),(4,"北京","华北"),(5,"新疆","西北");-- flinksql  维度表CREATETABLE dim_province (
    province_id BIGINT,-- 省份id
    province_name  VARCHAR,-- 省份名称
 region_name VARCHAR-- 区域名称)WITH('connector.type'='jdbc','connector.url'='jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8','connector.table'='dim_province','connector.username'='root','connector.password'='123456','connector.lookup.cache.max-rows'='5000','connector.lookup.cache.ttl'='10min');

注意: 加上

useUnicode=true&characterEncoding=UTF-8

,否则 flinksql 写到 mysql 产生乱码

3.2.2、事实数据存在 kafka

事实表存储在kafka中,数据为用户点击行为,格式为csv,具体数据样例如下:

1,1002,10002,fav,2022-10-27 16:25:00,2
1,1004,10002,cart,2022-10-27 16:25:01,3
6,1004,10004,pv,2022-10-27 16:25:01,3
3,1002,10001,cart,2022-10-27 16:25:01,1
4,1001,10004,fav,2022-10-27 16:25:01,4

创建kafka数据源表,如下:

CREATETABLE user_behavior (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,`ts`timestamp(3),
     province_id INT,-- 用户所在的省份id`proctime`as PROCTIME(),-- 处理时间列
    WATERMARK FOR ts as ts -INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列)WITH('connector'='kafka',-- 使用 kafka connector'topic'='user_behavior',-- kafka topic'scan.startup.mode'='latest-offset',-- 从起始 offset 开始读取'properties.bootstrap.servers'='chb1:9092','properties.group.id'='testGroup','format'='csv');

3.2.3、创建MySQL的结果表,表示区域销量

-- mysqlCREATETABLE top_region (
    region_name varchar(50),-- 区域名称
    buy_cnt BIGINT-- 销量)ENGINE=InnoDBDEFAULTCHARSET=utf8;-- flinksqlCREATETABLE region_sales_sink (
    region_name STRING,-- 区域名称
    buy_cnt BIGINT,-- 销量
    proctime as PROCTIME())WITH('connector.type'='jdbc','connector.url'='jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8','connector.table'='top_region',-- MySQL中的待插入数据的表'connector.username'='root','connector.password'='123456','connector.write.flush.interval'='1s');

3.2.4、用户行为数据与省份维表数据 join

CREATEVIEW user_behavior_detail ASSELECT
  u.user_id, 
  u.item_id,
  u.category_id,
  u.behavior,  
  p.province_name,
  p.region_name
FROM user_behavior AS u LEFTJOIN dim_province FOR SYSTEM_TIME ASOF u.proctime AS p
ON u.province_id = p.province_id;

3.2.5、计算区域的销量,并将计算结果写入MySQL

-- 结果INSERTINTO region_sales_sink
SELECT 
  region_name,COUNT(*) buy_cnt
FROM user_behavior_detail
WHERE behavior ='buy'GROUPBY region_name;

参考:
Flink SQL— CREATE语句
Flink Temporal Join Versioned Table Demo

标签: flink

本文转载自: https://blog.csdn.net/wuxintdrh/article/details/127555641
版权归原作者 宝哥大数据 所有, 如有侵权,请联系我们删除。

“Flink SQL --维表join”的评论:

还没有评论