0


Flink 窗口 概述

一:窗口简述

    Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 【把窗口理解成一个“桶”,Flink则可以把流切割成大小有限的“储存桶”,把数据分发到不同的桶里,每一个窗口都是一个桶。当窗口结束,就对每一个桶的数据进行收集处理】

二: 窗口的分类

** 1)按照驱动类型分**

** (1) 时间窗口**

                        原理:建立一个窗口,在固定的额时间段内不断收集数据,到达结束时间的时候窗口结束收集数据,生成结果,窗口销毁。【就像地铁一样,间隔一段时间发车,无论车上有多少乘客,地铁都会往前开】

** (2) 计数窗口**

** ** 原理:计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。基本思路是“人齐发车”

** 2)按照窗口分配数据的规则分类(以下均为以时间驱动为例)**

                    主要概念:窗口的大小,窗口的滑动步长【两个窗口重叠的部分】,会话间隔

** (1) 滚动窗口(Tumbling Windows)**

                         滚动窗口有固定的大小,而且窗口之间不会重叠,每个数据都在一个窗口且只属于这个窗口。在一个固定时间内,接受数据的传入。到了截止时间,收集数据,输出结果。应用类型广泛,可以对每个时间做聚合统计。

** **c850e2e36a3f48bca15878796c5b9ec0.png

** (2) 滑动窗口 (Sliding Windows)**

** ** 当窗口大小大于窗口步长的时候就会出现滑动,滑动窗口会重叠,同时数据也会同时被分到多个窗口,滑动步长就代表了计算频率。适合计算结果更新较快的场景。

** (3) 会话窗口 (Session Windows)**

** ** 原理:基于“会话”来进行数据分组、如果相邻两个数据到来的时间间隔 小于指定大小,那么这两个数据在同一个窗口内。如果 大于则数据到了新的窗口,且前面的窗口关闭。 会话窗口长度,起始结束时间不确定。各个分区之间窗口没有任何关联。在规定的时间内没有数据到来触发一次计算。可以用于保持会话的场景下。

(4) 全局窗口 (Global Windows)

** ** 把相同key的数据全部分配到一个窗口之中,窗口没有结束是不会触发计算的。如果希望对数据处理,需要定义一个触发器。

三:窗口API

** 1)按键分区(Keyed)和非按键分区(Non-Keyed)**

(1)按键分区窗口(Keyed Windows)

                            经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...) .window(...)

(2)非按键分区(Non-Keyed Windows)

如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。setParallelism(1)

在代码中,直接基于DataStream调用.windowAll()定义窗口。

stream.windowAll(...)

注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的

四:窗口分配器

1)Flink中的时间语义

                            到底是以哪种时间作为衡量的标准,就被成为“时间语义”

** 2)时间窗口分配器**

** (1) 处理时间的时间语义窗口**

特征:

  • 基于处理节点的当前系统时间。
  • 实现简单,延迟低。
  • 适用于数据实时性要求高且乱序不多的场景。
  • 在处理乱序数据时,结果可能不准确。

**应用场景: **

  • 监控系统指标(如CPU使用率、内存使用情况)。
  • 实时数据分析,延迟比准确性更重要的场景。

**使用参数: **
TumblingProcessingTimeWindows.of(Time.) 滚动窗口SlidingProcessingTimeWindows.of(Time.) 滑动窗口ProcessingTimeSessionWindows.withGap() ProcessingTimeSessionWindows.withDynamicGap() 会话窗口


(2) 处理事件的时间语义窗口

特征:

  • 基于事件的时间戳。
  • 处理乱序数据,通过Watermark机制来处理延迟事件。
  • 更加准确,适用于要求严格时间语义的场景。

**实时场景: **

  • 实时日志分析(如用户行为分析、点击流分析)。
  • 需要严格时间顺序和准确性的场景,如金融交易分析。

使用参数:
TumblingEventTimeWindows.of(Time.) 滚动窗口SlidingEventTimeWindows.of(Time.) 滑动窗口
EventTimeSessionWindows.withGap()

