0


spark源码跟踪(八)累加器Accumulators

累加器Accumulators

一,累加器作用及其原理

1.1,作用

可实现分布式计数或求和;可以在spark application运行UI中显示其值,便于调试。

1.2,原理

在Driver端中定义的累加器Accumulators对象,跟随各spark task任务分发到Executor端,反序列化后的Accumulators副本对象各自执行累加操作(add),task任务执行执行完毕后,Driver端对返回的多个Accumulators副本对象执行合并操作(merge)。

二,累加器关键源码跟踪阅读

2.1,测试代码

  1. def main(args: Array[String]): Unit ={
  2. log.info("-------begin---------")
  3. val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[3]")
  4. val sparkContext=new SparkContext(sparkConnf)
  5. val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
  6. var sum=0
  7. val sumAcc = sparkContext.longAccumulator("sumAcc")
  8. rdd.foreach(num=>{sum=sum+num
  9. sumAcc.add(num)
  10. println("----excutor:----sumACC="+sumAcc)})
  11. println("---------sum="+sum)
  12. println("---------sumAcc="+sumAcc.value)
  13. sparkContext.stop()}

关键日志:

  1. ----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 3)
  2. ----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 7)
  3. ----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 12)
  4. ----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 1)
  5. ----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 3)
  6. ---------sum=0
  7. ---------sumAcc=15

2.2,跟踪源码

2.2.1,add调用

SparkContext.scala

  1. /**
  2. * Create and register a long accumulator, which starts with 0 and accumulates inputs by `add`.
  3. */
  4. def longAccumulator(name: String): LongAccumulator ={
  5. val acc = new LongAccumulator
  6. register(acc, name)
  7. acc
  8. }
  9. /**
  10. * Register the given accumulator with given name.
  11. *
  12. * @note Accumulators must be registered before use, or it will throw exception.
  13. */
  14. def register(acc: AccumulatorV2[_, _], name: String): Unit ={
  15. acc.register(this, name = Option(name))}

AccumulatorV2.scala

  1. private[spark] def register(
  2. sc: SparkContext,
  3. name: Option[String]= None,
  4. countFailedValues: Boolean =false): Unit ={if(this.metadata != null){
  5. throw new IllegalStateException("Cannot register an Accumulator twice.")}
  6. this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues)
  7. AccumulatorContext.register(this)
  8. sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))}

AccumulatorContext

  1. /**
  2. * This global map holds the original accumulator objects that are created on the driver.
  3. * It keeps weak references to these objects so that accumulators can be garbage-collected
  4. * once the RDDs and user-code that reference them are cleaned up.
  5. * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
  6. */
  7. private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[AccumulatorV2[_, _]]]
  8. /**
  9. * Registers an [[AccumulatorV2]] created on the driver such that it can be used on the executors.
  10. *
  11. * All accumulators registered here can later be used as a container for accumulating partial
  12. * values across multiple tasks. This is what `org.apache.spark.scheduler.DAGScheduler` does.
  13. * Note: if an accumulator is registered here, it should also be registered with the active
  14. * context cleaner for cleanup so as to avoid memory leaks.
  15. *
  16. * If an [[AccumulatorV2]] with the same ID was already registered, this does nothing instead
  17. * of overwriting it. We will never register same accumulator twice, this is just a sanity check.
  18. */
  19. def register(a: AccumulatorV2[_, _]): Unit ={
  20. originals.putIfAbsent(a.id, new jl.ref.WeakReference[AccumulatorV2[_, _]](a))}
  1. val sumAcc = sparkContext.longAccumulator("sumAcc")

这行代码创建了一个LongAccumulator类型的累加器,并做了一些列注册工作,并返回了一个对象。
接下来rdd.foreach是一个行动操作:

  1. /**
  2. * Applies a function f to all elements of this RDD.
  3. */
  4. def foreach(f: T => Unit): Unit = withScope {
  5. val cleanF = sc.clean(f)
  6. sc.runJob(this, (iter: Iterator[T])=> iter.foreach(cleanF))}

