0


大数据(9h)FlinkSQL之Lookup Join

文章目录

概述

  • lookup join通常是 查询外部系统的数据 来 充实FlinkSQL的主表 例如:事实表 关联 维度表,维度表在外部系统(如MySQL)
  • 要求: 1个表具有处理时间属性(基于Processing Time Temporal Join语法) 语法上,和一般JOIN比较,多了FOR SYSTEM_TIME AS OF 另1个表由连接器(a lookup source connector)支持
  • Lookup Cache 默认情况下,不启用Lookup Cache 可设置lookup.cache.max-rowslookup.cache.ttl参数来启用 启用Lookup Cache后,Flink会先查询缓存,缓存未命中才查询外部数据库 启用缓存可加快查询速,但缓存中的记录未必是最新的
    SQL参数说明
    connector
    
    连接器,可以是
    jdbc
    
    kafka
    
    filesystem
    
    driver
    
    数据库驱动
    lookup.cache.ttl
    
    Lookup Cache中每行数据 的 最大 存活时间
    lookup.cache.max-rows
    
    Lookup Cache中的最大行数

    当 缓存的行数>

    lookup.cache.max-rows
    

    时,将清除存活时间最久的记录
    缓存中的行 的存货时间 超过

    lookup.cache.ttl
    

    也会被清除

pom.xml

环境:WIN10+IDEA+JDK1.8+Flink1.13+MySQL8

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>2.0.3</slf4j.version><log4j.version>2.17.2</log4j.version><fastjson.version>2.0.19</fastjson.version><lombok.version>1.18.24</lombok.version><mysql.version>8.0.31</mysql.version></properties><!-- https://mvnrepository.com/ --><dependencies><!-- Flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- FlinkSQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 'format'='csv' --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- 'format'='json' --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- 'connector' = 'jdbc' --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 'driver' = 'com.mysql.cj.jdbc.Driver' --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>${log4j.version}</version></dependency><!-- JSON解析 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- 简化JavaBean书写 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies>

MySQL建表

DROPDATABASEIFEXISTS db0;CREATEDATABASE db0;
CREATETABLE db0.tb0 (
  a VARCHAR(255)PRIMARYKEY,
  b INT(3),
  c BIGINT(5),
  d FLOAT(3,2),
  e DOUBLE(4,2),
  f DATEDEFAULT'2022-10-24',
  g TIMESTAMPDEFAULTCURRENT_TIMESTAMP);
INSERT db0.tb0 (a,b,c,d,e)VALUES('aa',1,11,1.11,11.11),('bb',2,22,2.22,22.22),('cc',3,33,3.33,33.33);SELECT*FROM db0.tb0;

对应Flink的建表SQL

SQL

CREATETEMPORARYTABLE temp_tb0 (
  a STRING,
  b INT,
  c BIGINT,
  d FLOAT,
  e DOUBLE,
  f DATE,
  g TIMESTAMP,PRIMARYKEY(a)NOT ENFORCED)WITH('lookup.cache.max-rows'='2','lookup.cache.ttl'='30 second','connector'='jdbc','driver'='com.mysql.cj.jdbc.Driver','url'='jdbc:mysql://localhost:3306/db0','username'='root','password'='123456','table-name'='tb0')

测试代码

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;publicclassHello{publicstaticvoidmain(String[] args){//创建流和表的执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);StreamTableEnvironment tbEnv =StreamTableEnvironment.create(env);//创建表,连接MySQL表
        tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb0 (\n"+"  a STRING,\n"+"  b INT,\n"+"  c BIGINT,\n"+"  d FLOAT,\n"+"  e DOUBLE,\n"+"  f DATE,\n"+"  g TIMESTAMP,\n"+"  PRIMARY KEY(a) NOT ENFORCED)\n"+"WITH (\n"+"  'lookup.cache.max-rows' = '2',\n"+"  'lookup.cache.ttl' = '30 second',\n"+"  'connector' = 'jdbc',\n"+"  'driver' = 'com.mysql.cj.jdbc.Driver',\n"+"  'url' = 'jdbc:mysql://localhost:3306/db0',\n"+"  'username' = 'root',\n"+"  'password' = '123456',\n"+"  'table-name' = 'tb0'\n"+")");//执行查询,打印
        tbEnv.sqlQuery("SELECT * FROM temp_tb0").execute().print();}}

测试结果打印

+----+----+---+----+------+-------+------------+----------------------------+| op |  a | b |  c |    d |     e |          f |                          g |+----+----+---+----+------+-------+------------+----------------------------+|+I| aa |1|11|1.11|11.11|2022-10-24|2022-11-2914:57:50.000000||+I| bb |2|22|2.22|22.22|2022-10-24|2022-11-2914:57:50.000000||+I| cc |3|33|3.33|33.33|2022-10-24|2022-11-2914:57:50.000000|+----+----+---+----+------+-------+------------+----------------------------+

Lookup Join

FlinkSQL

SELECT*FROM v
JOIN t
  FOR SYSTEM_TIME ASOF v.y
  ON v.x=t.a

完整Java代码

importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importjava.util.Scanner;importstaticorg.apache.flink.table.api.Expressions.$;publicclassHi{publicstaticvoidmain(String[] args){//创建流和表的执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);StreamTableEnvironment tbEnv =StreamTableEnvironment.create(env);//创建左表DataStreamSource<String> d = env.addSource(newManualSource());Table tb = tbEnv.fromDataStream(d, $("x"), $("y").proctime());
        tbEnv.createTemporaryView("v", tb);//创建右表(维度表)
        tbEnv.executeSql("CREATE TEMPORARY TABLE t ( "+"  a STRING, "+"  b INT, "+"  c BIGINT, "+"  d FLOAT, "+"  e DOUBLE, "+"  f DATE, "+"  g TIMESTAMP, "+"  PRIMARY KEY(a) NOT ENFORCED) "+"WITH ( "+"  'lookup.cache.max-rows' = '2', "+"  'lookup.cache.ttl' = '30 second', "+"  'connector' = 'jdbc', "+"  'driver' = 'com.mysql.cj.jdbc.Driver', "+"  'url' = 'jdbc:mysql://localhost:3306/db0', "+"  'username' = 'root', "+"  'password' = '123456', "+"  'table-name' = 'tb0' "+")");//执行查询,打印
        tbEnv.sqlQuery("SELECT * FROM v "+"JOIN t "+"  FOR SYSTEM_TIME AS OF v.y "+"  ON v.x=t.a").execute().print();}/** 手动输入的数据源 */publicstaticclassManualSourceimplementsSourceFunction<String>{publicManualSource(){}@Overridepublicvoidrun(SourceFunction.SourceContext<String> sc){Scanner scanner =newScanner(System.in);while(true){String str = scanner.nextLine().trim();if(str.equals("STOP")){break;}if(!str.equals("")){sc.collect(str);}}
            scanner.close();}@Overridepublicvoidcancel(){}}}

测试结果

标签: 大数据 java

本文转载自: https://blog.csdn.net/Yellow_python/article/details/128070761
版权归原作者 小基基o_O 所有, 如有侵权,请联系我们删除。

“大数据(9h)FlinkSQL之Lookup Join”的评论:

还没有评论