EventTimeSessionWindows.withDynamicGap()
会话窗口


** 3)计数窗口分配器**

countWindow(5)计数窗口分配器 【滚动】 满足5条输出一次计算结果countWindow(5,2)计数窗口分配器 【滑动】 满足5条输出一次计算结果 , 每经过一个步长都有一个窗口-触发输出【第一次输出在第二条数据来的时候】

** 4)全局窗口分配器**

GlobalWindows.create() 全局窗口

五:窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

1)增量聚合函数(ReduceFunction / AggregateFunction)

** **窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。

** **(1)归约函数(ReduceFunction)

import com.guigu.function.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class WindowReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());

//        窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        /*
            1.相同key的第一条数据来的时候,不会调用reduce方法 【来的数据类型必须保持一致】
            2.增量聚合:来一条数据,就会计算一次,但是不会输出
            3.在窗口触发的时候,才会输出窗口的最终结果
         */
//        窗口函数,增量聚合reduce
//        TODO:返回一个DataStream
        SingleOutputStreamOperator<WaterSensor> reduce = senorWS.reduce(new ReduceFunction<WaterSensor>() {
//            TODO:只有第二条数据进来了才会调用reduce方法
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                System.out.println("调用reduce方法: value1" + value1 + ",value2" + value2);
                return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
            }
        });

        reduce.print();

        env.execute();
    }
}

(2)聚合函数(AggregateFunction)

                                    来一个数据就调用add方法,进行数据聚合。结果保存在状态中。窗口需要输出时调用getresult()方法 得到计算结果。和ReduceFunction作用相同,而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class WindowAggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());

//        窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

//        窗口函数,增量聚合 Aggregate
        /*
            第一个类型:输入数据的类型
            第二个类型: 累加器的类型,存储的中间结果的类型
            第三个类型: 输出的类型
         */

        /*

                *1、属于本窗口的第一条数据来,创建窗口,创建累加器
                *2、增量聚合:来一条计算一条,调用-次add方法
                *3、窗口输出时调用一次getresult方法
                *4、输入、中间累加器、输出 类型可以不一样,非常灵活

         */

        senorWS.aggregate(
                new AggregateFunction<WaterSensor, Integer, String>() {
//                    TODO:创建累加器初始化累加器
                    @Override
                    public Integer createAccumulator() {
                        System.out.println("创建累加器");
                        return 0;
                    }
//                    TODO: 聚合逻辑
                    @Override
                    public Integer add(WaterSensor value, Integer accumulator) {
                        System.out.println("调用add方法,value="+value);
                        return accumulator+ value.getVc();
                    }
//                    TODO:获得最后的结果,窗口触发时输出
                    @Override
                    public String getResult(Integer accumulator) {
                        System.out.println("调用getresult方法");
                        return accumulator.toString();
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
//                        只有会话窗口才会用到
                        System.out.println("调用merge方法");
                        return null;
                    }
                }
        ).print();

        env.execute();
    }
}

另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。

** 2)全窗口函数(窗口函数,处理窗口函数)**

** 全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。 **WindowFunction【apply方法】能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用【老方法】。

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());

//        窗口分配器  【滚动】
        WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        //        TODO:全窗口  不断存储数据到最后  窗口触发时只会输出一次

        //        TODO:老写法
//        senorWS
//                .apply(
//                new WindowFunction<WaterSensor, String, String, TimeWindow>() {
//                    /**
//                     *
//                     * @param s 分组的key
//                     * @param window 窗口对象
//                     * @param input 存储的数据
//                     * @param out 采集器
//                     * @throws Exception
//                     */
//                    @Override
//                    public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
//
//                    }
//                }
//        )

