0


Spark-Job启动、Stage划分

一、上下文

《Spark-driver和executor启动过程》详细分析了driver和executor的启动,此时资源已经给我们分配好了,且Application也已经注册完成。下面我们就来看看Spark是如何启动job并根据DAG来划分Stage的

二、Job启动

Spark RDD中的算子分为Transformations 算子和Actions 算子,Transformations 算子只是将RDD在逻辑上进行了转换,只有调用Actions 算子时才会真正执行以上对RDD的所有操作。为什么呢?点进去这些Actions 算子看下就发现它们都调用了SparkContext的runJob() ,因此程序调用几次Actions算子就会启动几个Job,因此一个Application是对应多个Job的。下面继续看下runJob()中做了什么事情。

1、SparkContext

class SparkContext(config: SparkConf) extends Logging {

 //在RDD中的给定分区集上运行一个函数,并将结果传递给给定的处理程序函数。
 //这是Spark中所有操作的主要入口点。
 def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    ......
    //打印启动job的日志
    logInfo("Starting job: " + callSite.shortForm)
    //调用dagScheduler的runJob()
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    //在使用此RDD的作业完成后调用它
    rdd.doCheckpoint()
  }

}

2、DAGScheduler

DAGScheduler 是面向 stage 调度的高级调度层。它为每个作业计算一个stage的DAG,跟踪这些RDD和stage输出被物化,并找到运行作业的最小时间表。然后,它将stage作为TaskSet(包含完全独立的任务)提交给在集群上运行它们的底层TaskScheduler实现。

Spark stages 是通过在Shuffle边界处打破RDD图来创建的。是一系列具有“窄”依赖关系的RDD操作,如map() 和filter(),这些窄依赖的RDD在每个 stage 都被 pipelined 到一组任务中,但具有shuffle依赖关系的操作需要多个stage(一个stage shuffle write 一组文件,另一个stage shuffle read 这些文件)。最后,每个stage都只会对其他stage进行shuffle依赖,并可能在其中计算多个操作。这些操作的实际 pipelining 发生在各种RDD的RDD.compute()函数中

除了生成阶段的DAG外,DAGScheduler还根据当前缓存状态确定运行每个 Task 的首选位置,并将其传递给低级TaskScheduler。此外,它还可以处理由于shuffle输出文件丢失而导致的故障,在这种情况下,可能需要重新提交之前跑过的stage。非随机文件丢失导致的阶段内故障由TaskScheduler处理,TaskScheduler将在取消整个stage之前重试每个任务几次。

在看源码时,有几个关键概念:

Jobs

    是提交给调度器的顶级工作项,例如,当用户调用类似count() 的操作时,作业将通过submitJob提交。每个Job可能需要执行多个Stage来构建中间数据。

Stages

    是计算Job中中间结果的一组任务,其中每个Task在同一RDD的分区上计算相同的函数。

Stage在shuffle边界处分开,也说明我们必须等待前一阶段完成才能获取输出。有两种类型的阶段:ResultStage,用于执行动作的最后阶段,以及ShuffleMapStage,用于为shuffle写入映射输出文件。如果这些作业重用相同的RDD,则Stage通常在多个Job之间共享。

Tasks

    单独的工作单元,每个任务都发送到一台机器上。

Cache tracking

    DAGScheduler计算出哪些RDD被缓存以避免重新计算它们,并且同样记住哪些shuffle write阶段已经生成了输出文件,以避免重做shuffle write

Preferred locations

    DAGScheduler还根据其底层RDD的首选位置,或缓存或shuffle数据的位置,计算在stage中中的每个Task应该去哪台节点执行

Cleanup

    当Job完成后,所有依赖的数据结构都会被清除,防止OOM
