0


flink 窗口函数

文章目录

窗口函数

时间语义

事件像水流一样到来,经过pipline进行处理,为了划定窗口进行计算,需要以时间作为标准,也就是流中元素事件的先后以及间隔描述。

flink是一个分布式系统,如何让所有机器保证时间的完全同步呢。比如上游任务8点59分59秒发送了消息,到达下游时是9点零1秒,那么应该放到哪个窗口内计算呢。所以,我们需要决定到底以哪个时间为标准。

处理时间

processing time,指执行处理操作的机器的系统时间,就是说什么时候到达处理机器,就将其划分到哪个滑动窗口。这是最直接的时间语义,各结点按照自己的系统时钟划分窗口。

处理时间由于数据一旦到来就处理,所以效率很高,延迟很低。一般用在对准确性要求不太高的场景。

事件时间

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这就是时间戳。这种情况,我们无法看到系统时间,假设只以到来时间的时间戳为基准,控制滑动窗口,那么到来数据必须是有序的,时间戳也是不断增长的。这实际上不可能,所以我们需要借助另外的标志,水位线wateramrks。

事件语义允许设置水位线,并可以接收乱序的数据,可以进行一定的延迟等待,让窗口内的所有数据都到齐,再进行计算。

摄入时间

它指数据进入flink数据流的时间,也就是source算子读入数据的时间。他是事件时间和处理时间的中和,不会引入太大的延迟,他的具体行为跟事件时间非常像,可以当做特殊的事件时间处理。

水位线

关于sparkstreaming和flink水位线的思考对比:

https://www.sohu.com/a/270444235_494938

流计算中我们需要保存状态,但是Dstream是无状态的,那么其count算子是如何工作的呢,答案是将前一个时间步的RDD作为当前时间步的前继结点,就能达到状态更替的效果。

watermark用来度量事件时间,watermark是为了服务事件时间产生的。

在处理时间语义中,每个事件以到达处理机器的时间作为时间戳,当机器时间到达9点(窗口尾部),那么触发计算8点到9点这个窗口的数据,左闭右开。

在事件时间语义中,每个事件以事件产生的时间为时间戳,以事件时间作为窗口的起始和截止,何时触发操作呢?每当一个事件到来,我们读取它的时间戳,作为当前的水位线时钟,当9点的事件到来时,我们认为数据到齐了,开始触发计算。当然,我们可以通过调节触发时机,来调整数据延迟的容忍度及处理效率。

通过在数据流中插入一条记录,这条记录包含从数据流事件中读取的时间戳,称为水位线。通过将水位线广播到下游所有子任务,可以更新下游子任务的时钟

水位线的特性:

1必须递增

2可以设置延迟,保证处理完整的乱序流

3一个水位线t表示t之前的数据都到齐了,之后流中不会再出现小于t的事件了。

有序流中的水位线

有序流中可以保证水位线有序增长,在实际应用中,我们需要水位线的插入周期,当大量相同时间戳的数据到来时,不要频繁插入值相同的水位线。以系统时间为基准,每隔一段时间,插入水位线。

乱序流中的水位线

乱序流中有很多迟到数据,我们需要容忍这些数据全部到期,所以我们可以为水位线添加延迟,当读取时间戳时,减去2s作为时间戳,再当做水位线插入数据流。

生成水位线

生成水位线原则

我们知道,完美的水位线是绝对正确的,一旦水位线t出现表示t之前的数据都到齐了,之后流中不会再出现小于t的事件了。但是,这在实际中很难达成,我们需要考虑效率,及一些意外延迟。如何确保大部分数据不迟到,设置合理的水位线呢。另一种做法是,创建一个单独的flink作业监控事件流,统计事件概率,学习事件的迟到规律。然后,选择置信区间确定延迟,作为水位线的生成策略。

水位线生成策略

flink的datastream api中,使用assignTimestampsAndWatermarks()为流中的数据分配时间戳,并生成水位线指示时间。

