0


Flink 源码学习|Watermark 与 WatermarkGenerator

上游文档:

  • Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 生成 Watermark》学习笔记
  • Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 内置 Watermark 生成器》学习笔记
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

Watermark

Watermark 是在各个算子生成的、用于标记当前数据流事件时间的对象。当 Watermark 到达后,就意味着该数据流原则上将 不会 再到达比 Watermark 的事件时间更小的消息,即在 Watermark 后到达的事件时间更小的消息视作延迟消息。

首先,让我们来看一下

Watermark

类的源码。

源码

org.apache.flink.api.common.eventtime.Watermark

【Github】

packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;importjava.io.Serializable;importjava.text.SimpleDateFormat;importjava.util.Date;@PublicpublicfinalclassWatermarkimplementsSerializable{privatestaticfinallong serialVersionUID =1L;/** Thread local formatter for stringifying the timestamps. */privatestaticfinalThreadLocal<SimpleDateFormat>TS_FORMATTER=ThreadLocal.withInitial(()->newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));// ------------------------------------------------------------------------/** The watermark that signifies end-of-event-time. */publicstaticfinalWatermarkMAX_WATERMARK=newWatermark(Long.MAX_VALUE);// ------------------------------------------------------------------------/** The timestamp of the watermark in milliseconds. */privatefinallong timestamp;/** Creates a new watermark with the given timestamp in milliseconds. */publicWatermark(long timestamp){this.timestamp = timestamp;}/** Returns the timestamp associated with this Watermark. */publiclonggetTimestamp(){return timestamp;}/**
     * Formats the timestamp of this watermark, assuming it is a millisecond timestamp. The returned
     * format is "yyyy-MM-dd HH:mm:ss.SSS".
     */publicStringgetFormattedTimestamp(){returnTS_FORMATTER.get().format(newDate(timestamp));}// ------------------------------------------------------------------------@Overridepublicbooleanequals(Object o){returnthis== o
                || o !=null&& o.getClass()==Watermark.class&&((Watermark) o).timestamp ==this.timestamp;}@OverridepublicinthashCode(){returnLong.hashCode(timestamp);}@OverridepublicStringtoString(){return"Watermark @ "+ timestamp +" ("+getFormattedTimestamp()+')';}}

可以看到,

Watermark

类主要就是用来存储当前 watermark 的毫秒级时间戳,具体地:

  • 使用时间戳构造实例化 Watermark 对象,Watermark 对象在实例化后,不能修改其存储的时间戳
  • 提供 long getTimestamp()String getFormattedTimestamp() 两种查询 Watermark 对象时间戳的方法

WatermarkGenerator

接着,我们来看 watermark 的生成接口

WatermarkGenerator

的源码。

源码

org.apache.flink.api.common.eventtime.WatermarkGenerator

【Github】

packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;importorg.apache.flink.api.common.ExecutionConfig;/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
 * fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
 * AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */@PublicpublicinterfaceWatermarkGenerator<T>{/**
     * Called for every event, allows the watermark generator to examine and remember the event
     * timestamps, or to emit a watermark based on the event itself.
     */voidonEvent(T event,long eventTimestamp,WatermarkOutput output);/**
     * Called periodically, and might emit a new watermark, or not.
     *
     * <p>The interval in which this method is called and Watermarks are generated depends on {@link
     * ExecutionConfig#getAutoWatermarkInterval()}.
     */voidonPeriodicEmit(WatermarkOutput output);}
WatermarkGernator

接口,既可以基于消息,也可以基于周期。

WatermarkGenerator

接口有两个方法:

  • void onEvent(T event, long eventTimestamp, WatermarkOutput output):这个方法会在每个消息到达时被调用一次,其参数 event 为消息本身,参数 eventTimestamp 为消息的事件时间,output 为接收生成的 watermark 的对象。
  • void (WatermarkOutput output):这个方法会被周期性地调用,其参数 output 为接收生成的 watermark 的对象。

在实现

WatermarkGenerator

接口时,既可以在

onEvent

方法中生成 watermark,也可以在

onPeriodicEmit

方法中生成 watermark。因此,基于

WatermarkGenerator

接口,可以实现 标记生成周期性生成 两种 watermark 生成器。

下面,我们来看 Flink 内置的几个 watermark 生成器。

Flink 内置的 watermark 生成器

在这里插入图片描述

在这里,我们仅介绍 Flink 的

