大纲
在《Java版Flink使用指南——分流导出》中,我们通过addSink进行了输出分流。本文我们将介绍几种通过多个无界流输入合并成一个流来进行处理的方案。
新建工程
我们新建一个名字叫MultiSource的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
无界流
我们使用《Java版Flink使用指南——自定义无界流生成器》中的方法,我们定义3个无界流。其中两个是Long类型,一个是String类型。
奇数Long型无界流
src/main/java/org/example/generator/UnBoundedOddStreamGenerator.java
这个类每隔1秒钟产生一个Long型奇数。
packageorg.example.generator;importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;publicclassUnBoundedOddStreamGeneratorextendsRichSourceFunction<Long>{privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<Long> ctx)throwsException{long count =1L;while(isRunning){Thread.sleep(1000);// Simulate delay
ctx.collect(count);// Emit data
count = count +2;}}@Overridepublicvoidcancel(){
isRunning =false;System.out.println("UnBoundedStreamGenerator canceled");}}
偶数Long型无界流
src/main/java/org/example/generator/UnBoundedEvenStreamGenerator.java
这个类每隔1秒钟产生一个Long型偶数。
packageorg.example.generator;importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;publicclassUnBoundedEvenStreamGeneratorextendsRichSourceFunction<Long>{privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<Long> ctx)throwsException{long count =0L;while(isRunning){Thread.sleep(1000);// Simulate delay
ctx.collect(count);// Emit data
count = count +2;}}@Overridepublicvoidcancel(){
isRunning =false;System.out.println("UnBoundedStreamGenerator canceled");}}
奇数String型无界流
src/main/java/org/example/generator/UnBoundedOddStringStreamGenerator.java
这个类每隔1秒钟产生一个String型奇数。
packageorg.example.generator;importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;publicclassUnBoundedOddStringStreamGeneratorextendsRichSourceFunction<String>{privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<String> ctx)throwsException{long count =1L;while(isRunning){Thread.sleep(1000);// Simulate delay
ctx.collect(String.valueOf(count));// Emit data
count = count +2;}}@Overridepublicvoidcancel(){
isRunning =false;System.out.println("UnBoundedStreamGenerator canceled");}}
合流
Union
Union是最简单的算子。它可以把两个数据类型相同的流合并。
上面奇数和偶数Long型流就可以使用Union去做合并。
DataStreamSource<Long> evenLongDataStreamSource = env.addSource(newUnBoundedEvenStreamGenerator());DataStreamSource<Long> oddLongDataStreamSource = env.addSource(newUnBoundedOddStreamGenerator());
evenLongDataStreamSource.union(oddLongDataStreamSource).addSink(newSinkFunction<Long>(){@Overridepublicvoidinvoke(Long value,Context context)throwsException{System.out.println("sink union value: "+ value);}}).name("union stream");
Connect
Connect可以用于连接两个不同类型的流。这就意味着它需要提供针对不同类型的处理方法。
上面这个例子,如果使用Connect实现,则如下
evenLongDataStreamSource.connect(oddLongDataStreamSource).map(newCoMapFunction<Long,Long,Long>(){@OverridepublicLongmap1(Long value)throwsException{return value;}@OverridepublicLongmap2(Long value)throwsException{return value;}}).addSink(newSinkFunction<Long>(){@Overridepublicvoidinvoke(Long value,Context context)throwsException{System.out.println("sink connect value: "+ value);}}).name("connect stream");
map方法中的CoMapFunction接口类中的map1和map2就是将两个不同类型的流归一化处理的中间方法。
IN1是Connect方法调用者的流数据类型;IN2是Connect参数的流数据类型;R是它们归一化后的类型。
@PublicpublicinterfaceCoMapFunction<IN1, IN2, OUT>extendsFunction,Serializable{/**
* This method is called for each element in the first of the connected streams.
*
* @param value The stream element
* @return The resulting element
* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/OUTmap1(IN1 value)throwsException;/**
* This method is called for each element in the second of the connected streams.
*
* @param value The stream element
* @return The resulting element
* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/OUTmap2(IN2 value)throwsException;}
假如我们将Long型偶数流和String型奇数流合并,并生成一个Double类型的流,则可以如下
evenLongDataStreamSource.connect(oddStringDataStreamSource).map(newCoMapFunction<Long,String,Double>(){@OverridepublicDoublemap1(Long value)throwsException{returnDouble.valueOf(value);}@OverridepublicDoublemap2(String value)throwsException{returnDouble.valueOf(value);}}).addSink(newSinkFunction<Double>(){@Overridepublicvoidinvoke(Double value,Context context)throwsException{System.out.println("sink union connect value: "+ value);}}).name("union connect stream");
map1方法将evenLongDataStreamSource中的Long型数据转成了Double;map2将oddStringDataStreamSource中的String型数据转换成了Double。
测试
工程代码
版权归原作者 breaksoftware 所有, 如有侵权,请联系我们删除。