点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink DataSet
- Flink DataSet 转换操作
- Flink DataSet 输出
- 容错机制、对比、发展方向
Flink Window 背景
Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。
通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。
DataStreamAPI提供了Time和Count的Window。
Flink Window 总览
基本概念
- Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。
- Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。
- 而Window窗口是从Streaming到Batch的一个桥梁。
- Flink提供了非常完善的窗口机制
- 在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。
- 当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页
- 在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算
- 窗口可以基于时间驱动、也可以基于事件驱动
- 同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口
- Flink要操作窗口,先要将StreamSource转换成WindowedStream
转换步骤
- 获取流数据源
- 获取窗口
- 操作窗口数据
- 输出窗口数据
滚动时间窗口
类型特点
将数据依据固定的窗口长度对数据进行切分:
- 时间对齐
- 窗口长度固定,没有重叠
Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。
在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。
关键点
- 事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。
- 窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。
- 触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。
基于时间驱动
场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。
启动的主类:
packageicu.wzk;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple1;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.datastream.WindowedStream;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importjava.text.SimpleDateFormat;importjava.util.Random;publicclassTumblingWindow{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = dataStreamSource
.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis =System.currentTimeMillis();int random =newRandom().nextInt(10);System.out.println("value: "+ value +", random: "+ random +", time: "+ format.format(timeMillis));returnTuple2.of(value, random);}});KeyedStream<Tuple2<String,Integer>,Tuple> keyedStream = mapStream
.keyBy(newKeySelector<Tuple2<String,Integer>,Tuple>(){@OverridepublicTuplegetKey(Tuple2<String,Integer> value)throwsException{returnTuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String,Integer>,Tuple,TimeWindow> timeWindow = keyedStream
.timeWindow(Time.seconds(10));
timeWindow.apply(newMyTimeWindowFunction()).print();
env.execute("TumblingWindow");}}
我们实现一个 MyTimeWindowFunction,滚动时间窗口:
packageicu.wzk;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importjava.text.SimpleDateFormat;publicclassMyTimeWindowFunctionimplementsWindowFunction<Tuple2<String,Integer>,String,Tuple,TimeWindow>{/**
* 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)
* @author wzk
* @date 16:58 2024/7/26
**/@Overridepublicvoidapply(Tuple tuple,TimeWindow window,Iterable<Tuple2<String,Integer>> input,Collector<String> out)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum =0;for(Tuple2<String,Integer> tuple2 : input){
sum += tuple2.f1;}
out.collect("key: "+ tuple.getField(0)+", value: "+ sum +", window start: "+ format.format(window.getStart())+", window end: "+ format.format(window.getEnd()));}}
基于事件驱动
场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
编写一个启动类:
packageicu.wzk;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple1;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.datastream.WindowedStream;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.GlobalWindow;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importjava.text.SimpleDateFormat;importjava.util.Random;publicclassTumblingWindow{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = dataStreamSource
.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis =System.currentTimeMillis();int random =newRandom().nextInt(10);System.out.println("value: "+ value +", random: "+ random +", time: "+ format.format(timeMillis));returnTuple2.of(value, random);}});KeyedStream<Tuple2<String,Integer>,Tuple> keyedStream = mapStream
.keyBy(newKeySelector<Tuple2<String,Integer>,Tuple>(){@OverridepublicTuplegetKey(Tuple2<String,Integer> value)throwsException{returnTuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String,Integer>,Tuple,GlobalWindow> globalWindow = keyedStream
.countWindow(3);
globalWindow.apply(newMyCountWindowFuntion());
env.execute("TumblingWindow");}}
编写一个事件驱动的类:MyCountWindowFuntion
packageicu.wzk;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.windows.GlobalWindow;importorg.apache.flink.util.Collector;importjava.text.SimpleDateFormat;publicclassMyCountWindowFuntionimplementsWindowFunction<Tuple2<String,Integer>,String,Tuple,GlobalWindow>{/**
* 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
* @author wzk
* @date 17:11 2024/7/26
**/@Overridepublicvoidapply(Tuple tuple,GlobalWindow window,Iterable<Tuple2<String,Integer>> input,Collector<String> out)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum =0;for(Tuple2<String,Integer> tuple2 : input){
sum += tuple2.f1;}// 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间long maxTimestamp = window.maxTimestamp();
out.collect("key:"+ tuple.getField(0)+", value: "+ sum +", maxTimestamp :"+ maxTimestamp +","+ format.format(maxTimestamp));}}
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。