这里runJon的第二个参数的实际内容大致为

  1. (iter: Iterator[Int])=>iter.foreach(num=>{sum=sum+num
  2. sumAcc.add(sum)
  3. println("----excutor:----sumACC="+sumAcc)})

这个函数在那执行?是在task类的runTask方法中执行的。
ResultTask.scala

  1. override def runTask(context: TaskContext): U ={
  2. // Deserialize the RDD and the func using the broadcast variables.
  3. val threadMXBean = ManagementFactory.getThreadMXBean
  4. val deserializeStartTimeNs = System.nanoTime()
  5. val deserializeStartCpuTime =if(threadMXBean.isCurrentThreadCpuTimeSupported){
  6. threadMXBean.getCurrentThreadCpuTime
  7. }else 0L
  8. val ser = SparkEnv.get.closureSerializer.newInstance()
  9. val (rdd, func)= ser.deserialize[(RDD[T], (TaskContext, Iterator[T])=> U)](
  10. ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  11. _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
  12. _executorDeserializeCpuTime =if(threadMXBean.isCurrentThreadCpuTimeSupported){
  13. threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  14. }else 0L
  15. func(context, rdd.iterator(partition, context))}

这里反序列化出来的func函数的第二个参数中就封装了测试代码中rdd.foreach中传入的用户代码。

  1. (iter: Iterator[Int])=>iter.foreach(num=>{sum=sum+num
  2. sumAcc.add(sum)
  3. println("----excutor:----sumACC="+sumAcc)})

func(context, rdd.iterator(partition, context)) 执行func函数。
rdd.iterator返回该task负责的rdd一个分区的所有数据组成的迭代器Iterator并作为参数传入func中,在其函数体中遍历Iterator中数据并挨个执行用户自定义的逻辑代码。
结论一:所以sumAcc.add(sum)的执行次数与行动操作返回的数据元素数量一致。

2.2.2,merge调用

一个stage执行完毕后,executor会和ApplicationMaster通讯发送
CompletionEvent对象。
DAGSchedulerEventProcessLoop.scala

  1. private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)=>
  2. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)case MapStageSubmitted(jobId, dependency, callSite, listener, properties)=>
  3. dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)case StageCancelled(stageId, reason)=>
  4. dagScheduler.handleStageCancellation(stageId, reason)case JobCancelled(jobId, reason)=>
  5. dagScheduler.handleJobCancellation(jobId, reason)case JobGroupCancelled(groupId)=>
  6. dagScheduler.handleJobGroupCancelled(groupId)case AllJobsCancelled =>
  7. dagScheduler.doCancelAllJobs()case ExecutorAdded(execId, host)=>
  8. dagScheduler.handleExecutorAdded(execId, host)case ExecutorLost(execId, reason)=>
  9. val workerLost = reason match {case SlaveLost(_, true)=>truecase _ =>false}
  10. dagScheduler.handleExecutorLost(execId, workerLost)case WorkerRemoved(workerId, host, message)=>
  11. dagScheduler.handleWorkerRemoved(workerId, host, message)case BeginEvent(task, taskInfo)=>
  12. dagScheduler.handleBeginEvent(task, taskInfo)case SpeculativeTaskSubmitted(task)=>
  13. dagScheduler.handleSpeculativeTaskSubmitted(task)case GettingResultEvent(taskInfo)=>
  14. dagScheduler.handleGetTaskResult(taskInfo)case completion: CompletionEvent =>
  15. dagScheduler.handleTaskCompletion(completion)case TaskSetFailed(taskSet, reason, exception)=>
  16. dagScheduler.handleTaskSetFailed(taskSet, reason, exception)case ResubmitFailedStages =>
  17. dagScheduler.resubmitFailedStages()}

最终会调用updateAccumulators(event: CompletionEvent): Unit方法合并所有的累加器。
在这里插入图片描述
测试代码中sum=sum+num执行次数和 sumAcc.add(num)执行情况一摸一样,都是分散在各个executor中执行的,但是其结果没有回传到Driver端,所以Driver端的sum结果一直是初始值。

三,累加器在行动算子和转换算子中执行有何不同

上面写的源码跟踪里累加器是写在行动操作中的,与写在转换操作中有什么不一样?

