0


Flink窗口函数

ReduceFunction

    实例代码如下(本博客内容均基于按键分区)

    reduce函数有两个参数,分别是IN(输入)和OUT(输出类型)
SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(
                new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("调用reduce方法,value1=" + value1 + "value2=" + value2);
                        return new WaterSensor(value1.getId(), value2.ts, value1.vc + value2.vc);
                    }
                }
        );

reduce.print();
    1、相同key的第一条数据来的时候,不会调用reduce方法

    ![](https://img-blog.csdnimg.cn/direct/0edffddd189c4f4a9248ea6e6192975e.png)

    ![](https://img-blog.csdnimg.cn/direct/1e7d35f160ec4aa5bccc9734176aaf4c.png)

    2、相同key在一个窗口内传入的数据大于一条,则会调用reduce方法

    ![](https://img-blog.csdnimg.cn/direct/876c5b3a20d842f3b2924b6358584b38.png)

    ![](https://img-blog.csdnimg.cn/direct/03225f185903426d99468ddda98e9fd0.png)

    3、不同key对应不同的窗口,互不影响

    ![](https://img-blog.csdnimg.cn/direct/9bd867a381224cb3a5058a6a8490dad5.png)

    ![](https://img-blog.csdnimg.cn/direct/b22d8242d31047d5967b6bab91d61446.png)

    相比于ReduceFunction限制了输入、输出、聚合状态类型保持一致,AggregateFunction更加灵活,3个变量类型可以任意选择

AggregateFunction

// 2、增量聚合 Aggregate
        /*
         * 1、属于本窗口的第一条数据来,创建窗口,创建累加器
         * 2、增量聚合: 来一条计算一条, 调用一次add方法
         * 3、窗口输出时调用一次getresult方法
         * 4、输入、中间累加器、输出 类型可以不一样,非常灵活
         */
        SingleOutputStreamOperator<String> agg = sensorWS.aggregate(
                /*
                 * 第一个类型: 输入数据的类型
                 * 第二个类型: 累加器的类型,存储的中间计算结果的类型
                 * 第三个类型: 输出的类型
                 */
                new AggregateFunction<WaterSensor, Integer, String>() {
                    // 初始化累加器
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    // 聚合逻辑
                    @Override
                    public Integer add(WaterSensor waterSensor, Integer integer) {
                        System.out.println("调用add方法,value" + integer);
                        return waterSensor.vc + integer;
                    }

                    // 获取最终结果
                    @Override
                    public String getResult(Integer integer) {
                        System.out.println("调用getResult方法");
                        return integer.toString();
                    }

                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return null;
                    }
                }
        );

        agg.print();
    1、初始化累加器之后,第一个值设置为0,add方法一定会执行

    ![](https://img-blog.csdnimg.cn/direct/013cea81ae9740a890d9a0662950667a.png)

    ![](https://img-blog.csdnimg.cn/direct/e08d0f5b675c436b8d7f0bd347476995.png)

    2、有n条数据,就会执行n次add方法

    ![](https://img-blog.csdnimg.cn/direct/3263b0b97f8643629feeb36a289cf8df.png)

    ![](https://img-blog.csdnimg.cn/direct/6700c3906c8a486aa1deb976c5ba1b04.png)

    以上两种方法不能看到窗口的信息,比如时间,因此引入全窗口函数,它提供了一个上下文对象,不仅能获取窗口信息,还能得到访问的当前时间和状态信息。

ProcessWindowFunction

    该function的四个参数按顺序分别表示为输入,输出,key,窗口类型
SingleOutputStreamOperator<String> process = sensorWS.process(
                // IN,OUT,KEY,Window
                new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    @Override
                    /**
                     * 全窗口函数计算逻辑:  窗口触发时才会调用一次,统一计算窗口的所有数据
                     * @param s   分组的key
                     * @param context  上下文
                     * @param elements 存的数据
                     * @param out      采集器
                     * @throws Exception
                     */
                    public void process(String s, Context context, Iterable<WaterSensor> iterable, Collector<String> collector) throws Exception {
                        // 拿到窗口的开始时间、结束时间
                        long startTS = context.window().getStart();
                        long endTS = context.window().getEnd();
                        String start_time = DateFormatUtils.format(startTS, "yyyy-MM-dd HH:mm:ss.SSS");
                        String end_time = DateFormatUtils.format(endTS, "yyyy-MM-dd HH:mm:ss.SSS");
                        // 去除窗口的size
                        long count = iterable.spliterator().estimateSize();
                        collector.collect("key=" + s + "的窗口[" + start_time + "->" +
                                end_time + "),长度为" + count);

                    }

                }
        );
        process.print();
    1、不同key会产生不同的窗口,而一个窗口的数据在结束时才会执行计算

    ![](https://img-blog.csdnimg.cn/direct/dab6a828ee6b417e978a940c3f25741e.png)

    ![](https://img-blog.csdnimg.cn/direct/b30b1eb7e1d148ff96c48f6e75acb05b.png)

结合增量聚合函数和全窗口函数

    看过上面3个案例,可以发现增量聚合函数ReduceFunction和AggregateFunction都是数据来一条处理一条,而全窗口函数ProcessWindowFunction则是在窗口结束的时候才执行计算,因此process的效率相对更低。但是process的优点在于它可以得到上下文信息,因此,将两者结合起来使用,既可以提升计算效率,又能得到窗口信息。

public class WindowAggregateAndProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> source = env
                .socketTextStream("node1", 7777)
                .map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> keyBy = source.keyBy(
                new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor waterSensor) throws Exception {
                        return waterSensor.getId();
                    }
                }
        );

        // 1、窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new MyAgg(), new MyProcess());
        aggregate.print();

        env.execute();

    }

    public static class MyAgg implements AggregateFunction<WaterSensor,Integer,String>{

        @Override
        public Integer createAccumulator() {
            return 0;
        }

        @Override
        public Integer add(WaterSensor waterSensor, Integer integer) {
            System.out.println("调用add函数,value=" + integer);
            return waterSensor.vc + integer;
        }

        @Override
        public String getResult(Integer integer) {
            System.out.println("调用getResult函数");
            return integer.toString();
        }

        @Override
        public Integer merge(Integer integer, Integer acc1) {
            return null;
        }
    }

    // 注意,这里ProcessWindowFunction的输入为String类型,也就是AggregateFunction的输出类型
    public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{

        // iterable 只有一条数据,即AggregateFunction的计算结果
        @Override
        public void process(String s, Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
            // 拿到窗口的开始时间、结束时间
            long startTS = context.window().getStart();
            long endTS = context.window().getEnd();
            String start_time = DateFormatUtils.format(startTS, "yyyy-MM-dd HH:mm:ss.SSS");
            String end_time = DateFormatUtils.format(endTS, "yyyy-MM-dd HH:mm:ss.SSS");
            // 去除窗口的size
            long count = iterable.spliterator().estimateSize();
            collector.collect("key=" + s + "的窗口[" + start_time + "->" +
                    end_time + "),长度为" + count);
        }
    }
}
    1、结合两者既可以实现来一条数据执行一条数据,还能拿到上下文信息。

    2、process中的长度均为1,因为其输入是aggregate的输出。

    ![](https://img-blog.csdnimg.cn/direct/adc1c192af814515a328f91bbf5710bb.png)

    ![](https://img-blog.csdnimg.cn/direct/2fcc827ab6434ceeb99ad290bcc9d9aa.png)

    同理,reduce也支持结合使用增量聚合函数和全窗口函数,写法与aggregate基本保持一致,同样注意process的输入应该改成reduceFunction的输出类型。

标签: flink java

本文转载自: https://blog.csdn.net/m0_63069778/article/details/136592573
版权归原作者 数据开发杰出cv工程师 所有, 如有侵权,请联系我们删除。

“Flink窗口函数”的评论:

还没有评论