WaterMark源码分析
1. waterMark 源码分析
flink的watermark(水位线)是针对EventTime的计算,它用来标识应该何时触发窗口计算,随着数据不断地进入flink程序,事件的EventTime也将慢慢增大,随之watermark水位线也将逐步上涨,当watermark上涨超过窗口的结束时间,将开始触发窗口计算。
本文主要分析一下watermark在flink内部的工作原理,包括watermark的产生、传播、触发计算的过程。
首先来看看WaterMark的结构,可以看到Watermark的结构非常简单,只有一个时间戳timestamp
publicfinalclassWatermarkimplementsSerializable{publicWatermark(long timestamp){this.timestamp = timestamp;}publiclonggetTimestamp(){return timestamp;}publicStringgetFormattedTimestamp(){returnTS_FORMATTER.get().format(newDate(timestamp));}}
1.1.1 设置 waterMark
source.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner());
首先来看assignTimestampsAndWatermarks()方法:
publicSingleOutputStreamOperator<T>assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy){finalWatermarkStrategy<T> cleanedStrategy =clean(watermarkStrategy);finalTimestampsAndWatermarksOperator<T> operator =newTimestampsAndWatermarksOperator<>(cleanedStrategy);// match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship and chainfinalint inputParallelism =getTransformation().getParallelism();returntransform("Timestamps/Watermarks",getTransformation().getOutputType(), operator).setParallelism(inputParallelism);}
可以看到assignTimestampsAndWatermarks()会产生一个新的DataStream,同时会创建一个TimestampsAndWatermarksOperator,看来这个方法和普通的map、filter算子是类似的。
下面附上该代码的执行计划图,可以看到TimestampsAndWatermarksOperator就是一个普通的operator,可以和filter/map operator进行chain。
1.1.2 打开WatermarksOperator
在Task启动时,会调用operator.open()方法,看看TimestampsAndWatermarksOperator的open方法干了啥
publicclassTimestampsAndWatermarksOperator<T>extendsAbstractStreamOperator<T>implementsOneInputStreamOperator<T,T>,ProcessingTimeCallback{privatestaticfinallong serialVersionUID =1L;privatefinalWatermarkStrategy<T> watermarkStrategy;/** The timestamp assigner. */privatetransientTimestampAssigner<T> timestampAssigner;/** The watermark generator, initialized during runtime. */privatetransientWatermarkGenerator<T> watermarkGenerator;/** The watermark output gateway, initialized during runtime. */privatetransientWatermarkOutput wmOutput;/** The interval (in milliseconds) for periodic watermark probes. Initialized during runtime. */privatetransientlong watermarkInterval;publicTimestampsAndWatermarksOperator(WatermarkStrategy<T> watermarkStrategy){this.watermarkStrategy =checkNotNull(watermarkStrategy);this.chainingStrategy =ChainingStrategy.ALWAYS;}@Overridepublicvoidopen()throwsException{super.open();
timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
watermarkGenerator = watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);// 实例化 WatermarkEmitter 发射器
wmOutput =newWatermarkEmitter(output,getContainingTask().getStreamStatusMaintainer());// 从配置中获取到 waterMark 生产间隔
watermarkInterval =getExecutionConfig().getAutoWatermarkInterval();if(watermarkInterval >0){// 根据处理时间注册一个 waterMark 生产间隔闹钟finallong now =getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval,this);}}// 每个数据过来都会调用这个方法@OverridepublicvoidprocessElement(finalStreamRecord<T> element)throwsException{finalT event = element.getValue();finallong previousTimestamp = element.hasTimestamp()? element.getTimestamp():Long.MIN_VALUE;finallong newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
element.setTimestamp(newTimestamp);
output.collect(element);// 这里生成 waterMark
watermarkGenerator.onEvent(event, newTimestamp, wmOutput);}@OverridepublicvoidonProcessingTime(long timestamp)throwsException{
watermarkGenerator.onPeriodicEmit(wmOutput);// 注册闹钟finallong now =getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval,this);}/**
* Override the base implementation to completely ignore watermarks propagated from
* upstream, except for the "end of time" watermark.
*/@OverridepublicvoidprocessWatermark(org.apache.flink.streaming.api.watermark.Watermark mark)throwsException{// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif(mark.getTimestamp()==Long.MAX_VALUE){
wmOutput.emitWatermark(Watermark.MAX_WATERMARK);}}@Overridepublicvoidclose()throwsException{super.close();
watermarkGenerator.onPeriodicEmit(wmOutput);}// ------------------------------------------------------------------------/**
* Implementation of the {@code WatermarkEmitter}, based on the components
* that are available inside a stream operator.
*/// Watermark 发送器privatestaticfinalclassWatermarkEmitterimplementsWatermarkOutput{privatefinalOutput<?> output;privatefinalStreamStatusMaintainer statusMaintainer;// 当前的 WaterMarkprivatelong currentWatermark;privateboolean idle;WatermarkEmitter(Output<?> output,StreamStatusMaintainer statusMaintainer){this.output = output;this.statusMaintainer = statusMaintainer;this.currentWatermark =Long.MIN_VALUE;}@OverridepublicvoidemitWatermark(Watermark watermark){finallong ts = watermark.getTimestamp();if(ts <= currentWatermark){return;}
currentWatermark = ts;if(idle){
idle =false;
statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);}// 将 waterMark 发送到下游
output.emitWatermark(neworg.apache.flink.streaming.api.watermark.Watermark(ts));}@OverridepublicvoidmarkIdle(){
idle =true;
statusMaintainer.toggleStreamStatus(StreamStatus.IDLE);}}}
每条数据来的时候,应该获取到最大的 timeStamp 作为 waterMark
publicclassBoundedOutOfOrdernessWatermarks<T>implementsWatermarkGenerator<T>{/** The maximum timestamp encountered so far. */privatelong maxTimestamp;/** The maximum out-of-orderness that this watermark generator assumes. */privatefinallong outOfOrdernessMillis;/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/publicBoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness){checkNotNull(maxOutOfOrderness,"maxOutOfOrderness");checkArgument(!maxOutOfOrderness.isNegative(),"maxOutOfOrderness cannot be negative");this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();// start so that our lowest watermark would be Long.MIN_VALUE.this.maxTimestamp =Long.MIN_VALUE+ outOfOrdernessMillis +1;}// ------------------------------------------------------------------------@OverridepublicvoidonEvent(T event,long eventTimestamp,WatermarkOutput output){// 获取最大的 timeStamp作为 WaterMark
maxTimestamp =Math.max(maxTimestamp, eventTimestamp);}@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){// 周期性将 WaterMark 发送出去
output.emitWatermark(newWatermark(maxTimestamp - outOfOrdernessMillis -1));}}
可以看到open()方法就是注册了一个定时任务,这个定时任务触发的间隔时间就是在程序里设置的
setAutoWatermarkInterval(interval)这个值,默认是200ms,到达时间之后将会触发target.onProcessingTime(timestamp)
// 当闹钟时间到达之后调用这个方法publicvoidonProcessingTime(long timestamp)throwsException{// 会调用这个方法
watermarkGenerator.onPeriodicEmit(wmOutput);finallong now =getProcessingTimeService().getCurrentProcessingTime();// 注册闹钟getProcessingTimeService().registerTimer(now + watermarkInterval,this);}
最终会调用到WatermarkEmitter发射器的 emitWatermark 方法,会通过 output.emitWatermark 讲当前的 WaterMark 发送出去
@OverridepublicvoidemitWatermark(Watermark watermark){finallong ts = watermark.getTimestamp();if(ts <= currentWatermark){return;}
currentWatermark = ts;if(idle){
idle =false;
statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);}
output.emitWatermark(neworg.apache.flink.streaming.api.watermark.Watermark(ts));}
1.1.3 发送 WaterMark
//CountingOutput类publicvoidemitWatermark(Watermark mark){
output.emitWatermark(mark);}//ChainingOutput类publicvoidemitWatermark(Watermark mark){try{// 设置当前 WaterMark
watermarkGauge.setCurrentWatermark(mark.getTimestamp());if(streamStatusProvider.getStreamStatus().isActive()){// 处理 WaterMark
operator.processWatermark(mark);}}catch(Exception e){thrownewExceptionInChainedOperatorException(e);}}//AbstractStreamOperator类publicvoidprocessWatermark(Watermark mark)throwsException{//mapOperator中timeServiceManager == nullif(timeServiceManager !=null){
timeServiceManager.advanceWatermark(mark);}
output.emitWatermark(mark);}//CountingOutput类publicvoidemitWatermark(Watermark mark){
output.emitWatermark(mark);}//RecordWriterOutput类publicvoidemitWatermark(Watermark mark){
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
serializationDelegate.setInstance(mark);if(streamStatusProvider.getStreamStatus().isActive()){try{// 将 waterMark 信息广播发送
recordWriter.broadcastEmit(serializationDelegate);}catch(Exception e){thrownewRuntimeException(e.getMessage(), e);}}}//RecordWriter类publicvoidbroadcastEmit(T record)throwsIOException,InterruptedException{checkErroneous();
serializer.serializeRecord(record);boolean pruneAfterCopying =false;//这里是将watermark广播出去,每一个下游通道都要发送,也就是说会发送到下游的每一个taskfor(int channel : broadcastChannels){if(copyFromSerializerToTargetChannel(channel)){
pruneAfterCopying =true;}}// Make sure we don't hold onto the large intermediate serialization buffer for too longif(pruneAfterCopying){
serializer.prune();}}
上述代码说明了watermark在本例中的代码调用过程。
在本例中,watermarkOperator和map是chain在一起的,watermarkOperator会将watermark直接传递给map,mapOperator会调用processWatermark()方法来处理watermark,在mapOperator中timeServiceManager == null,所以mapOperator对watermark不做任务处理,而是直接将其送达出去。
mapOperator持有的Output是RecordWriterOutput,RecordWriterOutput它会通过RecordWriter将watermark广播到下游的所有通道,即发送给下游的所有task。也就是说,上游的一个task在更新了watermark之后,会将watermark广播给下游的所有task。
3.1.4 下游Task接收并处理watermark
下游task是一个OneInputStreamTask,通过数据处理器StreamInputProcessor.processInput()来处理接收到的数据信息
@OverridepublicInputStatusprocessInput()throwsException{// 发送下一个InputStatus status = input.emitNext(output);if(status ==InputStatus.END_OF_INPUT){
operatorChain.endHeadOperatorInput(1);}return status;}// StreamTaskNetworkInput 类@OverridepublicInputStatusemitNext(DataOutput<T> output)throwsException{while(true){// get the stream element from the deserializerif(currentRecordDeserializer !=null){DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if(result.isBufferConsumed()){
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer =null;}if(result.isFullRecord()){// 处理数据processElement(deserializationDelegate.getInstance(), output);returnInputStatus.MORE_AVAILABLE;}}Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();if(bufferOrEvent.isPresent()){// return to the mailbox after receiving a checkpoint barrier to avoid processing of// data after the barrier before checkpoint is performed for unaligned checkpoint modeif(bufferOrEvent.get().isEvent()&& bufferOrEvent.get().getEvent()instanceofCheckpointBarrier){returnInputStatus.MORE_AVAILABLE;}processBufferOrEvent(bufferOrEvent.get());}else{if(checkpointedInputGate.isFinished()){checkState(checkpointedInputGate.getAvailableFuture().isDone(),"Finished BarrierHandler should be available");returnInputStatus.END_OF_INPUT;}returnInputStatus.NOTHING_AVAILABLE;}}}privatevoidprocessElement(StreamElement recordOrMark,DataOutput<T> output)throwsException{// 如果是一个数据,那么通过 output 往下游发送数据if(recordOrMark.isRecord()){
output.emitRecord(recordOrMark.asRecord());}// 如果是一个 waterMark,那么通过 output 往下游发送 waterMarkelseif(recordOrMark.isWatermark()){
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);}elseif(recordOrMark.isLatencyMarker()){
output.emitLatencyMarker(recordOrMark.asLatencyMarker());}elseif(recordOrMark.isStreamStatus()){
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);}else{thrownewUnsupportedOperationException("Unknown type of StreamElement");}}
StreamInputProcessor会调用statusWatermarkValve.inputWatermark()来处理接收到watermark信息。看下代码:
publicvoidinputWatermark(Watermark watermark,int channelIndex){// ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).if(lastOutputStreamStatus.isActive()&& channelStatuses[channelIndex].streamStatus.isActive()){long watermarkMillis = watermark.getTimestamp();// if the input watermark's value is less than the last received watermark for its input channel, ignore it also.//channelStatuses是当前task对每个inputChannel(也就是每个上游task)的状态信息记录,//当新的watermark值大于inputChannel的watermark,就会进行调整if(watermarkMillis > channelStatuses[channelIndex].watermark){
channelStatuses[channelIndex].watermark = watermarkMillis;// previously unaligned input channels are now aligned if its watermark has caught upif(!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark){
channelStatuses[channelIndex].isWatermarkAligned =true;}// now, attempt to find a new min watermark across all aligned channels//从各个inputChannel的watermark里找到最小的的watermark进行处理findAndOutputNewMinWatermarkAcrossAlignedChannels();}}}privatevoidfindAndOutputNewMinWatermarkAcrossAlignedChannels(){long newMinWatermark =Long.MAX_VALUE;boolean hasAlignedChannels =false;// determine new overall watermark by considering only watermark-aligned channels across all channels//从所有的inputChannels的watermark里找到最小的的watermarkfor(InputChannelStatus channelStatus : channelStatuses){if(channelStatus.isWatermarkAligned){
hasAlignedChannels =true;
newMinWatermark =Math.min(channelStatus.watermark, newMinWatermark);}}// we acknowledge and output the new overall watermark if it really is aggregated// from some remaining aligned channel, and is also larger than the last output watermark//如果最小的watermark大于之前发送过的watermark,则调用outputHandler进行处理if(hasAlignedChannels && newMinWatermark > lastOutputWatermark){
lastOutputWatermark = newMinWatermark;
outputHandler.handleWatermark(newWatermark(lastOutputWatermark));}}
上述代码的大致实现是,当上游一个task将watermark广播到下游的所有channel(可以理解成下游所有task)之后,下游的task会更新对上游inputChannel记录状态信息中的watermark值,下游每个task都记录这上游所有task的状态值。然后下游task再从所有上游inputChannel(即上游所有task)中选出一个最小值的watermark,如果这个watermark大于最近已经发送的watermark,那么就调用outputHandler对新watermark进行处理。一般情况下,这个outputHandler就是ForwardingValveOutputHandler。
版权归原作者 mn_kw 所有, 如有侵权,请联系我们删除。