4.1 累加器(
Accumulator
)
累加器是实现了 加法运算 功能和 合并运算(合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。
Flink 的累加器均实现了
Accumulator
接口,包括如下 2 个方法用于支持加法运算和合并最终结果:
add(V value)
:执行加法运算,将值V
累加到当前 UDF 的累加器中merge(Accumulator<V, R> other)
:执行合并操作,将累加器other
与当前累加器合并
累加器的使用方法如下:
Step 1|在需要使用累加器的 UDF 中创建一个累加器对象(此处以计数器为例)
privateIntCounter numLines =newIntCounter();
Step 2|在富函数的
open()
方法中注册累加器对象,在注册时需要定义累加器名称用于查询结果
getRuntimeContext().addAccumulator("num-lines",this.numLines);
Step 3|在 UDF 的任何地方(包括
open()
和
close()
方法中)使用累加器
this.numLines.add(1);
Step 4|最终整体结果会存储在由执行环境的
execute()
方法返回的
JobExecutionResult
对象中。
myJobExecutionResult.getAccumulatorResult("num-lines");
单个作业的所有累加器 共享一个命名空间,因此可以在不同算子(operator)的不同 UDF 中使用同一个累加器,Flink 会合并将所有具有相同名称的累加器。
注意:当前累加器的结果只有在整个作业结束后才可以使用。
4.2
Accumulator
接口和
SimpleAccumulator
接口
Flink 内置的所有累加器都实现了
Accumulator
接口。
如果需要自定义累加器,只需要实现
Accumulator
接口或
SimpleAccumulator
接口即可。
4.2.1
Accumulator<V, R>
在
Accumulator
接口中,共定义了以下 5 个方法:
add(V value)
:将value
累加到当前累加器中(加法运算)getLocalValue()
:获取当前 UDF 中的累加器的值resetLocal()
:重置当前 UDF 中的累加器的值merge(Accumulator<V, R> other)
:将other
合并到当前累加器中,用于在 Flink 系统内部合并多个累加器(合并运算)clone()
:复制累加器对象
其中包含 2 个泛型,
V
类型表示每一次向累加器中的累加的值的类型,这个类型不要求是可序列化的;
R
类型表示累加器结果的类型,这个类型必须是可序列化的。不要求累加的值与累加器结果的值类型相同,可以支持类似直方图、列表等场景。
因为累加器需要在不同字节之间复制、传输,所以累加器自己必须是可序列化的、可复制的。
源码|Github|
org.apache.flink.api.common.accumulators.Accumulator
/**
* Accumulators collect distributed statistics or aggregates in a from user functions and operators.
* Each parallel instance creates and updates its own accumulator object, and the different parallel
* instances of the accumulator are later merged. merged by the system at the end of the job. The
* result can be obtained from the result of a job execution, or from the web runtime monitor.
*
* <p>The accumulators are inspired by the Hadoop/MapReduce counters.
*
* <p>The type added to the accumulator might differ from the type returned. This is the case e.g.
* for a set-accumulator: We add single objects, but the result is a set of objects.
*
* @param <V> Type of values that are added to the accumulator
* @param <R> Type of the accumulator result as it will be reported to the client
*/@PublicpublicinterfaceAccumulator<V,RextendsSerializable>extendsSerializable,Cloneable{/** @param value The value to add to the accumulator object */voidadd(V value);/** @return local The local value from the current UDF context */RgetLocalValue();/** Reset the local value. This only affects the current UDF context. */voidresetLocal();/**
* Used by system internally to merge the collected parts of an accumulator at the end of the
* job.
*
* @param other Reference to accumulator to merge in.
*/voidmerge(Accumulator<V,R> other);/**
* Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
* throw a {@link java.lang.CloneNotSupportedException}
*
* @return The duplicated accumulator.
*/Accumulator<V,R>clone();}
4.2.2
SimpleAccumulator<T>
SimpleAccumulator
是简化版的
Accumulator
,它继承了
Accumulator<V, R>
,但是要求累加的值的类型与累加器的结果的类型必须相同,适用于一些相对简单的操作,例如计数器。
源码|Github|
org.apache.flink.api.common.accumulators.SimpleAccumulator
/** Similar to Accumulator, but the type of items to add and the result value must be the same. */@PublicpublicinterfaceSimpleAccumulator<TextendsSerializable>extendsAccumulator<T,T>{}
4.3 Flink 内置累加器
可以看到 Flink 内置的累加器,除
Histogram
、
ListAccumulator
等累加器的累加类型与结果类型不同,直接实现了
Accumulator
接口以外,其他累加器均实现
SimpleAccumulator
接口。
4.3.1 内置累加器列表
这些 Flink 内置的累加器如下:
累加器类累加器用途输入类型输出类型
AverageAccumulator
计算平均值
Double
(
double
/
long
/
int
)
Double
DoubleCounter
计算
Double
类型的和
Double
Double
DoubleMaximum
计算
Double
类型最大值
Double
Double
DoubleMinimum
计算
Double
类型最小值
Double
Double
IntCounter
计算
Integer
类型的和
Integer
Integer
IntMaximum
计算
Integer
类型最大值
Integer
Integer
IntMinimum
计算
Integer
类型最小值
Integer
Integer
LongCounter
计算
Long
类型的和
Long
Long
LongMaximum
计算
Long
类型的最大值
Long
Long
LongMinimum
计算
Long
类型的最小值
Long
Long
Histogram
直方图
Integer
TreeMap<Integer, Integer>
ListAccumulator
列表累加器(将元素存储到列表中)
T
ArrayList<T>
SerializedListAccumulator
序列化的列表累加器(将元素序列化存储到列表中)
T
ArrayList<byte[]>
4.3.2 累加器实现样例
以上列表中累加器的实现逻辑是类似的,我们具体来看
AverageAccumulator
作为样例。
源码|Github|
org.apache.flink.api.common.accumulators.AverageAccumulator
(部分)
@PublicpublicclassAverageAccumulatorimplementsSimpleAccumulator<Double>{privatelong count;privatedouble sum;@Overridepublicvoidadd(Double value){this.count++;this.sum += value;}@OverridepublicDoublegetLocalValue(){if(this.count ==0){return0.0;}returnthis.sum /this.count;}@OverridepublicvoidresetLocal(){this.count =0;this.sum =0;}@Overridepublicvoidmerge(Accumulator<Double,Double> other){if(other instanceofAverageAccumulator){AverageAccumulator avg =(AverageAccumulator) other;this.count += avg.count;this.sum += avg.sum;}else{thrownewIllegalArgumentException("The merged accumulator must be AverageAccumulator.");}}@OverridepublicAverageAccumulatorclone(){AverageAccumulator average =newAverageAccumulator();
average.count =this.count;
average.sum =this.sum;return average;}}
AverageAccumulator
在对象属性中存储了当前累加器中所有元素的和
sum
以及元素的数量
count
。
在调用
add(Double value)
方法执行加法运算时,累加
sum
和
count
。
在调用
merge(Accumulator<Double, Double> other)
方法执行合并运算时,将另一个累加器
other
的
count
和
sum
累加到当前累加器中。
在调用
clone()
方法复制累加器对象时,创建一个新的
AverageAccumulator
对象,并将当前累加器的
count
和
sum
复制给该对象。
4.4
AccumulatorHelper
类中累加器相关工具方法
在
AccumulatorHelper
类中,有一些用于操作累加器的静态工具方法。下面,我们来看一下其中在部分外部被使用的方法:
4.4.1
mergeInto()
:合并 2 个累加器命名空间
在 Flink 中,每个作业的所有累加器共享一个命名空间,Flink 会合并具有相同名称的累加器。这个累加器的命名空间使用
Map<String, Accumulator<?, ?>>
类型存储,在
mergeInto()
方法的
target
和
toMerge
参数均为累加器的命名空间。
源码|Github|
org.apache.flink.api.common.accumulators.AccumulatorHelper#mergeInto
publicstaticvoidmergeInto(Map<String,OptionalFailure<Accumulator<?,?>>> target,Map<String,Accumulator<?,?>> toMerge){for(Map.Entry<String,Accumulator<?,?>> otherEntry : toMerge.entrySet()){OptionalFailure<Accumulator<?,?>> ownAccumulator = target.get(otherEntry.getKey());if(ownAccumulator ==null){// Create initial counter (copy!)
target.put(
otherEntry.getKey(),wrapUnchecked(otherEntry.getKey(),()-> otherEntry.getValue().clone()));}elseif(ownAccumulator.isFailure()){continue;}else{Accumulator<?,?> accumulator = ownAccumulator.getUnchecked();// Both should have the same typecompareAccumulatorTypes(
otherEntry.getKey(),
accumulator.getClass(),
otherEntry.getValue().getClass());// Merge target counter with other counter
target.put(
otherEntry.getKey(),wrapUnchecked(
otherEntry.getKey(),()->mergeSingle(accumulator, otherEntry.getValue().clone())));}}}privatestatic<V,RextendsSerializable>Accumulator<V,R>mergeSingle(Accumulator<?,?> target,Accumulator<?,?> toMerge){@SuppressWarnings("unchecked")Accumulator<V,R> typedTarget =(Accumulator<V,R>) target;@SuppressWarnings("unchecked")Accumulator<V,R> typedToMerge =(Accumulator<V,R>) toMerge;
typedTarget.merge(typedToMerge);return typedTarget;}
参数
target
为需要合并到的累加器 Map,
toMerge
参数为需要被合并的累加器的 Map,在合并时:
- 如果
target
中没有对应的累加器,则调用累加器的clone()
方法将toMerge
中的累加器复制到target
中 - 如果
target
中有对应的累加器,则先检查两个累加器的类型后,在mergeSingle
方法中调用累加器的merge()
方法将onMerge
中的累加器合并到target
的累加器中
源码阅读思路|在 Flink 官方文档中,提到单个作业的所有累加器共享一个命名空间,Flink 会合并所有具有相同名称的累加器。从功能上看,这个方法用于合并两个累加器的 Map,应该会在 Flink 执行作业过程中,接近结束作业时被调用。因此,搜索这个方法被调用的位置,可以找到 Flink 执行作业的逻辑。具体地,这个方法被调用的位置如下:
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#aggregateUserAccumulators
org.apache.flink.runtime.executiongraph.ExecutionJobVertex#getAggregatedUserAccumulatorsStringified
4.4.2
toResultMap()
:对累加器的命名空间计算结果
源码|Github|
org.apache.flink.api.common.accumulators.AccumulatorHelper#toResultMap
publicstaticMap<String,OptionalFailure<Object>>toResultMap(Map<String,Accumulator<?,?>> accumulators){Map<String,OptionalFailure<Object>> resultMap =newHashMap<>();for(Map.Entry<String,Accumulator<?,?>> entry : accumulators.entrySet()){
resultMap.put(
entry.getKey(),wrapUnchecked(entry.getKey(),()-> entry.getValue().getLocalValue()));}return resultMap;}
在命名空间中,逐个调用累加器的
getLocalValue()
方法获取最终结果并写入到新的
Map
中。
4.4.3
deserializeAccumulators()
:将序列化的累加器反序列化
源码|Github|
org.apache.flink.api.common.accumulators.AccumulatorHelper#deserializeAccumulators
publicstaticMap<String,OptionalFailure<Object>>deserializeAccumulators(Map<String,SerializedValue<OptionalFailure<Object>>> serializedAccumulators,ClassLoader loader)throwsIOException,ClassNotFoundException{if(serializedAccumulators ==null|| serializedAccumulators.isEmpty()){returnCollections.emptyMap();}Map<String,OptionalFailure<Object>> accumulators =CollectionUtil.newHashMapWithExpectedSize(serializedAccumulators.size());for(Map.Entry<String,SerializedValue<OptionalFailure<Object>>> entry :
serializedAccumulators.entrySet()){OptionalFailure<Object> value =null;if(entry.getValue()!=null){
value = entry.getValue().deserializeValue(loader);}
accumulators.put(entry.getKey(), value);}return accumulators;}
逐个遍历序列化的累加器中的每个键值对,调用
org.apache.flink.util.SerializedValue#deserializeValue
方法对序列化的值进行反序列化,并将结果添加到新的
HashMap
中。
在实现上,因为反序列化后的
HashMap
中的元素数量与反序列化之前一致,所以可以通过在初始化新的
HashMap
时直接指定
HashMap
的容量,来避免扩容时带来的性能损耗。这里调用了
org.apache.flink.util.CollectionUtil#newHashMapWithExpectedSize
方法,该方法中创建了指定容积及扩容比例的
HashMap
。
参考文档
- 《Flink 官方文档:应用开发 - DataStream API - 用户自定义 Functions》
版权归原作者 长行 所有, 如有侵权,请联系我们删除。