0


Flink 源码剖析|累加器

1 累加器(

Accumulator

累加器是实现了 加法运算 功能和 合并运算(合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。

Flink 的累加器均实现了

Accumulator

接口,包括如下 2 个方法用于支持加法运算和合并最终结果:

  • add(V value):执行加法运算,将值 V 累加到当前 UDF 的累加器中
  • merge(Accumulator<V, R> other):执行合并操作,将累加器 other 与当前累加器合并

累加器的使用方法如下:

Step 1|在需要使用累加器的 UDF 中创建一个累加器对象(此处以计数器为例)

privateIntCounter numLines =newIntCounter();

Step 2|在富函数的

open()

方法中注册累加器对象,在注册时需要定义累加器名称用于查询结果

getRuntimeContext().addAccumulator("num-lines",this.numLines);

Step 3|在 UDF 的任何地方(包括

open()

close()

方法中)使用累加器

this.numLines.add(1);

Step 4|最终整体结果会存储在由执行环境的

execute()

方法返回的

JobExecutionResult

对象中。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器 共享一个命名空间,因此可以在不同算子(operator)的不同 UDF 中使用同一个累加器,Flink 会合并将所有具有相同名称的累加器。

注意:当前累加器的结果只有在整个作业结束后才可以使用。

2

Accumulator

接口和

SimpleAccumulator

接口

Flink 内置的所有累加器都实现了

Accumulator

接口。

如果需要自定义累加器,只需要实现

Accumulator

接口或

SimpleAccumulator

接口即可。

2.1

Accumulator<V, R>

Accumulator

接口中,共定义了以下 5 个方法:

  • add(V value):将 value 累加到当前累加器中(加法运算)
  • getLocalValue():获取当前 UDF 中的累加器的值
  • resetLocal():重置当前 UDF 中的累加器的值
  • merge(Accumulator<V, R> other):将 other 合并到当前累加器中,用于在 Flink 系统内部合并多个累加器(合并运算)
  • clone():复制累加器对象

其中包含 2 个泛型,

V

类型表示每一次向累加器中的累加的值的类型,这个类型不要求是可序列化的;

R

类型表示累加器结果的类型,这个类型必须是可序列化的。不要求累加的值与累加器结果的值类型相同,可以支持类似直方图、列表等场景。

因为累加器需要在不同字节之间复制、传输,所以累加器自己必须是可序列化的、可复制的。

源码|Github|

org.apache.flink.api.common.accumulators.Accumulator
/**
 * Accumulators collect distributed statistics or aggregates in a from user functions and operators.
 * Each parallel instance creates and updates its own accumulator object, and the different parallel
 * instances of the accumulator are later merged. merged by the system at the end of the job. The
 * result can be obtained from the result of a job execution, or from the web runtime monitor.
 *
 * <p>The accumulators are inspired by the Hadoop/MapReduce counters.
 *
 * <p>The type added to the accumulator might differ from the type returned. This is the case e.g.
 * for a set-accumulator: We add single objects, but the result is a set of objects.
 *
 * @param <V> Type of values that are added to the accumulator
 * @param <R> Type of the accumulator result as it will be reported to the client
 */@PublicpublicinterfaceAccumulator<V,RextendsSerializable>extendsSerializable,Cloneable{/** @param value The value to add to the accumulator object */voidadd(V value);/** @return local The local value from the current UDF context */RgetLocalValue();/** Reset the local value. This only affects the current UDF context. */voidresetLocal();/**
     * Used by system internally to merge the collected parts of an accumulator at the end of the
     * job.
     *
     * @param other Reference to accumulator to merge in.
     */voidmerge(Accumulator<V,R> other);/**
     * Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
     * throw a {@link java.lang.CloneNotSupportedException}
     *
     * @return The duplicated accumulator.
     */Accumulator<V,R>clone();}

2.2

SimpleAccumulator<T>
SimpleAccumulator

是简化版的

Accumulator

,它继承了

Accumulator<V, R>

,但是要求累加的值的类型与累加器的结果的类型必须相同,适用于一些相对简单的操作,例如计数器。

源码|Github|

org.apache.flink.api.common.accumulators.SimpleAccumulator
/** Similar to Accumulator, but the type of items to add and the result value must be the same. */@PublicpublicinterfaceSimpleAccumulator<TextendsSerializable>extendsAccumulator<T,T>{}

3 Flink 内置累加器

可以看到 Flink 内置的累加器,除

Histogram

ListAccumulator

等累加器的累加类型与结果类型不同,直接实现了

Accumulator

接口以外,其他累加器均实现

SimpleAccumulator

接口。

3.1 内置累加器列表

这些 Flink 内置的累加器如下:
累加器类累加器用途输入类型输出类型

AverageAccumulator

计算平均值

Double

double

/

long

/

int

Double
DoubleCounter

计算

Double

类型的和

Double
Double
DoubleMaximum

计算

Double

类型最大值

Double
Double
DoubleMinimum

计算

Double

类型最小值

Double
Double
IntCounter

计算

Integer

类型的和

Integer
Integer
IntMaximum

计算

Integer

类型最大值

Integer
Integer
IntMinimum

计算

Integer

类型最小值

Integer
Integer
LongCounter

计算

Long

类型的和

Long
Long
LongMaximum

计算

Long

类型的最大值

Long
Long
LongMinimum

计算

Long

类型的最小值

Long
Long
Histogram

直方图

Integer
TreeMap<Integer, Integer>
ListAccumulator

列表累加器(将元素存储到列表中)

T
ArrayList<T>
SerializedListAccumulator

序列化的列表累加器(将元素序列化存储到列表中)

T
ArrayList<byte[]>

3.2 累加器实现样例

以上列表中累加器的实现逻辑是类似的,我们具体来看

AverageAccumulator

作为样例。

源码|Github|

org.apache.flink.api.common.accumulators.AverageAccumulator

(部分)

@PublicpublicclassAverageAccumulatorimplementsSimpleAccumulator<Double>{privatelong count;privatedouble sum;@Overridepublicvoidadd(Double value){this.count++;this.sum += value;}@OverridepublicDoublegetLocalValue(){if(this.count ==0){return0.0;}returnthis.sum /this.count;}@OverridepublicvoidresetLocal(){this.count =0;this.sum =0;}@Overridepublicvoidmerge(Accumulator<Double,Double> other){if(other instanceofAverageAccumulator){AverageAccumulator avg =(AverageAccumulator) other;this.count += avg.count;this.sum += avg.sum;}else{thrownewIllegalArgumentException("The merged accumulator must be AverageAccumulator.");}}@OverridepublicAverageAccumulatorclone(){AverageAccumulator average =newAverageAccumulator();
        average.count =this.count;
        average.sum =this.sum;return average;}}
AverageAccumulator

在对象属性中存储了当前累加器中所有元素的和

sum

以及元素的数量

count

在调用

add(Double value)

方法执行加法运算时,累加

sum

count

在调用

merge(Accumulator<Double, Double> other)

方法执行合并运算时,将另一个累加器

other

count

sum

累加到当前累加器中。

在调用

clone()

方法复制累加器对象时,创建一个新的

AverageAccumulator

对象,并将当前累加器的

count

sum

复制给该对象。

参考文档

  • 《Flink 官方文档:应用开发 - DataStream API - 用户自定义 Functions》

本文转载自: https://blog.csdn.net/Changxing_J/article/details/136114853
版权归原作者 长行 所有, 如有侵权,请联系我们删除。

“Flink 源码剖析|累加器”的评论:

还没有评论