0


大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink Time 详解
  • 示例内容分析
  • Watermark

在这里插入图片描述

Watermark

Watermark 在窗口计算中的作用

在使用基于事件时间的窗口时,Flink 依赖 Watermark 来决定何时触发窗口计算。例如,如果你有一个每 10 秒的滚动窗口,当 Watermark 达到某个窗口的结束时间后,Flink 才会触发该窗口的计算。

假设有一个 10 秒的窗口,并且 Watermark 达到 12:00:10,此时 Flink 会触发 12:00:00 - 12:00:10 的窗口计算。

如何处理迟到事件

尽管 Watermark 能有效解决乱序问题,但总有可能会出现事件在生成 Watermark 之后才到达的情况(即“迟到事件”)。为此,Flink 提供了处理迟到事件的机制:

  • 允许一定的延迟处理:可以配置窗口允许迟到的时间。
  • 迟到事件的侧输出流(Side Output):可以将迟到的事件发送到一个侧输出流中,以便后续处理。
DataStream<Tuple2<String,Integer>> mainStream = 
  stream.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(5)).sideOutputLateData(lateOutputTag);

代码实现

数据格式

01,1586489566000
01,1586489567000
01,1586489568000
01,1586489569000
01,1586489570000
01,1586489571000
01,1586489572000
01,1586489573000
01,1586489574000
01,1586489575000
01,1586489576000
01,1586489577000
01,1586489578000
01,1586489579000

编写代码

这段代码实现了:

  • 通过 socket 获取实时流数据。
  • 将流数据映射成带有时间戳的二元组形式。
  • 应用了一个允许 5 秒乱序的水印策略,确保 Flink 可以处理乱序的事件流。
  • 按照事件的 key 进行分组,并在事件时间的基础上进行 5 秒的滚动窗口计算。
  • 最后输出每个窗口内事件的时间范围、窗口开始和结束时间等信息。

其中,这里对流数据进行了按 key(事件的第一个字段)分组,并且使用了 滚动窗口(Tumbling Window),窗口长度为 5 秒。
在 apply 方法中,你收集窗口中的所有事件,并根据事件时间戳进行排序,然后输出每个窗口的开始和结束时间,以及窗口中最早和最晚事件的时间戳。

SingleOutputStreamOperator<String> res = waterMark
    .keyBy(newKeySelector<Tuple2<String,Long>,String>(){@OverridepublicStringgetKey(Tuple2<String,Long> value)throwsException{return value.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(newWindowFunction<Tuple2<String,Long>,String,String,TimeWindow>(){@Overridepublicvoidapply(String s,TimeWindow window,Iterable<Tuple2<String,Long>> input,Collector<String> out)throwsException{List<Long> list =newArrayList<>();for(Tuple2<String,Long> next : input){
                list.add(next.f1);}Collections.sort(list);SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result ="key: "+ s +", list.size(): "+ list.size()+", list.get(0): "+ sdf.format(list.get(0))+", list.get(last): "+ sdf.format(list.get(list.size()-1))+", start: "+ sdf.format(window.getStart())+", end: "+ sdf.format(window.getEnd());
            out.collect(result);}});

水印的策略,定义了一个Bounded Out-of-Orderness 的水印策略,允许最多 5 秒的事件乱序,在 extractTimestamp 中,提取了事件的时间戳,并打印出每个事件的 key 和对应的事件时间。还维护了一个 currentMaxTimestamp 来记录当前最大的事件时间戳:

WatermarkStrategy<Tuple2<String,Long>> watermarkStrategy =WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(newSerializableTimestampAssigner<Tuple2<String,Long>>(){Long currentMaxTimestamp =0L;finalSimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@OverridepubliclongextractTimestamp(Tuple2<String,Long> element,long recordTimestamp){long timestamp = element.f1;
            currentMaxTimestamp =Math.max(currentMaxTimestamp, timestamp);System.out.println("Key:"+ element.f0 +", EventTime: "+ element.f1 +", "+ format.format(element.f1));return element.f1;}});

完整代码如下所示,代码实现了一个基于事件时间的流处理系统,并通过水印(Watermark)机制来处理乱序事件:

packageicu.wzk;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importjava.text.SimpleDateFormat;importjava.time.Duration;importjava.util.ArrayList;importjava.util.Collections;importjava.util.List;publicclassWatermarkTest01{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<String> data = env.socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple2<String,Long>> mapped = data.map(newMapFunction<String,Tuple2<String,Long>>(){@OverridepublicTuple2<String,Long>map(String value)throwsException{String[] split = value.split(",");returnnewTuple2<>(split[0],Long.valueOf(split[1]));}});WatermarkStrategy<Tuple2<String,Long>> watermarkStrategy =WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(newSerializableTimestampAssigner<Tuple2<String,Long>>(){Long currentMaxTimestamp =0L;finalSimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@OverridepubliclongextractTimestamp(Tuple2<String,Long> element,long recordTimestamp){long timestamp = element.f1;
                        currentMaxTimestamp =Math.max(currentMaxTimestamp, timestamp);System.out.println("Key:"+ element.f0 +", EventTime: "+ element.f1 +", "+ format.format(element.f1));return element.f1;}});SingleOutputStreamOperator<Tuple2<String,Long>> waterMark = mapped
                .assignTimestampsAndWatermarks(watermarkStrategy);SingleOutputStreamOperator<String> res = waterMark
                .keyBy(newKeySelector<Tuple2<String,Long>,String>(){@OverridepublicStringgetKey(Tuple2<String,Long> value)throwsException{return value.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(newWindowFunction<Tuple2<String,Long>,String,String,TimeWindow>(){@Overridepublicvoidapply(String s,TimeWindow window,Iterable<Tuple2<String,Long>> input,Collector<String> out)throwsException{List<Long> list =newArrayList<>();for(Tuple2<String,Long> next : input){
                            list.add(next.f1);}Collections.sort(list);SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result ="key: "+ s +", list.size(): "+ list.size()+", list.get(0): "+ sdf.format(list.get(0))+", list.get(last): "+ sdf.format(list.get(list.size()-1))+", start: "+ sdf.format(window.getStart())+", end: "+ sdf.format(window.getEnd());
                        out.collect(result);}});

        res.print();
        env.execute();}}

运行代码

在这里插入图片描述

传入数据

在控制台中,输入如下的数据:
在这里插入图片描述

查看结果

控制台运行结果如下:
在这里插入图片描述

标签: 大数据 flink java

本文转载自: https://blog.csdn.net/w776341482/article/details/141950776
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window”的评论:

还没有评论