flink多流操作
1 分流操作
SingleOutputStreamOperator<Student> mainStream = students.process(newProcessFunction<Student,Student>(){@OverridepublicvoidprocessElement(Student student,ProcessFunction<Student,Student>.Context ctx,Collector<Student> collector)throwsException{if(student.getGender().equals("m")){// 输出到测流
ctx.output(maleOutputTag, student);}elseif(student.getGender().equals("f")){// 输出到测流
ctx.output(femaleOutputTag, student.toString());}else{// 在主流中输出
collector.collect(student);}}});SingleOutputStreamOperator<Student> side1 = mainStream.getSideOutput(maleOutputTag);SingleOutputStreamOperator<String> side2 = mainStream.getSideOutput(femaleOutputTag);
2 connect连接操作
2.1 connect 连接(DataStream,DataStream→ConnectedStreams)
connect 翻译成中文意为连接,可以将两个数据类型一样也可以类型不一样 DataStream 连接成一个新 的 ConnectedStreams。需要注意的是,connect 方法与 union 方法不同,虽然调用 connect 方法将两个 流连接成一个新的 ConnectedStreams,但是里面的两个流依然是相互独立的,这个方法最大的好处是 可以让两个流共享 State 状态。
// 使用 fromElements 创建两个 DataStreamDataStreamSource<String> word = env.fromElements("a","b","c","d");DataStreamSource<Integer> num = env.fromElements(1,3,5,7,9);// 将两个 DataStream 连接到一起ConnectedStreams<String,Integer> connected = word.connect(num);
2.2 coMap(ConnectedStreams → DataStream)
对 ConnectedStreams 调用 map 方法时需要传入 CoMapFunction 函数;
该接口需要指定 3 个泛型:
- 第一个输入 DataStream 的数据类型
- 第二个输入 DataStream 的数据类型
- 返回结果的数据类型。 该接口需要重写两个方法:
- map1 方法,是对第 1 个流进行 map 的处理逻辑。
- 2 map2 方法,是对 2 个流进行 map 的处理逻辑
这两个方法必须是相同的返回值类型。
//将两个 DataStream 连接到一起ConnectedStreams<String,Integer> wordAndNum = word.connect(num);// 对 ConnectedStreams 中两个流分别调用个不同逻辑的 map 方法DataStream<String> result = wordAndNum.map(newCoMapFunction<String,Integer,String>(){@OverridepublicStringmap1(String value)throwsException{// 第一个 map 方法是将第一个流的字符变大写return value.toUpperCase();}@OverridepublicStringmap2(Integer value)throwsException{// 第二个 map 方法将是第二个流的数字乘以 10 并转成 StringreturnString.valueOf(value *10);}});
2.3 coFlatMap(ConnectedStreams → DataStream)
对 ConnectedStreams 调用 flatMap 方法。调用 flatMap 方法,传入的 Function 是 CoFlatMapFunction;
这个接口要重写两个方法:
- flatMap1 方法,是对第 1 个流进行 flatMap 的处理逻辑;
- flatMap2 方法,是对 2 个流进行 flatMap 的处理逻辑;
这两个方法都必须返回是相同的类型。
// 使用 fromElements 创建两个 DataStreamDataStreamSource<String> word = env.fromElements("a b c","d e f");DataStreamSource<String> num = env.fromElements("1,2,3","4,5,6");// 将两个 DataStream 连接到一起ConnectedStreams<String,String> connected = word.connect(num);// 对 ConnectedStreams 中两个流分别调用个不同逻辑的 flatMap 方法DataStream<String> result = connected.flatMap(newCoFlatMapFunction<String,String,String>(){@OverridepublicvoidflatMap1(String value,Collector<String> out)throwsException{String[] words = value.split(" ");for(String w : words){
out.collect(w);}}@OverridepublicvoidflatMap2(String value,Collector<String> out)throwsException{String[] nums = value.split(",");for(String n : nums){
out.collect(n);}}});
3 union操作
3.1 union 合并(DataStream * → DataStream)
该方法可以将两个或者多个数据类型一致的 DataStream 合并成一个 DataStream。DataStream union(DataStream… streams)可以看出 DataStream 的 union 方法的参数为可变参数,即可以合并两 个或多个数据类型一致的 DataStream,connect 不要求两个流的类型一致,但union必须一致。
下面的例子是使用 fromElements 生成两个 DataStream,一个是基数的,一个是偶数的,然后将两个 DataStream 合并成一个 DataStream。
// 使用 fromElements 创建两个 DataStreamDataStreamSource<Integer> odd = env.fromElements(1,3,5,7,9);DataStreamSource<Integer> even = env.fromElements(2,4,6,8,10);// 将两个 DataStream 合并到一起DataStream<Integer> result = odd.union(even);
4 coGroup 协同分组
coGroup 本质上是join 算子的底层算子;功能类似;可以用cogroup来实现join left join full join的功能。 代码结构如下:
DataStreamSource<String> stream1 = env.fromElements("1,aa,m,18","2,bb,m,28","3,cc,f,38");DataStreamSource<String> stream2 = env.fromElements("1:aa:m:18","2:bb:m:28","3:cc:f:38");DataStream<String> res = stream1
.coGroup(stream2).where(newKeySelector<String,String>(){@OverridepublicStringgetKey(String value)throwsException{return value;}}).equalTo(newKeySelector<String,String>(){@OverridepublicStringgetKey(String value)throwsException{return value;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(newCoGroupFunction<String,String,String>(){@OverridepublicvoidcoGroup(Iterable<String> first,Iterable<String> second,Collector<String> out)throwsException{// 这里添加具体的 coGroup 处理逻辑// 这两个迭代器,是这5s的数据中的某一组,id = 1}});
4.1 coGroup 实现 left join操作
packagebatch;importorg.apache.flink.api.common.functions.CoGroupFunction;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.operators.DataSource;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.Collector;publicclass coGrouptest {publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// id nameDataStreamSource<String> stream1 = env.socketTextStream("localhost",9998);// id ageDataStreamSource<String> stream2 = env.socketTextStream("localhost",9999);// nc -lp 9999// nc -lp 9998SingleOutputStreamOperator<Tuple2<String,String>> s1 = stream1.map(s ->{String[] arr = s.split(",");returnTuple2.of(arr[0], arr[1]);}).returns(newTypeHint<Tuple2<String,String>>(){});SingleOutputStreamOperator<Tuple2<String,String>> s2 = stream2.map(s ->{String[] arr = s.split(",");returnTuple2.of(arr[0], arr[1]);}).returns(newTypeHint<Tuple2<String,String>>(){});DataStream<Tuple3<String,String,String>> out = s1.coGroup(s2).where(tp -> tp.f0)//左的f0 id 字段.equalTo(tp -> tp.f0)//又的f0 id 字段.window(TumblingProcessingTimeWindows.of(Time.seconds(2))).apply(newCoGroupFunction<Tuple2<String,String>,Tuple2<String,String>,Tuple3<String,String,String>>(){@OverridepublicvoidcoGroup(Iterable<Tuple2<String,String>> iterable,Iterable<Tuple2<String,String>> iterable1,Collector<Tuple3<String,String,String>> out)throwsException{for(Tuple2<String,String> t1 : iterable){boolean t2isnull =false;for(Tuple2<String,String> t2 : iterable1){
out.collect(newTuple3<String,String,String>(t1.f0,t1.f1,t2.f1));
t2isnull =true;}if(!t2isnull){
out.collect(newTuple3<String,String,String>(t1.f0,t1.f1,null));}}}});
out.print();
env.execute();}}
5 join
用于关联两个流(类似于 sql 中 join),需要指定 join,需要在窗口中进行关联后的逻辑计算。
只能支持inner join 不支持 左右和全连接
stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>);
实例:
SingleOutputStreamOperator<Student> s1;SingleOutputStreamOperator<StuInfo> s2;// join 两个流,此时并没有具体的计算逻辑JoinedStreams<Student,StuInfo> joined = s1.join(s2);// 对 join 流进行计算处理DataStream<String> stream = joined
// where 流 1 的某字段 equalTo 流 2 的某字段.where(s -> s.getId()).equalTo(s -> s.getId())// join 实质上只能在窗口中进行.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))// 对窗口中满足关联条件的数据进行计算.apply(newJoinFunction<Student,StuInfo,String>(){// 这边传入的两个流的两条数据,是能够满足关联条件的@OverridepublicStringjoin(Student first,StuInfo second)throwsException{// first: 左流数据 ; second: 右流数据// 计算逻辑// 返回结果returnnull;}});// 对 join 流进行计算处理
joined.where(s -> s.getId()).equalTo(s -> s.getId()).window(TumblingProcessingTimeWindows.of(Time.seconds(20))).apply(newFlatJoinFunction<Student,StuInfo,String>(){@Overridepublicvoidjoin(Student first,StuInfo second,Collector<String> out)throwsException{
out.collect();}});
6 broadcast 广播
Broadcast State 是 Flink 1.5 引入的新特性。 在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到 另一个数据流的计算中 。
6.1 API 介绍 , 核心要点
- 将需要广播出去的流,调用 broadcast 方法进行广播转换,得到广播流 BroadCastStream
- 然后在主流上调用 connect 算子,来连接广播流(以实现广播状态的共享处理)
- 在连接流上调用 process 算子,就会在同一个 ProcessFunciton 中提供两个方法分别对两个流进行 处理,并在这个 ProcessFunction 内实现“广播状态”的共享
publicclass _16_BroadCast_Demo {publicstaticvoidmain(String[] args)throwsException{Configuration configuration =newConfiguration();
configuration.setInteger("rest.port",8822);StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
env.setParallelism(1);// id,eventIdDataStreamSource<String> stream1 = env.socketTextStream("localhost",9998);SingleOutputStreamOperator<Tuple2<String,String>> s1 = stream1.map(s ->{String[] arr = s.split(",");returnTuple2.of(arr[0], arr[1]);}).returns(newTypeHint<Tuple2<String,String>>(){});// id,age,cityDataStreamSource<String> stream2 = env.socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple3<String,String,String>> s2 = stream2.map(s ->{String[] arr = s.split(",");returnTuple3.of(arr[0], arr[1], arr[2]);}).returns(newTypeHint<Tuple3<String,String,String>>(){});/**
* 案例背景:
* 流 1: 用户行为事件流(持续不断,同一个人也会反复出现,出现次数不定
* 流 2: 用户维度信息(年龄,城市),同一个人的数据只会来一次,来的时间也不定 (作为广播流)
* 需要加工流 1,把用户的维度信息填充好,利用广播流来实现
*/// 将字典数据所在流: s2 , 转成 广播流MapStateDescriptor<String,Tuple2<String,String>> userInfoStateDesc =newMapStateDescriptor<>("userInfoStateDesc",TypeInformation.of(String.class),TypeInformation.of(newTypeHint<Tuple2<String,String>>(){}));BroadcastStream<Tuple3<String,String,String>> s2BroadcastStream = s2.broadcast(userInfoStateDesc);// 哪个流处理中需要用到广播状态数据,就要 去 连接 connect 这个广播流SingleOutputStreamOperator<String> connected = s1.connect(s2BroadcastStream).process(newBroadcastProcessFunction<Tuple2<String,String>,Tuple3<String,String,String>,String>(){/**BroadcastState<String, Tuple2<String, String>> broadcastState;*//**
* 本方法,是用来处理 主流中的数据(每来一条,调用一次)
* @param element 左流(主流)中的一条数据
* @param ctx 上下文
* @param out 输出器
* @throws Exception
*/@OverridepublicvoidprocessElement(Tuple2<String,String> element,BroadcastProcessFunction<Tuple2<String,String>,Tuple3<String,String,String>,String>.ReadOnlyContext ctx,Collector<String> out)throwsException{// 通过 ReadOnlyContext ctx 取到的广播状态对象,是一个 “只读 ” 的对象;ReadOnlyBroadcastState<String,Tuple2<String,String>> broadcastState = ctx.getBroadcastState(userInfoStateDesc);if(broadcastState !=null){Tuple2<String,String> userInfo = broadcastState.get(element.f0);
out.collect(element.f0 +","+ element.f1 +","+(userInfo ==null?null: userInfo.f0)+","+(userInfo ==null?null: userInfo.f1));}else{ out.collect(element.f0 +","+ element.f1 +","+null+","+null);}}/****
* @param element 广播流中的一条数据
* @param ctx 上下文
* @param out 输出器
* @throws Exception
*/@OverridepublicvoidprocessBroadcastElement(Tuple3<String,String,String> element,BroadcastProcessFunction<Tuple2<String,String>,Tuple3<String,String,String>,String>.Context ctx,Collector<String> out)throwsException{// 从上下文中,获取广播状态对象(可读可写的状态对象) BroadcastState<String,Tuple2<String,String>> broadcastState = ctx.getBroadcastState(userInfoStateDesc);// 然后将获得的这条广播流数据,拆分后,装入广播状态
broadcastState.put(element.f0,Tuple2.of(element.f1, element.f2));}
resultStream.print();
env.execute();}}
版权归原作者 Direction_Wind 所有, 如有侵权,请联系我们删除。