assignTimestampsAndWatermarks方法需要传入一个WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy中包含一个时间戳分配器TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础
WatermarkGenerator主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator接口中,主要又有两个方法:onEvent()和onPeriodicEmit()
onEvent:每个事件到来都会调用的方法,它的参数由当前时间、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作。
onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInternal()方法来设置,默认为200ms。
env.getConfig().setAutoWatermarkInternal(60*1000L)
flink内置水位线生成器

flink提供了内置的生成器,可以使用WatermarkStrategy的静态辅助方法来创建,它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景

有序流

对于有序流,主要特点就是时间戳单调递增,所以永远不会出现迟到数据的问题。直接调用WatermarkStrategy.forMonotonousTimestamps()就可以实现,就是拿当前最大的时间戳作为水位线。

我们使用withTimestampAssigner方法将数据中的timestamp字段取出来,作为时间戳分配给数据元素;然后用内置的有序流水线生成器构造策略。

stream.assignTimestampsAndWatermarks(
 WatermarkStrategy.<Event>forMonotonousTimestamps()
 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() 
{
 @Override
 public long extractTimestamp(Event element, long recordTimestamp) 
{
 return element.timestamp;
 }
 })
);
乱序流

乱序流需要设置延迟时间。调用WatermarkStrategy. forBoundedOutOfOrderness()可以实现,这个方法需要传入一个maxOutOfOrderness参数,表示最大乱序程度。

env
 .addSource(new ClickSource())
 // 插入水位线的逻辑
 .assignTimestampsAndWatermarks(
 // 针对乱序流插入水位线,延迟时间设置为 5s
 
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
 .withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {
 // 抽取时间戳的逻辑
@Override
public long extractTimestamp(Event element, long 
recordTimestamp) {
 return element.timestamp;
 }
 })
 )
 .print();
自定义水位线

水位线分为周期性的,断点式的,分别在onPeriodicEmit和onEvent编写程序。

周期性水位线生成器

周期性水位线一般通过onEvent观察输入事件,而在onPerioddicEmit发出水位线。

public static class CustomPeriodicGenerator implements 
WatermarkGenerator<Event> {
 private Long delayTime = 5000L; // 延迟时间
 private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
 @Override
 public void onEvent(Event event, long eventTimestamp, WatermarkOutput 
output) {
 // 每来一条数据就调用一次
 maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
 }
 @Override
 public void onPeriodicEmit(WatermarkOutput output) {
 // 发射水位线,默认 200ms 调用一次
 output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
 }
 }
}
断点式水位线生成器

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时, 就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

@Override
 public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
// 只有在遇到特定的 itemId 时,才发出水位线
 if (r.user.equals("Mary")) {
 output.emitWatermark(new Watermark(r.timestamp - 1));
 }
 }
 @Override
 public void onPeriodicEmit(WatermarkOutput output) {
 // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
 }

水位线的传递

当有多个来自上游的水位线时,取值最小的水位线,因为要确保水位线以前的消息都到齐了。

窗口

窗口将无界数据流切分为多个数据块

当使用处理时间时,窗口的概念很直观。

当使用事件时间时,窗口的触发时机发生了变化。我们把窗口理解为一个桶,讲不同的数据收集到正确的窗口桶中。

主要关注几个属性:事件到来,窗口范围及存储数据,水位线。

注意触发计算和窗口关闭是可以分开的,只是一般情况无需这么复杂。

窗口的分类

按照驱动类型分类

窗口可以按照驱动类型分为时间窗口、计数窗口。

计数窗口按照某个固定的个数,来截取一段数据集,这种窗口叫做计数窗口。

时间窗口

flink中有一个时间窗口的类叫做TimeWindow,这个类有两个私有属性:start和end,表示窗口的开始和结束的时间戳,单位为毫秒。

private final long start;
private final long end;

