0


【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文介绍了flink 维表的前三种实现方式,即通过初始化静态数据、通过异步IO访问外部数据和通过广播维表数据。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,本文还依赖redis环境。

本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

一、maven依赖及数据结构

1、maven依赖

本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>32.0.1-jre</version></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.12</artifactId><version>1.1.0</version><exclusions><exclusion><artifactId>flink-streaming-java_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-runtime_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-core</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-java</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.43</version></dependency></dependencies>

2、数据结构

本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2<Order, String>。

  • 事实流 order
// 事实表@Data@NoArgsConstructor@AllArgsConstructorstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}
  • 维度流 user
// 维表@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}

3、数据源

事实流数据有几种,具体见示例部分,比如socket、redis、kafka等
维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。
如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。

4、验证结果

本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。

二、维表来源于初始化的静态数据

1、说明

通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在事实流map()方法中与维表数据进行关联。

由于数据存储于内存中,所以只适合小数据量并且维表数据更新频率不高的情况下使用。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况或资源开销较大的情况。一般如果数据量较小且不大会变(或变化影响也不大)的情况下,理想选择之一。

2、示例:将事实流与维表进行关联

importjava.util.HashMap;importjava.util.Map;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 采用在RichMapfunction类的open方法中将维表数据加载到内存
 */publicclassTestJoinDimFromStaticDataDemo{// 维表@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// order 事实流DataStream<Order> orderDs = env.socketTextStream("192.168.10.42",9999).map(o ->{String[] lines = o.split(",");returnnewOrder(Integer.valueOf(lines[0]),Integer.valueOf(lines[1]),Double.valueOf(lines[2]));});DataStream<Tuple2<Order,String>> result = orderDs.map(newRichMapFunction<Order,Tuple2<Order,String>>(){Map<Integer,User> userDim =null;// 维表-静态数据,本处使用的是匿名内部类实现的@Overridepublicvoidopen(Configuration parameters)throwsException{
                userDim =newHashMap<>();
                userDim.put(1001,newUser(1001,"alan",20d,18,"[email protected]"));
                userDim.put(1002,newUser(1002,"alanchan",22d,20,"[email protected]"));
                userDim.put(1003,newUser(1003,"alanchanchn",23d,22,"[email protected]"));
                userDim.put(1004,newUser(1004,"alan_chan",21d,19,"[email protected]"));
                userDim.put(1005,newUser(1005,"alan_chan_chn",23d,21,"[email protected]"));}@OverridepublicTuple2<Order,String>map(Order value)throwsException{returnnewTuple2(value, userDim.get(value.getUId()).getName());}});

        result.print();// nc 输入// 1,1004,345// 2,1001,678// 控制台输出// 2> (TestJoinDimFromStaticData.Order(id=1, uId=1004, total=345.0),alan_chan)// 3> (TestJoinDimFromStaticData.Order(id=2, uId=1001, total=678.0),alan)
        env.execute("TestJoinDimFromStaticData");}}

三、维表来源于第三方数据源

1、说明

这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,事实流在关联维表数据的时候实时去外部存储中查询。

由于维度数据量不受内存限制,可以存储很大的数据量。同时维表数据来源于第三方数据源,读取速度受制于外部存储的读取速度。一般常见的做法该种方式较多。

2、示例:将事实流与维表进行关联-通过缓存降低性能开销

如果频繁的访问第三方数据源进行join,会带来很大的开销,为降低该种情况的开销,一般使用cache来减轻访问压力,但该种方式存在数据同步的不一致或延迟情况。如果使用缓存,则会存在将数据存在内存中,也会增加系统开销。该种情况的实际应用以具体的业务场景而定。本示例使用的是guava Cache,缓存的实现有很多种方式,具体以自己的实际情况进行选择。

本示例的数据源仅仅以静态的数据进行展示,实际上可能数据来源于Hbase、mysql等。

importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.TimeUnit;importcom.google.common.cache.CacheBuilder;importcom.google.common.cache.CacheLoader;importcom.google.common.cache.LoadingCache;importcom.google.common.cache.RemovalListener;importcom.google.common.cache.RemovalNotification;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */publicclassTestJoinDimFromCacheDataDemo{// 维表@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// order 实时流DataStream<Order> orderDs = env.socketTextStream("192.168.10.42",9999).map(o ->{String[] lines = o.split(",");returnnewOrder(Integer.valueOf(lines[0]),Integer.valueOf(lines[1]),Double.valueOf(lines[2]));});// user 维表DataStream<Tuple2<Order,String>> result = orderDs.map(newRichMapFunction<Order,Tuple2<Order,String>>(){// 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存LoadingCache<Integer,User> userDim;@Overridepublicvoidopen(Configuration parameters)throwsException{// 使用google LoadingCache来进行缓存// CacheBuilder的构造函数是私有的,只能通过其静态方法newBuilder()来获得CacheBuilder的实例
                userDim =CacheBuilder.newBuilder()// 设置并发级别为8,并发级别是指可以同时写缓存的线程数.concurrencyLevel(8)// 最多缓存个数,超过了就根据最近最少使用算法来移除缓存.maximumSize(1000)// 设置写缓存后10分钟过期.expireAfterWrite(10,TimeUnit.MINUTES)// 设置缓存容器的初始容量为10.initialCapacity(10)// 设置要统计缓存的命中率.recordStats()// 指定移除通知.removalListener(newRemovalListener<Integer,User>(){@OverridepublicvoidonRemoval(RemovalNotification<Integer,User> removalNotification){System.out.println(removalNotification.getKey()+"被移除了,值为:"+ removalNotification.getValue());}}).build(// 指定加载缓存的逻辑newCacheLoader<Integer,User>(){@OverridepublicUserload(Integer uId)throwsException{returndataSource(uId);}});System.out.println("userDim:"+ userDim.get(1002));}privateUserdataSource(Integer uId){// 可以是任何数据源,本处仅仅示例Map<Integer,User> users =newHashMap<>();
                users.put(1001,newUser(1001,"alan",20d,18,"[email protected]"));
                users.put(1002,newUser(1002,"alanchan",22d,20,"[email protected]"));
                users.put(1003,newUser(1003,"alanchanchn",23d,22,"[email protected]"));
                users.put(1004,newUser(1004,"alan_chan",21d,19,"[email protected]"));
                users.put(1005,newUser(1005,"alan_chan_chn",23d,21,"[email protected]"));User user =null;if(users.containsKey(uId)){
                    user = users.get(uId);}return user;}@OverridepublicTuple2<Order,String>map(Order value)throwsException{returnnewTuple2(value, userDim.get(value.getUId()).getName());}});

        result.print();// 输入数据// 7,1003,111// 8,1005,234// 9,1002,875// 控制台输出数据// 5> (TestJoinDimFromCacheDataDemo.Order(id=7, uId=1003, total=111.0),alanchanchn)// 6> (TestJoinDimFromCacheDataDemo.Order(id=8, uId=1005,  total=234.0),alan_chan_chn)// 7> (TestJoinDimFromCacheDataDemo.Order(id=9, uId=1002, total=875.0),alanchan)

        env.execute("TestJoinDimFromCacheDataDemo");}}

3、示例:将事实流与维表进行关联-通过Flink 的异步 I/O提高系统效率

Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。

Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,比如redis、MongoDB等。

更多内容见文章:
55、Flink之用于外部数据访问的异步 I/O介绍及示例

1)、redis 异步I/O实现

