0


Flink时间窗口程序骨架结构

前言

Flink 作业的基本骨架结构包含三部分:创建执行环境、定义数据处理逻辑、提交并执行Flink作业。

日常大部分 Flink 作业是基于时间窗口计算模型的,同样的,开发一个Flink时间窗口作业也有一套基本的骨架结构,了解这套结构有助于我们更快地上手时间窗口作业开发。

窗口程序的基本骨架

一个Flink时间窗口作业的代码基本骨架如下所示:

stream
   .keyBy(...)<-  仅 keyed 窗口需要
   .window(...)<-  必填项:"assigner"[.trigger(...)]<-  可选项:"trigger"(省略则使用默认 trigger)[.evictor(...)]<-  可选项:"evictor"(省略则不使用 evictor)[.allowedLateness(...)]<-  可选项:"lateness"(省略则为 0)[.sideOutputLateData(...)]<-  可选项:"output tag"(省略则不对迟到数据使用 side output).reduce/aggregate/apply()<-  必填项:"function"[.getSideOutput(...)]<-  可选项:"output tag"

时间窗口作业对数据逻辑的处理,主要包含以下步骤:

  • 对数据流进行分组,将DataStream装换为KeyedStream
  • 指定窗口分配器 WindowAssigner,将数据划分到对应的窗口
  • 指定窗口触发器 Trigger,决定了窗口何时关闭并计算
  • 指定窗口移除器 Evictor,它可以在窗口计算前后对窗口内的数据进行移除
  • allowedLateness 允许迟到的数据,事件时间语义下,即使事件时钟到达窗口关闭时间,窗口仍会保留一段时间以等待迟到的数据
  • sideOutputLateData 针对窗口关闭后到达的迟到数据,可以将其输出到另外一条数据流,对计算结果做修正
  • ProcessFunction 窗口内数据的处理函数

时间窗口作业实战

了解了时间窗口作业的基本骨架,以及相关组件的作用,接下来就实战一把。

如下示例程序,数据源每秒会生成2个一百以内的随机数,然后数据经过 keyBy 算子分组,这里为了简单,数据全部划分为一组,KeySelector 统一返回 “all”。

分组后,窗口分配器将数据划分到对应的窗口。这里基于处理时间语义,统一分配10秒大小的时间窗口,时间窗口被Flink封装为 TimeWindow 对象,包含两个属性,分别是起始时间戳和结束时间戳。

一旦有数据进入窗口,Trigger#onElement 就会触发,返回值决定了Flink如何处理窗口。显然我们的逻辑是时间到达窗口的结束时间,窗口就会触发计算并关闭,所以我们会注册一个 ProcessingTime 事件,窗口结束时间一到,Trigger#onProcessingTime 就会触发,窗口就会开始计算。

窗口计算前,还需要经过移除器Evictor。它有两个方法,分别在窗口计算前和计算后调用,在这里你可以根据条件移除窗口内无须计算的数据。示例程序中,把小于10的数移除掉了。

最终,窗口内的数据会交给 ProcessWindowFunction 处理,窗口内的数据被Flink封装成迭代器Iterable,通过它可以获得所有窗口内的数据。示例程序 中,我们所有元素打印出来并求和。

