0


Flink 窗口触发器(Trigger)(一)

Flink 窗口触发器(Trigger)(一)
Flink 窗口触发器(Trigger)(二)

Flink的窗口触发器(Trigger)是流处理中一个非常关键的概念,它定义了窗口何时被触发并决定触发后的行为(如进行窗口数据的计算或清理)。

一、基本概念

  • 定义:触发器决定了窗口何时被触发以及触发后的行为。在Flink中,窗口的触发是通过设置定时器来实现的。
  • 作用:控制窗口数据的聚合时机,确保数据在适当的时间点被处理和输出。在这里插入图片描述

二、类型

Flink提供了多种内置的触发器,以下几种为常用类型:

  1. EventTimeTrigger
  • 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立即触发窗口计算。
  • 适用场景:适用于需要基于事件时间进行处理的场景,如金融交易、日志分析等。
  1. ProcessingTimeTrigger
  • 工作原理:基于处理时间(即机器的系统时间)来触发窗口计算。当处理时间达到窗口的结束时间时,触发窗口计算。
  • 适用场景:适用于对时间精度要求不高的场景,或者当事件时间无法准确获取时。
  1. CountTrigger
  • 工作原理:根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。
  • 适用场景:适用于需要基于数据量进行处理的场景,如批量数据处理、流量分析等。
  1. ContinuousEventTimeTriggerContinuousProcessingTimeTrigger
  • 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的结束时间小于当前的时间(事件时间或处理时间)时触发计算。
  • 适用场景:适用于需要周期性处理数据的场景,如实时监控、周期性报表等。
  1. DeltaTrigger
  • 工作原理:根据接入数据计算出的Delta指标是否超过指定的阈值来触发窗口计算。
  • 适用场景:适用于需要基于数据变化量进行处理的场景,如异常检测、趋势分析等。
  1. PurgingTrigger
  • 工作原理:将其他触发器作为参数转换为Purge类型的触发器,在触发计算后清除窗口内的数据。
  • 适用场景:适用于需要在计算完成后立即清除窗口数据的场景,以节省存储空间。

三、关键方法

触发器通常包含以下几个关键方法:

  1. onElement(T element, long timestamp, W window, TriggerContext ctx) 当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
  2. onEventTime(long time, W window, TriggerContext ctx) 当事件时间计时器触发时调用,用于处理事件时间相关的触发逻辑。
  3. onProcessingTime(long time, W window, TriggerContext ctx) 当处理时间计时器触发时调用,用于处理处理时间相关的触发逻辑。
  4. onMerge(W window, OnMergeContext ctx)(可选) 当两个窗口合并时调用,用于合并窗口的状态和定时器。
  5. clear(W window, TriggerContext ctx) 当窗口被删除时调用,用于清理窗口的状态和定时器。

四、行为

触发器在触发时会返回一个TriggerResult枚举值,以决定窗口的后续行为。常见的TriggerResult值包括:

  • CONTINUE:表示不进行任何操作,等待下一个触发条件。
  • FIRE:表示触发窗口计算并输出结果,但窗口状态保持不变。
  • PURGE:表示不触发窗口计算,只清除窗口内的数据和状态。
  • FIRE_AND_PURGE:表示触发窗口计算并输出结果,然后清除窗口内的数据和状态。

Flink的窗口触发器是流处理中非常灵活且强大的工具,它允许开发者根据实际需求定义窗口的触发条件和触发后的行为。通过选择合适的触发器和配置相应的参数,可以实现高效、准确的流数据处理。

五、Trigger

