点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink Window 背景总览
- Flink Window 滚动时间窗口
- 基于时间驱动
- 基于事件驱动
滑动时间窗口
滑动窗口是固定窗口更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。Flink 的滑动时间窗口(Sliding Window)是一种常用的窗口机制,适用于处理流式数据时需要在时间范围内定期计算的场景。滑动窗口会按照指定的窗口大小(window size)和滑动步长(slide interval)不断地划分数据,并对每个窗口内的数据进行聚合计算。
类型特点
窗口长度固定,可以有重叠。
- 滑动窗口会有重叠部分,因此每个事件可能会被包含在多个窗口中。
- 滑动窗口更适合定期计算某个时间范围内的聚合值,像是移动平均值、最近一段时间的活跃用户等场景。
关键参数
- 窗口大小(window size):每个窗口包含的时间范围,例如 10 秒。
- 滑动步长(slide interval):窗口每次滑动的时间步长,例如 5 秒。这意味着每隔 5 秒就会创建一个新的窗口,每个窗口覆盖的时间范围是 10 秒。
基于时间驱动
场景:我们可以每30秒计算一次最近一分钟用户购买的商品数
packageicu.wzk;importorg.apache.commons.math3.analysis.function.Sin;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple1;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.datastream.WindowedStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importjava.text.SimpleDateFormat;importjava.util.Random;publicclassSlidingWindow{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = dataStreamSource
.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis =System.currentTimeMillis();int random =newRandom().nextInt(10);System.out.println("value: "+ value +", random: "+ random +", timestamp: "+ format.format(timeMillis));returnTuple2.of(value, random);}});KeyedStream<Tuple2<String,Integer>,Tuple> keyedStream = mapStream
.keyBy(newKeySelector<Tuple2<String,Integer>,Tuple>(){@OverridepublicTuplegetKey(Tuple2<String,Integer> value)throwsException{returnTuple1.of(value.f0);}});WindowedStream<Tuple2<String,Integer>,Tuple,TimeWindow> timeWindow = keyedStream
.timeWindow(Time.seconds(10),Time.seconds(5));
timeWindow.apply(newMyTimeWindowFunction()).print();
env.execute("SlidingWindow");}}
基于事件驱动
packageicu.wzk;importorg.apache.commons.math3.analysis.function.Sin;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple1;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.datastream.WindowedStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.GlobalWindow;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importjava.text.SimpleDateFormat;importjava.util.Random;publicclassSlidingWindow{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = dataStreamSource
.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{SimpleDateFormat format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis =System.currentTimeMillis();int random =newRandom().nextInt(10);System.out.println("value: "+ value +", random: "+ random +", timestamp: "+ format.format(timeMillis));returnTuple2.of(value, random);}});KeyedStream<Tuple2<String,Integer>,Tuple> keyedStream = mapStream
.keyBy(newKeySelector<Tuple2<String,Integer>,Tuple>(){@OverridepublicTuplegetKey(Tuple2<String,Integer> value)throwsException{returnTuple1.of(value.f0);}});WindowedStream<Tuple2<String,Integer>,Tuple,GlobalWindow> globalWindow = keyedStream
.countWindow(3,2);
globalWindow.apply(newMyCountWindowFuntion()).print();
env.execute("SlidingWindow");}}
会话窗口
由一系列事件组合一个指定时间长度timeout间隙组成,类似于Web应用的Session,也就是一段时间没有接收到新数据会生成新的窗口。
Session窗口分配器通过Session活动来对元素进行分组,Session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况。
Session窗口在一个固定的时间周期内不再收到元素,即非活动间隔产生,那么这个窗口就会关闭。
一个Session窗口通过一个Session间隔来配置,这个Session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的Session将关闭并且后续的元素将被分配到新的Session窗口去。
类型特点
- 会话窗口不重叠,没有固定的开始和结束时间
- 于翻滚窗口和滑动窗口相反,当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。
- 后续的元素将会被分配到新的会话窗口
基于时间驱动
packageicu.wzk;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.api.java.tuple.Tuple1;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.datastream.WindowedStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;publicclassSessionWindow{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = dataStreamSource
.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{returnnull;}});KeyedStream<Tuple2<String,Integer>,Tuple> keyedStream = mapStream
.keyBy(newKeySelector<Tuple2<String,Integer>,Tuple>(){@OverridepublicTuplegetKey(Tuple2<String,Integer> value)throwsException{returnTuple1.of(value.f0);}});WindowedStream<Tuple2<String,Integer>,Tuple,TimeWindow> window = keyedStream
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
window.apply(newMyTimeWindowFunction()).print();
env.execute("SessionWindow");}}
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。