packageorg.tablesql.join;importjava.util.ArrayList;importjava.util.Collections;importjava.util.List;importjava.util.concurrent.CompletableFuture;importjava.util.function.Supplier;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.async.ResultFuture;importorg.apache.flink.streaming.api.functions.async.RichAsyncFunction;importorg.tablesql.join.TestJoinDimFromAsyncDataStreamDemo.Order;importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisPool;importredis.clients.jedis.JedisPoolConfig;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */publicclassJoinAyncFunctionByRedisextendsRichAsyncFunction<Order,Tuple2<Order,String>>{privateJedisPoolConfig config =null;privatestaticStringADDR="192.168.10.41";privatestaticintPORT=6379;privatestaticintTIMEOUT=10000;privateJedisPool jedisPool =null;privateJedis jedis =null;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
        config =newJedisPoolConfig();
        jedisPool =newJedisPool(config,ADDR,PORT,TIMEOUT);

        jedis = jedisPool.getResource();}@OverridepublicvoidasyncInvoke(Order input,ResultFuture<Tuple2<Order,String>> resultFuture)throwsException{// order 实时流中的单行数据System.out.println("输入参数input----:"+ input);// 发起一个异步请求,返回结果CompletableFuture.supplyAsync(newSupplier<String>(){@OverridepublicStringget(){// 数据格式:1002,alanchan,19,25,[email protected] userLine = jedis.hget("AsyncReadUserById_Redis", input.getUId()+"");String[] userTemp = userLine.split(",");// 返回 用户名return userTemp[1];}}).thenAccept((String dbResult)->{// 设置请求完成时的回调,将结果返回List list =newArrayList<Tuple2<Order,String>>();
            list.add(newTuple2<>(input, dbResult));
            resultFuture.complete(list);});}// 连接超时的时候调用的方法publicvoidtimeout(Order input,ResultFuture<Tuple2<Order,String>> resultFuture)throwsException{List list =newArrayList<Tuple2<Order,String>>();// 数据源超时,不能获取到维表信息,置为"
        list.add(newTuple2<>(input,""));
        resultFuture.complete(list);}@Overridepublicvoidclose()throwsException{super.close();if(jedis.isConnected()){
            jedis.close();}}}

2)、实现事实流与维度流join

packageorg.tablesql.join;importjava.util.concurrent.TimeUnit;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.AsyncDataStream;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */publicclassTestJoinDimFromAsyncDataStreamDemo{// 维表@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}publicstaticvoidmain(String[] args)throwsException{testJoinAyncFunctionByRedis();}staticvoidtestJoinAyncFunctionByRedis()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// order 实时流DataStream<Order> orderDs = env.socketTextStream("192.168.10.42",9999).map(o ->{String[] lines = o.split(",");returnnewOrder(Integer.valueOf(lines[0]),Integer.valueOf(lines[1]),Double.valueOf(lines[2]));});// 保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压DataStream<Tuple2<Order,String>> result =AsyncDataStream.orderedWait(orderDs,newJoinAyncFunctionByRedis(),1000L,TimeUnit.MILLISECONDS,2);

        result.print("result:");// 允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压DataStream<Tuple2<Order,String>> unorderedResult =AsyncDataStream.unorderedWait(orderDs,newJoinAyncFunctionByRedis(),1000L,TimeUnit.MILLISECONDS,2).setParallelism(1);
        unorderedResult.print("unorderedResult");// redis的操作命令及数据// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1001 '1001,alan,18,20,[email protected]'// (integer) 1// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1002 '1002,alanchan,19,25,[email protected]'// (integer) 1// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1003 '1003,alanchanchn,20,30,[email protected]'// (integer) 1// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1004 '1004,alan_chan,27,20,[email protected]'// (integer) 1// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1005 '1005,alan_chan_chn,36,10,[email protected]'// (integer) 1// 127.0.0.1:6379> hgetall AsyncReadUserById_Redis// 1) "1001"// 2) "1001,alan,18,20,[email protected]"// 3) "1002"// 4) "1002,alanchan,19,25,[email protected]"// 5) "1003"// 6) "1003,alanchanchn,20,30,[email protected]"// 7) "1004"// 8) "1004,alan_chan,27,20,[email protected]"// 9) "1005"// 10) "1005,alan_chan_chn,36,10,[email protected]"// 输入数据// 13,1002,811// 14,1004,834// 15,1005,975// 控制台输出数据// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)// result::12> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,[email protected])// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)// unorderedResult:9> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,[email protected])// result::5> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0)// unorderedResult:2> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)// result::6> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)// unorderedResult:3> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)

        env.execute("TestJoinDimFromAsyncDataStreamDemo");}}