EventTimeTrigger

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.apache.flink.streaming.api.windowing.triggers;importorg.apache.flink.annotation.PublicEvolving;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;@PublicEvolvingpublicclassEventTimeTriggerextendsTrigger<Object,TimeWindow>{privatestaticfinallong serialVersionUID =1L;privateEventTimeTrigger(){}publicTriggerResultonElement(Object element,long timestamp,TimeWindow window,Trigger.TriggerContext ctx)throwsException{if(window.maxTimestamp()<= ctx.getCurrentWatermark()){returnTriggerResult.FIRE;}else{
            ctx.registerEventTimeTimer(window.maxTimestamp());returnTriggerResult.CONTINUE;}}publicTriggerResultonEventTime(long time,TimeWindow window,Trigger.TriggerContext ctx){return time == window.maxTimestamp()?TriggerResult.FIRE:TriggerResult.CONTINUE;}publicTriggerResultonProcessingTime(long time,TimeWindow window,Trigger.TriggerContext ctx)throwsException{returnTriggerResult.CONTINUE;}publicvoidclear(TimeWindow window,Trigger.TriggerContext ctx)throwsException{
        ctx.deleteEventTimeTimer(window.maxTimestamp());}publicbooleancanMerge(){returntrue;}publicvoidonMerge(TimeWindow window,Trigger.OnMergeContext ctx){long windowMaxTimestamp = window.maxTimestamp();if(windowMaxTimestamp > ctx.getCurrentWatermark()){
            ctx.registerEventTimeTimer(windowMaxTimestamp);}}publicStringtoString(){return"EventTimeTrigger()";}publicstaticEventTimeTriggercreate(){returnnewEventTimeTrigger();}}

ProcessingTimeTrigger

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.apache.flink.streaming.api.windowing.triggers;importorg.apache.flink.annotation.PublicEvolving;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;@PublicEvolvingpublicclassProcessingTimeTriggerextendsTrigger<Object,TimeWindow>{privatestaticfinallong serialVersionUID =1L;privateProcessingTimeTrigger(){}publicTriggerResultonElement(Object element,long timestamp,TimeWindow window,Trigger.TriggerContext ctx){
        ctx.registerProcessingTimeTimer(window.maxTimestamp());returnTriggerResult.CONTINUE;}publicTriggerResultonEventTime(long time,TimeWindow window,Trigger.TriggerContext ctx)throwsException{returnTriggerResult.CONTINUE;}publicTriggerResultonProcessingTime(long time,TimeWindow window,Trigger.TriggerContext ctx){returnTriggerResult.FIRE;}publicvoidclear(TimeWindow window,Trigger.TriggerContext ctx)throwsException{
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());}publicbooleancanMerge(){returntrue;}publicvoidonMerge(TimeWindow window,Trigger.OnMergeContext ctx){long windowMaxTimestamp = window.maxTimestamp();if(windowMaxTimestamp > ctx.getCurrentProcessingTime()){
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}publicStringtoString(){return"ProcessingTimeTrigger()";}publicstaticProcessingTimeTriggercreate(){returnnewProcessingTimeTrigger();}}

