Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
- 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
- 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
- 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
- 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
- 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文详细的介绍了通过broadcast state的广播示例展示在维表中的应用,需要使用BroadcastProcessFunction。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
一、示例:BroadcastProcessFunction将维表数据广播给其他流
本示例是将用户信息作为维表通过流进行广播,在事实表订单流中进行连接匹配输出。
1、maven依赖
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>
2、实现
实现方式可以使用匿名内部类或内部类实现,本示例为了清楚其中的逻辑关系,特意以一个具体class来实现。
1)、BroadcastProcessFunction实现
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/packageorg.tablesql.join;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.api.common.state.ReadOnlyBroadcastState;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;importorg.apache.flink.util.Collector;importorg.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;importorg.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;// final BroadcastProcessFunction<IN1, IN2, OUT> function)publicclassJoinBroadcastProcessFunctionImplextendsBroadcastProcessFunction<Order,User,Tuple2<Order,String>>{// 用于存储规则名称与规则本身的 map 存储结构 MapStateDescriptor<Integer,User> broadcastDesc;JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer,User> broadcastDesc){this.broadcastDesc = broadcastDesc;}// 负责处理广播流的元素@OverridepublicvoidprocessBroadcastElement(User value,BroadcastProcessFunction<Order,User,Tuple2<Order,String>>.Context ctx,Collector<Tuple2<Order,String>> out)throwsException{System.out.println("收到广播数据:"+ value);// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);}// 处理非广播流,关联维度@OverridepublicvoidprocessElement(Order value,BroadcastProcessFunction<Order,User,Tuple2<Order,String>>.ReadOnlyContext ctx,Collector<Tuple2<Order,String>> out)throwsException{// 得到广播流的存储状态ReadOnlyBroadcastState<Integer,User> state = ctx.getBroadcastState(broadcastDesc);
out.collect(newTuple2<>(value, state.get(value.getUId()).getName()));}}
2)、连接实现
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/packageorg.tablesql.join;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.streaming.api.datastream.BroadcastStream;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;publicclassTestJoinDimFromBroadcastDataStreamDemo{// 维表@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// order 实时流DataStream<Order> orderDs = env.socketTextStream("192.168.10.42",9999).map(o ->{String[] lines = o.split(",");returnnewOrder(Integer.valueOf(lines[0]),Integer.valueOf(lines[1]),Double.valueOf(lines[2]));});// user 实时流DataStream<User> userDs = env.socketTextStream("192.168.10.42",8888).map(o ->{String[] lines = o.split(",");returnnewUser(Integer.valueOf(lines[0]), lines[1],Double.valueOf(lines[2]),Integer.valueOf(lines[3]), lines[4]);}).setParallelism(1);// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构// MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(// "RulesBroadcastState",// BasicTypeInfo.STRING_TYPE_INFO,// TypeInformation.of(new TypeHint<Rule>() {// }));// 广播流,广播规则并且创建 broadcast state// BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 将user流(维表)定义为广播流finalMapStateDescriptor<Integer,User> broadcastDesc =newMapStateDescriptor("Alan_RulesBroadcastState",Integer.class,User.class);BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);// 需要由非广播流来进行调用DataStream result = orderDs.connect(broadcastStream).process(newJoinBroadcastProcessFunctionImpl(broadcastDesc));
result.print();
env.execute();}// final BroadcastProcessFunction<IN1, IN2, OUT> function)// static class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {// // 用于存储规则名称与规则本身的 map 存储结构 // MapStateDescriptor<Integer, User> broadcastDesc;// JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {// this.broadcastDesc = broadcastDesc;// }// // 负责处理广播流的元素// @Override// public void processBroadcastElement(User value,// BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,// Collector<Tuple2<Order, String>> out) throws Exception {// System.out.println("收到广播数据:" + value);// // 得到广播流的存储状态// ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);// }// // 处理非广播流,关联维度// @Override// public void processElement(Order value,// BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,// Collector<Tuple2<Order, String>> out) throws Exception {// // 得到广播流的存储状态// ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);// out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));// }// }}
3、验证
本示例使用的是两个socket数据源,通过netcat进行模拟。
1)、输入user数据
“192.168.10.42”, 8888
// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常// 1001,alan,18,20,alan.chan.chn@163.com// 1002,alanchan,19,25,alan.chan.chn@163.com// 1003,alanchanchn,20,30,alan.chan.chn@163.com// 1004,alan_chan,27,20,alan.chan.chn@163.com// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
2)、输入事实流订单数据
“192.168.10.42”, 9999
// order 流数据// 16,1002,211// 17,1004,234// 18,1005,175
3)、观察程序控制台输出
// 控制台输出// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)
以上,本文详细的介绍了通过broadcast state的广播示例展示在维表中的应用,需要使用BroadcastProcessFunction。
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。