一、watermark如何生成
punctuated:每条数据后都会插入当前事件时间解析出来的watermark
periodic:周期性生成,默认是200m生成一个watermark
在新版本中punctuated已经被标记为过时(当前版本1.18.1)
DataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(newSerializableTimestampAssigner<String>(){@OverridepubliclongextractTimestamp(Order element,long recordTimestamp){return element.getTs();}}));
watermark的构造:
1.forMontonousTimestamps:时间戳单调递增策略
2.forBoundedOutOfOrderness:为乱序数据创建水位线策略
3.forGenerator:自定义策略
在assignTimestampsAndWatermarks<>传入的参数是WaterMarkStrategy匿名内部类进入查看水位线生成策略WaterMarkStrategy接口 包含时间分配器TimestampAssigner和WatermarkGenerator水位线生成器
publicinterfaceWatermarkStrategy<T>extendsTimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);// 负责从数据元素中提取时间戳,并分配给元素 时间戳分配器是生成水位线的基础@OverridedefaultTimestampAssigner<T>createTimestampAssigner(TimestampAssignerSupplier.Context context){returnnewRecordTimestampAssigner<>();}// 内置谁水位线 单调递增 对于有序流 其实是乱序流的一种特殊情况 里边设置的延迟等待时间是0Lstatic<T>WatermarkStrategy<T>forMonotonousTimestamps(){return(ctx)->newAscendingTimestampsWatermarks<>();}// 乱序流 设置等待时间 水位线策略 maxOutOfOrderness最大乱序时间static<T>WatermarkStrategy<T>forBoundedOutOfOrderness(Duration maxOutOfOrderness){return(ctx)->newBoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);}// 自定义水位线策略 其实就是继承WatermarkGeneratorstatic<T>WatermarkStrategy<T>forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier){return generatorSupplier::createWatermarkGenerator;}static<T>WatermarkStrategy<T>noWatermarks(){return(ctx)->newNoWatermarksGenerator<>();}}
watermark的生成策略 WatermarkGenerator接口:
publicinterfaceWatermarkGenerator<T>{// 数据来一条调用一次voidonEvent(T event,long eventTimestamp,WatermarkOutput output);// 定时调用,默认是200msvoidonPeriodicEmit(WatermarkOutput output);}
以forBoundedOutOfOrderness的实现为例
publicclassBoundedOutOfOrdernessWatermarks<T>implementsWatermarkGenerator<T>{// 接收到的最大时间戳privatelong maxTimestamp;// 乱序程度 代码中手动设置的privatefinallong outOfOrdernessMillis;publicBoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness){checkNotNull(maxOutOfOrderness,"maxOutOfOrderness");checkArgument(!maxOutOfOrderness.isNegative(),"maxOutOfOrderness cannot be negative");this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();// 默认值Long的最小值 + 乱序程度 +1 this.maxTimestamp =Long.MIN_VALUE+ outOfOrdernessMillis +1;}@OverridepublicvoidonEvent(T event,long eventTimestamp,WatermarkOutput output){// 来一条数据处理一条 每条数据都会更新 maxTimestamp
maxTimestamp =Math.max(maxTimestamp, eventTimestamp);}// 发送watermark的逻辑@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){
output.emitWatermark(// maxTimestamp - outOfOrdernessMillis(为了修正乱序数据) 减1ms是为了后续开窗,保证窗口是左闭右开的状态。保证窗口关闭时数据只会落在一个窗口区间newWatermark(maxTimestamp - outOfOrdernessMillis -1));}}
二、水位线的在DataStream API中的处理逻辑
publicSingleOutputStreamOperator<T>assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy){finalWatermarkStrategy<T> cleanedStrategy =clean(watermarkStrategy);finalint inputParallelism =getTransformation().getParallelism();finalTimestampsAndWatermarksTransformation<T> transformation =newTimestampsAndWatermarksTransformation<>("Timestamps/Watermarks",
inputParallelism,getTransformation(),
cleanedStrategy,false);getExecutionEnvironment().addOperator(transformation);returnnewSingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);}
assignTimestampsAndWatermarks本质上就是DataStream中的一个算子
将用户自定义的操作封装到Operator中 再将Operator封装进Transformation ,最后将Transformation添加到env的集合中去。并且 new Transformation 都有与之对应的TransformationOperator 上述代码 new TimestampsAndWatermarksTransformation() 因此去查看TimestampsAndWatermarksOperator
// TimestampsAndWatermarksOperator类中的open()@Overridepublicvoidopen()throwsException{super.open();// 初始化用户定义的水印生成逻辑,如果需要定时发送水印会注册一个定时器
timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
watermarkGenerator =
emitProgressiveWatermarks
? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup):newNoWatermarksGenerator<>();
wmOutput =newWatermarkEmitter(output);
watermarkInterval =getExecutionConfig().getAutoWatermarkInterval();if(watermarkInterval >0&& emitProgressiveWatermarks){finallong now =getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval,this);}}
首先从配置获取定时生成watermark间隔参数并创建当前时间(处理时间) + 间隔的定时器定义了第一个watermark是如何生成的。定时器会自动执行onProcessingTime()
// 先调用用户定义的onPeriodicEmit()发送水印,然后获取当前时间,最后注册当前时间加水位线电视发送间隔的定时器触发,等待下次触发该方法@OverridepublicvoidonProcessingTime(long timestamp)throwsException{// 这里调用发送一次watermark,随后再次创建下一次的定时器,作为一个算子肯定会接收处理数据 那么肯定会存在processElement()方法
watermarkGenerator.onPeriodicEmit(wmOutput);finallong now =getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval,this);}
// 当元素到达算子后会调用processElements()@OverridepublicvoidprocessElement(finalStreamRecord<T> element)throwsException{/**
对数据的处理逻辑什么都不做直接像下游发送,然后调用onEvent记录最大时间戳 其实就是flink先发送数据在生成watermark watermark在生成他的数据之后
*/finalT event = element.getValue();// 如果元素已经被注册了时间,就直接获取或者设置为Long.MIN_VALUEfinallong previousTimestamp =
element.hasTimestamp()? element.getTimestamp():Long.MIN_VALUE;// 从数据中提取时间戳再将时间写入元素中finallong newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
element.setTimestamp(newTimestamp);
output.collect(element);// 调用用户定义的onEvent()根据用户的逻辑选择刷新水印以及是否发送水印
watermarkGenerator.onEvent(event, newTimestamp, wmOutput);}
watermark生成器本质上就是一个算子,在生命周期方法Open()中会注册定时器,并在定时器中发送记录最大时间戳的watermark并继续注册定时器,算子对业务数据不做任务处理直接发送给下游后记录当前数据的时间与记录的最大时间作比较。
三、watermark的传递规则
Watermark继承自StreamElement
@PublicEvolvingpublicfinalclassWatermarkextendsStreamElement{publicstaticfinalWatermarkMAX_WATERMARK=newWatermark(Long.MAX_VALUE);publicstaticfinalWatermarkUNINITIALIZED=newWatermark(Long.MIN_VALUE);publicWatermark(long timestamp){this.timestamp = timestamp;}publiclonggetTimestamp(){return timestamp;}@Overridepublicbooleanequals(Object o){returnthis== o
|| o !=null&& o.getClass()==Watermark.class&&((Watermark) o).timestamp == timestamp;}@OverridepublicinthashCode(){return(int)(timestamp ^(timestamp >>>32));}@OverridepublicStringtoString(){return"Watermark @ "+ timestamp;}}
StreamElement用于算子间的数据流动 不包含checkpoint的barrier 分别有四个子类
1.StreamRecord:业务数据
2.Watermark:用于表示事件时间的特殊数据
3.LatencyMarker:特殊记录数据,记录创建时间,算子id,subtask编号
4.WatermarkStatus:用于标记是否为空闲流,即IDLE和ACTIVE
watermark的处理逻辑:
当算子接收到watermark时首先会对其进行操作并发送接收到最小的watermark到下游,在多并行下传递watermark发送接收到最小的那个
版权归原作者 前兄如后背 所有, 如有侵权,请联系我们删除。