ProcessingTimeoutTrigger

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.apache.flink.streaming.api.windowing.triggers;importjava.time.Duration;importorg.apache.flink.annotation.PublicEvolving;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.common.typeutils.base.LongSerializer;importorg.apache.flink.streaming.api.windowing.windows.Window;@PublicEvolvingpublicclassProcessingTimeoutTrigger<T,WextendsWindow>extendsTrigger<T,W>{privatestaticfinallong serialVersionUID =1L;privatefinalTrigger<T,W> nestedTrigger;privatefinallong interval;privatefinalboolean resetTimerOnNewRecord;privatefinalboolean shouldClearOnTimeout;privatefinalValueStateDescriptor<Long> timeoutStateDesc;privateProcessingTimeoutTrigger(Trigger<T,W> nestedTrigger,long interval,boolean resetTimerOnNewRecord,boolean shouldClearOnTimeout){this.nestedTrigger = nestedTrigger;this.interval = interval;this.resetTimerOnNewRecord = resetTimerOnNewRecord;this.shouldClearOnTimeout = shouldClearOnTimeout;this.timeoutStateDesc =newValueStateDescriptor("timeout",LongSerializer.INSTANCE);}publicTriggerResultonElement(T element,long timestamp,W window,Trigger.TriggerContext ctx)throwsException{TriggerResult triggerResult =this.nestedTrigger.onElement(element, timestamp, window, ctx);if(triggerResult.isFire()){this.clear(window, ctx);return triggerResult;}else{ValueState<Long> timeoutState =(ValueState)ctx.getPartitionedState(this.timeoutStateDesc);long nextFireTimestamp = ctx.getCurrentProcessingTime()+this.interval;Long timeoutTimestamp =(Long)timeoutState.value();if(timeoutTimestamp !=null&&this.resetTimerOnNewRecord){
                ctx.deleteProcessingTimeTimer(timeoutTimestamp);
                timeoutState.clear();
                timeoutTimestamp =null;}if(timeoutTimestamp ==null){
                timeoutState.update(nextFireTimestamp);
                ctx.registerProcessingTimeTimer(nextFireTimestamp);}return triggerResult;}}publicTriggerResultonProcessingTime(long timestamp,W window,Trigger.TriggerContext ctx)throwsException{TriggerResult triggerResult =this.nestedTrigger.onProcessingTime(timestamp, window, ctx);if(this.shouldClearOnTimeout){this.clear(window, ctx);}return triggerResult.isPurge()?TriggerResult.FIRE_AND_PURGE:TriggerResult.FIRE;}publicTriggerResultonEventTime(long timestamp,W window,Trigger.TriggerContext ctx)throwsException{TriggerResult triggerResult =this.nestedTrigger.onEventTime(timestamp, window, ctx);if(this.shouldClearOnTimeout){this.clear(window, ctx);}return triggerResult.isPurge()?TriggerResult.FIRE_AND_PURGE:TriggerResult.FIRE;}publicvoidclear(W window,Trigger.TriggerContext ctx)throwsException{ValueState<Long> timeoutTimestampState =(ValueState)ctx.getPartitionedState(this.timeoutStateDesc);Long timeoutTimestamp =(Long)timeoutTimestampState.value();if(timeoutTimestamp !=null){
            ctx.deleteProcessingTimeTimer(timeoutTimestamp);
            timeoutTimestampState.clear();}this.nestedTrigger.clear(window, ctx);}publicStringtoString(){return"TimeoutTrigger("+this.nestedTrigger.toString()+")";}publicstatic<T,WextendsWindow>ProcessingTimeoutTrigger<T,W>of(Trigger<T,W> nestedTrigger,Duration timeout){returnnewProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(),false,true);}publicstatic<T,WextendsWindow>ProcessingTimeoutTrigger<T,W>of(Trigger<T,W> nestedTrigger,Duration timeout,boolean resetTimerOnNewRecord,boolean shouldClearOnTimeout){returnnewProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);}}