我们可以调用公有的getStrat()和getEnd()方法直接获取这两个时间戳。另外,TimeWindow还提供了一个maxTimestamp()方法,用来获取窗口中能够包含的数据的最大时间戳。

public long maxTimestamp(){
   return end-1;
}
#很明显,窗口中的数据,最大的允许的时间戳是end-1,也就代表了窗口时间范围是左闭右开的。
计数窗口

计数窗口基于元素的个数来截取数据,到达固定个数时就触发计算并关闭窗口。这相当于座位有限,人满就发车。每个窗口截取的个数,就是窗口的大小。flink通过全局窗口global window来实现计数窗口。

全局窗口
按照窗口分配数据的规则分配

时间窗口和计数窗口,只是对窗口的一个大致划分;在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以有不同的功能应用。

根据分配的规则,窗口的具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

滚动窗口Tumbling Windows

滚动窗口对数据进行均匀分片。窗口之间没有重叠,也不会有间隔,是首尾相接的状态。如果把多个窗口的创建看作一个窗口的移动,那么他就像在滚动一样。

滑动窗口Sliding Windows

由窗口大小和滑动距离确定,每个窗口之间有一定重叠部分。滑动窗口是滚动窗口的一种广义方式,当滑动步长等于滑动窗口大小时,就是滚动窗口。

会话窗口Session Windows

简单来说,就是数据来了之后开启一个会话窗口,如果接下来还有数据陆续到来,那么一直保持会话,如果一段时间没有接收到数据,那就认为会话超时失效,窗口自动关闭。

会话窗口只能基于时间来定义,而没有会话计数窗口的概念,因为会话的意思就是“隔一段时间没有数据来”。会话窗口的关键参数是窗口大小,如果两个数据到来的间隔小于指定的大小,那么说明还在保持会话。

在乱序流中,如果gap间隔超过size就关闭,可能导致迟到的消息丢失。为了处理这种情况,每当来一个新的数据,都会创建一个新的会话窗口;然后判断已有窗口之间的距离,如果小于给定的size,就对它进行合并操作。相当于会话窗口永远不关闭,一直在维护,无论迟到的数据何时到来,总能合并到正确的会话中。注意会话窗口的一些特性:

1不同分区是不相关的

2会话窗口的长度不固定

3起始和结束时间不确定

全局窗口Global Windows

还有一类比较通用的窗口,就是全局窗口。这种窗口全局有效,会把相同key的所有数据分配到同一个窗口中。说直白点,就是没分窗口一样,这种窗口没有结束的时候,默认是不会做触发计算的,必须编写触发器。

窗口API

#按键分区Keyed Windows
stream.keyBy().window()
#非按键分区Non-Keyed Windows
stream.windowAll()
#窗口api使用
steram.keyBy(<key selector>)
       .window(<window assigner>)
       .aggregate(<window function>)
窗口分配器
时间窗口

分为滚动、滑动和会话三种

#滚动处理时间窗口
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
#滑动处理时间窗口
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
#处理时间会话窗口
stream.keyBy(...)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(...)
#动态定义超时时间
.window(ProcessingTimeSessionWindows.withDynamicGap(new 
SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
 @Override
 public long extract(Tuple2<String, Long> element) { 
// 提取 session gap 值返回, 单位毫秒
 return element.f0.length() * 1000;
 }
}))
#滚动事件时间窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
#滑动事件时间窗口
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
#事件时间会话窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
计数窗口
#滚动计数窗口
stream.keyBey()
   .countWindow(10)
#滑动计数窗口
stream.keyBy()
       .countWindow(10,3)
全局窗口
stream.keyBy()
      .window(GlobalWindows.create())
窗口函数
增量聚合函数

简单聚合是对一些特定统计需求的实现,那么reduce算子就是一个一般化的聚合统计操作

1.归约函数

reduce操作会将keyedstream转换为datastream,调用reduce传入一个参数,此类需要实现reduceFunction接口。定义的方法有两个参数,是输入事件,将输入事件合并可以得到输出事件。

