0


【flink番外篇】13、Broadcast State 模式示例(完整版)

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文详细的介绍了broadcast state的具体使用,并以两个例子分别介绍了BroadcastProcessFunction和KeyedBroadcastProcessFunction的具体实现。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、示例:按照分组规则进行图形匹配-KeyedBroadcastProcessFunction

本示例是简单的应用broadcast state实现简单模式匹配,即实现:
1、按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。
2、相同颜色的规则1:长方形后是三角形
3、相同颜色的规则2:正方形后是长方形

如匹配上述规则1或规则2则输出匹配成功。

1、maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>

2、实现

packageorg.tablesql.join;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.Map.Entry;importorg.apache.flink.api.common.state.MapState;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.api.common.state.ReadOnlyBroadcastState;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.typeutils.ListTypeInfo;importorg.apache.flink.streaming.api.datastream.BroadcastStream;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;importorg.apache.flink.util.Collector;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * 
 * @LastEditors: alanchan
 * 
 * @Description: 按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。相同颜色的规则1:长方形后是三角形;规则2:正方形后是长方形
 */publicclassTestJoinDimKeyedBroadcastProcessFunctionDemo{@Data@NoArgsConstructor@AllArgsConstructorstaticclassShape{privateString name;privateString desc;}@Data@NoArgsConstructor@AllArgsConstructorstaticclassColour{privateString name;privateLong blue;privateLong red;privateLong green;}@Data@NoArgsConstructor@AllArgsConstructorstaticclassItem{privateShape shape;privateColour color;}@Data@NoArgsConstructor@AllArgsConstructorstaticclassRule{privateString name;privateShape first;privateShape second;}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// item 实时流DataStream<Item> itemStream = env.socketTextStream("192.168.10.42",9999).map(o ->{// 解析item流// 数据结构:Item[shape(name,desc);color(name,blue,red,green)]String[] lines = o.split(";");String[] shapeString = lines[0].split(",");String[] colorString = lines[1].split(",");Shape shape =newShape(shapeString[0],shapeString[1]);Colour color =newColour(colorString[0],Long.valueOf(colorString[1]),Long.valueOf(colorString[2]),Long.valueOf(colorString[3]));returnnewItem(shape,color);});// rule 实时流DataStream<Rule> ruleStream = env.socketTextStream("192.168.10.42",8888).map(o ->{// 解析rule流// 数据结构:Rule[name;shape(name,desc);shape(name,desc)]String[] lines = o.split(";");String name = lines[0];String[] firstShapeString = lines[1].split(",");String[] secondShapeString = lines[2].split(",");Shape firstShape =newShape(firstShapeString[0],firstShapeString[1]);Shape secondShape =newShape(secondShapeString[0],secondShapeString[1]);returnnewRule(name,firstShape,secondShape);}).setParallelism(1);// 将图形使用颜色进行划分KeyedStream<Item,Colour> colorPartitionedStream = itemStream
                .keyBy(newKeySelector<Item,Colour>(){@OverridepublicColourgetKey(Item value)throwsException{return value.getColor();// 实现分组}});

        colorPartitionedStream.print("colorPartitionedStream:---->");// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构MapStateDescriptor<String,Rule> ruleStateDescriptor =newMapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(newTypeHint<Rule>(){}));// 将rule定义为广播流,广播规则并且创建 broadcast stateBroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 连接,输出流,connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入。DataStream<String> output = colorPartitionedStream
                .connect(ruleBroadcastStream).process(// KeyedBroadcastProcessFunction 中的类型参数表示:// 1. key stream 中的 key 类型// 2. 非广播流中的元素类型// 3. 广播流中的元素类型// 4. 结果的类型,在这里是 stringnewKeyedBroadcastProcessFunction<Colour,Item,Rule,String>(){// 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素// 用一个数组来存储,因为同时可能有很多第一个元素正在等待privatefinalMapStateDescriptor<String,List<Item>> itemMapStateDesc =newMapStateDescriptor<>("items",BasicTypeInfo.STRING_TYPE_INFO,newListTypeInfo<>(Item.class));// 与之前的 ruleStateDescriptor 相同,用于存储规则名称与规则本身的 map 存储结构privatefinalMapStateDescriptor<String,Rule> ruleStateDescriptor =newMapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(newTypeHint<Rule>(){}));// 负责处理广播流的元素        @OverridepublicvoidprocessBroadcastElement(Rule ruleValue,KeyedBroadcastProcessFunction<Colour,Item,Rule,String>.Context ctx,Collector<String> out)throwsException{// 得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)// 查询元素的时间戳:ctx.timestamp()// 查询目前的Watermark:ctx.currentWatermark()// 目前的处理时间(processing time):ctx.currentProcessingTime()// 产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)    // 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同
                                ctx.getBroadcastState(ruleStateDescriptor).put(ruleValue.getName(), ruleValue);}// 负责处理另一个流的元素@OverridepublicvoidprocessElement(Item itemValue,KeyedBroadcastProcessFunction<Colour,Item,Rule,String>.ReadOnlyContext ctx,Collector<String> out)throwsException{finalMapState<String,List<Item>> itemMapState =getRuntimeContext().getMapState(itemMapStateDesc);finalShape shape = itemValue.getShape();System.out.println("shape:"+shape);// 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同ReadOnlyBroadcastState<String,Rule> readOnlyBroadcastState = ctx.getBroadcastState(ruleStateDescriptor);Iterable<Entry<String,Rule>> iterableRule = readOnlyBroadcastState.immutableEntries();for(Entry<String,Rule> entry : iterableRule){finalString ruleName = entry.getKey();finalRule rule = entry.getValue();// 初始化List<Item> itemStoredList = itemMapState.get(ruleName);if(itemStoredList ==null){
                                        itemStoredList =newArrayList<>();}// 比较 shape if(shape.getName().equals(rule.second.getName())&&!itemStoredList.isEmpty()){for(Item item : itemStoredList){// 符合规则,收集匹配结果
                                            out.collect("匹配成功: "+ item +" - "+ itemValue);}
                                        itemStoredList.clear();}// 规则连续性设置if(shape.getName().equals(rule.first.getName())){
                                        itemStoredList.add(itemValue);}// if(itemStoredList.isEmpty()){
                                        itemMapState.remove(ruleName);}else{
                                        itemMapState.put(ruleName, itemStoredList);}}}});

        output.print("output:------->");

        env.execute();}}