private[spark] class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

  def runJob[T, U](...) {
    //提交要给Job给调度器
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    }
  }

  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // 获取rdd的所有分区数量
    val maxPartitions = rdd.partitions.length

    ......
    
    //等待DAGScheduler作业完成的对象。当任务完成时,它将其结果传递给给定的处理函数。
    val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
    //向线程组提交一个 JobSubmitted 事件
    //在目标RDD上提交了一个产生结果的作业
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      Utils.cloneProperties(properties)))
    waiter
  }

}

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      //处理 JobSubmitted 事件 转交给 dagScheduler 处理
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  }

}

三、Stage划分

Stage是一组并行Task,它们都计算需要作为Spark作业的一部分运行的相同函数,
其中所有Task都具有相同的shuffle依赖关系。调度器运行的每个Task
DAG在Shuffle发生的边界处被划分为Stage,然后DAGScheduler按拓扑顺序运行这些阶段。

每个 Stage 可以是 ShuffleMapStage ,在这种情况下,其Task的结果被输入到其他Stage,
也可以是ResultStage,在该情况下,它的任务通过在RDD上运行函数直接计算Spark动作
(例如count()、save()等)。对于 ShuffleMapStage ,还跟踪每个输出分区所在的节点。

1、以Stage为单位一层一层递归寻找

private[spark] class DAGScheduler(......)
  extends Logging {

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties): Unit = {
    var finalStage: ResultStage = null
    try {
      //创建第一个 Stage 
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      .......
    }
    //作业已提交,内部数据清理。
    barrierJobIdToNumTasksCheckFailures.remove(jobId)

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()

    //提交finalStage 但首先要递归提交该阶段依赖的所有父stage
    submitStage(finalStage)
  }

  //递归方法
  private def submitStage(stage: Stage): Unit = {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //将 finalStage 传入 获得其 依赖的 Stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        //如果该Stage上一层没有Stage了说明到头了,开始提交这个最初的第一个stage
        if (missing.isEmpty) {
          如果上一层没有Stage了那么就提交这层起始的stage
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            //如果Stage前面还有Stage,就继续提交,看是否还有上一层Stage
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

 private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]  //缺失的Stage
    val visited = new HashSet[RDD[_]] //访问过的RDD
    // 我们在这里手动维护一个堆栈,以防止递归访问引起的StackOverflowError
    //因为递归需要一直将方法压栈 ,最后回归时再弹栈,如果递归太深很容易导致栈溢出
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += stage.rdd //等待访问的RDD,最开始就是一个Job的最后那个RDD
    def visit(rdd: RDD[_]): Unit = {
      if (!visited(rdd)) {
        visited += rdd //这个rdd 已经访问过了
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          //从这个循环我们可以看出,rdd的依赖是多个的,也就是一个rdd可能来自于1-多个rdd的数据
          //我们详细看下rdd的依赖关系
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                //如果RDD和父RDD是Shuffle依赖,就创建一个ShuffleMapStage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                // 只有在基于推送的混洗合并完成后,才将mapStage标记为可用于混洗输出。
                // 如果没有,后续ShuffleMapStage将不会从合并的输出中读取,因为MergeStatuses不可用。
                if (!mapStage.isAvailable || !mapStage.shuffleDep.shuffleMergeFinalized) {
                  //添加到在丢失的Stage Set中
                  missing += mapStage
                } else {
                  //如果跳过并首次被访问,则转发nextAttemptId。否则一旦重试,
                  //  1) 阶段信息中的内容变得扭曲,例如任务编号、输入字节、e.t.c
                  //  2) 第一次尝试从0-idx开始,不会标记为重试
                  mapStage.increaseAttemptIdOnFirstSkip()
                }
              case narrowDep: NarrowDependency[_] =>
                //当这个RDD和它的父RDD时窄依赖时,放到栈中,继续往下寻找
                waitingForVisit.prepend(narrowDep.rdd)
            }
          }
        }
      }
    }
    //循环这个栈,看是否还有RDD,如果还有就说明还没有窄依赖的父RDD
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.remove(0))
    }
    //返回该stage上一层的stage列表
    missing.toList
  }

}

