上游文档:
- Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 生成 Watermark》学习笔记
- Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 内置 Watermark 生成器》学习笔记
- Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记
Watermark
Watermark 是在各个算子生成的、用于标记当前数据流事件时间的对象。当 Watermark 到达后,就意味着该数据流原则上将 不会 再到达比 Watermark 的事件时间更小的消息,即在 Watermark 后到达的事件时间更小的消息视作延迟消息。
首先,让我们来看一下
Watermark
类的源码。
源码:
org.apache.flink.api.common.eventtime.Watermark
【Github】
packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;importjava.io.Serializable;importjava.text.SimpleDateFormat;importjava.util.Date;@PublicpublicfinalclassWatermarkimplementsSerializable{privatestaticfinallong serialVersionUID =1L;/** Thread local formatter for stringifying the timestamps. */privatestaticfinalThreadLocal<SimpleDateFormat>TS_FORMATTER=ThreadLocal.withInitial(()->newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));// ------------------------------------------------------------------------/** The watermark that signifies end-of-event-time. */publicstaticfinalWatermarkMAX_WATERMARK=newWatermark(Long.MAX_VALUE);// ------------------------------------------------------------------------/** The timestamp of the watermark in milliseconds. */privatefinallong timestamp;/** Creates a new watermark with the given timestamp in milliseconds. */publicWatermark(long timestamp){this.timestamp = timestamp;}/** Returns the timestamp associated with this Watermark. */publiclonggetTimestamp(){return timestamp;}/**
* Formats the timestamp of this watermark, assuming it is a millisecond timestamp. The returned
* format is "yyyy-MM-dd HH:mm:ss.SSS".
*/publicStringgetFormattedTimestamp(){returnTS_FORMATTER.get().format(newDate(timestamp));}// ------------------------------------------------------------------------@Overridepublicbooleanequals(Object o){returnthis== o
|| o !=null&& o.getClass()==Watermark.class&&((Watermark) o).timestamp ==this.timestamp;}@OverridepublicinthashCode(){returnLong.hashCode(timestamp);}@OverridepublicStringtoString(){return"Watermark @ "+ timestamp +" ("+getFormattedTimestamp()+')';}}
可以看到,
Watermark
类主要就是用来存储当前 watermark 的毫秒级时间戳,具体地:
- 使用时间戳构造实例化
Watermark
对象,Watermark
对象在实例化后,不能修改其存储的时间戳 - 提供
long getTimestamp()
和String getFormattedTimestamp()
两种查询Watermark
对象时间戳的方法
WatermarkGenerator
接着,我们来看 watermark 的生成接口
WatermarkGenerator
的源码。
源码:
org.apache.flink.api.common.eventtime.WatermarkGenerator
【Github】
packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;importorg.apache.flink.api.common.ExecutionConfig;/**
* The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
* fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
* AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/@PublicpublicinterfaceWatermarkGenerator<T>{/**
* Called for every event, allows the watermark generator to examine and remember the event
* timestamps, or to emit a watermark based on the event itself.
*/voidonEvent(T event,long eventTimestamp,WatermarkOutput output);/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated depends on {@link
* ExecutionConfig#getAutoWatermarkInterval()}.
*/voidonPeriodicEmit(WatermarkOutput output);}
WatermarkGernator
接口,既可以基于消息,也可以基于周期。
WatermarkGenerator
接口有两个方法:
void onEvent(T event, long eventTimestamp, WatermarkOutput output)
:这个方法会在每个消息到达时被调用一次,其参数event
为消息本身,参数eventTimestamp
为消息的事件时间,output
为接收生成的 watermark 的对象。void (WatermarkOutput output)
:这个方法会被周期性地调用,其参数output
为接收生成的 watermark 的对象。
在实现
WatermarkGenerator
接口时,既可以在
onEvent
方法中生成 watermark,也可以在
onPeriodicEmit
方法中生成 watermark。因此,基于
WatermarkGenerator
接口,可以实现 标记生成 或 周期性生成 两种 watermark 生成器。
下面,我们来看 Flink 内置的几个 watermark 生成器。
Flink 内置的 watermark 生成器
在这里,我们仅介绍 Flink 的
flink-core
项目中如下内置的 watermark 生成器:
NoWatermarksGenerator
:不生成 watermark 的生成器BoundedOutOfOrdernessWatermarks
:固定延迟时间的周期性 watermark 生成器AscendingTimestampsWatermarks
:零延迟时间的周期性 watermark 生成器
不生成 watermark 的生成器:
NoWatermarksGenerator
最简单的,不生成任何 watermark 的生成器。在实现上,在
onEvent
方法和
onPeriodicEmit
方法中均不生成 watermark。
源码:
org.apache.flink.api.common.eventtime.NoWatermarksGenerator
【Github】
packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;@PublicpublicfinalclassNoWatermarksGenerator<E>implementsWatermarkGenerator<E>{@OverridepublicvoidonEvent(E event,long eventTimestamp,WatermarkOutput output){}@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){}}
固定延迟时间的周期性 watermark 生成器:
BoundedOutOfOrdernessWatermarks
当输入数据流中消息的事件时间不完全有序,但是对于绝大部分元素,滞后时间通常不会超过一个固定的时间长度时,我们可以通过在当前最大事件时间的基础上减去一个固定延迟时间,来生成 watermark。Flink 内置的 watermark 生成器
BoundedOutOfOrdernessWatermarks
实现了这种功能。
源码:
org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks
【Github】
packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;importjava.time.Duration;importstaticorg.apache.flink.util.Preconditions.checkArgument;importstaticorg.apache.flink.util.Preconditions.checkNotNull;@PublicpublicclassBoundedOutOfOrdernessWatermarks<T>implementsWatermarkGenerator<T>{/** The maximum timestamp encountered so far. */privatelong maxTimestamp;/** The maximum out-of-orderness that this watermark generator assumes. */privatefinallong outOfOrdernessMillis;/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/publicBoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness){checkNotNull(maxOutOfOrderness,"maxOutOfOrderness");checkArgument(!maxOutOfOrderness.isNegative(),"maxOutOfOrderness cannot be negative");this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();// start so that our lowest watermark would be Long.MIN_VALUE.this.maxTimestamp =Long.MIN_VALUE+ outOfOrdernessMillis +1;}// ------------------------------------------------------------------------@OverridepublicvoidonEvent(T event,long eventTimestamp,WatermarkOutput output){
maxTimestamp =Math.max(maxTimestamp, eventTimestamp);}@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){
output.emitWatermark(newWatermark(maxTimestamp - outOfOrdernessMillis -1));}}
可以看到,在
BoundedOutOfOrdernessWatermarks
类中:
- 使用固定的延迟时间
maxOutOfOrderness
来实例化 - 使用示例属性
maxTimestamp
存储当前所有消息的最大事件时间,当每个消息到达时,onEvent
方法被调用,并更新maxTimestamp
属性 - 周期性地生成 watermark,当
onPeriodicEmit
方法被周期性地调用时,会根据当前的最大事件时间以及固定延迟时间来生成 watermark
零延迟时间的周期性 watermark 生成器:
AscendingTimestampsWatermarks
当数据源中消息的事件时间单调递增时,当前事件时间(同时也是最大事件时间)就可以充当 watermark,因为后续到达的消息的事件时间一定不会比当前事件时间小。例如,当只读取一个 Kafka 分区,并使用 Kafka 的消息时间戳作为事件时间时,则可以保证事件时间的单调递增。
此时的 watermark 生成规则,就相当于是延迟为 0 的 “固定延迟时间的周期性生成器”。Flink 内置的 watermark 生成器
AscendingTimestampsWatermarks
实现了这个功能。
源码:
org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks
【Github】
packageorg.apache.flink.api.common.eventtime;importorg.apache.flink.annotation.Public;importjava.time.Duration;@PublicpublicclassAscendingTimestampsWatermarks<T>extendsBoundedOutOfOrdernessWatermarks<T>{/** Creates a new watermark generator with for ascending timestamps. */publicAscendingTimestampsWatermarks(){super(Duration.ofMillis(0));}}
在实现上,
AscendingTimestampsWatermarks
继承了
BoundedOutOfOrdernessWatermarks
,并将延迟指定为 0。
版权归原作者 长行 所有, 如有侵权,请联系我们删除。