0


Flink 源码剖析|4. 累加器与相关工具方法

4.1 累加器(

Accumulator

累加器是实现了 加法运算 功能和 合并运算(合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。

Flink 的累加器均实现了

Accumulator

接口,包括如下 2 个方法用于支持加法运算和合并最终结果:

  • add(V value):执行加法运算,将值 V 累加到当前 UDF 的累加器中
  • merge(Accumulator<V, R> other):执行合并操作,将累加器 other 与当前累加器合并

累加器的使用方法如下:

Step 1|在需要使用累加器的 UDF 中创建一个累加器对象(此处以计数器为例)

privateIntCounter numLines =newIntCounter();

Step 2|在富函数的

open()

方法中注册累加器对象,在注册时需要定义累加器名称用于查询结果

getRuntimeContext().addAccumulator("num-lines",this.numLines);

Step 3|在 UDF 的任何地方(包括

open()

close()

方法中)使用累加器

this.numLines.add(1);

Step 4|最终整体结果会存储在由执行环境的

execute()

方法返回的

JobExecutionResult

对象中。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器 共享一个命名空间,因此可以在不同算子(operator)的不同 UDF 中使用同一个累加器,Flink 会合并将所有具有相同名称的累加器。

注意:当前累加器的结果只有在整个作业结束后才可以使用。

4.2

Accumulator

接口和

SimpleAccumulator

接口

Flink 内置的所有累加器都实现了

Accumulator

接口。

如果需要自定义累加器,只需要实现

Accumulator

接口或

SimpleAccumulator

接口即可。

4.2.1

Accumulator<V, R>

Accumulator

接口中,共定义了以下 5 个方法:

  • add(V value):将 value 累加到当前累加器中(加法运算)
  • getLocalValue():获取当前 UDF 中的累加器的值
  • resetLocal():重置当前 UDF 中的累加器的值
  • merge(Accumulator<V, R> other):将 other 合并到当前累加器中,用于在 Flink 系统内部合并多个累加器(合并运算)
  • clone():复制累加器对象

其中包含 2 个泛型,

V

类型表示每一次向累加器中的累加的值的类型,这个类型不要求是可序列化的;

R

类型表示累加器结果的类型,这个类型必须是可序列化的。不要求累加的值与累加器结果的值类型相同,可以支持类似直方图、列表等场景。

因为累加器需要在不同字节之间复制、传输,所以累加器自己必须是可序列化的、可复制的。

源码|Github|

org.apache.flink.api.common.accumulators.Accumulator
/**
 * Accumulators collect distributed statistics or aggregates in a from user functions and operators.
 * Each parallel instance creates and updates its own accumulator object, and the different parallel
 * instances of the accumulator are later merged. merged by the system at the end of the job. The
 * result can be obtained from the result of a job execution, or from the web runtime monitor.
 *
 * <p>The accumulators are inspired by the Hadoop/MapReduce counters.
 *
 * <p>The type added to the accumulator might differ from the type returned. This is the case e.g.
 * for a set-accumulator: We add single objects, but the result is a set of objects.
 *
 * @param <V> Type of values that are added to the accumulator
 * @param <R> Type of the accumulator result as it will be reported to the client
 */@PublicpublicinterfaceAccumulator<V,RextendsSerializable>extendsSerializable,Cloneable{/** @param value The value to add to the accumulator object */voidadd(V value);/** @return local The local value from the current UDF context */RgetLocalValue();/** Reset the local value. This only affects the current UDF context. */voidresetLocal();/**
     * Used by system internally to merge the collected parts of an accumulator at the end of the
     * job.
     *
     * @param other Reference to accumulator to merge in.
     */voidmerge(Accumulator<V,R> other);/**
     * Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
     * throw a {@link java.lang.CloneNotSupportedException}
     *
     * @return The duplicated accumulator.
     */Accumulator<V,R>clone();}

4.2.2

SimpleAccumulator<T>
SimpleAccumulator

是简化版的

Accumulator

,它继承了

Accumulator<V, R>

,但是要求累加的值的类型与累加器的结果的类型必须相同,适用于一些相对简单的操作,例如计数器。

源码|Github|

org.apache.flink.api.common.accumulators.SimpleAccumulator
/** Similar to Accumulator, but the type of items to add and the result value must be the same. */@PublicpublicinterfaceSimpleAccumulator<TextendsSerializable>extendsAccumulator<T,T>{}

4.3 Flink 内置累加器

可以看到 Flink 内置的累加器,除

Histogram

ListAccumulator

等累加器的累加类型与结果类型不同,直接实现了

Accumulator

接口以外,其他累加器均实现

SimpleAccumulator

接口。

4.3.1 内置累加器列表

这些 Flink 内置的累加器如下:
累加器类累加器用途输入类型输出类型

AverageAccumulator

计算平均值

Double

double

/

long

/

int

Double
DoubleCounter

计算

Double

类型的和

Double
Double
DoubleMaximum

计算

Double

类型最大值

Double
Double
DoubleMinimum

计算

Double

类型最小值

Double
Double
IntCounter

计算

Integer

类型的和

Integer
Integer
IntMaximum

计算

Integer

类型最大值

Integer
Integer
IntMinimum

计算

Integer

类型最小值

Integer
Integer
LongCounter

计算

Long

类型的和

Long
Long
LongMaximum

计算

Long

类型的最大值

Long
Long
LongMinimum

计算

Long

类型的最小值

Long
Long
Histogram

直方图

Integer
TreeMap<Integer, Integer>
ListAccumulator

列表累加器(将元素存储到列表中)

T
ArrayList<T>
SerializedListAccumulator

序列化的列表累加器(将元素序列化存储到列表中)

T
ArrayList<byte[]>

4.3.2 累加器实现样例

以上列表中累加器的实现逻辑是类似的,我们具体来看

AverageAccumulator

作为样例。

源码|Github|

org.apache.flink.api.common.accumulators.AverageAccumulator

(部分)

@PublicpublicclassAverageAccumulatorimplementsSimpleAccumulator<Double>{privatelong count;privatedouble sum;@Overridepublicvoidadd(Double value){this.count++;this.sum += value;}@OverridepublicDoublegetLocalValue(){if(this.count ==0){return0.0;}returnthis.sum /this.count;}@OverridepublicvoidresetLocal(){this.count =0;this.sum =0;}@Overridepublicvoidmerge(Accumulator<Double,Double> other){if(other instanceofAverageAccumulator){AverageAccumulator avg =(AverageAccumulator) other;this.count += avg.count;this.sum += avg.sum;}else{thrownewIllegalArgumentException("The merged accumulator must be AverageAccumulator.");}}@OverridepublicAverageAccumulatorclone(){AverageAccumulator average =newAverageAccumulator();
        average.count =this.count;
        average.sum =this.sum;return average;}}
AverageAccumulator

在对象属性中存储了当前累加器中所有元素的和

sum

以及元素的数量

count

在调用

add(Double value)

方法执行加法运算时,累加

sum

count

在调用

merge(Accumulator<Double, Double> other)

方法执行合并运算时,将另一个累加器

other

count

sum

累加到当前累加器中。

在调用

clone()

方法复制累加器对象时,创建一个新的

AverageAccumulator

对象,并将当前累加器的

count

sum

复制给该对象。

4.4

AccumulatorHelper

类中累加器相关工具方法

AccumulatorHelper

类中,有一些用于操作累加器的静态工具方法。下面,我们来看一下其中在部分外部被使用的方法:

4.4.1

mergeInto()

:合并 2 个累加器命名空间

在 Flink 中,每个作业的所有累加器共享一个命名空间,Flink 会合并具有相同名称的累加器。这个累加器的命名空间使用

Map<String, Accumulator<?, ?>>

类型存储,在

mergeInto()

方法的

target

toMerge

参数均为累加器的命名空间。

源码|Github|

org.apache.flink.api.common.accumulators.AccumulatorHelper#mergeInto
publicstaticvoidmergeInto(Map<String,OptionalFailure<Accumulator<?,?>>> target,Map<String,Accumulator<?,?>> toMerge){for(Map.Entry<String,Accumulator<?,?>> otherEntry : toMerge.entrySet()){OptionalFailure<Accumulator<?,?>> ownAccumulator = target.get(otherEntry.getKey());if(ownAccumulator ==null){// Create initial counter (copy!)
            target.put(
                    otherEntry.getKey(),wrapUnchecked(otherEntry.getKey(),()-> otherEntry.getValue().clone()));}elseif(ownAccumulator.isFailure()){continue;}else{Accumulator<?,?> accumulator = ownAccumulator.getUnchecked();// Both should have the same typecompareAccumulatorTypes(
                    otherEntry.getKey(),
                    accumulator.getClass(),
                    otherEntry.getValue().getClass());// Merge target counter with other counter

            target.put(
                    otherEntry.getKey(),wrapUnchecked(
                            otherEntry.getKey(),()->mergeSingle(accumulator, otherEntry.getValue().clone())));}}}privatestatic<V,RextendsSerializable>Accumulator<V,R>mergeSingle(Accumulator<?,?> target,Accumulator<?,?> toMerge){@SuppressWarnings("unchecked")Accumulator<V,R> typedTarget =(Accumulator<V,R>) target;@SuppressWarnings("unchecked")Accumulator<V,R> typedToMerge =(Accumulator<V,R>) toMerge;

    typedTarget.merge(typedToMerge);return typedTarget;}

参数

target

为需要合并到的累加器 Map,

toMerge

参数为需要被合并的累加器的 Map,在合并时:

  • 如果 target 中没有对应的累加器,则调用累加器的 clone() 方法将 toMerge 中的累加器复制到 target
  • 如果 target 中有对应的累加器,则先检查两个累加器的类型后,在 mergeSingle 方法中调用累加器的 merge() 方法将 onMerge 中的累加器合并到 target 的累加器中

源码阅读思路|在 Flink 官方文档中,提到单个作业的所有累加器共享一个命名空间,Flink 会合并所有具有相同名称的累加器。从功能上看,这个方法用于合并两个累加器的 Map,应该会在 Flink 执行作业过程中,接近结束作业时被调用。因此,搜索这个方法被调用的位置,可以找到 Flink 执行作业的逻辑。具体地,这个方法被调用的位置如下:

  • org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#aggregateUserAccumulators
  • org.apache.flink.runtime.executiongraph.ExecutionJobVertex#getAggregatedUserAccumulatorsStringified

4.4.2

toResultMap()

:对累加器的命名空间计算结果

源码|Github|

org.apache.flink.api.common.accumulators.AccumulatorHelper#toResultMap
publicstaticMap<String,OptionalFailure<Object>>toResultMap(Map<String,Accumulator<?,?>> accumulators){Map<String,OptionalFailure<Object>> resultMap =newHashMap<>();for(Map.Entry<String,Accumulator<?,?>> entry : accumulators.entrySet()){
        resultMap.put(
                entry.getKey(),wrapUnchecked(entry.getKey(),()-> entry.getValue().getLocalValue()));}return resultMap;}

在命名空间中,逐个调用累加器的

getLocalValue()

方法获取最终结果并写入到新的

Map

中。

4.4.3

deserializeAccumulators()

:将序列化的累加器反序列化

源码|Github|

org.apache.flink.api.common.accumulators.AccumulatorHelper#deserializeAccumulators
publicstaticMap<String,OptionalFailure<Object>>deserializeAccumulators(Map<String,SerializedValue<OptionalFailure<Object>>> serializedAccumulators,ClassLoader loader)throwsIOException,ClassNotFoundException{if(serializedAccumulators ==null|| serializedAccumulators.isEmpty()){returnCollections.emptyMap();}Map<String,OptionalFailure<Object>> accumulators =CollectionUtil.newHashMapWithExpectedSize(serializedAccumulators.size());for(Map.Entry<String,SerializedValue<OptionalFailure<Object>>> entry :
            serializedAccumulators.entrySet()){OptionalFailure<Object> value =null;if(entry.getValue()!=null){
            value = entry.getValue().deserializeValue(loader);}

        accumulators.put(entry.getKey(), value);}return accumulators;}

逐个遍历序列化的累加器中的每个键值对,调用

org.apache.flink.util.SerializedValue#deserializeValue

方法对序列化的值进行反序列化,并将结果添加到新的

HashMap

中。

在实现上,因为反序列化后的

HashMap

中的元素数量与反序列化之前一致,所以可以通过在初始化新的

HashMap

时直接指定

HashMap

的容量,来避免扩容时带来的性能损耗。这里调用了

org.apache.flink.util.CollectionUtil#newHashMapWithExpectedSize

方法,该方法中创建了指定容积及扩容比例的

HashMap

参考文档

  • 《Flink 官方文档:应用开发 - DataStream API - 用户自定义 Functions》
标签: flink 累加器

本文转载自: https://blog.csdn.net/Changxing_J/article/details/136118126
版权归原作者 长行 所有, 如有侵权,请联系我们删除。

“Flink 源码剖析|4. 累加器与相关工具方法”的评论:

还没有评论