0


Spark任务提交源码

Spark任务提交流程

从任务提交入口类到driver和executor启动

背景知识

spark版本: 3.0.4

部署模式:集群模式

部署集群:yarn

hadoop版本:2.7.3

spark源码地址:https://github.com/apache/spark

本人fork下来添加一些注释的源码地址:https://gitee.com/hangzhou-tech/spark.git

spark源码分支:branch-3.0

自己创建的分支:branch-3.0.4

程序提交入口

org.apache.spark.deploy.SparkSubmit#main方法创建Sparksubmit对象,调用dosubmit方法,传入参数。

 override def main(args: Array[String]): Unit = {
    log.info("这里是程序提交入口")
    val submit = new SparkSubmit() {
      self =>
        。。。
    }

    submit.doSubmit(args)
  }

在doSubmit方法中解析传入的参数,然后进行模式匹配,根据传入的参数action决定走哪个模式,接下来研究走submit的逻辑:

    val appArgs: SparkSubmitArguments = parseArguments(args)
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
    }

在submit方法中定义了doRunMain方法,然后调用该方法,再调用runMain方法,在该方法中的文档中解释了要做的两件事:

  1. 准备启动环境,设置了类路径,系统属性,以及运行用户主类所需的程序参数;
  2. 使用这些启动环境去调用用户主类到main方法。

在方法中第一行调用了org.apache.spark.deploy.SparkSubmit#prepareSubmitEnvironment,该方法经过一系列逻辑处理,返回一个tuple,包含四个元素:

val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
  1. 子进程(用户主类程序)所需参数
  2. 子进程所需类路径,包括本地和远端路径
  3. 系统属性,包含sparkConf,hadoop四个配置文件
  4. 子进程的主类(根据部署模式是client模式还是cluster模式会有所不同,client模式就是用户主类名,cluster模式就是org.apache.spark.deploy.yarn.YarnClusterApplication)

继续走runMain方法,根据childMainClass创建Class对象,再用反射调用构造器创建SparkApplication类的对象实例:

    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      // org.apache.spark.deploy.yarn.YarnClusterApplication
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      new JavaMainApplication(mainClass)
    }

app调用start方法,因为研究spark在yarn上的运行模式,所以查看SparkApplication接口实现org.apache.spark.deploy.yarn.YarnClusterApplication的start方法。

Client端逻辑

在start方法中创建org.apache.spark.deploy.yarn.Client对象并调用run方法

override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove(JARS)
    conf.remove(FILES)

    new Client(new ClientArguments(args), conf, null).run()
  }

在Client类中进行了一些初始化:

  1. 创建yarn客户端
  2. 创建ApplicationMaster
  3. 获取am内存配置,包括amMemory,amMemoryOverhead,amCores
  4. 获取executor内存和堆外内存
  5. 其他的一些配置

然后调用client的run方法,再调用org.apache.spark.deploy.yarn.Client#submitApplication提交应用程序。提交过程分为以下几个环节:

  1. 初始化yarn客户端并启动
  2. 从yarn resource manager获取一个application
  3. 验证集群资源是否充足: - executor内存不超过单container最大可用内存- am内存不超过单container最大可用内存
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
val executorMem =executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory
val amMem = amMemory + amMemoryOverhead
  1. 创建containerContext,获取appContext
val containerContext: ContainerLaunchContext = createContainerLaunchContext(newAppResponse)
val appContext: ApplicationSubmissionContext = createApplicationSubmissionContext(newApp, containerContext)

在创建containerContext中,会设置java参数,启动am的命令。

//创建amContainer
val amContainer: ContainerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
//启动进程的主类
val amClass =
  if (isClusterMode) {
    //从这里接着走
    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
  } else {
    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
  }

//传入am的参数
val amArgs: Seq[String] =
  Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
  Seq("--properties-file",
    buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
  Seq("--dist-cache-conf",
    buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))
    
//启动am的命令
val commands: Iterable[String] = prefixEnv ++
  Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
  javaOpts ++ amArgs ++
  Seq(
    "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
    "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
    
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)

在创建appContext中,会对刚刚从yarn获取的app设置一些参数,包括:

  1. app名字
  2. 队列
  3. 刚刚获取的containerContext上下文
  4. app的类型,为SPARK
  5. 失败重试次数spark.yarn.maxAppAttempts和重试间隔时间spark.yarn.am.attemptFailuresValidityInterval
  6. am的amMemory + amMemoryOverhead
  7. 优先级

然后yarn客户端会提交程序

 yarnClient.submitApplication(appContext)

ApplicationMaster(am)执行流程

yarn客户端提交程序后会在刚刚获取的container里运行刚刚传进来的java命令,启动am进程。这里启动的主类就是上文comman命令里的org.apache.spark.deploy.yarn.ApplicationMaster,在main方法中,创建ApplicationMaster对象:

master = new ApplicationMaster(amArgs, sparkConf, yarnConf) //创建master,点进去,创建yarn client

调用am的run方法:

  1. 获取attemptId
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
  1. 设置上下文