在流处理中,将两个输入事件合并后的状态进行保存,当到来一个新事件时,对其进行计算并更新状态。

所有聚合的操作保存在flink的状态内存中,因为他需要跨越多条记录,需要根据key保存状态。数据流入的过程,就是不断计算并更新flink中保存的状态的过程。

2.聚合函数

aggregate需要指定一个aggregatefunction函数,可以看做reduce函数的通用版本,这里有三种类型IN,ACC,OUT,分别代表输入类型,累加器类型,输出类型。

接口中有四个方法:

createAccumulator():创建一个累加器,这就是为聚合创建一个初始状态,每个聚合任务只会调用一次

add():将输入的元素添加到累加器中,这就是聚合状态,对于新来的数据进行进一步聚合的过程。传入两个参数,当前新到来的数据value,和当前的累加器accumulator;返回一个新的累加器值,对聚合状态进行更新。

getResult():从累加器提取聚合的输出结果。也就是说我们可以定义多个状态,然后基于这些聚合的状态计算出一个结果进行输出。比如计算平均,我们可以设置sum和count两个状态,最终调用这个方法时相除得到最终的结果。这个方法只在窗口要输出结果时调用。

merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口的场景就是会话窗口。

与reduce相比,aggregate的输入格式与输出格式可以不同。

全窗口函数

全窗口函数与增量聚合函数不同,全窗口函数需要先收集窗口中的所有数据,然后在计算全部数据。

这种计算方式相比于流处理是低效的,但是有的时候必须获取到所有数据才能计算,或者需要获取窗口的起始时间等,那么就必须使用全窗口函数。这是典型的批处理思想。

1.窗口函数(WindowFunction)

WindowFunction字面上就是“窗口函数”,他其实就是老版本的通用窗口函数接口。我们可以基于WindowedStream调用apply方法,传入一个WindowFunction的实现类。

WindowFunction可用的功能较少,一般使用ProcessWindowFunction

stream
      .keyBy(<key selector>)
      .window(<window assigner>)
      .apply(new MyWindowFunction());

2.处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction是windowAPI中最底层的通用窗口函数接口。相比于WindowFunction,他可以拿到Context上下文对象,不仅能获得窗口信息,还能访问当前时间和内存状态。这里的时间包括当前时间和时间水位线。

stream
      .keyBy(<key selector>)
      .window(<window assigner>)
      .process(UvCountByWindow())
public static class UvCountByWindow extends ProcessWindowFunction<Event, 
    String, Boolean, TimeWindow>{
         @Override
         public void process(Boolean aBoolean, Context context, Iterable<Event> 
                elements, Collector<String> out) throws Exception {
                 HashSet<String> userSet = new HashSet<>();
                 // 遍历所有数据,放到 Set 里去重
                 for (Event event: elements){
                 userSet.add(event.user);
                 }
                 // 结合窗口信息,包装输出内容
                 Long start = context.window().getStart();
                 Long end = context.window().getEnd();
                 out.collect(" 窗 口 : " + new Timestamp(start) + " ~ " + new 
                Timestamp(end)
                 + " 的独立访客数量是:" + userSet.size());
    }
}
}
增量聚合函数和全窗口函数的结合使用

对于reduce和aggregate函数,我们除了可以传入一个ReduceFunction 或 AggregateFunction 进行增量聚合,还可以传入WindowFunction获取更多丰富的信息,传入的可以是WindowFunction或ProcessWindowFunction。

示例中apply的输入就是ReduceFunction 或 AggregateFunction 处理完的结果,当窗口到达结尾触发计算才会调用第二个窗口函数的apply方法,将其传入windowFunction处理后,作为aggregate函数的处理结果。

#可以借助aggregate函数调用窗口函数
stream
      .keyBy(<key selector>)
      .window(<window assigner>)
      .aggregate(new AggreagteFunction(),new WindowResultFunction())
     