//        TODO:新写法
                senorWS
        .process(
                new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    /**
                     * @param s        分组的 key
                     * @param context  上下文
                     * @param elements 存的数据
                     * @param out      采集器
                     * @throws Exception
                     */
                    @Override
                    public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
//                        TODO:毫秒
                        long start = context.window().getStart();
                        long end = context.window().getEnd();

                        String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                        String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");

//                        TODO:多少条数据
                        long data = elements.spliterator().estimateSize();

                        out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());

                    }
                }
        ).print();

        env.execute();
    }
}

3)全窗口函数 和 增量聚合函数 的结合使用

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WindowAggregateAndProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());

//        窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

//        TODO:窗口函数,增量聚合 Aggregate  +  全窗口 Process
                /*
            *增量聚合 Aggregate+全窗日 process
                    * 1、增量聚合函数处理数据:来一条计算一条
                    *2、窗口触发时,增量聚合的结果(只有一条)传递给 全窗口函数
                    *3、经过全窗口函数的处理包装后,输出米指轺调坝党
            *结合两者的优点:
                *1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
                *2、全窗口函数:可以通过 上下文 实现灵活的功能
                * */

        senorWS.aggregate(
                new MyAgg(),
                new MyProcess()
                ).print();
//

        env.execute();
    }

    public static class MyAgg implements  AggregateFunction<WaterSensor, Integer, String>{
//        TODO:创建累加器初始化累加器
        @Override
        public Integer createAccumulator() {
            System.out.println("创建累加器");
            return 0;
        }
        //                    TODO: 聚合逻辑
        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用add方法,value="+value);
            return accumulator+ value.getVc();
        }
        //                    TODO:获得最后的结果,窗口触发时输出
        @Override
        public String getResult(Integer accumulator) {
            System.out.println("调用getresult方法");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
//                        只有会话窗口才会用到
            System.out.println("调用merge方法");
            return null;
        }

    }

    public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{

        /**
         * @param s        分组的 key
         * @param context  上下文
         * @param elements 存的数据
         * @param out      采集器
         * @throws Exception
         */

        @Override
        public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
//                TODO:毫秒
            long start = context.window().getStart();
            long end = context.window().getEnd();

            String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
            String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");

//                        TODO:多少条数据
            long data = elements.spliterator().estimateSize();

            out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());

        }
    }
}

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。

六:其他API

1) 触发器(Trigger)

                                    触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...) .window(...) .trigger(new MyTrigger())

2)移除器(Evictor)

                            移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...) .window(...)

七:窗口的原理

以 时间类型的 滚动窗口 为例,分析原理:

1、窗口什么时候触发 输出?

时间进展>= 窗口的最大时间戳(end-1ms)

2、窗口是怎么划分的?

start= 向下取整,取窗口长度的整数倍

end =start+窗长度

窗口左闭右开 ==》 属于本窗口的 最大时间戳 =end - 1ms

3、窗口的生命周期?

创建:属于本窗口的第一条数据来的时候,现new的,放入一个singeton单例的集合中。

销毁(关窗): 时间进展>= 窗口的最大时间戳(end - 1ms) +允许迟到的时间【默认为0】

八:水位线 (Watermark)

1)什么是水位线?【下图中均为虚线(w)表示】

                    在Flink中水位线被用来标记事件的进展时间。是在数据流里面的一个标记点,具体内容是时间戳。用来指示当前事件的处理时间。

2) 有序数据的处理

                    为了提高速率,一般会每隔一段时间产生一个水位线

3)乱序+数据迟到的水位线处理

                            通常在流式计算中,数据的传输收到网络IO等等因素的影响,数据无法准时到达计算的窗口。为了让窗口处理数据变得规整且正确。我们通常使用让水位线等待固定秒数【意思就是在时间戳的基础上增加一些延迟,以保证不丢失数据】

注意:乱序 和 迟到的区别:

乱序: 数据的顺序乱了 时间小的比时间大的晚来

迟到: 数据的时间戳 < 当前的 watermark