new CallerContext(
  "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
  Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
  1. 启动driver
if (isClusterMode) {
  log.info("运行driver进程!")
  runDriver()
} else {
  runExecutorLauncher()
}

在runDrvier方法中,会先获取用户线程

userClassThread = startUserApplication()

在org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication方法中,会:

  1. 获取参数
  2. 获取用户main方法
val mainMethod: Method = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
  1. 创建线程,在run方法中通过反射调用main方法
val userThread: Thread = new Thread {
      override def run(): Unit = {
            mainMethod.invoke(null, userArgs.toArray)
      }
}
  1. 设置线程类加载器和线程名字
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
  1. 启动线程
  2. 返回线程

这个线程就是driver线程。
获取完driver线程后,获取sparkContext,获取driver的host和port,然后向rm注册am,在org.apache.spark.deploy.yarn.ApplicationMaster#registerAM方法中,之前创建的yarnClient会调用register方法,

client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

在register方法中,会创建am客户端,初始化并且启动,然后注册am。

amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)

仍然在runDriver方法中,会进行资源的申请和分配

createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

在org.apache.spark.deploy.yarn.ApplicationMaster#createAllocator方法中:

  • 创建分配器并分配资源
allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      appAttemptId,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)
allocator.allocateResources()

在org.apache.spark.deploy.yarn.YarnAllocator#allocateResources中,

  • 和yarn交互,持续获取container的最新状态,可用信息,完成状态,健康信息等
val allocateResponse: AllocateResponse = amClient.allocate(progressIndicator)

这个allocate方法作用在文档里的说明为

   * Request additional containers and receive new container allocations.
   * Requests made via <code>addContainerRequest</code> are sent to the
   * <code>ResourceManager</code>. New containers assigned to the master are
   * retrieved. Status of completed containers and node health updates are also
   * retrieved. This also doubles up as a heartbeat to the ResourceManager and
   * must be made periodically. The call may not always return any new
   * allocations of containers. App should not make concurrent allocate
   * requests. May cause request loss.

意思就是请求额外的container并且收到新的container分配。(rm)给master的新container会被获取到,container完成状态和节点健康更新也会被获取到。

  • 然后就是获取到所有可用container,
val allocatedContainers: util.List[Container] = allocateResponse.getAllocatedContainers()
  • 接着就是在这些container上启动executor
handleAllocatedContainers(allocatedContainers.asScala)
/**
 * Handle containers granted by the RM by launching executors on them.
 */

def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
      。。。

在org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers中,释放不需要的container,在剩余的container上启动executor。

runAllocatedContainers(containersToUse)

在org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers中,遍历所有可用container,在每个ececutor上启动executor线程

new ExecutorRunnable(
           Some(container),
           conf,
           sparkConf,
           driverUrl,
           executorId,
           executorHostname,
           executorMemory,
           executorCores,
           appAttemptId.getApplicationId.toString,
           securityMgr,
           localResources,
           ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
         ).run()

在org.apache.spark.deploy.yarn.ExecutorRunnable#run中,会初始化nmClient并且启动,然后启动container

  def run(): Unit = {
    log.info("Starting Executor Container/启动executor容器")
    nmClient = NMClient.createNMClient()
    nmClient.init(conf)
    nmClient.start()
    startContainer()
  }

在org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer中,会设置container启动context,获取container启动命令,然后nmClient启动container。

val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++
      Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
        "--driver-url", masterAddress,
        "--executor-id", executorId,
        "--hostname", hostname,
        "--cores", executorCores.toString,
        "--app-id", appId,
        "--resourceProfileId", resourceProfileId.toString) ++
      userClassPath ++
      Seq(
        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

nmClient.startContainer(container.get, ctx)

executor执行逻辑

从上面解读流程中得知executor启动主类为org.apache.spark.executor.YarnCoarseGrainedExecutorBackend,下面研究executor会做些什么。在YarnCoarseGrainedExecutorBackend的main方法中,会调用run方法。在run方法中,会建立和driver的联系

var driver: RpcEndpointRef = null
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)

创建executor的环境

val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
  arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

然后创建CoarseGrainedExecutorBackend对象,设置env.rpcEnv的终端,名字就叫Executor

val backend: CoarseGrainedExecutorBackend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
env.rpcEnv.setupEndpoint("Executor", backend)

然后会和driver进行通信,一番操作后,在org.apache.spark.executor.CoarseGrainedExecutorBackend#receive中,会进行模式匹配,如果是注册executor,会创建一个executor对象

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)
driver.get.send(LaunchedExecutor(executorId))

然后向driver发送注册executor的消息。

下图展示了整个流程各个组件承担的工作以及之间的相互关系:
![(img-DpDAiBQQ-1648646392041)(https://note.youdao.com/yws/res/10752/3ACAB105C7154995B9D9244313C63D0B)]](https://img-blog.csdnimg.cn/f372add1358c4ee2bc845cca7c7b9ad2.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5qGD6Iqx5rqQ5YW76bih,size_20,color_FFFFFF,t_70,g_se,x_16)

到目前位置,driver和container都已经启动完毕。

总结

综上所述,在am中,总共执行了以下几个步骤:

  • 启动driver
  • 申请资源(container)
  • 在container上启动executor

本文转载自: https://blog.csdn.net/ralph_wren/article/details/123857348
版权归原作者 桃花源养鸡 所有, 如有侵权,请联系我们删除。

“Spark任务提交源码”的评论:

还没有评论