0


大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink DataSet
  • Flink DataSet 转换操作
  • Flink DataSet 输出
  • 容错机制、对比、发展方向

在这里插入图片描述

Flink Window 背景

Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。

通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。
DataStreamAPI提供了Time和Count的Window。

Flink Window 总览

基本概念

  • Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。
  • Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。
  • 而Window窗口是从Streaming到Batch的一个桥梁。
  • Flink提供了非常完善的窗口机制
  • 在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。
  • 当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页
  • 在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算
  • 窗口可以基于时间驱动、也可以基于事件驱动
  • 同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口
  • Flink要操作窗口,先要将StreamSource转换成WindowedStream

转换步骤

  • 获取流数据源
  • 获取窗口
  • 操作窗口数据
  • 输出窗口数据

滚动时间窗口

在这里插入图片描述

类型特点

将数据依据固定的窗口长度对数据进行切分:

  • 时间对齐
  • 窗口长度固定,没有重叠

Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。

在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。

关键点

  • 事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。
  • 窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。
  • 触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。

基于时间驱动

场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。
启动的主类:

packageicu.wzk;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple1;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.datastream.WindowedStream;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importjava.text.SimpleDateFormat;importjava.util.Random;publicclassTumblingWindow{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = dataStreamSource
                .map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis =System.currentTimeMillis();int random =newRandom().nextInt(10);System.out.println("value: "+ value +", random: "+ random +", time: "+ format.format(timeMillis));returnTuple2.of(value, random);}});KeyedStream<Tuple2<String,Integer>,Tuple> keyedStream = mapStream
                .keyBy(newKeySelector<Tuple2<String,Integer>,Tuple>(){@OverridepublicTuplegetKey(Tuple2<String,Integer> value)throwsException{returnTuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String,Integer>,Tuple,TimeWindow> timeWindow = keyedStream
                .timeWindow(Time.seconds(10));
        timeWindow.apply(newMyTimeWindowFunction()).print();
        env.execute("TumblingWindow");}}

我们实现一个 MyTimeWindowFunction,滚动时间窗口:

packageicu.wzk;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importjava.text.SimpleDateFormat;publicclassMyTimeWindowFunctionimplementsWindowFunction<Tuple2<String,Integer>,String,Tuple,TimeWindow>{/**
     * 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)
     * @author wzk
     * @date 16:58 2024/7/26

    **/@Overridepublicvoidapply(Tuple tuple,TimeWindow window,Iterable<Tuple2<String,Integer>> input,Collector<String> out)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum =0;for(Tuple2<String,Integer> tuple2 : input){
            sum += tuple2.f1;}
        out.collect("key: "+ tuple.getField(0)+", value: "+ sum  +", window start: "+ format.format(window.getStart())+", window end: "+ format.format(window.getEnd()));}}

基于事件驱动

场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
编写一个启动类:

packageicu.wzk;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple1;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.datastream.WindowedStream;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.GlobalWindow;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importjava.text.SimpleDateFormat;importjava.util.Random;publicclassTumblingWindow{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = dataStreamSource
                .map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis =System.currentTimeMillis();int random =newRandom().nextInt(10);System.out.println("value: "+ value +", random: "+ random +", time: "+ format.format(timeMillis));returnTuple2.of(value, random);}});KeyedStream<Tuple2<String,Integer>,Tuple> keyedStream = mapStream
                .keyBy(newKeySelector<Tuple2<String,Integer>,Tuple>(){@OverridepublicTuplegetKey(Tuple2<String,Integer> value)throwsException{returnTuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String,Integer>,Tuple,GlobalWindow> globalWindow = keyedStream
                .countWindow(3);
        globalWindow.apply(newMyCountWindowFuntion());
        env.execute("TumblingWindow");}}

编写一个事件驱动的类:MyCountWindowFuntion

packageicu.wzk;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.windows.GlobalWindow;importorg.apache.flink.util.Collector;importjava.text.SimpleDateFormat;publicclassMyCountWindowFuntionimplementsWindowFunction<Tuple2<String,Integer>,String,Tuple,GlobalWindow>{/**
     * 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
     * @author wzk
     * @date 17:11 2024/7/26
    **/@Overridepublicvoidapply(Tuple tuple,GlobalWindow window,Iterable<Tuple2<String,Integer>> input,Collector<String> out)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum =0;for(Tuple2<String,Integer> tuple2 : input){
            sum += tuple2.f1;}// 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间long maxTimestamp = window.maxTimestamp();
        out.collect("key:"+ tuple.getField(0)+", value: "+ sum +", maxTimestamp :"+ maxTimestamp +","+ format.format(maxTimestamp));}}
标签: 大数据 flink 架构

本文转载自: https://blog.csdn.net/w776341482/article/details/141915594
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动”的评论:

还没有评论