文章目录
1. 触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算,执行窗口函数。基于 WindowedStream 用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:
------onElement():窗口中每到来一个元素,都会调用这个方法
------onEventTime():当注册的事件时间定时器触发时,将调用这个方法
------onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法
------clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态
这几个方法的参数中都有一个触发器上下文(TriggerContext)对象,可以用来注册定时器回调(callback)。定时器(Timer)代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作
除clear外,三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型:
------CONTINUE(继续):什么都不做
------FIRE(触发):触发计算,输出结果
------PURGE(清除):清空窗口中的所有数据,销毁窗口
------FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。如:所有事件时间窗口,默认的触发器都是
EventTimeTrigger
;
源码:
publicclassEventTimeTriggerextendsTrigger<Object,TimeWindow>{privatestaticfinallong serialVersionUID =1L;privateEventTimeTrigger(){}@OverridepublicTriggerResultonElement(Object element,long timestamp,TimeWindow window,TriggerContext ctx)throwsException{if(window.maxTimestamp()<= ctx.getCurrentWatermark()){// if the watermark is already past the window fire immediatelyreturnTriggerResult.FIRE;}else{
ctx.registerEventTimeTimer(window.maxTimestamp());returnTriggerResult.CONTINUE;}}@OverridepublicTriggerResultonEventTime(long time,TimeWindow window,TriggerContext ctx){return time == window.maxTimestamp()?TriggerResult.FIRE :TriggerResult.CONTINUE;}@OverridepublicTriggerResultonProcessingTime(long time,TimeWindow window,TriggerContext ctx)throwsException{returnTriggerResult.CONTINUE;}@Overridepublicvoidclear(TimeWindow window,TriggerContext ctx)throwsException{
ctx.deleteEventTimeTimer(window.maxTimestamp());}@OverridepublicbooleancanMerge(){returntrue;}@OverridepublicvoidonMerge(TimeWindow window,OnMergeContext ctx){// only register a timer if the watermark is not yet past the end of the merged window// this is in line with the logic in onElement(). If the watermark is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if(windowMaxTimestamp > ctx.getCurrentWatermark()){
ctx.registerEventTimeTimer(windowMaxTimestamp);}}@OverridepublicStringtoString(){return"EventTimeTrigger()";}/**
* Creates an event-time trigger that fires once the watermark passes the end of the window.
*
* <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
*/publicstaticEventTimeTriggercreate(){returnnewEventTimeTrigger();}}
实际场景:
每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 5 秒钟触发一次窗口的计算:
publicclassTriggerExample{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<Event> streamSource = env.addSource(newClickSource());SingleOutputStreamOperator<Event> watermarks = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(newSerializableTimestampAssigner<Event>(){@OverridepubliclongextractTimestamp(Event event,long l){return event.timestamp;}}));SingleOutputStreamOperator<UrlViewCount> process = watermarks.keyBy(r -> r.url).window(TumblingEventTimeWindows.of(Time.seconds(10))).trigger(newMyTrigger()).process(newWindowResult());
process.print();
env.execute();}publicstaticclassWindowResultextendsProcessWindowFunction<Event,UrlViewCount,String,TimeWindow>{@Overridepublicvoidprocess(String s,Context context,Iterable<Event> iterable,Collector<UrlViewCount> collector)throwsException{
collector.collect(newUrlViewCount(
s,// 获取迭代器中的元素个数
iterable.spliterator().getExactSizeIfKnown(),
context.window().getStart(),
context.window().getEnd()));}}publicstaticclassMyTriggerextendsTrigger<Event,TimeWindow>{@OverridepublicTriggerResultonElement(Event event,long l,TimeWindow timeWindow,TriggerContext triggerContext)throwsException{ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(newValueStateDescriptor<Boolean>("first-event",Types.BOOLEAN));if(isFirstEvent.value()==null){for(long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i +5000L){
triggerContext.registerEventTimeTimer(i);}
isFirstEvent.update(true);}returnTriggerResult.CONTINUE;}@OverridepublicTriggerResultonEventTime(long l,TimeWindow timeWindow,TriggerContext triggerContext)throwsException{returnTriggerResult.FIRE;}@OverridepublicTriggerResultonProcessingTime(long l,TimeWindow timeWindow,TriggerContext triggerContext)throwsException{returnTriggerResult.CONTINUE;}@Overridepublicvoidclear(TimeWindow timeWindow,TriggerContext triggerContext)throwsException{ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(newValueStateDescriptor<Boolean>("first-event",Types.BOOLEAN));
isFirstEvent.clear();}}}
结果如图所示,每个用户在10秒的窗口内,触发了3次计算
2. 移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器
stream.keyBy(...).window(...).evictor(newMyEvictor())
Evictor 接口定义了两个方法:evictBefore():定义执行窗口函数之前的移除数据操作; evictAfter():定义执行窗口函数之后的以处数据操作默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的
3. 允许延迟(Allowed Lateness)
当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个
允许的最大延迟
(Allowed Lateness)。在这段时间内,窗口不会销毁,继续到来的数据依然可以
进入窗口中并触发计算
。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).allowedLateness(Time.minutes(1))
4. 将迟到的数据放入侧输出流
基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现将未收入窗口的迟到数据,放入侧输出流(side output)进行另外的处理。方法需要传入一个
输出标签
(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同
DataStream<Event> stream = env.addSource(...);OutputTag<Event> outputTag =newOutputTag<Event>("late"){};
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).sideOutputLateData(outputTag)
将迟到数据放入侧输出流之后,基于窗口处理完成之后的DataStream,用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).sideOutputLateData(outputTag).aggregate(newMyAggregateFunction())DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
版权归原作者 但行益事莫问前程 所有, 如有侵权,请联系我们删除。