文章目录
导航
观看本文章之前,请先看之前文章
(十)Flink Datastream API 编程指南 算子-1 (转换算子、物理分区、任务链、资源组 、算子和作业)等基本介绍
常用算子基本使用
效果
本次主要是阅读几个常用算子的源码。
阅读算子列表
- map
- flatmap
- keyby
- filter
- reduce
- window
- windowAll
- 其它
Map()
首先说一下这个算子是one to one的,通俗的讲就是进一条数据 经过逻辑处理后 必出一条数据。
用户代码
packagecom.stream.samples;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author DeveoperZJQ
* @since 2022/11/12
*/publicclassMapOperator{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 消费socket数据源DataStreamSource<String> dataStream = env.socketTextStream("192.168.112.147",7777);// map => String::length 是传入的虚函数map的逻辑SingleOutputStreamOperator<Integer> map = dataStream.map(String::length);// print输出
map.print();
env.execute(MapOperator.class.getSimpleName());}}
上面的逻辑非常简单,目的只是为了追踪map的入口,你可以使用debug模式,并且在map上打上断点,可以一层一层的往下看。
两种变形
public<R>SingleOutputStreamOperator<R>map(MapFunction<T,R> mapper){// 根据传入函数获取返回值类型TypeInformation<R> outType =TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper),this.getType(),Utils.getCallLocationName(),true);// 调用下面的map方法returnthis.map(mapper, outType);}// 带返回的输出类型参数,可以看到传入的operator默认名字就叫Mappublic<R>SingleOutputStreamOperator<R>map(MapFunction<T,R> mapper,TypeInformation<R> outputType){returnthis.transform("Map", outputType,(OneInputStreamOperator)(newStreamMap((MapFunction)this.clean(mapper))));}
上面两个重载方法刚好对应上面截图中的两个方法。
@PublicEvolvingpublic<R>SingleOutputStreamOperator<R>transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T,R> operator){// 抽象工厂returnthis.doTransform(operatorName, outTypeInfo,SimpleOperatorFactory.of(operator));}
上面代码有一个抽象工厂的简单 算子工厂类,传入的是OneInputStreamOperator类型的算子,然后把用户代码的operator转换成了StreamOperatorFactory operatorFactory, 工厂类是真的多啊,如果设计模式关于工厂方法和抽象工厂没学牢固的同学,可以来这里细品。
// protected 只能包能调用protected<R>SingleOutputStreamOperator<R>doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory){// 我理解这里调用这个就是为了抛出异常的返回类型,如果可用,typeUsed则标识为truethis.transformation.getOutputType();// 构建OneInputTransformationOneInputTransformation<T,R> resultTransform =newOneInputTransformation(this.transformation, operatorName, operatorFactory, outTypeInfo,this.environment.getParallelism());SingleOutputStreamOperator<R> returnStream =newSingleOutputStreamOperator(this.environment, resultTransform);// 这里讲用户逻辑封装到resultTransform,全部装填到List<Transformation<?>> 为构建流图StreamGraph做准备this.getExecutionEnvironment().addOperator(resultTransform);return returnStream;}// 添加算子到Transformations<>集合中@InternalpublicvoidaddOperator(Transformation<?> transformation){Preconditions.checkNotNull(transformation,"transformation must not be null.");this.transformations.add(transformation);}
env.execute(MapOperator.class.getSimpleName());这里开始执行,物理执行图部署成功,这个就开始了消费数据。
当然通过上面说的,你可以尝试着自定义map算子或者其它算子,只要使用transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator<T, R> operator)得当,就会实现你的功能,但是一般不建议入门选手这样做。
这里面,咱们还看到很多使用clean() 方法的地方,后面章节会拿出一章说明这个。
flatMap()
可以看出,flatMap的源码和map非常类似。有一个地方值得咱们看一下,就是传入的函数和map是不同 ,flatmap可以一进多出,一进不出,但是map做不到,单纯从算子性能上说flatMap的开销是要比map大的。
来看一下下面两者的区别:new StreamMap((MapFunction) vs new StreamFlatMap((FlatMapFunction)
StreamMap
publicStreamMap(MapFunction<IN, OUT> mapper){super(mapper);this.chainingStrategy =ChainingStrategy.ALWAYS;}
单从这个各自的构造器上看不出什么,传入的函数不一样,可以点击进去
@Public@FunctionalInterfacepublicinterfaceMapFunction<T,O>extendsFunction,Serializable{Omap(T value)throwsException;}
map方法是有返回值的。
再来看下具体处理方法:
publicvoidprocessElement(StreamRecord<IN> element)throwsException{this.output.collect(element.replace(((MapFunction)this.userFunction).map(element.getValue())));}
从上面可以看出map直接把数据通过collect回收了,等于没有对用户暴露,那么flatMatp呢?看下面的源码。
StreamFlatMap
publicStreamFlatMap(FlatMapFunction<IN, OUT> flatMapper){super(flatMapper);this.chainingStrategy =ChainingStrategy.ALWAYS;}
点击进去,是下面的接口类,里面有一个flatMap方法,继承的类也是一样的。
@Public@FunctionalInterfacepublicinterfaceFlatMapFunction<T,O>extendsFunction,Serializable{voidflatMap(T value,Collector<O> out)throwsException;}
flatMap没有返回值,它是利用Collector out返回数据的。
再来看下具体处理方法:
publicvoidopen()throwsException{super.open();this.collector =newTimestampedCollector(this.output);}publicvoidprocessElement(StreamRecord<IN> element)throwsException{this.collector.setTimestamp(element);((FlatMapFunction)this.userFunction).flatMap(element.getValue(),this.collector);}
把this.collector作为参数封装到flatMap里了,用户可以通过collector进行操作。
那么现在是不是明白了,为啥map是一对一,flatMap是一对多和一对0了吧。
结语
本文章是系列文章中的一篇,如果有错误的地方,欢迎批评指正!欢迎同行交流!
版权归原作者 京河小蚁 所有, 如有侵权,请联系我们删除。