Flink SQL支持对动态表进行复杂而灵活的连接操作,本文为您介绍如何使用双流JOIN语句。
背景信息
实时计算的JOIN和传统批处理JOIN的语义一致,都用于将两张表关联起来。区别为实时计算关联的是两张动态表,关联的结果也会动态更新,以保证最终结果和批处理结果一致。
双流JOIN语法
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
- tableReference:表名称。
- tableExpression:表达式。
- joinCondition:JOIN条件。
双流JOIN hints
从实时计算引擎VVR 8.0.1 开始,您可以通过提示(Hints)单独为双流JOIN的左右流状态设置不同生命周期 (TTL)来减少维护的状态大小。
- 语法
-- VVR 8.0.1 开始SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...-- VVR 8.0.7 开始,您也可以使用社区的Join State TTL Hint语法SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
- 注意事项- JOIN STATE TTL HINT仅支持在双流JOIN场景使用,不支持维表JOIN、Interval Join或Window Join。- 若双流JOIN时JOIN STATE TTL HINT仅指定某一条流的在JOIN节点的状态生命周期,则另外一条流的状态生命周期使用Flink SQL作业级别的状态生命周期,由table.exec.state.ttl控制(参见基本配置),默认值为1.5天。- tableReference支持表名,视图名和别名,一旦为表名指定别名时,则需使用别名。- 这是一个实验性质的特性,HINT语法未来可能会发生变化。
- 示例
-- HINT使用别名SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS pON o.productid = p.productid;-- VVR 8.0.7及以上版本也可以使用新语法SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS pON o.productid = p.productid;-- HINT使用表名SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN ProductsON Orders.productid = Products.productid;-- VVR 8.0.7及以上版本也可以使用新语法SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN ProductsON Orders.productid = Products.productid;-- HINT使用视图名CREATE TEMPORARY VIEW v ASSELECT id, ... FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn FROM src1 WHERE ... ) tmpWHERE rn = 1; SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*FROM vLEFT JOIN src2 AS b ON v.id = b.id;-- VVR 8.0.7及以上版本也可以使用新语法SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*FROM vLEFT JOIN src2 AS b ON v.id = b.id;
Orders JOIN Products表的数据示例
- 测试数据表 1. Ordersrowtimeproductidorderid****units
10:17:00
305410:17:05
106110:18:05
207210:18:07
3082011:02:00
109611:04:00
1010111:09:30
40111211:24:11
10124表 2. Productsproductidnameunitprice30Cheese17
10Beer0.25
20Wine6
30Cheese17
10Beer0.25
10Beer0.25
40Bread100
10Beer0.25
- 测试语句
SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS pON o.productid = p.productid;
- 测试结果o.rowtimeo.productido.orderido.unitsp.name****p.unitprice10:17:003054Cheese17.0010:17:003054Cheese17.0010:17:051061Beer0.2510:17:051061Beer0.2510:17:051061Beer0.2510:17:051061Beer0.2510:18:052072Wine6.0010:18:0730820Cheese17.0010:18:0730820Cheese17.0011:02:001096Beer0.2511:02:001096Beer0.2511:02:001096Beer0.2511:02:001096Beer0.2511:04:0010101Beer0.2511:04:0010101Beer0.2511:04:0010101Beer0.2511:04:0010101Beer0.2511:09:30401112Bread100.0011:24:1110124Beer0.2511:24:1110124Beer0.2511:24:1110124Beer0.2511:24:1110124Beer0.25
datahub_stream1 JOIN datahub_stream2表的数据示例
- 测试数据表 3. datahub_stream1a(BIGINT)b(BIGINT)c(VARCHAR)010test11110test21表 4. datahub_stream2a(BIGINT)b(BIGINT)c(VARCHAR)010test11110test21010test31110test41
- 测试语句
SELECT s1.c,s2.c FROM datahub_stream1 AS s1JOIN datahub_stream2 AS s2 ON s1.a = s2.aWHERE s1.a = 0;
- 测试结果s1.c(VARCHAR)****s2.c(VARCHAR)test11test11test11test31
版权归原作者 soso1968 所有, 如有侵权,请联系我们删除。