2、获取与父RDDs的依赖关系

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

  //我们的依赖关系和分区将通过调用下面的子类方法获得,并在我们被检查点时被覆盖
  @volatile private var dependencies_ : Seq[Dependency[_]] = _

  //获取此RDD的依赖关系列表,同时考虑RDD是否为检查点。
  final def dependencies: Seq[Dependency[_]] = {
    //OneToOneDependency 是表示父RDD和子RDD的分区之间的一对一依赖关系。
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        stateLock.synchronized {
          if (dependencies_ == null) {
            //在此处才真正去获取此RDD的依赖关系
            dependencies_ = getDependencies
          }
        }
      }
      dependencies_
    }
  }

  //由子类实现,以返回此RDD如何依赖父RDD。此方法只会被调用一次,因此在其中实现耗时的计算是安全的。
  //因此不同形态的rdd 获取依赖关系的方法也不同,它的子类有
  //CoGroupedRDD 有OneToOneDependency (当父子RDD分区数一样时,也就是可以认为调整分区数来调整某些场景的宽窄依赖) 也有 ShuffleDependency
  //CoalescedRDD 只有 NarrowDependency
  //ShuffledRDD  只有ShuffleDependency,需要序列化管理器
  //SubtractedRDD 有OneToOneDependency (当父子RDD分区数一样时) 也有 ShuffleDependency
  //CartesianRDD  只有固定的两个NarrowDependency
  //UnionRDD RangeDependency 它 继承了 NarrowDependency 时窄依赖 (表示父RDD和子RDD中分区范围之间的一对一依赖关系)
  //CoGroupedRDD 、SubtractedRDD  宽窄依赖都有可能,可以通过父子分区数调节
  //CoalescedRDD 、CartesianRDD  、UnionRDD 都是窄依赖关系 
  //ShuffledRDD  必然是宽依赖,因为都开始准备序列化管理器拉数据了
  protected def getDependencies: Seq[Dependency[_]] = deps

}

//继承了一个NarrowDependency 有一个未实现的方法 即: 获取父rdd的分区数量
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

我们根据一个例子来捋一下

/**
 * 该程序只作为学习用,没有任何业务知识哈
 */
object StageDivision {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("StageDivision").setMaster("local")
    val sc = new SparkContext(conf)
    val sourceRdd1 = sc.textFile("file/word_count_data.txt")
    val sourceRdd2 = sc.textFile("file/word_count_data2.txt")
    val sourceRdd3 = sc.textFile("file/word_count_data3.txt")
    val stepOneRdd = sourceRdd1.filter(_.length>10).flatMap(_.split(","))
      .map(x=>{(x,1)}).groupByKey().map(data=>{
      (data._1,data._2.filter(_.>(5)).toList.length)
    })
    val stepTwoRdd = sourceRdd2.flatMap(_.split(",")).map(x=>{(x,10)})
    val stepThreeRdd = sourceRdd3.flatMap(_.split(",")).map(x=>{(x,20)}).reduceByKey(_+_)
    val unionStepOneTwoRdd = stepOneRdd.union(stepTwoRdd)
    val countRdd = unionStepOneTwoRdd.reduceByKey(_+_).union(stepThreeRdd)
    val resultMap = countRdd.collect().toMap
    sc.stop()
  }

该程序的DAG如下图:

1、submitStage(Stage3)

2、根据RDD依赖关系找上一层Stage:Stage2和Stage1

3、执行submitStage(Stage2)和submitStage(Stage1)

4、根据RDD依赖关系找Stage2和Stage1的上一层Stage,

    Stage2没有上一层了,执行submitMissingTasks(stage2, jobId.get)

    Stage1还有上一层Stage:执行submitStage(Stage0)

5、根据RDD依赖关系找Stage0的上一层Stage

    Stage0没有上一层了,执行submitMissingTasks(stage0, jobId.get)

注意:其中只有Stage3为ResultStage,其他Stage均为ShuffleMapStage

