0


【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文介绍了通过Flink的时态表进行维度表的join操作,通过三个示例分别进行介绍。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,本文还依赖kafka环境。

本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

一、maven依赖及数据结构

1、maven依赖

本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>32.0.1-jre</version></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.12</artifactId><version>1.1.0</version><exclusions><exclusion><artifactId>flink-streaming-java_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-runtime_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-core</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-java</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.43</version></dependency></dependencies>

2、数据结构

本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2<Order, String>。

  • 事实流 order
// 事实表@Data@NoArgsConstructor@AllArgsConstructorstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}
  • 维度流 user
// 维表@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}

3、数据源

事实流数据有几种,具体见示例部分,比如socket、redis、kafka等
维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。
如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。

4、验证结果

本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。

五、通过Temporal table实现维表数据join

1、说明

Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,传递一个时间参数,返回Temporal table这一指定时刻的视图。可以将维度数据流映射为Temporal table,事实流与这个Temporal table进行join,可以关联到某一个版本视图的维度数据。

该种方式维度数据量可以很大,维表数据实时更新,不依赖于第三方存储,并且提供不同版本的维表数据(应对维表信息更新)。截至版本Flink 1.17该种方式只能在Flink SQL API中使用。

关于时间参数,flink有三个时间,即eventtime、processingtime和injectiontime,常用的是eventtime和processingtime,本文介绍其实现方式。关于eventtime的实现,kafka与其他的数据源还有不同,本文单独介绍一下kafka的实现方式。

2、示例:将事实流与维表进行关联-ProcessingTime实现

packageorg.tablesql.join;importstaticorg.apache.flink.table.api.Expressions.$;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.functions.TemporalTableFunction;importorg.apache.flink.types.Row;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 基于处理时间的时态表
 */publicclassTestJoinDimByProcessingTimeDemo{// 维表@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// order 实时流 事实表DataStream<Order> orderDs = env.socketTextStream("192.168.10.42",9999).map(o ->{String[] lines = o.split(",");returnnewOrder(Integer.valueOf(lines[0]),Integer.valueOf(lines[1]),Double.valueOf(lines[2]));});// user 实时流 维度表DataStream<User> userDs = env.socketTextStream("192.168.10.42",8888).map(o ->{String[] lines = o.split(",");returnnewUser(Integer.valueOf(lines[0]), lines[1],Double.valueOf(lines[2]),Integer.valueOf(lines[3]), lines[4]);}).setParallelism(1);// 转变为TableTable orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_ps").proctime());Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"),
                $("user_ps").proctime());// 定义一个TemporalTableFunctionTemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_ps"), $("id"));// 注册表函数
        tenv.registerFunction("alan_userDim", userDim);// 关联查询Table result = tenv
                .sqlQuery("select o.* , u.name from "+ orderTable +" as o  , Lateral table (alan_userDim(o.order_ps)) u "+"where o.uId = u.id");// 打印输出DataStream resultDs = tenv.toAppendStream(result,Row.class);

        resultDs.print();// user 流数据(维度表)// 1001,alan,18,20,[email protected]// 1002,alanchan,19,25,[email protected]// 1003,alanchanchn,20,30,[email protected]// 1004,alan_chan,27,20,[email protected]// 1005,alan_chan_chn,36,10,[email protected]// order 流数据// 26,1002,311// 27,1004,334// 28,1005,475// 控制台输出// 15> +I[26, 1002, 311.0, 2023-12-20T05:21:12.977Z, alanchan]// 11> +I[27, 1004, 334.0, 2023-12-20T05:21:50.898Z, alan_chan]// 5> +I[28, 1005, 475.0, 2023-12-20T05:21:57.559Z, alan_chan_chn]

        env.execute();}}