publicclassTimeWindowStructure{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment environment =StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(newSourceFunction<Long>(){@Overridepublicvoidrun(SourceContext<Long> sourceContext)throwsException{while(true){Threads.sleep(500);
                            sourceContext.collect(ThreadLocalRandom.current().nextLong(100));}}@Overridepublicvoidcancel(){}}).keyBy(i ->"all")// 窗口分配器.window(newWindowAssigner<Long,TimeWindow>(){staticfinallong WINDOW_SIZE =10_000L;@OverridepublicCollection<TimeWindow>assignWindows(Long event,long timestamp,WindowAssignerContext windowAssignerContext){// 把数据分配到对应的窗口,一条数据甚至可以分配到多个窗口// 这里根据处理时间 分配10秒大小的窗口finallong processingTime = windowAssignerContext.getCurrentProcessingTime();long start = processingTime / WINDOW_SIZE * WINDOW_SIZE;long end = start + WINDOW_SIZE;returnList.of(newTimeWindow(start, end));}@OverridepublicTrigger<Long,TimeWindow>getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment){// 默认触发器,废弃了returnnull;}@OverridepublicTypeSerializer<TimeWindow>getWindowSerializer(ExecutionConfig executionConfig){// 窗口序列化器returnnewTimeWindow.Serializer();}@OverridepublicbooleanisEventTime(){// 是否基于事件时间语义returnfalse;}})// 窗口触发器.trigger(newTrigger<Long,TimeWindow>(){privatelong max_register_processing_time =0L;@OverridepublicTriggerResultonElement(Long element,long timestamp,TimeWindow timeWindow,TriggerContext triggerContext)throwsException{// 每个元素进入窗口,都会触发该方法 返回结果决定了窗口是否计算或关闭// 我们是根据处理时间窗口结束时间来判断是否触发的,所以注册一个处理时间事件即可if(timeWindow.maxTimestamp()> max_register_processing_time){
                            max_register_processing_time = timeWindow.maxTimestamp();
                            triggerContext.registerProcessingTimeTimer(max_register_processing_time);}returnTriggerResult.CONTINUE;}@OverridepublicTriggerResultonProcessingTime(long l,TimeWindow timeWindow,TriggerContext triggerContext)throwsException{// 窗口计算并清除数据returnTriggerResult.FIRE_AND_PURGE;}@OverridepublicTriggerResultonEventTime(long l,TimeWindow timeWindow,TriggerContext triggerContext)throwsException{returnnull;}@Overridepublicvoidclear(TimeWindow timeWindow,TriggerContext triggerContext)throwsException{
                        triggerContext.deleteProcessingTimeTimer(timeWindow.maxTimestamp());}})// 窗口移除器.evictor(newEvictor<Long,TimeWindow>(){@OverridepublicvoidevictBefore(Iterable<TimestampedValue<Long>> iterable,int i,TimeWindow timeWindow,EvictorContext evictorContext){// 窗口计算前触发Iterator<TimestampedValue<Long>> iterator = iterable.iterator();while(iterator.hasNext()){TimestampedValue<Long> next = iterator.next();Long value = next.getValue();if(value <10){
                                iterator.remove();System.err.println("Evicted:"+ value);}}}@OverridepublicvoidevictAfter(Iterable<TimestampedValue<Long>> iterable,int i,TimeWindow timeWindow,EvictorContext evictorContext){// 窗口计算后触发}})// 因为是基于事件时间语义,不存在迟到数据,所以无须设置 allowedLateness、sideOutputLateData// 窗口处理函数.process(newProcessWindowFunction<Long,String,String,TimeWindow>(){@Overridepublicvoidprocess(String group,ProcessWindowFunction<Long,String,String,TimeWindow>.Context context,Iterable<Long> iterable,Collector<String> collector)throwsException{TimeWindow window = context.window();StringBuilder builder =newStringBuilder();
                        builder.append("["+ window.getStart()+"-"+ window.maxTimestamp()+"] elements:");Iterator<Long> iterator = iterable.iterator();Long sum =0L;while(iterator.hasNext()){Long value = iterator.next();
                            sum += value;
                            builder.append(value +" ");}
                        builder.append(", sum:"+ sum);System.err.println(builder.toString());}});
        environment.execute();}}

运行Flink作业,控制台输出:

Evicted:3Evicted:6Evicted:1[1722665800000-1722665809999] elements:8917165794476798, sum:485Evicted:6Evicted:4[1722665810000-1722665819999] elements:865071953610554396362887895053356395, sum:1078Evicted:4Evicted:8Evicted:0[1722665820000-1722665829999] elements:8520428646203245915957643167787128, sum:922

尾巴

了解Flink时间窗口作业的基本骨架结构,理清Flink时间窗口的数据流转过程,有助于我们更快上手Flink时间窗口作业的开发。

Flink时间窗口作业包含的核心组件有:WindowAssigner、Window、Trigger、Evictor、ProcessWindowFunction。

标签: flink java

本文转载自: https://blog.csdn.net/qq_32099833/article/details/142958290
版权归原作者 程序员小潘 所有, 如有侵权,请联系我们删除。

“Flink时间窗口程序骨架结构”的评论:

还没有评论