0


【大数据面试题】005 谈一谈 Flink Watermark 水印

一步一个脚印,一天一道面试题。

感觉我现在很难把水印描述的很好,但,完成比完美更重要。后续我再补充。各位如果有什么建议或补充也欢迎留言。(已更新2)

在实时处理任务时,由于网络延迟,人工异常,各种问题,数据往往会出现乱序,不按照我们的预期到达处理框架。
WaterMark 水印,就是为了一定程度的解决数据,延迟乱序问题的。

使用 WaterMark 一般有以下几个步骤:

  • 定义时间特性 (Flink 1.12 已废弃,默认使用 事件时间)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • 设置 Watermark 策略,赋值事件时间
// 分配时间戳和水位线DataStream<Tuple2<Long,Integer>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Long,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp)-> element.f0));
水位线特性
  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊数据。
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的。
  • 水位线必须单调递增,以确保任务的时间时间时钟一直向前推进。
  • 水位线可以设置延迟,来尽量保证正确处理乱序数据。
  • 一个水位线 Watermark (t), 表示在当前流中事件时间已经达到了时间戳 t,这代表 t 之前的所有数据都到齐了,之后不会出现在时间戳 (t) 之前的数据。出现了在 t 之前的数据就会被抛弃不处理。

话不多说,直接给个 Watermark 水印样例代码。
代码逻辑是取一段时间内最大值。

Watermark 等待时间为 3 秒
滚动窗口是事件时间的 [0, 10) 秒;

输入事件时间:            对应的 watermark:1-22-19610737(watermark 单增,不会倒退的)1310

事件时间 = 13 时,watermark 到了10, 开始触发 [0, 10)区间的窗口
[0,10)的窗口被触发时, 会将时间为 1,2,9,10 的数据放入窗口做计算

publicclassSimpleWatermarkExample{publicstaticvoidmain(String[] args)throwsException{// 设置流执行环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 从 socket 文本流接收数据DataStream<String> input = env.addSource(newSocketTextStreamFunction("localhost",9999,"\n",-1));// 解析输入的数据DataStream<Tuple2<Long,Integer>> parsedStream = input
                .map(newMapFunction<String,Tuple2<Long,Integer>>(){@OverridepublicTuple2<Long,Integer>map(String value)throwsException{String[] parts = value.split(",");returnnewTuple2<>(Long.parseLong(parts[0]),Integer.parseInt(parts[1]));}});// 分配时间戳和水位线DataStream<Tuple2<Long,Integer>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Long,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp)-> element.f0));// 使用窗口函数统计每10秒内的最大值DataStream<String> maxValues = withTimestampsAndWatermarks
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).apply(newWindowFunction<Tuple2<Long,Integer>,String,TimeWindow>(){@Overridepublicvoidapply(TimeWindow window,Iterable<Tuple2<Long,Integer>> values,Collector<String> out)throwsException{int maxValue =Integer.MIN_VALUE;for(Tuple2<Long,Integer> value : values){
                            maxValue =Math.max(maxValue, value.f1);}
                        out.collect("Window: "+ window +" Max Value: "+ maxValue);}});// 打印结果
        maxValues.print();// 执行程序
        env.execute("Simple Flink Watermark Example");}}
标签: 大数据 flink

本文转载自: https://blog.csdn.net/Jiweilai1/article/details/136066918
版权归原作者 Jiweilai1 所有, 如有侵权,请联系我们删除。

“【大数据面试题】005 谈一谈 Flink Watermark 水印”的评论:

还没有评论