1.Flink窗口API
1.1分为 按键分区(Keyed)和非按键分区(Non-Keyed)
1.1.1按键分区窗口(Keyed Windows)
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。
每个key上都定义了一组窗口,各自独立地进行统计计算。
需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...)
.window(...)
1.1.2非按键分区(Non-Keyed Windows)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。
**没有keyby的窗口: 窗口内的所有数据进入同一个子任务,并行度值只能为1**
stream.windowAll(...)
1.2窗口API的调用
窗口操作主要有两个部分 : 窗口分配器(Window Assigners) 和 窗口函数(Window Functions)
stream.keyBy(<key selector>)
.window(<window assigner>) .aggregate(<window function>)
- keyby()指定一个分区,每个key都会单独进行处理
- window()传入一个窗口分配器,指定窗口类型
- aggregate()定义窗口具体的处理逻辑
2.窗口分配器
定义窗口分配器,调用window()方法。
传入一个WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。
窗口按照驱动类型可以分成时间窗口和计数窗口
1.2.1时间窗口
时间窗口又可以分为滚动、滑动和会话三种
(1)滚动处理时间窗口
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(...)
- TumblingProcessingTimeWindows 创建滚动窗口
- of方法中传入一个time类型的参数size,表示滚动窗口的大小为5秒
- of方法中传入两个time类型的参数 of(size,offset)表示(滚动窗口的大小,步长)
(2)滑动处理时间窗口
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .aggregate(...)
- SlidingProcessingTimeWindows 创建滑动窗口
- of()方法需要传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。
- 这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。
(3)处理时间会话窗口
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)
- ProcessingTimeSessionWindows 创建会话窗口。
- withGap()方法传入一个Time类型的参数,表示会话的超时时间,也就是最小间隔。
- 这里创建了静态会话超时时间为10秒的会话窗口。
(4)滚动事件时间窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...)
- TumblingEventTimeWindows 创建滚动窗口
- of方法中传入一个time类型的参数size,表示滚动窗口的大小为5秒。
(5)滑动事件时间窗口
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))) .aggregate(...)
- SlidingEventTimeWindows 创建滑动窗口
- 这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。
(6)事件时间会话窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)
- EventTimeSessionWindows会话窗口
- 这里创建了一个长度为10秒的会话窗口
1.2.2 计数窗口
基于全局窗口(Global Window)实现,直接调用countWindow()方法
(1)滚动计数窗口
传入一个长整型的参数表示窗口的大小
stream.keyBy(...)
.countWindow(10)
此处定义一个长度为10的滚动计数窗口,当窗口元素数量达到10时,触发计算执行并关闭窗口。
(2)滑动计数窗口
stream.keyBy(...)
.countWindow(10,3)
countWindow(窗口大小,滑动步长)
此处创建的窗口长度为10,步长为3,每个窗口统计10个数据,每隔3个输出一次结果
1.2.3 全局窗口
全局窗口一般在自定义窗口时使用
stream.keyBy(...)
.window(GlobalWindows.create());
GlobalWindows.create() 使用全局窗口,必须自定义触发器才能实现窗口计算。
3.窗口函数
窗口后续的计算操作就是窗口函数
根据处理方式分为增量聚合函数和全窗口函数
3.1增量聚合函数( ReduceFunction / AggregateFunction )
将收集起来的数据进行聚合
ReduceFunction 和 AggregateFunction
归约函数( ReduceFunction)
ReduceFunction必须输入输出的类型一样
public interface ReduceFunction<T> extends Function, Serializable { T reduce(T value1, T value2) throws Exception; } // 示例: 获取一段时间内每个用户浏览的商品的最大价值的那条记录(ReduceFunction) kafkaStream // 将从Kafka获取的JSON数据解析成Java Bean .process(new KafkaProcessFunction()) // 提取时间戳生成水印 .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds))) // 按用户分组 .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID) // 构造TimeWindow .timeWindow(Time.seconds(windowLengthSeconds)) // 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录 .reduce(new ReduceFunction<UserActionLog>() { @Override public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception { return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2; } }) .print();
结果
UserActionLog{userID='user_4', eventTime='2019-11-09 12:51:25', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_2', eventTime='2019-11-09 12:51:29', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-09 12:51:22', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_5', eventTime='2019-11-09 12:51:21', eventType='browse', productID='product_3', productPrice=30}
聚合函数( AggregateFunction )
AggregateFunction 不需要输入输出类型一样 , 需要传入一个AggregateFunction的实现类
参数类型:AggregateFunction接口。该接口的继承关系和方法如下:
接口中有四个方法:
- createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- add():将给定的输入添加到给定的累加器,并返回新的累加器值。
- getResult():从累加器中提取聚合的输出结果。
- merge():合并两个累加器,并将合并后的状态作为一个累加器返回。
首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。
3.2全窗口函数( full window functions )
全窗口函数有两种:WindowFunction 和 ProcessWindowFunction
(1)窗口函数
通过WindowFunction调用 apply ( ) 方法,传入一个WindowFunction的实现类
stream
.keyBy(<key selector>) .window(<window assigner>) .apply(new MyWindowFunction());
(2)处理窗口函数
ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。
触发器 (Trigger)
控制窗口什么时候触发计算。
通过 WindowedStream 调用 trigger ( ) 方法 传入一个自定义的窗口触发器
stream.keyBy(...)
.window(...) .trigger(new MyTrigger())
移除器(Evictor)
定义移除某些数据的逻辑
通过 WindowedStream 调用 evictor ( ) 方法 传入自定义的移除器
stream.keyBy(...)
.window(...) .evictor(new MyEvictor())
版权归原作者 内向仓鼠 所有, 如有侵权,请联系我们删除。