四、提交Task

当根据RDD的依赖关系划分完Stage,就开始从每个分支的最外层Stage提交Task了,即执行submitMissingTasks(最外层Stage)

private[spark] class DAGScheduler(...){

  //决定任务是否可以将输出提交到HDFS。使用“第一提交者获胜”策略
  //OutputCommitCoordinator在 driver 和 executor 中都被实例化。在executor 上,它配置了对driver OutputCommitCoordinatorEndpoint的引用,因此提交输出的请求将被转发到driver的OutputCommitCoordinator。
  private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

  private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
    //计算出分区ID的索引Seq。
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    //当一个Stage 启动会有DAGScheduler调度,并对Stage进行初始化
    stage match {
        case s: ShuffleMapStage =>
            outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)

        case s: ResultStage =>
            outputCommitCoordinator.stageStart(
              stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)

    //计算每一个分区的所有位置
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    }
    
    //为此 Stage 创建新的尝试 ,也就是Stage有失败重试的机制
    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    //为 Task 广播二进制文件,用于将 Task 分派给 executor。
    //请注意,我们广播了RDD的序列化副本,对于每个Task,我们将对其进行反序列化,
    //这意味着每个任务都会得到一个不同的RDD副本。这在可能修改闭包中引用的对象状态的任务之间提供了更强的隔离。
    //这在Hadoop中是必要的,因为JobConf/Configuration对象不是线程安全的。
    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {
      //对于ShuffleMapTask 序列化并广播  (rdd, shuffleDep)
      //对于ResultTask 序列化并广播 (rdd, func)
      var taskBinaryBytes: Array[Byte] = null
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }
      //如果Task序列化后的大小 > 1 M 就会发出警告
      if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
        logWarning(s"Broadcasting large task binary with size " +
          s"${Utils.bytesToString(taskBinaryBytes.length)}")
      }
      //将序列化后的Task广播到每个Executor
      //有多分区就有多少Task,虽然每个Task会按照分区位置移动到最佳的Executor,但是它们的计算逻辑是一样的,因此可以直接广播
      taskBinary = sc.broadcast(taskBinaryBytes)
    }

    //ShuffleMapStage  ---对应---> ShuffleMapTask
    //ResultStage   ----对应--->  ResultTask
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            new ShuffleMapTask(...)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            new ResultTask(...)
          }
      }
    }

    //提交任务
    //会调用 TaskSchedulerImpl.submitTasks() 是在构建SparkContext时就指定好的
    taskScheduler.submitTasks(new TaskSet(...))

  }

  //获取与特定RDD的分区关联的局部性信息
  //该方法时一个递归方法,知道找到这个Stage的最除的那个rdd,然后获取每个分区的最佳位置
  //根据不同rdd的类型有不同的算法,我们单拿出来分析下
  private[spark]
  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
    getPreferredLocsInternal(rdd, partition, new HashSet)
  }

  private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    //如果分区已被访问,则无需重新访问
    if (!visited.add((rdd, partition))) {
      return Nil
    }

    //如果分区已经缓存了,那么返回缓存的地址
    //如果你的rdd会被多个rdd使用,那么可以缓存起来,其他rdd使用时可以直接从缓存拿数据
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // 获取分区的首选位置,同时考虑RDD是否为检查点。
    // 如果rdd设置了首选位置,那么直接使用它
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.filter(_ != null).map(TaskLocation(_))
    }

    //如果RDD具有窄依赖关系,请选择具有任何放置偏好的第一个窄依赖关系的第一个分区。
    //理想情况下,我们会根据传输大小进行选择,但现在就可以了。
    //递归方法:同一个Stage的分区也都是窄依赖的,因此需要获取到这个Stage的第一个RDD的那个分区中的最佳位置
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }

}

Task判断分发到哪个Executor

