一、窗口概念
窗口:将无限数据切割成有限的“数据块”进行处理,窗口是处理无界流的核心。
窗口更像一个“桶”,将流切割成有限大小的多个存储桶,每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
- 动态创建:当有落在这个窗口区间范围的数据到达时,才创建对应的窗口
- 窗口关闭:到达窗口结束时间时,窗口就触发计算并关闭
二、窗口的分类
按照驱动类型分类
常见的窗口类型有时间窗口和计数窗口
(1)时间窗口 Time Window
时间窗口以时间点来定义窗口的开始(start)和结束(end)、所以截取出来的就是某一时间段的数据、到达结束时间时、窗口不再收集数据、触发计算输出结果、并将窗口关闭。
用结束时间减去开始时间、得到这段时间的长度、就是窗口的大小(window size)、时间可以有不同的语义、所以我们定义处理时间窗口和事件时间窗口。
(2)计数窗口 Count Window
计数窗口相比于时间窗口更简单、只需要指定窗口的大小、就可以把数据分配到对应的窗口。
按照窗口分配数据的规则分类
时间窗口和计数窗口、只是对窗口的一个大致划分、根据分配数据的规则、具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window)、以及全局窗口(Global Window)
三、窗口API概览
按键分区窗口(Keyed)和非按键分区(Non-Keyed)
在定义窗口操作之前,首先需要确定,到达是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是在调用窗口算子之前是否有keyBy操作
按键分区窗口(Keyed)
经过按按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),也就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据被发送到同一个并行子任务,而窗口操作会基于每个key单独的处理。可以认为每个key上都定义了一组窗口,各自独立地进行统计计算。
stream.keyBy(...)
.window(...)
非按键分区(Non-Keyed Windows)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,相当于并行度变成了1
stream.windowAll(...)
代码中窗口API的调用
窗口的操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)
窗口分配器(Window Assigners)
定义窗口分配器是构建窗口算子的第一步,作用是定义数据应该被“分配”到哪个窗口。
窗口按照驱动类型可以分成时间窗口和计数窗口,按照具体的分配规定为滚动窗口、滑动窗口、会话窗口、全局窗口。除去自定义外的全局窗口外,其它常用的类型Flink都给出了内置的分配器实现
1、时间窗口
- 滚动
- 滑动
- 会话
处理时间下窗口
(1)滚动处理时间窗口
stream.keyBy(...)
.window(TumblingProcessTimeWindow.of(Time.seconds(5)))
.aggregate(...)
(2)滑动处理时间窗口
stream.keyBy(...)
.window(SlidingProcessingTimewindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(...)
(3)处理时间会话窗口
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
事件时间下窗口
(1)滚动事件时间窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
(2)滑动事件时间窗口
stream.keyBy(...)
.window(SlidingEventTimewindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(...)
(3)事件时间会话窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
2、计数窗口
底层是全局窗口,Flink为我们提供了非常方便地接口:直接调用countWindow()方法,根据分配规则的不同,又可以分为滚动计数、滑动计数窗口。
(1)滚动计数窗口
stream.keyBy(...)
.countWindow(10)
(2)滚动计数窗口
stream.keyBy(...)
.countWindow(10,3)
3、全局窗口
stream.keyBy(...)
.window(GlobalWindows.create());
窗口函数(Window Functions)
定义窗口分配,我们知道了数据属于哪个窗口;定义窗口函数,如何进行计算的操作,这就是所谓的“窗口函数”。
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数、全窗口函数
增量函数
为了提高实时性,我们可以每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间拿出之前聚合的状态直接输出。
典型的增量聚合函数有两个:ReduceFunction、AggregateFunction
(1)归约函数(ReduceFunction)
将窗口收集到的数据两两进行归约,实现增量式的聚合。
public class WindowReduceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从自定义数据源读取数据,并提取时间戳、生成水位线
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
// 将数据转换成二元组,方便计算
return Tuple2.of(value.user, 1L);
}
}).keyBy(r -> r.f0)
// 设置滚动事件时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 定义累加规则,窗口闭合时,向下游发送累加结果
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();
env.execute();
}
}
(2)聚合函数(AggregateFunction)
ReduceFunction接口有一个限制:输入数据类型、聚合状同类型、输出结果的类型一样。这就迫使我们在聚会前先将数据转换成预期结果类型。而在有些情况下,需要对状态进一步处理才能得到输出结果时,这时它们的类型可能不同。
Flink的Window API中的aggregate就提供了这样的操作。直接基于WindowedStream调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable{
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
AggregateFunction可看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)、输出类型(OUT)
- createAccumulator():创建一个累加器,为聚合创建一个初始状态
- add():将输入的元素添加到累加器中,这就是基于聚合状态,对新来的数据进一步聚合。方法传入两个参数,当前新到的数据value,和当前的累加器accumulator,返回一个新的累加器值。
- getResult():从累加器中提取聚合输出的结果。
- merge():合并两个累加器,并将合并的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口的场景就是会话窗口
全窗口函数
全窗口需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
(1)窗口函数(WindowFunction)
WindowFunction是老版本的通用窗口函数接口,我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction实现类
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function,Serializable {
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
当窗口到达结束时间需要触发计算时,就会调用这里的apply方法。我们可以从input集合中取出窗口收集的数据,结合key和window信息,通过收集器输出结果。WindowFunction的作用可以被ProcessWindowFunction全覆盖,一般在实际应用中,直接使用ProcessWindowFunction就可以
(2)窗口函数处理窗口函(ProcessWindowFunction)
ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口,他可以获取到一个“上下文对象”(Context)。这个上下文对象不仅能够获取窗口信息,还可以访问当前的时间和状态信息,这里的时间就包括了处理时间和事件时间水位线。
public class WindowProcessTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(100);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序流的watermark生成
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
//使用ProcessWindowFunction计算UV
stream.keyBy(data -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UvCountByWindow())
.print();
env.execute();
}
//实现自定义的ProcessWindowFunction,输出一条统计信息
public static class UvCountByWindow extends ProcessWindowFunction<Event,String,Boolean, TimeWindow>{
@Override
public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
//用一个HashSet保存user
HashSet<String> userSet = new HashSet<>();
//从elements中遍历数据,放到set中去重
for (Event element : elements) {
userSet.add(element.user);
}
Integer uv = userSet.size();
//结合窗口信息
Long start = context.window().getStart();
Long end = context.window().getEnd();
out.collect("窗口" + new Timestamp(start) + "~" + new Timestamp(end) + "UV值为:" + uv);
}
}
}
增量聚合和全窗口结合
增量聚合的优点:高效,输出更加实时
全窗口的优点:提供更多的信息,更加“通用”的窗口操作。
在实际应用中,我们往往希望兼具这两者的优点,,结合使用,我们在传入窗口函数哪里,这里调用的机制:第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当做Iterable类型的输出。
public class UrlCountViewExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(100);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序流的watermark生成
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
stream.print("input");
//统计每个url的访问量
stream.keyBy(data -> data.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new UrlViewCountAgg(),new UrlViewCountResult())
.print();
env.execute();
}
//增量聚合,来一条数据 + 1
public static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
//包装窗口信息,输出UrlViewCount
public static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow>{
@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
Long start = context.window().getStart();
Long end = context.window().getEnd();
Long count = elements.iterator().next();
out.collect(new UrlViewCount(s,count,start,end));
}
}
}
获取水位线和窗口的使用
public class WatermarkTest2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 将数据源改为 socket 文本流,并转换成 Event 类型
env.socketTextStream("hadoop102", 7777)
.map(new MapFunction<String, Event>() {
@Override
public Event map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(),
Long.valueOf(fields[2].trim()));
}
})
// 插入水位线的逻辑
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 5s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// 抽取时间戳的逻辑
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
)
// 根据 user 分组,开窗统计
.keyBy(data -> data.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new WatermarkTestResult())
.print();
env.execute();
}
// 自定义处理窗口函数,输出当前的水位线和窗口信息
public static class WatermarkTestResult extends ProcessWindowFunction<Event, String, String, TimeWindow> {
@Override
public void process(String s, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
Long start = context.window().getStart();
Long end = context.window().getEnd();
Long currentWatermark = context.currentWatermark();
Long count = elements.spliterator().getExactSizeIfKnown();
out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素, 窗口闭合计算时,水位线处于:" + currentWatermark);
}
}
}
其他API
对于一些窗口算子而言,窗口分配器和窗口函数是必不可少的,除此之外,Flink还提供了其他一些可选的API,可让我们更加灵活地控制窗口行为
1、触发器(Trigger)
调用trigger()方法,就可以传入一个自定义的窗口触发器
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认 的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间 窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。 所以一般情况下是不需要自定义触发器的,不过我们依然有必要了解它的原理。 Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:
- onElement():窗口中每到来一个元素,都会调用这个方法
- onEventTime():当注册的事件时间定时触发时,将调用这个方法
- onProcessingTime():当注册的处理时间定时器触发时,将调用这个方法
- clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。
版权归原作者 请叫我阿炜 所有, 如有侵权,请联系我们删除。