3、示例:将事实流与维表进行关联-EventTime实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */packageorg.tablesql.join;importstaticorg.apache.flink.table.api.Expressions.$;importjava.time.Duration;importjava.util.Arrays;importjava.util.List;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.functions.TemporalTableFunction;importorg.apache.flink.types.Row;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;publicclassTestjoinDimByEventTimeDemo{// 维表@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;privateLong eventTime;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;privateLong eventTime;}finalstaticList<User> userList =Arrays.asList(newUser(1001,"alan",20d,18,"[email protected]",1L),newUser(1002,"alan",30d,19,"[email protected]",10L),newUser(1003,"alan",29d,25,"[email protected]",1L),newUser(1004,"alanchan",22d,28,"[email protected]",5L),newUser(1005,"alanchan",50d,29,"[email protected]",1698742362424L));finalstaticList<Order> orderList =Arrays.asList(newOrder(11,1002,1084d,1L),newOrder(12,1001,84d,10L),newOrder(13,1005,369d,2L));publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// order 实时流 事实表// DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)//         .map(o -> {//             String[] lines = o.split(",");//             return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]), Long.valueOf(lines[3]));//         })//         .assignTimestampsAndWatermarks(WatermarkStrategy//                 .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))//                 .withTimestampAssigner((order, rTimeStamp) -> order.getEventTime()));DataStream<Order> orderDs = env.fromCollection(orderList).assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((order, rTimeStamp)-> order.getEventTime()));// user 实时流 维度表// DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)//         .map(o -> {//             String[] lines = o.split(",");//             return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4], Long.valueOf(lines[3]));//         })//         .assignTimestampsAndWatermarks(WatermarkStrategy//                 .<User>forBoundedOutOfOrderness(Duration.ofSeconds(10))//                 .withTimestampAssigner((user, rTimeStamp) -> user.getEventTime()));DataStream<User> userDs =  env.fromCollection(userList).assignTimestampsAndWatermarks(WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((user, rTimeStamp)-> user.getEventTime()));// 转变为TableTable orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_eventTime").rowtime());Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"), $("user_eventTime").rowtime());

        tenv.createTemporaryView("alan_orderTable", orderTable);
        tenv.createTemporaryView("alan_userTable", userTable);// 定义一个TemporalTableFunctionTemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_eventTime"), $("id"));// 注册表函数
        tenv.registerFunction("alan_userDim", userDim);// String sql = "select o.* from alan_orderTable as o ";// String sql = "select u.* from alan_userTable as u ";// String sql = "select o.*,u.name from alan_orderTable as o , alan_userTable as u where o.uId = u.id";String sql ="select o.*,u.name from alan_orderTable as o,Lateral table (alan_userDim(o.order_eventTime)) u where o.uId = u.id";// 关联查询Table result = tenv.sqlQuery(sql);// 打印输出DataStream resultDs = tenv.toAppendStream(result,Row.class);

        resultDs.print();// user 流数据(维度表)// userList    // order 流数据// orderList// 控制台输出// 3> +I[12, 1001, 84.0, 1970-01-01T00:00:00.010, alan]

        env.execute();}}

4、示例:将事实流与维表进行关联-Kafka Source的EventTime实现

1)、bean定义

packageorg.tablesql.join.bean;importjava.io.Serializable;importlombok.Data;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */@DatapublicclassCityInfoimplementsSerializable{privateInteger cityId;privateString cityName;privateLong ts;}
packageorg.tablesql.join.bean;importjava.io.Serializable;importlombok.Data;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */@DatapublicclassUserInfoimplementsSerializable{privateString userName;privateInteger cityId;privateLong ts;}

2)、序列化定义

packageorg.tablesql.join.bean;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.TypeReference;importorg.apache.flink.api.common.serialization.DeserializationSchema;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.common.typeinfo.TypeInformation;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */publicclassCityInfoSchemaimplementsDeserializationSchema<CityInfo>{@OverridepublicCityInfodeserialize(byte[] message)throwsIOException{String jsonStr =newString(message,StandardCharsets.UTF_8);CityInfo data =JSON.parseObject(jsonStr,newTypeReference<CityInfo>(){});return data;}@OverridepublicbooleanisEndOfStream(CityInfo nextElement){returnfalse;}@OverridepublicTypeInformation<CityInfo>getProducedType(){returnTypeInformation.of(newTypeHint<CityInfo>(){});}}
packageorg.tablesql.join.bean;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.TypeReference;importorg.apache.flink.api.common.serialization.DeserializationSchema;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.common.typeinfo.TypeInformation;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */publicclassUserInfoSchemaimplementsDeserializationSchema<UserInfo>{@OverridepublicUserInfodeserialize(byte[] message)throwsIOException{String jsonStr =newString(message,StandardCharsets.UTF_8);UserInfo data =JSON.parseObject(jsonStr,newTypeReference<UserInfo>(){});return data;}@OverridepublicbooleanisEndOfStream(UserInfo nextElement){returnfalse;}@OverridepublicTypeInformation<UserInfo>getProducedType(){returnTypeInformation.of(newTypeHint<UserInfo>(){});}}