3、验证

在netcat中启动两个端口,分别是8888和9999,8888输入规则,9999输入item,然后关键控制台输出。

1)、规则输入

red;rectangle,is a rectangle;tripe,is a tripe
green;square,is a square;rectangle,is a rectangle

2)、item输入

# 匹配成功
rectangle,is a rectangle;red,100,100,100
tripe,is a tripe;red,100,100,100

# 匹配成功
square,is square;green,150,150,150
rectangle,is a rectangle;green,150,150,150

# 匹配不成功
tripe,is tripe;blue,200,200,200

# 匹配成功
rectangle,is a rectangle;blue,100,100,100
tripe,is a tripe;blue,100,100,100

# 匹配不成功
tripe,is a tripe;blue,100,100,100
rectangle,is a rectangle;blue,100,100,100

3)、控制台输出

colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle)
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe)
output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150))
output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=square, desc=is square), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150))
colorPartitionedStream:---->:3> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=200, red=200, green=200))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
output:------->:1> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))

二、示例:BroadcastProcessFunction将维表数据广播给其他流

本示例是将用户信息作为维表通过流进行广播,在事实表订单流中进行连接匹配输出。

1、maven依赖

参考上述示例中的依赖。

2、实现

实现方式可以使用匿名内部类或内部类实现,本示例为了清楚其中的逻辑关系,特意以一个具体class来实现。

