简介
处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数。
Flink提供了8种不同处理函数:
ProcessFunction
:dataStreamKeyedProcessFunction
:用于KeyedStream,keyBy之后的流处理CoProcessFunction
:用于connect连接的流ProcessJoinFunction
:用于join流操作BroadcastProcessFunction
:用于广播KeyedBroadcastProcessFunction
:keyBy之后的广播ProcessWindowFunction
:窗口增量聚合ProcessAllWindowFunction
:全窗口聚合
@PublicpublicinterfaceRichFunctionextendsFunction{voidopen(Configuration var1)throwsException;voidclose()throwsException;RuntimeContextgetRuntimeContext();IterationRuntimeContextgetIterationRuntimeContext();voidsetRuntimeContext(RuntimeContext var1);}
基本处理函数(ProcessFunction)
处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。它所对应的函数 类,就叫作
ProcessFunction
。
处理函数的功能和使用
在很多应 用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。就必须使用处理函数进行实现。
处理函数提供了一个“定时服务” (
TimerService
),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线 (watermark),甚至可以注册“定时事件”。处理函数继承了
AbstractRichFunction
抽象类, 所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函 数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方 法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。
处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用
.process()
方法 就可以了。
ProcessFunction
不是接口,而是一个抽象类,继承了
AbstractRichFunction
; 所以所有的处理函数,都是富函数(RichFunction), 富函数可以调用的东西这里同样都可以调用。
PublicEvolvingpublicabstractclassProcessFunction<I,O>extendsAbstractRichFunction{privatestaticfinallong serialVersionUID =1L;publicProcessFunction(){}publicabstractvoidprocessElement(I var1,ProcessFunction<I,O>.Context var2,Collector<O> var3)throwsException;publicvoidonTimer(long timestamp,ProcessFunction<I,O>.OnTimerContext ctx,Collector<O> out)throwsException{}//publicabstractclassOnTimerContextextendsProcessFunction<I,O>.Context {publicOnTimerContext(){super();}publicabstractTimeDomaintimeDomain();}//publicabstractclassContext{publicContext(){}publicabstractLongtimestamp();publicabstractTimerServicetimerService();publicabstract<X>voidoutput(OutputTag<X> var1,X var2);}}
processElement
会针对流中的每条记录调用一次。跟
MapFunction
一样,
Collector
发送出去。
Context
可以访问时间戳,当前记录键值以及
TimeService
,支持将结果发送到副输出。
onTimer()
是一个回调函数,会在之前注册的计数器触发时调用。
timestamp
参数给出了所触发计时器的时间戳,
Collector
可用来发出记录。
OnTimerContext
能够提供和
processElement()
方法中
Context
对象相同的服务,它还会返回触发计时器的时间域(处理时间/事件时间)。
时间服务和计时器
Context
和
OnTimerContext
对象中
TimerService
提供时间相关的数据访问。
PublicEvolvingpublicinterfaceTimerService{/** Error string for {@link UnsupportedOperationException} on registering timers. */StringUNSUPPORTED_REGISTER_TIMER_MSG="Setting timers is only supported on a keyed streams.";/** Error string for {@link UnsupportedOperationException} on deleting timers. */StringUNSUPPORTED_DELETE_TIMER_MSG="Deleting timers is only supported on a keyed streams.";/** 返回当前的处理时间。 */longcurrentProcessingTime();/** 返回当前水位线时间戳。 */longcurrentWatermark();/**
针对当前键值注册一个处理时间计时器,当执行机器处理时间达到给定的时间戳,该计时器就会触发。
*/voidregisterProcessingTimeTimer(long time);/**
* 针对当前键值注册一个事件时间计时器,当更新后水位线时间戳大于或等于计时器时间戳时,它就会触发。
*/voidregisterEventTimeTimer(long time);/**
* 针对当前键值删除一个注册过的处理时间计时器。如果该计时器不存在,则方法不会有任何作用。
*/voiddeleteProcessingTimeTimer(long time);/**
* 针对当前键值删除一个注册过事件时间计时器,如果该计时器不存在,则方法不会有任何作用。
*/voiddeleteEventTimeTimer(long time);}
计时器触发时会调用
onTimer()
回调函数,系统对于
processElement()
和
onTimer()
两个方法调用同步,防止并发。
每个键值和时间戳只能注册一个计时器,每个键值可以有多个计时器,但具体到每个时间戳就只能有一个。
副输出/侧输出(SideOutput)
大多数DataStream API 算子都只有一个输出,即只能生成一条某个数据类型的结果流。只有
split
算子可以将一条流拆分成多条类型相同的流。
处理函数提供的副输出功能允许从同一函数发出多条数据流,它们类型可以不同。
按键分区处理函数(KeyedProcessFunction)
按键分区处理函数是重点,用在keyby后面,对keyedStream进行处理,keyby将会按照Key进行分区,然后将不同key的数据分配到不同并行子任务上进行执行。
PublicEvolvingpublicabstractclassKeyedProcessFunction<K,I,O>extendsAbstractRichFunction{privatestaticfinallong serialVersionUID =1L;publicabstractvoidprocessElement(I value,Context ctx,Collector<O> out)throwsException;publicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<O> out)throwsException{}publicabstractclassContext{publicabstractLongtimestamp();publicabstractTimerServicetimerService();publicabstract<X>voidoutput(OutputTag<X> outputTag,X value);/** Get key of the element being processed. */publicabstractKgetCurrentKey();}publicabstractclassOnTimerContextextendsContext{/** The {@link TimeDomain} of the firing timer. */publicabstractTimeDomaintimeDomain();/** Get key of the firing timer. */@OverridepublicabstractKgetCurrentKey();}}
窗口处理函数(ProcessWindowsFunction)
除了上面的按键分区处理函数之外,对于窗口也有函数,分两种,一种是窗口处理函数(
ProcessWindowsFunction
),另一种是全窗口处理函数(
ProcessAllWindowsFunction
),
ProcessWindowFunction
获得一个包含窗口所有元素的可迭代器以及一个具有时间和状态信息访问权的上下文对象,使得它比其他窗口函数提供更大的灵活性。是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。
ProcessWindowsFunction
:处理分区数据,每个窗口执行一次
process
方法.
publicabstractclassProcessWindowFunction<IN, OUT, KEY,WextendsWindow>extendsAbstractRichFunction{privatestaticfinallong serialVersionUID =1L;publicabstractvoidprocess(KEY key,Context context,Iterable<IN> elements,Collector<OUT> out)throwsException;publicvoidclear(Context context)throwsException{}/** The context holding window metadata. */publicabstractclassContextimplementsjava.io.Serializable{/** Returns the window that is being evaluated. */publicabstractWwindow();publicabstractlongcurrentProcessingTime();publicabstractlongcurrentWatermark();publicabstractKeyedStateStorewindowState();publicabstractKeyedStateStoreglobalState();publicabstract<X>voidoutput(OutputTag<X> outputTag,X value);}}
全窗口处理函数(ProcessAllWindowFunction)
ProcessAllWindowFunction
和
ProcessFunction
类相似,都是用来对上游过来的元素做处理,不过
ProcessFunction
是每个元素执行一次
processElement
方法,
ProcessAllWindowFunction
是每个窗口执行一次process方法(方法内可以遍历该窗口内的所有元素);
publicabstractclassProcessAllWindowFunction<IN, OUT,WextendsWindow>extendsAbstractRichFunction{privatestaticfinallong serialVersionUID =1L;publicabstractvoidprocess(Context context,Iterable<IN> elements,Collector<OUT> out)throwsException;publicvoidclear(Context context)throwsException{}/** The context holding window metadata. */publicabstractclassContext{publicabstractWwindow();publicabstractKeyedStateStorewindowState();publicabstractKeyedStateStoreglobalState();publicabstract<X>voidoutput(OutputTag<X> outputTag,X value);}}
合并流处理函数(CoProcessFunction)
对于连接流
ConnectedStreams
的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(
co-process function
)。与
CoMapFunction
类似,如果是调用
.flatMap()
就需要传入一个
CoFlatMapFunction
,需要实现
flatMap1()
、
flatMap2()
两个方法;而调用
.process()
时,传入的则是一个
CoProcessFunction
。
publicabstractclassCoProcessFunction<IN1, IN2, OUT>extendsAbstractRichFunction{privatestaticfinallong serialVersionUID =1L;publicabstractvoidprocessElement1(IN1 value,Context ctx,Collector<OUT> out)throwsException;publicabstractvoidprocessElement2(IN2 value,Context ctx,Collector<OUT> out)throwsException;publicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<OUT> out)throwsException{}publicabstractclassContext{publicabstractLongtimestamp();/** A {@link TimerService} for querying time and registering timers. */publicabstractTimerServicetimerService();publicabstract<X>voidoutput(OutputTag<X> outputTag,X value);}publicabstractclassOnTimerContextextendsContext{publicabstractTimeDomaintimeDomain();}}
连接流处理函数(ProcessJoinFunction)
ProcessJoinFunction
和
CoProcessFunction
类似,但是有区别。
ProcessJoinFunction
用于join流操作,可以拿到两个流数据处理
CoProcessFunction
用于连接流处理,两个流数据分别处理
publicabstractclassProcessJoinFunction<IN1, IN2, OUT>extendsAbstractRichFunction{privatestaticfinallong serialVersionUID =-2444626938039012398L;publicabstractvoidprocessElement(IN1 left,IN2 right,Context ctx,Collector<OUT> out)throwsException;publicabstractclassContext{publicabstractlonggetLeftTimestamp();publicabstractlonggetRightTimestamp();publicabstractlonggetTimestamp();publicabstract<X>voidoutput(OutputTag<X> outputTag,X value);}}
广播流处理函数(BroadcastProcessFunction)
广播连接流处理函数,基于
BroadcastConnectedStream
调用
.process()
时作为参数传入。这里的“广播连接流”
BroadcastConnectedStream
,是一个未
keyBy
的普通 DataStream 与一个广播流(
BroadcastStream
)做连接(
conncet
)之后的产物。
publicabstractclassBroadcastProcessFunction<IN1, IN2, OUT>extendsBaseBroadcastProcessFunction{privatestaticfinallong serialVersionUID =8352559162119034453L;publicabstractvoidprocessElement(finalIN1 value,finalReadOnlyContext ctx,finalCollector<OUT> out)throwsException;publicabstractvoidprocessBroadcastElement(finalIN2 value,finalContext ctx,finalCollector<OUT> out)throwsException;publicabstractclassContextextendsBaseBroadcastProcessFunction.Context{}publicabstractclassReadOnlyContextextendsBaseBroadcastProcessFunction.ReadOnlyContext{}}
按键分区的广播连接流处理函数(KeyedBroadcastProcessFunction)
按键分区的广播连接流处理函数,同样是基于
BroadcastConnectedStream
调用
.process()
时作为参数传入。与
BroadcastProcessFunction
不同的是,这时的广播连接流, 是一个
KeyedStream
与广播流(
BroadcastStream
)做连接之后的产物。
附录
参考
Flink官方文档
版权归原作者 demon7552003 所有, 如有侵权,请联系我们删除。