0


37、Flink 的窗口函数(Window Functions)详解

窗口函数(Window Functions)
a)概述

定义了 window assigner 之后,需要指定当窗口触发之后,如何计算每个窗口中的数据, 即 window function。

窗口函数有三种:

ReduceFunction

AggregateFunction

ProcessWindowFunction

  • 前两者执行更高效,因为 Flink 可以在每条数据到达窗口后进行增量聚合(incrementally aggregate);
  • ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information。

使用

ProcessWindowFunction

的窗口转换操作没有其它两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据;

ProcessWindowFunction

可以与

ReduceFunction

AggregateFunction

合并来提高效率,既可以增量聚合窗口内的数据,又可以从

ProcessWindowFunction

接收窗口的 metadata。

b)ReduceFunction
ReduceFunction

指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同

Flink 使用

ReduceFunction

对窗口中的数据进行增量聚合。

示例:对窗口内元组的第二个属性求和。

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
c)AggregateFunction
ReduceFunction

AggregateFunction

的特殊情况;

AggregateFunction

接收三个参数:输入数据的类型(

IN

)、累加器的类型(

ACC

)和输出数据的类型(

OUT

)。

输入数据的类型是输入流的元素类型,

AggregateFunction

接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(

OUT

类型)。

ReduceFunction

相同,Flink 会在输入数据到达窗口时直接进行增量聚合。

示例:计算窗口内所有元素第二个属性的平均值。

private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());
d)ProcessWindowFunction

ProcessWindowFunction 具备 Iterable 能获取窗口内所有的元素 ,以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活;ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。

ProcessWindowFunction

的函数签名如下:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;

    /**
     * Deletes any state in the {@code Context} when the Window expires (the watermark passes its
     * {@code maxTimestamp} + {@code allowedLateness}).
     *
     * @param context The context to which the window is being evaluated
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public void clear(Context context) throws Exception {}

    /**
     * The context holding window metadata.
     */
    public abstract class Context implements java.io.Serializable {
        /**
         * Returns the window that is being evaluated.
         */
        public abstract W window();

        /** Returns the current processing time. */
        public abstract long currentProcessingTime();

        /** Returns the current event-time watermark. */
        public abstract long currentWatermark();

        /**
         * State accessor for per-key and per-window state.
         *
         * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
         * by implementing {@link ProcessWindowFunction#clear(Context)}.
         */
        public abstract KeyedStateStore windowState();

        /**
         * State accessor for per-key global state.
         */
        public abstract KeyedStateStore globalState();
    }

}
key

参数由

keyBy()

中指定的

KeySelector

选出;如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是

Tuple

, 并且需要手动将它转换为正确大小的 tuple 才能提取 key。

示例:使用

ProcessWindowFunction

对窗口中的元素计数,并且将窗口本身的信息一同输出。

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}
e)增量聚合的 ProcessWindowFunction
ProcessWindowFunction

可以与

ReduceFunction

AggregateFunction

搭配使用, 使其能够在数据到达窗口的时候进行增量聚合,当窗口关闭时,

ProcessWindowFunction

将会得到聚合的结果;即实现了增量聚合窗口的元素并且从 ProcessWindowFunction 中获得窗口的元数据。

使用 ReduceFunction 增量聚合

示例:将

ReduceFunction

ProcessWindowFunction

组合,返回窗口中的最小元素和窗口的开始时间。

DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}

使用 AggregateFunction 增量聚合

示例:将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}
f)在 ProcessWindowFunction 中使用 per-window state

除了访问 keyed state,

ProcessWindowFunction

还可以使用作用域仅为“当前正在处理的窗口”的 keyed state

per-window 中的 window 对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口,具体情况取决于窗口的定义,根据具体的 key 和时间段会产生诸多不同的窗口实例。

Per-window state 如果处理有 1000 种不同 key 的事件,并且目前所有事件都处于 [12:00, 13:00) 时间窗口内,那么将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。

process()

接收到的

Context

对象中有两个方法允许访问以下两种 state:

  • globalState(),访问全局的 keyed state
  • windowState(), 访问作用域仅限于当前窗口的 keyed state

如果可能将一个 window 触发多次(比如当迟到数据会再次触发窗口计算, 或自定义了根据推测提前触发窗口的 trigger),那么这个功能将非常有用,这时可能需要在 per-window state 中储存关于之前触发的信息或触发的总次数。

**当使用窗口状态时,一定记得在删除窗口时清除这些状态,应该定义在

clear()

方法中**。

WindowFunction(已过时)

在某些可以使用

ProcessWindowFunction

的地方,也可以使用

WindowFunction

;它是旧版的

ProcessWindowFunction

,只能提供更少的环境信息且缺少一些高级的功能,比如 per-window state。

WindowFunction

的函数签名如下:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key The key for which this window is evaluated.
   * @param window The window that is being evaluated.
   * @param input The elements in the window being evaluated.
   * @param out A collector for emitting elements.
   *
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

可以像下例这样使用:

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());
标签: flink java 大数据

本文转载自: https://blog.csdn.net/m0_50186249/article/details/139083319
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。

“37、Flink 的窗口函数(Window Functions)详解”的评论:

还没有评论