3.1,测试代码

  1. def main(args: Array[String]): Unit ={
  2. log.info("-------begin---------")
  3. val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
  4. val sparkContext=new SparkContext(sparkConnf)
  5. val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
  6. val sumAcc = sparkContext.longAccumulator("sumAcc")
  7. val mapRDD = rdd.map(num =>{
  8. sumAcc.add(num)
  9. println("----transfer:----sumACC="+sumAcc)
  10. num
  11. })
  12. mapRDD.foreach(num=>{
  13. sumAcc.add(num)
  14. println("----action:----sumACC="+sumAcc)})
  15. println("----result:----sumACC="+sumAcc)
  16. sparkContext.stop()}

在转换操作中执行的add操作显然是在rdd的compute方法中被调用的,先于行动操作执行。
关键日志:

  1. ----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 1)
  2. ----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 2)
  3. ----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 4)
  4. ----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 6)
  5. ----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 3)
  6. ----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 6)
  7. ----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 10)
  8. ----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 14)
  9. ----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 19)
  10. ----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 24)
  11. ----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 30)

累加器add操作执行了两次,结果是30不是15。

四,累加器级别

累加器是application级别的,如果一个application中有多个行动操作或者有检查点(检查点是一个独立的job,参考:https://cangchen.blog.csdn.net/article/details/122020410)的情况,累加器的值可能与预期的不一致。

4.1,检查点与累加器

  1. def main(args: Array[String]): Unit ={
  2. log.info("-------begin---------")
  3. val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
  4. val sparkContext=new SparkContext(sparkConnf)
  5. sparkContext.setCheckpointDir(".")
  6. val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
  7. val sumAcc = sparkContext.longAccumulator("sumAcc")
  8. val mapRDD = rdd.map(num =>{
  9. sumAcc.add(num)
  10. num
  11. })
  12. mapRDD.checkpoint()
  13. mapRDD.foreach(num=>{
  14. println(num)})
  15. println("----result:----sumACC="+sumAcc)
  16. sparkContext.stop()}

结果:

  1. ----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 30)

累加器的add操作是在转换操作map中执行的,计算时执行了两次,第一次是在行动操作foreach 提交的job中执行的,第二次是检查点提交的job中执行,要避免检查点重复执行add,可使用cache。

  1. def main(args: Array[String]): Unit ={
  2. log.info("-------begin---------")
  3. val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
  4. val sparkContext=new SparkContext(sparkConnf)
  5. sparkContext.setCheckpointDir(".")
  6. val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
  7. val sumAcc = sparkContext.longAccumulator("sumAcc")
  8. val mapRDD = rdd.map(num =>{
  9. sumAcc.add(num)
  10. num
  11. })
  12. mapRDD.checkpoint()
  13. mapRDD.cache()
  14. mapRDD.foreach(num=>{
  15. println(num)})
  16. println("----result:----sumACC="+sumAcc)
  17. sparkContext.stop()}

结果:

  1. ----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 15)

cache会缓存数据避免rdd的compute函数再次调用,所以累加器只执行了一次。

4.2 多个行动操作与累加器

  1. def main(args: Array[String]): Unit ={
  2. log.info("-------begin---------")
  3. val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
  4. val sparkContext=new SparkContext(sparkConnf)
  5. // sparkContext.setCheckpointDir(".")
  6. val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
  7. val sumAcc = sparkContext.longAccumulator("sumAcc")
  8. val mapRDD = rdd.map(num =>{
  9. sumAcc.add(num)
  10. num
  11. })
  12. // mapRDD.checkpoint()
  13. //mapRDD.cache()
  14. mapRDD.foreach(num=>{
  15. println(num)})
  16. mapRDD.foreach(num=>{
  17. println(num)})
  18. println("----result:----sumACC="+sumAcc)
  19. sparkContext.stop()}

结果:

  1. ----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 30)