4) 水位线的特征

  1. 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  2. 水位线主要的内容是一个时间戳,用来表示当前事件时间【数据传入的时间】的进展
  3. 水位线是基于数据的时间戳生成的
  4. 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进水位线可以通过设置延迟,来保证正确处理乱序数据
  5. 一个水位线Watermank(t),表示在当前流中事件时间已经达到了时间戳T,这代表t之前的所有数据都到齐了,之后流中不会出现 时间戳T<t的数据。
  6. 水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理

5)水位线的使用【需要使用事件时间语义窗口才能触发】

(1)有序流的使用

                            直接调用WatermarkStrategy.forMonotonousTimestamps()方法,然后使用assignTimestampsAndWatermarks()接受 。就可以实现。
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WaterMarkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.130", 9999)
                .map(new WaterSensorMapFunction());
        /*
            1.定义watermark策略
            2.使用assignTimestampsAndWatermarks 调用
         */

//        TODO:指定watermark策略
        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
                .<WaterSensor>forMonotonousTimestamps()    // 指定watermark的生成 单调递增  没有等待时间
//                指定时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
//                        返回的时间戳是毫秒
                        System.out.println("数据=" + element + ",recordTS = " + recordTimestamp);
                        return element.getTs() * 1000L;
                    }
                });

        map.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy)

                .keyBy(sensor -> sensor.getId())
//                TODO:只能使用 事件时间语义窗口 才能使用水平线
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                 .process(
                new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    /**
                     * @param s        分组的 key
                     * @param context  上下文
                     * @param elements 存的数据
                     * @param out      采集器
                     * @throws Exception
                     */
                    @Override
                    public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
//                        TODO:毫秒
                        long start = context.window().getStart();
                        long end = context.window().getEnd();

                        String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                        String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");

//                        TODO:多少条数据
                        long data = elements.spliterator().estimateSize();

                        out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());

                    }
                }
        ).print();

        env.execute();
    }
}

(2) 无序流的使用

                                    将.forMonotonousTimestamps()方法 修改为forBoundedOutOfOrderness() 参数Duration.ofSeconds(等待的时间)                          
//        TODO:指定watermark策略
        WatermarkStrategy
                //  TODO: 指定watermark的生成 乱序  有等待时间 3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
//                TODO: 指定时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
//                        返回的时间戳是毫秒
                        System.out.println("数据=" + element + ",recordTS = " + recordTimestamp);
                        return element.getTs() * 1000L;
                    }
                });

(3) 自定义水位线的使用

                                            将.forBoundedOutOfOrderness()方法 修改为 forGenerator(重写类atermarkGeneratorSupplier)    最后 return 自己的方法<>(延迟的时间)                         

                                    
WatermarkStrategy
                //  TODO: 自定义指定watermark的生成 乱序  有等待时间 3s
                .<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
                    @Override
                    public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
//                        TODO:延迟时间3s
                        return new MyGenerator<>(3000);
                    }
                })

(4) 在数据源发送水位线

                                            我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)

6)水位线的传递

                            一个Task通常会设置多个并行度,而而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。

九:处理乱序+迟到数据 常用三部曲

1)水位线空闲等待

                                    在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,而没有数据的并行任务一直保持最小。就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待(withIdleness)。
env.socketTextStream("192.168.88.130", 9999)
                .partitionCustom(new MyPartioner(), r -> r)
                .map(r -> Integer.parseInt(r))
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Integer>forMonotonousTimestamps()
                        .withTimestampAssigner((r, ts) -> r * 1000L)
//                        TODO: 空闲等待  5s
                        .withIdleness(Duration.ofSeconds(5))
                );

2) 允许推迟时间关窗

                                     Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭,迟到来的数据不会被计算。
  
                .keyBy(sensor -> sensor.getId())
//                TODO:只能使用 事件时间语义窗口 才能使用水平线
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
//                TODO:使用窗口延迟   5s
                .allowedLateness(Time.seconds(5))

3) 侧输出流

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
// TODO:  【关窗后】   迟到的数据放入侧输出流
.sideOutputLateData(
// TODO:  参数1:侧输出流的名字    参数2:侧输出数据的类型
        new OutputTag<WaterSensor>("late_data", Types.POJO(WaterSensor.class))
)

