0


【Flink系列】窗口系列简介

一、窗口概念

窗口:将无限数据切割成有限的“数据块”进行处理,窗口是处理无界流的核心。
在这里插入图片描述
窗口更像一个“桶”,将流切割成有限大小的多个存储桶,每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

  • 动态创建:当有落在这个窗口区间范围的数据到达时,才创建对应的窗口
  • 窗口关闭:到达窗口结束时间时,窗口就触发计算并关闭

二、窗口的分类

按照驱动类型分类

常见的窗口类型有时间窗口和计数窗口
在这里插入图片描述

(1)时间窗口 Time Window

时间窗口以时间点来定义窗口的开始(start)和结束(end)、所以截取出来的就是某一时间段的数据、到达结束时间时、窗口不再收集数据、触发计算输出结果、并将窗口关闭。
用结束时间减去开始时间、得到这段时间的长度、就是窗口的大小(window size)、时间可以有不同的语义、所以我们定义处理时间窗口和事件时间窗口。

(2)计数窗口 Count Window

计数窗口相比于时间窗口更简单、只需要指定窗口的大小、就可以把数据分配到对应的窗口。

按照窗口分配数据的规则分类

时间窗口和计数窗口、只是对窗口的一个大致划分、根据分配数据的规则、具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window)、以及全局窗口(Global Window)

三、窗口API概览

按键分区窗口(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到达是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是在调用窗口算子之前是否有keyBy操作

按键分区窗口(Keyed)

经过按按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),也就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据被发送到同一个并行子任务,而窗口操作会基于每个key单独的处理。可以认为每个key上都定义了一组窗口,各自独立地进行统计计算。

stream.keyBy(...)
    .window(...)

非按键分区(Non-Keyed Windows)

如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,相当于并行度变成了1

stream.windowAll(...)

代码中窗口API的调用

窗口的操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)

stream.keyBy(<key selector>)
           .window(<window assigner>)
           .aggregate(<window function>)

窗口分配器(Window Assigners)

定义窗口分配器是构建窗口算子的第一步,作用是定义数据应该被“分配”到哪个窗口。
窗口按照驱动类型可以分成时间窗口和计数窗口,按照具体的分配规定为滚动窗口、滑动窗口、会话窗口、全局窗口。除去自定义外的全局窗口外,其它常用的类型Flink都给出了内置的分配器实现

1、时间窗口

  • 滚动
  • 滑动
  • 会话

处理时间下窗口

(1)滚动处理时间窗口
stream.keyBy(...)
           .window(TumblingProcessTimeWindow.of(Time.seconds(5)))
           .aggregate(...)
(2)滑动处理时间窗口
stream.keyBy(...)
           .window(SlidingProcessingTimewindows.of(Time.seconds(10),Time.seconds(5)))
           .aggregate(...)
(3)处理时间会话窗口
stream.keyBy(...)
           .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
           .aggregate(...)

事件时间下窗口

(1)滚动事件时间窗口
stream.keyBy(...)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .aggregate(...)
(2)滑动事件时间窗口
stream.keyBy(...)
              .window(SlidingEventTimewindows.of(Time.seconds(10),Time.seconds(5)))
              .aggregate(...)
(3)事件时间会话窗口
stream.keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(...)

2、计数窗口

底层是全局窗口,Flink为我们提供了非常方便地接口:直接调用countWindow()方法,根据分配规则的不同,又可以分为滚动计数、滑动计数窗口。

(1)滚动计数窗口
stream.keyBy(...)
           .countWindow(10)
(2)滚动计数窗口
stream.keyBy(...)
           .countWindow(10,3)

3、全局窗口

stream.keyBy(...)
           .window(GlobalWindows.create());

窗口函数(Window Functions)

定义窗口分配,我们知道了数据属于哪个窗口;定义窗口函数,如何进行计算的操作,这就是所谓的“窗口函数”。在这里插入图片描述
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数、全窗口函数

增量函数

为了提高实时性,我们可以每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间拿出之前聚合的状态直接输出。

典型的增量聚合函数有两个:ReduceFunction、AggregateFunction

(1)归约函数(ReduceFunction)

将窗口收集到的数据两两进行归约,实现增量式的聚合。

public class WindowReduceExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从自定义数据源读取数据,并提取时间戳、生成水位线
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event value) throws Exception {
                // 将数据转换成二元组,方便计算
                return Tuple2.of(value.user, 1L);
            }
        }).keyBy(r -> r.f0)
                // 设置滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 定义累加规则,窗口闭合时,向下游发送累加结果
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();

        env.execute();
    }
}
(2)聚合函数(AggregateFunction)

ReduceFunction接口有一个限制:输入数据类型、聚合状同类型、输出结果的类型一样。这就迫使我们在聚会前先将数据转换成预期结果类型。而在有些情况下,需要对状态进一步处理才能得到输出结果时,这时它们的类型可能不同。

