文章目录
概述
- lookup join通常是 查询外部系统的数据 来 充实FlinkSQL的主表 例如:事实表 关联 维度表,维度表在外部系统(如MySQL)
- 要求: 1个表具有处理时间属性(基于Processing Time Temporal Join语法) 语法上,和一般JOIN比较,多了
FOR SYSTEM_TIME AS OF
另1个表由连接器(a lookup source connector)支持 - Lookup Cache 默认情况下,不启用Lookup Cache 可设置
lookup.cache.max-rows
和lookup.cache.ttl
参数来启用 启用Lookup Cache后,Flink会先查询缓存,缓存未命中才查询外部数据库 启用缓存可加快查询速,但缓存中的记录未必是最新的
SQL参数说明
连接器,可以是connector
、jdbc
、kafka
…filesystem
数据库驱动driver
Lookup Cache中每行数据 的 最大 存活时间lookup.cache.ttl
Lookup Cache中的最大行数lookup.cache.max-rows
当 缓存的行数>
lookup.cache.max-rows
时,将清除存活时间最久的记录
缓存中的行 的存货时间 超过lookup.cache.ttl
也会被清除
pom.xml
环境:WIN10+IDEA+JDK1.8+Flink1.13+MySQL8
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>2.0.3</slf4j.version><log4j.version>2.17.2</log4j.version><fastjson.version>2.0.19</fastjson.version><lombok.version>1.18.24</lombok.version><mysql.version>8.0.31</mysql.version></properties><!-- https://mvnrepository.com/ --><dependencies><!-- Flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- FlinkSQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 'format'='csv' --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- 'format'='json' --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- 'connector' = 'jdbc' --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 'driver' = 'com.mysql.cj.jdbc.Driver' --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>${log4j.version}</version></dependency><!-- JSON解析 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- 简化JavaBean书写 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies>
MySQL建表
DROPDATABASEIFEXISTS db0;CREATEDATABASE db0;
CREATETABLE db0.tb0 (
a VARCHAR(255)PRIMARYKEY,
b INT(3),
c BIGINT(5),
d FLOAT(3,2),
e DOUBLE(4,2),
f DATEDEFAULT'2022-10-24',
g TIMESTAMPDEFAULTCURRENT_TIMESTAMP);
INSERT db0.tb0 (a,b,c,d,e)VALUES('aa',1,11,1.11,11.11),('bb',2,22,2.22,22.22),('cc',3,33,3.33,33.33);SELECT*FROM db0.tb0;
对应Flink的建表SQL
SQL
CREATETEMPORARYTABLE temp_tb0 (
a STRING,
b INT,
c BIGINT,
d FLOAT,
e DOUBLE,
f DATE,
g TIMESTAMP,PRIMARYKEY(a)NOT ENFORCED)WITH('lookup.cache.max-rows'='2','lookup.cache.ttl'='30 second','connector'='jdbc','driver'='com.mysql.cj.jdbc.Driver','url'='jdbc:mysql://localhost:3306/db0','username'='root','password'='123456','table-name'='tb0')
测试代码
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;publicclassHello{publicstaticvoidmain(String[] args){//创建流和表的执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);StreamTableEnvironment tbEnv =StreamTableEnvironment.create(env);//创建表,连接MySQL表
tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb0 (\n"+" a STRING,\n"+" b INT,\n"+" c BIGINT,\n"+" d FLOAT,\n"+" e DOUBLE,\n"+" f DATE,\n"+" g TIMESTAMP,\n"+" PRIMARY KEY(a) NOT ENFORCED)\n"+"WITH (\n"+" 'lookup.cache.max-rows' = '2',\n"+" 'lookup.cache.ttl' = '30 second',\n"+" 'connector' = 'jdbc',\n"+" 'driver' = 'com.mysql.cj.jdbc.Driver',\n"+" 'url' = 'jdbc:mysql://localhost:3306/db0',\n"+" 'username' = 'root',\n"+" 'password' = '123456',\n"+" 'table-name' = 'tb0'\n"+")");//执行查询,打印
tbEnv.sqlQuery("SELECT * FROM temp_tb0").execute().print();}}
测试结果打印
+----+----+---+----+------+-------+------------+----------------------------+| op | a | b | c | d | e | f | g |+----+----+---+----+------+-------+------------+----------------------------+|+I| aa |1|11|1.11|11.11|2022-10-24|2022-11-2914:57:50.000000||+I| bb |2|22|2.22|22.22|2022-10-24|2022-11-2914:57:50.000000||+I| cc |3|33|3.33|33.33|2022-10-24|2022-11-2914:57:50.000000|+----+----+---+----+------+-------+------------+----------------------------+
Lookup Join
FlinkSQL
SELECT*FROM v
JOIN t
FOR SYSTEM_TIME ASOF v.y
ON v.x=t.a
完整Java代码
importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importjava.util.Scanner;importstaticorg.apache.flink.table.api.Expressions.$;publicclassHi{publicstaticvoidmain(String[] args){//创建流和表的执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);StreamTableEnvironment tbEnv =StreamTableEnvironment.create(env);//创建左表DataStreamSource<String> d = env.addSource(newManualSource());Table tb = tbEnv.fromDataStream(d, $("x"), $("y").proctime());
tbEnv.createTemporaryView("v", tb);//创建右表(维度表)
tbEnv.executeSql("CREATE TEMPORARY TABLE t ( "+" a STRING, "+" b INT, "+" c BIGINT, "+" d FLOAT, "+" e DOUBLE, "+" f DATE, "+" g TIMESTAMP, "+" PRIMARY KEY(a) NOT ENFORCED) "+"WITH ( "+" 'lookup.cache.max-rows' = '2', "+" 'lookup.cache.ttl' = '30 second', "+" 'connector' = 'jdbc', "+" 'driver' = 'com.mysql.cj.jdbc.Driver', "+" 'url' = 'jdbc:mysql://localhost:3306/db0', "+" 'username' = 'root', "+" 'password' = '123456', "+" 'table-name' = 'tb0' "+")");//执行查询,打印
tbEnv.sqlQuery("SELECT * FROM v "+"JOIN t "+" FOR SYSTEM_TIME AS OF v.y "+" ON v.x=t.a").execute().print();}/** 手动输入的数据源 */publicstaticclassManualSourceimplementsSourceFunction<String>{publicManualSource(){}@Overridepublicvoidrun(SourceFunction.SourceContext<String> sc){Scanner scanner =newScanner(System.in);while(true){String str = scanner.nextLine().trim();if(str.equals("STOP")){break;}if(!str.equals("")){sc.collect(str);}}
scanner.close();}@Overridepublicvoidcancel(){}}}
测试结果
本文转载自: https://blog.csdn.net/Yellow_python/article/details/128070761
版权归原作者 小基基o_O 所有, 如有侵权,请联系我们删除。
版权归原作者 小基基o_O 所有, 如有侵权,请联系我们删除。