public class WindowResultFunction
        implements WindowFunction<Long, TopProductEntity, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple key, TimeWindow window, Iterable<Long> aggregateResult, Collector<TopProductEntity> collector) throws Exception {
        int itemId = key.getField(0);
        Long count = aggregateResult.iterator().next();
        collector.collect(TopProductEntity.of(itemId,window.getEnd(),count));
    }
}
其他API

对于一个窗口算子,其由窗口分配器+窗口函数构成。flink还提供了其他可选的api,让我们灵活的控制窗口行为。

触发器

触发器主要用来控制窗口什么时候触发计算。所谓的触发计算,本质上就是执行窗口函数,所以可以认为是计算得到的结果。

基于WindowedStream调用trigger

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认 的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间 窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。 所以一般情况下是不需要自定义触发器的,不过我们依然有必要了解它的原理。

stream.keyBy(...)
     .window(...)
     .trigger(new MyTrigger())
#Trigger是一个抽象类,自定义时需要实现如下四个抽象方法
onElement():对元素的响应
onEventTime():对事件时间的响应
onProcessingTime():对处理时间的响应
clear():窗口销毁时调用此方法
前三个方法通过放回enum值,控制窗口行为。
枚举值为:CONTINUE,FIRE,PURGE,FIRE_AND_PURGE
如上的枚举值说明触发计算和窗口销毁是可以分开的,并不一定一起触发。
移除器

移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器。evictor是一个接口,不同的窗口类型都有各自实现的移除器。

stream.keyBy()
      .window()
      .evictor(new MyEvictor())
evictor定义了两个方法
evictBefore():定义执行窗口函数之前的移除数据操作
evictAfter():定义执行窗口函数之后的移除数据操作
允许延迟

为了让迟到的数据也参与计算,修正结果的误差,使用允许延迟控制窗口的销毁时间。

这里可以看出,窗口的触发计算和清除操作确实可以分开。不过在默认情况下,允许的延迟是0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口。

stream.keyBy()
      .window(TumblingEventTimeWindows.of(Time.hours(1)))
      .allowedLateness(Time.minutes(1))      
将迟到的数据放入侧输出流

即使可以设置延迟,仍然会有数据 超过延迟。如果不想丢弃任何数据,可以借助side output进行另外的处理。所谓的侧输出流,相当于数据流的一个分支,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。

基于WindowedStream调用.sideOutputLateData()方法,就可以实现这个功能。方法需要传入一个输出标签,用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以OutputTag的类型和流中数据类型相同。

DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
     .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .sideOutputLateData(outputTag)
    .aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
窗口的生命周期
窗口的创建

窗口的类型和基本信息由窗口分配器指定,但是窗口不会预先创建好,而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建窗口。

窗口计算的触发

除了窗口分配器,还有自己的窗口函数的触发器。窗口函数可以分为增量聚合函数和全窗口函数,主要定义了计算的逻辑。而触发器就是窗口函数触发计算的条件。

窗口的销毁

flink只对时间窗口进行销毁,计数窗口由于是基于全局窗口实现的,而全局窗口不会清除状态,所以就不会销毁。

窗口api总结
stream.
     . keyBy()
     .window()
     [.trigger()]
     [.evictor()]
     [.allowedLateness()]
     [.sideOutputLateData]
     .redluce/aggregate/fold/apply()
     [.getSideOutput()]

stream
     .windowAll()
     [.trigger()]
     [.evictor()]
     [.allowedLateness()]
     [.sideOutputLateData()]
     .reduce/aggregate/fold/apply()
     [.getSideOutput]

DataStream,KeyStream,WindowedStream,singleOutputStream

迟到数据的处理

1水位线延迟

2窗口延迟销毁

3侧输出流

标签: flink 大数据 java

本文转载自: https://blog.csdn.net/m0_50689246/article/details/126309826
版权归原作者 CODE20220318 所有, 如有侵权,请联系我们删除。

“flink 窗口函数”的评论:

还没有评论