3)、实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */packageorg.tablesql.join;importstaticorg.apache.flink.table.api.Expressions.$;importjava.time.Duration;importjava.util.Properties;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.connector.kafka.source.KafkaSource;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.functions.TemporalTableFunction;importorg.apache.flink.types.Row;importorg.tablesql.join.bean.CityInfo;importorg.tablesql.join.bean.CityInfoSchema;importorg.tablesql.join.bean.UserInfo;importorg.tablesql.join.bean.UserInfoSchema;publicclassTestJoinDimByKafkaEventTimeDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);// Kafka的ip和要消费的topic,//Kafka设置Properties props =newProperties();
        props.setProperty("bootstrap.servers","192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
        props.setProperty("group.id","kafkatest");// 读取用户信息KafkaFlinkKafkaConsumer<UserInfo> userConsumer =newFlinkKafkaConsumer<UserInfo>("user",newUserInfoSchema(),props);
        userConsumer.setStartFromEarliest();

        userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.<UserInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((user, rTimeStamp)-> user.getTs())// 该句如果不加,则是默认为kafka的事件时间);// 读取城市维度信息KafkaFlinkKafkaConsumer<CityInfo> cityConsumer =newFlinkKafkaConsumer<CityInfo>("city",newCityInfoSchema(), props);
        cityConsumer.setStartFromEarliest();

        cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.<CityInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((city, rTimeStamp)-> city.getTs())// 该句如果不加,则是默认为kafka的事件时间);Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer), $("userName"), $("cityId"), $("ts").rowtime());Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer), $("cityId"), $("cityName"),$("ts").rowtime());

        tableEnv.createTemporaryView("userTable", userTable);
        tableEnv.createTemporaryView("cityTable", cityTable);// 定义一个TemporalTableFunctionTemporalTableFunction dimCity = cityTable.createTemporalTableFunction($("ts"), $("cityId"));// 注册表函数// tableEnv.registerFunction("dimCity", dimCity);
        tableEnv.createTemporarySystemFunction("dimCity", dimCity);Table u = tableEnv.sqlQuery("select * from userTable");// u.printSchema();
        tableEnv.toAppendStream(u,Row.class).print("user流接收到:");Table c = tableEnv.sqlQuery("select * from cityTable");// c.printSchema();
        tableEnv.toAppendStream(c,Row.class).print("city流接收到:");// 关联查询Table result = tableEnv
                .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts "+"from userTable as u "+", Lateral table  (dimCity(u.ts)) d "+"where u.cityId=d.cityId");// 打印输出DataStream resultDs = tableEnv.toAppendStream(result,Row.class);
        resultDs.print("\t关联输出:");// 用户信息格式:// {"userName":"user1","cityId":1,"ts":0}// {"userName":"user1","cityId":1,"ts":1}// {"userName":"user1","cityId":1,"ts":4}// {"userName":"user1","cityId":1,"ts":5}// {"userName":"user1","cityId":1,"ts":7}// {"userName":"user1","cityId":1,"ts":9}// {"userName":"user1","cityId":1,"ts":11}// kafka-console-producer.sh --broker-list server1:9092 --topic user// 城市维度格式:// {"cityId":1,"cityName":"nanjing","ts":15}// {"cityId":1,"cityName":"beijing","ts":1}// {"cityId":1,"cityName":"shanghai","ts":5}// {"cityId":1,"cityName":"shanghai","ts":7}// {"cityId":1,"cityName":"wuhan","ts":10}// kafka-console-producer.sh --broker-list server1:9092 --topic city// 输出// city流接收到::6> +I[1, beijing, 1970-01-01T00:00:00.001]// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.004]// city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.005]// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.005]// city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.007]// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.007]// city流接收到::6> +I[1, wuhan, 1970-01-01T00:00:00.010]// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.009]// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.011]//         关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.001]//         关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.004]//         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.005]//         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.007]//         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.009]
        
        env.execute("joinDemo");}}

以上,本文介绍了通过Flink的时态表进行维度表的join操作,通过三个示例分别进行介绍。

本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

标签: flink 大数据 kafka

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/135449743
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。

“【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join”的评论:

还没有评论