1)、BroadcastProcessFunction实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */packageorg.tablesql.join;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.api.common.state.ReadOnlyBroadcastState;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;importorg.apache.flink.util.Collector;importorg.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;importorg.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;// final BroadcastProcessFunction<IN1, IN2, OUT> function)publicclassJoinBroadcastProcessFunctionImplextendsBroadcastProcessFunction<Order,User,Tuple2<Order,String>>{// 用于存储规则名称与规则本身的 map 存储结构 MapStateDescriptor<Integer,User> broadcastDesc;JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer,User> broadcastDesc){this.broadcastDesc = broadcastDesc;}// 负责处理广播流的元素@OverridepublicvoidprocessBroadcastElement(User value,BroadcastProcessFunction<Order,User,Tuple2<Order,String>>.Context ctx,Collector<Tuple2<Order,String>> out)throwsException{System.out.println("收到广播数据:"+ value);// 得到广播流的存储状态
        ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);}// 处理非广播流,关联维度@OverridepublicvoidprocessElement(Order value,BroadcastProcessFunction<Order,User,Tuple2<Order,String>>.ReadOnlyContext ctx,Collector<Tuple2<Order,String>> out)throwsException{// 得到广播流的存储状态ReadOnlyBroadcastState<Integer,User> state = ctx.getBroadcastState(broadcastDesc);

        out.collect(newTuple2<>(value, state.get(value.getUId()).getName()));}}

2)、连接实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */packageorg.tablesql.join;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.streaming.api.datastream.BroadcastStream;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;publicclassTestJoinDimFromBroadcastDataStreamDemo{// 维表@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateInteger id;privateString name;privateDouble balance;privateInteger age;privateString email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorstaticclassOrder{privateInteger id;privateInteger uId;privateDouble total;}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// order 实时流DataStream<Order> orderDs = env.socketTextStream("192.168.10.42",9999).map(o ->{String[] lines = o.split(",");returnnewOrder(Integer.valueOf(lines[0]),Integer.valueOf(lines[1]),Double.valueOf(lines[2]));});// user 实时流DataStream<User> userDs = env.socketTextStream("192.168.10.42",8888).map(o ->{String[] lines = o.split(",");returnnewUser(Integer.valueOf(lines[0]), lines[1],Double.valueOf(lines[2]),Integer.valueOf(lines[3]), lines[4]);}).setParallelism(1);// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构// MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(//         "RulesBroadcastState",//         BasicTypeInfo.STRING_TYPE_INFO,//         TypeInformation.of(new TypeHint<Rule>() {//         }));// 广播流,广播规则并且创建 broadcast state// BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 将user流(维表)定义为广播流finalMapStateDescriptor<Integer,User> broadcastDesc =newMapStateDescriptor("Alan_RulesBroadcastState",Integer.class,User.class);BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);// 需要由非广播流来进行调用DataStream result = orderDs.connect(broadcastStream).process(newJoinBroadcastProcessFunctionImpl(broadcastDesc));

        result.print();
       
        env.execute();}// final BroadcastProcessFunction<IN1, IN2, OUT> function)//     static class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {//         // 用于存储规则名称与规则本身的 map 存储结构 //         MapStateDescriptor<Integer, User> broadcastDesc;//         JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {//             this.broadcastDesc = broadcastDesc;//         }//         // 负责处理广播流的元素//         @Override//         public void processBroadcastElement(User value,//                 BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,//                 Collector<Tuple2<Order, String>> out) throws Exception {//             System.out.println("收到广播数据:" + value);//             // 得到广播流的存储状态//             ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);//         }//         // 处理非广播流,关联维度//         @Override//         public void processElement(Order value,//                 BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,//                 Collector<Tuple2<Order, String>> out) throws Exception {//             // 得到广播流的存储状态//             ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);//             out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));//         }//     }}

3、验证

本示例使用的是两个socket数据源,通过netcat进行模拟。

1)、输入user数据

“192.168.10.42”, 8888

// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常// 1001,alan,18,20,[email protected]// 1002,alanchan,19,25,[email protected]// 1003,alanchanchn,20,30,[email protected]// 1004,alan_chan,27,20,[email protected]// 1005,alan_chan_chn,36,10,[email protected]

2)、输入事实流订单数据

“192.168.10.42”, 9999

// order 流数据// 16,1002,211// 17,1004,234// 18,1005,175

3)、观察程序控制台输出

// 控制台输出// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, [email protected])// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, [email protected])// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, [email protected])// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, [email protected])// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, [email protected])// ......// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, [email protected])// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)

以上,本文详细的介绍了broadcast state的具体使用,并以两个例子分别介绍了BroadcastProcessFunction和KeyedBroadcastProcessFunction的具体实现。

标签: flink 大数据 kafka

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/135448989
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。

“【flink番外篇】13、Broadcast State 模式示例(完整版)”的评论:

还没有评论