四、通过广播将维表数据传递到下游

1、说明

利用Flink的Broadcast State将维表数据流广播到下游做join操作。该种方式实现比较方便,完全满足需求,美中不足的是需要充分利用系统的内存,也就是将数据存储在内容中。

更多内容见文章:
53、Flink 的Broadcast State 模式介绍及示例

2、示例:将事实流与维表进行关联-通过Flink 的Broadcast

1)、广播实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */packageorg.tablesql.join;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.api.common.state.ReadOnlyBroadcastState;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;importorg.apache.flink.util.Collector;importorg.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;importorg.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;// final BroadcastProcessFunction<IN1, IN2, OUT> function)publicclassJoinBroadcastProcessFunctionImplextendsBroadcastProcessFunction<Order,User,Tuple2<Order,String>>{// 用于存储规则名称与规则本身的 map 存储结构 MapStateDescriptor<Integer,User> broadcastDesc;JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer,User> broadcastDesc){this.broadcastDesc = broadcastDesc;}// 负责处理广播流的元素@OverridepublicvoidprocessBroadcastElement(User value,BroadcastProcessFunction<Order,User,Tuple2<Order,String>>.Context ctx,Collector<Tuple2<Order,String>> out)throwsException{System.out.println("收到广播数据:"+ value);// 得到广播流的存储状态
        ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);}// 处理非广播流,关联维度@OverridepublicvoidprocessElement(Order value,BroadcastProcessFunction<Order,User,Tuple2<Order,String>>.ReadOnlyContext ctx,Collector<Tuple2<Order,String>> out)throwsException{// 得到广播流的存储状态ReadOnlyBroadcastState<Integer,User> state = ctx.getBroadcastState(broadcastDesc);

        out.collect(newTuple2<>(value, state.get(value.getUId()).getName()));}}

2)、实现事实流与维度流join

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */packageorg.tablesql.join;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.streaming.api.datastream.BroadcastStream;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;publicclassTestJoinDimFromBroadcastDataStreamDemo{// 维表@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// order 实时流DataStream<Order> orderDs = env.socketTextStream("192.168.10.42",9999).map(o ->{String[] lines = o.split(",");returnnewOrder(Integer.valueOf(lines[0]),Integer.valueOf(lines[1]),Double.valueOf(lines[2]));});// user 实时流DataStream<User> userDs = env.socketTextStream("192.168.10.42",8888).map(o ->{String[] lines = o.split(",");returnnewUser(Integer.valueOf(lines[0]), lines[1],Double.valueOf(lines[2]),Integer.valueOf(lines[3]), lines[4]);}).setParallelism(1);// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构// MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(//         "RulesBroadcastState",//         BasicTypeInfo.STRING_TYPE_INFO,//         TypeInformation.of(new TypeHint<Rule>() {//         }));// 广播流,广播规则并且创建 broadcast state// BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 将user流(维表)定义为广播流finalMapStateDescriptor<Integer,User> broadcastDesc =newMapStateDescriptor("Alan_RulesBroadcastState",Integer.class,User.class);BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);// 需要由非广播流来进行调用DataStream result = orderDs.connect(broadcastStream).process(newJoinBroadcastProcessFunctionImpl(broadcastDesc));

        result.print();// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常// 1001,alan,18,20,[email protected]// 1002,alanchan,19,25,[email protected]// 1003,alanchanchn,20,30,[email protected]// 1004,alan_chan,27,20,[email protected]// 1005,alan_chan_chn,36,10,[email protected]// order 流数据// 16,1002,211// 17,1004,234// 18,1005,175// 控制台输出// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, [email protected])// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, [email protected])// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, [email protected])// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, [email protected])// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, [email protected])// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)

        env.execute();}}

以上,本文介绍了flink 维表的前三种实现方式,即通过初始化静态数据、通过异步IO访问外部数据和通过广播维表数据。

本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

标签: flink kafka flink hive

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/135449763
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。

“【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)”的评论:

还没有评论