CountTrigger

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.apache.flink.streaming.api.windowing.triggers;importorg.apache.flink.annotation.PublicEvolving;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.common.state.ReducingState;importorg.apache.flink.api.common.state.ReducingStateDescriptor;importorg.apache.flink.api.common.typeutils.base.LongSerializer;importorg.apache.flink.streaming.api.windowing.windows.Window;@PublicEvolvingpublicclassCountTrigger<WextendsWindow>extendsTrigger<Object,W>{privatestaticfinallong serialVersionUID =1L;privatefinallong maxCount;privatefinalReducingStateDescriptor<Long> stateDesc;privateCountTrigger(long maxCount){this.stateDesc =newReducingStateDescriptor("count",newSum(),LongSerializer.INSTANCE);this.maxCount = maxCount;}publicTriggerResultonElement(Object element,long timestamp,W window,Trigger.TriggerContext ctx)throwsException{ReducingState<Long> count =(ReducingState)ctx.getPartitionedState(this.stateDesc);
        count.add(1L);if((Long)count.get()>=this.maxCount){
            count.clear();returnTriggerResult.FIRE;}else{returnTriggerResult.CONTINUE;}}publicTriggerResultonEventTime(long time,W window,Trigger.TriggerContext ctx){returnTriggerResult.CONTINUE;}publicTriggerResultonProcessingTime(long time,W window,Trigger.TriggerContext ctx)throwsException{returnTriggerResult.CONTINUE;}publicvoidclear(W window,Trigger.TriggerContext ctx)throwsException{((ReducingState)ctx.getPartitionedState(this.stateDesc)).clear();}publicbooleancanMerge(){returntrue;}publicvoidonMerge(W window,Trigger.OnMergeContext ctx)throwsException{
        ctx.mergePartitionedState(this.stateDesc);}publicStringtoString(){return"CountTrigger("+this.maxCount +")";}publicstatic<WextendsWindow>CountTrigger<W>of(long maxCount){returnnewCountTrigger(maxCount);}privatestaticclassSumimplementsReduceFunction<Long>{privatestaticfinallong serialVersionUID =1L;privateSum(){}publicLongreduce(Long value1,Long value2)throwsException{return value1 + value2;}}}

ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.apache.flink.streaming.api.windowing.triggers;importorg.apache.flink.annotation.PublicEvolving;importorg.apache.flink.annotation.VisibleForTesting;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.common.state.ReducingState;importorg.apache.flink.api.common.state.ReducingStateDescriptor;importorg.apache.flink.api.common.typeutils.base.LongSerializer;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.Window;@PublicEvolvingpublicclassContinuousEventTimeTrigger<WextendsWindow>extendsTrigger<Object,W>{privatestaticfinallong serialVersionUID =1L;privatefinallong interval;privatefinalReducingStateDescriptor<Long> stateDesc;privateContinuousEventTimeTrigger(long interval){this.stateDesc =newReducingStateDescriptor("fire-time",newMin(),LongSerializer.INSTANCE);this.interval = interval;}publicTriggerResultonElement(Object element,long timestamp,W window,Trigger.TriggerContext ctx)throwsException{if(window.maxTimestamp()<= ctx.getCurrentWatermark()){returnTriggerResult.FIRE;}else{
            ctx.registerEventTimeTimer(window.maxTimestamp());ReducingState<Long> fireTimestampState =(ReducingState)ctx.getPartitionedState(this.stateDesc);if(fireTimestampState.get()==null){this.registerNextFireTimestamp(timestamp - timestamp %this.interval, window, ctx, fireTimestampState);}returnTriggerResult.CONTINUE;}}publicTriggerResultonEventTime(long time,W window,Trigger.TriggerContext ctx)throwsException{if(time == window.maxTimestamp()){returnTriggerResult.FIRE;}else{ReducingState<Long> fireTimestampState =(ReducingState)ctx.getPartitionedState(this.stateDesc);Long fireTimestamp =(Long)fireTimestampState.get();if(fireTimestamp !=null&& fireTimestamp == time){
                fireTimestampState.clear();this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);returnTriggerResult.FIRE;}else{returnTriggerResult.CONTINUE;}}}publicTriggerResultonProcessingTime(long time,W window,Trigger.TriggerContext ctx)throwsException{returnTriggerResult.CONTINUE;}publicvoidclear(W window,Trigger.TriggerContext ctx)throwsException{ReducingState<Long> fireTimestamp =(ReducingState)ctx.getPartitionedState(this.stateDesc);Long timestamp =(Long)fireTimestamp.get();if(timestamp !=null){
            ctx.deleteEventTimeTimer(timestamp);
            fireTimestamp.clear();}}publicbooleancanMerge(){returntrue;}publicvoidonMerge(W window,Trigger.OnMergeContext ctx)throwsException{
        ctx.mergePartitionedState(this.stateDesc);Long nextFireTimestamp =(Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();if(nextFireTimestamp !=null){
            ctx.registerEventTimeTimer(nextFireTimestamp);}}publicStringtoString(){return"ContinuousEventTimeTrigger("+this.interval +")";}@VisibleForTestingpubliclonggetInterval(){returnthis.interval;}publicstatic<WextendsWindow>ContinuousEventTimeTrigger<W>of(Time interval){returnnewContinuousEventTimeTrigger(interval.toMilliseconds());}privatevoidregisterNextFireTimestamp(long time,W window,Trigger.TriggerContext ctx,ReducingState<Long> fireTimestampState)throwsException{long nextFireTimestamp =Math.min(time +this.interval, window.maxTimestamp());
        fireTimestampState.add(nextFireTimestamp);
        ctx.registerEventTimeTimer(nextFireTimestamp);}privatestaticclassMinimplementsReduceFunction<Long>{privatestaticfinallong serialVersionUID =1L;privateMin(){}publicLongreduce(Long value1,Long value2)throwsException{returnMath.min(value1, value2);}}}