从上面我们已经知道Task的个数 = 这个Stage的最后的那个RDD的分区数量,且需要对每个分区进行递归找到这个Stage最初的那个RDD,找到其数据的存放位置,如果这个位置所在节点也正好有启动的存活的Executor,那么这就是Task要分发的目的地了,下面我们详细看下不同RDD类型对应的位置计算逻辑

1、HadoopRDD

  private[spark] def convertSplitLocationInfo(
       infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
    Option(infos).map(_.flatMap { loc =>
      val locationStr = loc.getLocation
      if (locationStr != null && locationStr != "localhost") {
        //优先取在内存中存放的数据节点上的Excutor
        if (loc.isInMemory) {
          logDebug(s"Partition $locationStr is cached by Hadoop.")
          Some(HDFSCacheTaskLocation(locationStr).toString)
        } else {
          Some(HostTaskLocation(locationStr).toString)
        }
      } else {
        None
      }
    })
  }

2、ShuffledRDD

  //返回在给定Shuffle中运行给定map输出分区的首选主机,即该分区输出最多的节点。
  //如果映射输出是预合并的,那么如果合并率高于阈值,则返回合并块所在的节点。
  def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
      : Seq[String] = {
    val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
    if (shuffleStatus != null) {
      //检查map输出是否已预合并,合并比率是否高于阈值。如果是这样,合并块的位置是首选位置
      //判断是否启用基于推送的Shuffle
      //满足以下条件:
      //    提交应用程序以在YARN模式下运行
      //    已启用外部洗牌服务
      //    IO加密已禁用
      //    序列化器(如KryoSerialer)支持重新定位序列化对象
      val preferredLoc = if (pushBasedShuffleEnabled) {
        shuffleStatus.withMergeStatuses { statuses =>
          val status = statuses(partitionId)
          val numMaps = dep.rdd.partitions.length
          if (status != null && status.getNumMissingMapOutputs(numMaps).toDouble / numMaps
            <= (1 - REDUCER_PREF_LOCS_FRACTION)) {
            Seq(status.location.host)
          } else {
            Nil
          }
        }
      } else {
        Nil
      }
      if (preferredLoc.nonEmpty) {
        preferredLoc
      } else {
        //是否计算reduce任务的局部偏好 spark.shuffle.reduceLocality.enabled 默认 true
        //rdd的分区数 < 1000 (map和reduce任务的数量,超过此数量,我们不会根据map输出大小分配首选位置。我们限制了分配首选位置的作业的大小,因为按大小计算顶部位置变得昂贵。)
        //也就是如果shuffle过程中分区数大于1000就没有什么最佳位置的概念了,也就造成了速度的不确定性
        if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
          dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
          val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
            dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
          if (blockManagerIds.nonEmpty) {
            blockManagerIds.get.map(_.host)
          } else {
            Nil
          }
        } else {
          Nil
        }
      }
    } else {
      Nil
    }
  }

五、总结

1、线性解析程序中的代码,遇到Action算子调用SparkContext的runJob(),有几个Action算子就会产生几个Job

2、转交给DAGScheduler提交Job

3、DAGScheduler先为调用Action算子的RDD创建一个ResultStage

4、以ResultStage为其实递归调用submitStage(Stage)获取上一层的Stage,直到没有依赖关系(详细请看第三部分:Stage划分)(只有最后一个Stage叫ResultStage,其他Stage叫ShuffleMapStage )

5、从最前一层的Stage依次处理,计算分区数量以及每份分区对应数据的最佳位置的节点上的Executor,因为最终Task是要发到Executor执行的(每个Stage的第一个RDD类型不同计算最佳位置的方式也不同,详细看第四部分中的:Task判断分发到哪个Executor)

6、将Task逻辑序列化并交由TaskScheduler进行调度


本文转载自: https://blog.csdn.net/lu070828/article/details/141460462
版权归原作者 隔着天花板看星星 所有, 如有侵权,请联系我们删除。

“Spark-Job启动、Stage划分”的评论:

还没有评论