4) 设置经验

1.watermark等待时间,设置一个不算特别大的,一般是秒级,在乱序和 延迟 取舍

2.设置一定的窗口允许迟到,只考虑大部分的迟到数据,极端小部分迟到很久的数据,不管

3.极端小部分迟到很久的数据,放到侧输出流。获取到之后可以做各种处理

十:基于时间的合流-----双流联结(Join)

可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。

不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。

1)窗口联结 (Window Join)

                            Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
//        TODO: window  join 窗口联结
        
    /**
         * stream1.join(stream2)
         *  .where(<KeySelector>)
         * .equalTo(<KeySelector>)
         * .window(<WindowAssigner>)
         *  .apply(<JoinFunction>)
         *
         *   1. 落在同一个时间窗口范围内才能匹配
         *   2. 根据keyby的key,来进行匹配关联
         *   3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
     */

        data1.join(data2)
//                TODO:data1的keyby
                .where(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
//                TODO:data2的keyby
                .equalTo(new KeySelector<Tuple3<String, Integer, Integer>, String>() {
                    @Override
                    public String getKey(Tuple3<String, Integer, Integer> value) throws Exception {
                        return value.f0;
                    }
                })

                .window(TumblingEventTimeWindows.of(Time.seconds(5)))

                .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     *关联上的数据调用join方法
                     * @param first The element from first input.  [data1]
                     * @param second The element from second input. [data2]
                     * @return
                     * @throws Exception
                     */
                    @Override
                    public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
                        return first + "<------------>" + second;
                    }
                });

2)间隔联结 (Interval Join)

                            这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key,所以要先进行KeyBy分组;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。

(1)正常使用

        data_afterKeyBy1.intervalJoin(data_afterKeyBy2)
//                TODO:设置下上界的偏移量  [先下后上的设置]
                .between(Time.seconds(-3), Time.seconds(3))
                .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     *两条流的数据匹配上才会调用这个方法
                     * @param left ks1 的数据
                     * @param right ks2 的数据
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
//                                关联上的数据才能进入这个方法是
                        out.collect(left + "<------->" + right);
                    }
                })

(2) 处理迟到数据

 /**
         * TODO Interval join  处理迟到数据
         * 1、只支持事件时间
         * 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后
         * 3、process中,只能处理 join上的数据
         * 4、两条流关联后的watermark,以两条流中最小的为准
         * 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理
         *  => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流
         */

        OutputTag<Tuple2<String, Integer>> ks1Late = new OutputTag<>("ks1_late", Types.TUPLE(Types.STRING, Types.INT));
        OutputTag<Tuple3<String, Integer,Integer>> ks2Late = new OutputTag<>("ks1_late", Types.TUPLE(Types.STRING, Types.INT,Types.INT));

        SingleOutputStreamOperator<String> process = key1.intervalJoin(key2)
//                TODO:设置下上界的偏移量  [先下后上的设置]
                .between(Time.seconds(-3), Time.seconds(3))

                
//                TODO:处理左侧迟到数据到侧输出流
                .sideOutputLeftLateData(ks1Late)
//                TODO:处理右侧迟到数据到侧输出流
                .sideOutputRightLateData(ks2Late)
                
                
                .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     *两条流的数据匹配上才会调用这个方法
                     * @param left ks1 的数据
                     * @param right ks2 的数据
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
//                                进入这个方法是关联上的数据
                        out.collect(left + "<------->" + right);
                    }
                });

        process.print("主流打印:");
        process.getSideOutput(ks1Late).printToErr("ks1迟到数据以错误日志方式打印:");
        process.getSideOutput(ks2Late).printToErr("ks2迟到数据以错误日志方式打印:");
标签: flink 大数据

本文转载自: https://blog.csdn.net/2202_75658539/article/details/139050761
版权归原作者 可乐朱仔 所有, 如有侵权,请联系我们删除。

“Flink 窗口 概述”的评论:

还没有评论