//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.apache.flink.streaming.api.windowing.triggers;importorg.apache.flink.annotation.PublicEvolving;importorg.apache.flink.annotation.VisibleForTesting;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.common.state.ReducingState;importorg.apache.flink.api.common.state.ReducingStateDescriptor;importorg.apache.flink.api.common.typeutils.base.LongSerializer;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.Window;@PublicEvolvingpublicclassContinuousProcessingTimeTrigger<WextendsWindow>extendsTrigger<Object,W>{privatestaticfinallong serialVersionUID =1L;privatefinallong interval;privatefinalReducingStateDescriptor<Long> stateDesc;privateContinuousProcessingTimeTrigger(long interval){this.stateDesc =newReducingStateDescriptor("fire-time",newMin(),LongSerializer.INSTANCE);this.interval = interval;}publicTriggerResultonElement(Object element,long timestamp,W window,Trigger.TriggerContext ctx)throwsException{ReducingState<Long> fireTimestampState =(ReducingState)ctx.getPartitionedState(this.stateDesc);
        timestamp = ctx.getCurrentProcessingTime();if(fireTimestampState.get()==null){this.registerNextFireTimestamp(timestamp - timestamp %this.interval, window, ctx, fireTimestampState);}returnTriggerResult.CONTINUE;}publicTriggerResultonEventTime(long time,W window,Trigger.TriggerContext ctx)throwsException{returnTriggerResult.CONTINUE;}publicTriggerResultonProcessingTime(long time,W window,Trigger.TriggerContext ctx)throwsException{ReducingState<Long> fireTimestampState =(ReducingState)ctx.getPartitionedState(this.stateDesc);if(((Long)fireTimestampState.get()).equals(time)){
            fireTimestampState.clear();this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);returnTriggerResult.FIRE;}else{returnTriggerResult.CONTINUE;}}publicvoidclear(W window,Trigger.TriggerContext ctx)throwsException{ReducingState<Long> fireTimestamp =(ReducingState)ctx.getPartitionedState(this.stateDesc);Long timestamp =(Long)fireTimestamp.get();if(timestamp !=null){
            ctx.deleteProcessingTimeTimer(timestamp);
            fireTimestamp.clear();}}publicbooleancanMerge(){returntrue;}publicvoidonMerge(W window,Trigger.OnMergeContext ctx)throwsException{
        ctx.mergePartitionedState(this.stateDesc);Long nextFireTimestamp =(Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();if(nextFireTimestamp !=null){
            ctx.registerProcessingTimeTimer(nextFireTimestamp);}}@VisibleForTestingpubliclonggetInterval(){returnthis.interval;}publicStringtoString(){return"ContinuousProcessingTimeTrigger("+this.interval +")";}publicstatic<WextendsWindow>ContinuousProcessingTimeTrigger<W>of(Time interval){returnnewContinuousProcessingTimeTrigger(interval.toMilliseconds());}privatevoidregisterNextFireTimestamp(long time,W window,Trigger.TriggerContext ctx,ReducingState<Long> fireTimestampState)throwsException{long nextFireTimestamp =Math.min(time +this.interval, window.maxTimestamp());
        fireTimestampState.add(nextFireTimestamp);
        ctx.registerProcessingTimeTimer(nextFireTimestamp);}privatestaticclassMinimplementsReduceFunction<Long>{privatestaticfinallong serialVersionUID =1L;privateMin(){}publicLongreduce(Long value1,Long value2)throwsException{returnMath.min(value1, value2);}}}