flink-core

项目中如下内置的 watermark 生成器:

  • NoWatermarksGenerator:不生成 watermark 的生成器
  • BoundedOutOfOrdernessWatermarks:固定延迟时间的周期性 watermark 生成器
  • AscendingTimestampsWatermarks:零延迟时间的周期性 watermark 生成器

不生成 watermark 的生成器:

NoWatermarksGenerator

最简单的,不生成任何 watermark 的生成器。在实现上,在

onEvent

方法和

onPeriodicEmit

方法中均不生成 watermark。

源码

org.apache.flink.api.common.eventtime.NoWatermarksGenerator

【Github】

packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;@PublicpublicfinalclassNoWatermarksGenerator<E>implementsWatermarkGenerator<E>{@OverridepublicvoidonEvent(E event,long eventTimestamp,WatermarkOutput output){}@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){}}

固定延迟时间的周期性 watermark 生成器:

BoundedOutOfOrdernessWatermarks

当输入数据流中消息的事件时间不完全有序,但是对于绝大部分元素,滞后时间通常不会超过一个固定的时间长度时,我们可以通过在当前最大事件时间的基础上减去一个固定延迟时间,来生成 watermark。Flink 内置的 watermark 生成器

BoundedOutOfOrdernessWatermarks

实现了这种功能。

源码

org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks

【Github】

packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;importjava.time.Duration;importstaticorg.apache.flink.util.Preconditions.checkArgument;importstaticorg.apache.flink.util.Preconditions.checkNotNull;@PublicpublicclassBoundedOutOfOrdernessWatermarks<T>implementsWatermarkGenerator<T>{/** The maximum timestamp encountered so far. */privatelong maxTimestamp;/** The maximum out-of-orderness that this watermark generator assumes. */privatefinallong outOfOrdernessMillis;/**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */publicBoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness){checkNotNull(maxOutOfOrderness,"maxOutOfOrderness");checkArgument(!maxOutOfOrderness.isNegative(),"maxOutOfOrderness cannot be negative");this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();// start so that our lowest watermark would be Long.MIN_VALUE.this.maxTimestamp =Long.MIN_VALUE+ outOfOrdernessMillis +1;}// ------------------------------------------------------------------------@OverridepublicvoidonEvent(T event,long eventTimestamp,WatermarkOutput output){
        maxTimestamp =Math.max(maxTimestamp, eventTimestamp);}@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){
        output.emitWatermark(newWatermark(maxTimestamp - outOfOrdernessMillis -1));}}

可以看到,在

BoundedOutOfOrdernessWatermarks

类中:

  • 使用固定的延迟时间 maxOutOfOrderness 来实例化
  • 使用示例属性 maxTimestamp 存储当前所有消息的最大事件时间,当每个消息到达时,onEvent 方法被调用,并更新 maxTimestamp 属性
  • 周期性地生成 watermark,当 onPeriodicEmit 方法被周期性地调用时,会根据当前的最大事件时间以及固定延迟时间来生成 watermark

零延迟时间的周期性 watermark 生成器:

AscendingTimestampsWatermarks

当数据源中消息的事件时间单调递增时,当前事件时间(同时也是最大事件时间)就可以充当 watermark,因为后续到达的消息的事件时间一定不会比当前事件时间小。例如,当只读取一个 Kafka 分区,并使用 Kafka 的消息时间戳作为事件时间时,则可以保证事件时间的单调递增。

此时的 watermark 生成规则,就相当于是延迟为 0 的 “固定延迟时间的周期性生成器”。Flink 内置的 watermark 生成器

AscendingTimestampsWatermarks

实现了这个功能。

源码

org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks

【Github】

packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;importjava.time.Duration;@PublicpublicclassAscendingTimestampsWatermarks<T>extendsBoundedOutOfOrdernessWatermarks<T>{/** Creates a new watermark generator with for ascending timestamps. */publicAscendingTimestampsWatermarks(){super(Duration.ofMillis(0));}}

在实现上,

AscendingTimestampsWatermarks

继承了

BoundedOutOfOrdernessWatermarks

,并将延迟指定为 0。


本文转载自: https://blog.csdn.net/Changxing_J/article/details/135896567
版权归原作者 长行 所有, 如有侵权,请联系我们删除。

“Flink 源码学习|Watermark 与 WatermarkGenerator”的评论:

还没有评论