Flink的Window API中的aggregate就提供了这样的操作。直接基于WindowedStream调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable{
    ACC createAccumulator();
    ACC add(IN value, ACC accumulator);
    OUT getResult(ACC accumulator);
    ACC merge(ACC a, ACC b);
}

AggregateFunction可看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)、输出类型(OUT)

  • createAccumulator():创建一个累加器,为聚合创建一个初始状态
  • add():将输入的元素添加到累加器中,这就是基于聚合状态,对新来的数据进一步聚合。方法传入两个参数,当前新到的数据value,和当前的累加器accumulator,返回一个新的累加器值。
  • getResult():从累加器中提取聚合输出的结果。
  • merge():合并两个累加器,并将合并的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口的场景就是会话窗口

全窗口函数

全窗口需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

(1)窗口函数(WindowFunction)

WindowFunction是老版本的通用窗口函数接口,我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction实现类

stream
 .keyBy(<key selector>)
 .window(<window assigner>)
 .apply(new MyWindowFunction());
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function,Serializable {
    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

当窗口到达结束时间需要触发计算时,就会调用这里的apply方法。我们可以从input集合中取出窗口收集的数据,结合key和window信息,通过收集器输出结果。WindowFunction的作用可以被ProcessWindowFunction全覆盖,一般在实际应用中,直接使用ProcessWindowFunction就可以

(2)窗口函数处理窗口函(ProcessWindowFunction)

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口,他可以获取到一个“上下文对象”(Context)。这个上下文对象不仅能够获取窗口信息,还可以访问当前的时间和状态信息,这里的时间就包括了处理时间和事件时间水位线。

public class WindowProcessTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                //乱序流的watermark生成
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        //使用ProcessWindowFunction计算UV
        stream.keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new UvCountByWindow())
                .print();

        env.execute();
    }

    //实现自定义的ProcessWindowFunction,输出一条统计信息
    public static class UvCountByWindow extends ProcessWindowFunction<Event,String,Boolean, TimeWindow>{

        @Override
        public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            //用一个HashSet保存user
            HashSet<String> userSet = new HashSet<>();
            //从elements中遍历数据,放到set中去重
            for (Event element : elements) {
                userSet.add(element.user);
            }

            Integer uv = userSet.size();
            //结合窗口信息
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect("窗口" + new Timestamp(start) + "~" + new Timestamp(end) + "UV值为:" + uv);
        }
    }
}

增量聚合和全窗口结合

增量聚合的优点:高效,输出更加实时
全窗口的优点:提供更多的信息,更加“通用”的窗口操作。
在实际应用中,我们往往希望兼具这两者的优点,,结合使用,我们在传入窗口函数哪里,这里调用的机制:第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当做Iterable类型的输出。

public class UrlCountViewExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                //乱序流的watermark生成
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        stream.print("input");

        //统计每个url的访问量
        stream.keyBy(data -> data.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new UrlViewCountAgg(),new UrlViewCountResult())
                .print();

        env.execute();
    }

    //增量聚合,来一条数据 + 1
    public static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long>{

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    //包装窗口信息,输出UrlViewCount
    public static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow>{

        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long count = elements.iterator().next();
            out.collect(new UrlViewCount(s,count,start,end));
        }
    }
}

获取水位线和窗口的使用

public class WatermarkTest2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 将数据源改为 socket 文本流,并转换成 Event 类型
        env.socketTextStream("hadoop102", 7777)
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new Event(fields[0].trim(), fields[1].trim(),
                                Long.valueOf(fields[2].trim()));
                    }
                })
                // 插入水位线的逻辑
                .assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 5s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    // 抽取时间戳的逻辑
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                )
                // 根据 user 分组,开窗统计
                .keyBy(data -> data.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new WatermarkTestResult())
                .print();
        env.execute();
    }

    // 自定义处理窗口函数,输出当前的水位线和窗口信息
    public static class WatermarkTestResult extends ProcessWindowFunction<Event, String, String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long currentWatermark = context.currentWatermark();
            Long count = elements.spliterator().getExactSizeIfKnown();
            out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素, 窗口闭合计算时,水位线处于:" + currentWatermark);
        }
    }

}

其他API

对于一些窗口算子而言,窗口分配器和窗口函数是必不可少的,除此之外,Flink还提供了其他一些可选的API,可让我们更加灵活地控制窗口行为

1、触发器(Trigger)

调用trigger()方法,就可以传入一个自定义的窗口触发器

stream.keyBy(...)
     .window(...)
     .trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认 的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间 窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。 所以一般情况下是不需要自定义触发器的,不过我们依然有必要了解它的原理。 Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法
  • onEventTime():当注册的事件时间定时触发时,将调用这个方法
  • onProcessingTime():当注册的处理时间定时器触发时,将调用这个方法
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。
标签: flink 大数据 java

本文转载自: https://blog.csdn.net/qq_42875020/article/details/127354961
版权归原作者 请叫我阿炜 所有, 如有侵权,请联系我们删除。

“【Flink系列】窗口系列简介”的评论:

还没有评论