DeltaTrigger

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.apache.flink.streaming.api.windowing.triggers;importorg.apache.flink.annotation.PublicEvolving;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.common.typeutils.TypeSerializer;importorg.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;importorg.apache.flink.streaming.api.windowing.windows.Window;@PublicEvolvingpublicclassDeltaTrigger<T,WextendsWindow>extendsTrigger<T,W>{privatestaticfinallong serialVersionUID =1L;privatefinalDeltaFunction<T> deltaFunction;privatefinaldouble threshold;privatefinalValueStateDescriptor<T> stateDesc;privateDeltaTrigger(double threshold,DeltaFunction<T> deltaFunction,TypeSerializer<T> stateSerializer){this.deltaFunction = deltaFunction;this.threshold = threshold;this.stateDesc =newValueStateDescriptor("last-element", stateSerializer);}publicTriggerResultonElement(T element,long timestamp,W window,Trigger.TriggerContext ctx)throwsException{ValueState<T> lastElementState =(ValueState)ctx.getPartitionedState(this.stateDesc);if(lastElementState.value()==null){
            lastElementState.update(element);returnTriggerResult.CONTINUE;}elseif(this.deltaFunction.getDelta(lastElementState.value(), element)>this.threshold){
            lastElementState.update(element);returnTriggerResult.FIRE;}else{returnTriggerResult.CONTINUE;}}publicTriggerResultonEventTime(long time,W window,Trigger.TriggerContext ctx){returnTriggerResult.CONTINUE;}publicTriggerResultonProcessingTime(long time,W window,Trigger.TriggerContext ctx)throwsException{returnTriggerResult.CONTINUE;}publicvoidclear(W window,Trigger.TriggerContext ctx)throwsException{((ValueState)ctx.getPartitionedState(this.stateDesc)).clear();}publicStringtoString(){return"DeltaTrigger("+this.deltaFunction +", "+this.threshold +")";}publicstatic<T,WextendsWindow>DeltaTrigger<T,W>of(double threshold,DeltaFunction<T> deltaFunction,TypeSerializer<T> stateSerializer){returnnewDeltaTrigger(threshold, deltaFunction, stateSerializer);}}

PurgingTrigger

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.apache.flink.streaming.api.windowing.triggers;importorg.apache.flink.annotation.PublicEvolving;importorg.apache.flink.annotation.VisibleForTesting;importorg.apache.flink.streaming.api.windowing.windows.Window;@PublicEvolvingpublicclassPurgingTrigger<T,WextendsWindow>extendsTrigger<T,W>{privatestaticfinallong serialVersionUID =1L;privateTrigger<T,W> nestedTrigger;privatePurgingTrigger(Trigger<T,W> nestedTrigger){this.nestedTrigger = nestedTrigger;}publicTriggerResultonElement(T element,long timestamp,W window,Trigger.TriggerContext ctx)throwsException{TriggerResult triggerResult =this.nestedTrigger.onElement(element, timestamp, window, ctx);return triggerResult.isFire()?TriggerResult.FIRE_AND_PURGE: triggerResult;}publicTriggerResultonEventTime(long time,W window,Trigger.TriggerContext ctx)throwsException{TriggerResult triggerResult =this.nestedTrigger.onEventTime(time, window, ctx);return triggerResult.isFire()?TriggerResult.FIRE_AND_PURGE: triggerResult;}publicTriggerResultonProcessingTime(long time,W window,Trigger.TriggerContext ctx)throwsException{TriggerResult triggerResult =this.nestedTrigger.onProcessingTime(time, window, ctx);return triggerResult.isFire()?TriggerResult.FIRE_AND_PURGE: triggerResult;}publicvoidclear(W window,Trigger.TriggerContext ctx)throwsException{this.nestedTrigger.clear(window, ctx);}publicbooleancanMerge(){returnthis.nestedTrigger.canMerge();}publicvoidonMerge(W window,Trigger.OnMergeContext ctx)throwsException{this.nestedTrigger.onMerge(window, ctx);}publicStringtoString(){return"PurgingTrigger("+this.nestedTrigger.toString()+")";}publicstatic<T,WextendsWindow>PurgingTrigger<T,W>of(Trigger<T,W> nestedTrigger){returnnewPurgingTrigger(nestedTrigger);}@VisibleForTestingpublicTrigger<T,W>getNestedTrigger(){returnthis.nestedTrigger;}}
标签: flink java 服务器

本文转载自: https://blog.csdn.net/mqiqe/article/details/140116193
版权归原作者 王小工 所有, 如有侵权,请联系我们删除。

“Flink 窗口触发器(Trigger)(一)”的评论:

还没有评论