两个行动操作生成两个独立的job,累加器执行了两次。同样使用cache也能避免这种情况:

  1. def main(args: Array[String]): Unit ={
  2. log.info("-------begin---------")
  3. val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
  4. val sparkContext=new SparkContext(sparkConnf)
  5. // sparkContext.setCheckpointDir(".")
  6. val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
  7. val sumAcc = sparkContext.longAccumulator("sumAcc")
  8. val mapRDD = rdd.map(num =>{
  9. sumAcc.add(num)
  10. num
  11. })
  12. // mapRDD.checkpoint()
  13. mapRDD.cache()
  14. mapRDD.foreach(num=>{
  15. println(num)})
  16. mapRDD.foreach(num=>{
  17. println(num)})
  18. println("----result:----sumACC="+sumAcc)
  19. sparkContext.stop()}
  1. ----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 15)

五,自定义累加器

自定义累加器实现wordcount功能

  1. package cchen.spark.sparkcore
  2. import org.apache.spark.internal.Logging
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.util.{AccumulatorV2, LongAccumulator}import org.apache.spark.{SparkConf, SparkContext}import scala.collection.{Iterator, mutable}
  5. class WordCount
  6. object WordCount extends Logging{
  7. def main(args: Array[String]): Unit ={
  8. log.info("-------begin---------")
  9. val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[3]")
  10. val sparkContext=new SparkContext(sparkConnf)
  11. val rdd = sparkContext.parallelize(Array("hello", "thank you", "thank you very much", "are you ok"), 2)
  12. val flat_rdd = rdd.flatMap(_.split(" ",-1))
  13. val wordCountACC = new WordCountAccumulator
  14. sparkContext.register(wordCountACC, "wordCountACC")
  15. flat_rdd.foreach(f=>{
  16. wordCountACC.add(f)})
  17. flat_rdd.cache()
  18. println("----reduceByKey result-----")
  19. flat_rdd.map(f=>(f,1)).reduceByKey(_+_).collect().map(f=>println(f._1+":"+f._2))
  20. println("----acc result-----")
  21. wordCountACC.value.map(f=>println(f._1+":"+f._2))
  22. sparkContext.stop()}}
  23. class WordCountAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
  24. private var map=mutable.Map[String,Long]()
  25. override def isZero: Boolean = map.isEmpty
  26. override def copy(): AccumulatorV2[String, mutable.Map[String,Long]]={
  27. val newACC=new WordCountAccumulator()
  28. newACC.map=this.map.clone()
  29. newACC
  30. }
  31. override def reset(): Unit = map.clear()
  32. override def add(v: String): Unit ={
  33. val value=map.getOrElse(v,0L)
  34. map.put(v,value+1)}
  35. override def merge(other: AccumulatorV2[String, mutable.Map[String,Long]]): Unit =other match {case o: WordCountAccumulator =>
  36. o.value.map(f=>{
  37. val value=map.getOrElse(f._1,0L)
  38. map.put(f._1,value+f._2)})case _ =>
  39. throw new UnsupportedOperationException(
  40. s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")}
  41. override def value: mutable.Map[String,Long]= map
  42. }

结果:

  1. ----reduceByKey result-----
  2. are:1
  3. thank:2
  4. hello:1
  5. very:1
  6. ok:1
  7. you:3
  8. much:1
  9. ----acc result-----
  10. you:3
  11. ok:1
  12. are:1
  13. very:1
  14. thank:2
  15. much:1
  16. hello:1

使用累加器的方式速度应该更快,跟reduceByKey相比它没有shuffle过程。

六,总结

1,累加器的add操作实际执行的地方与客户代码中调用的地方有关系。
如果add在RDD转换操作中调用,则实际在RDD compute函数中被调用;如果在RDD行动操作中被调用,则在ResultTask runTask方法中被调用。都是在Excutor端执行
2,累加器的merge操作实际执行的地方在Driver端,每个job stage执行成功后执行累加器的merge操作。
3,累加器是application级别,多个行动操作或者单行动操作且有检查点checkpoint的情况下要注意“多加”的现象。


本文转载自: https://blog.csdn.net/weixin_43172032/article/details/122051709
版权归原作者 cangchen@csdn 所有, 如有侵权,请联系我们删除。

“spark源码跟踪(八)累加器Accumulators”的评论:

还没有评论