0


Spark任务提交源码

Spark任务提交流程

从任务提交入口类到driver和executor启动

背景知识

spark版本: 3.0.4

部署模式:集群模式

部署集群:yarn

hadoop版本:2.7.3

spark源码地址:https://github.com/apache/spark

本人fork下来添加一些注释的源码地址:https://gitee.com/hangzhou-tech/spark.git

spark源码分支:branch-3.0

自己创建的分支:branch-3.0.4

程序提交入口

org.apache.spark.deploy.SparkSubmit#main方法创建Sparksubmit对象,调用dosubmit方法,传入参数。

  1. override def main(args: Array[String]): Unit = {
  2. log.info("这里是程序提交入口")
  3. val submit = new SparkSubmit() {
  4. self =>
  5. 。。。
  6. }
  7. submit.doSubmit(args)
  8. }

在doSubmit方法中解析传入的参数,然后进行模式匹配,根据传入的参数action决定走哪个模式,接下来研究走submit的逻辑:

  1. val appArgs: SparkSubmitArguments = parseArguments(args)
  2. appArgs.action match {
  3. case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
  4. }

在submit方法中定义了doRunMain方法,然后调用该方法,再调用runMain方法,在该方法中的文档中解释了要做的两件事:

  1. 准备启动环境,设置了类路径,系统属性,以及运行用户主类所需的程序参数;
  2. 使用这些启动环境去调用用户主类到main方法。

在方法中第一行调用了org.apache.spark.deploy.SparkSubmit#prepareSubmitEnvironment,该方法经过一系列逻辑处理,返回一个tuple,包含四个元素:

  1. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
  1. 子进程(用户主类程序)所需参数
  2. 子进程所需类路径,包括本地和远端路径
  3. 系统属性,包含sparkConf,hadoop四个配置文件
  4. 子进程的主类(根据部署模式是client模式还是cluster模式会有所不同,client模式就是用户主类名,cluster模式就是org.apache.spark.deploy.yarn.YarnClusterApplication)

继续走runMain方法,根据childMainClass创建Class对象,再用反射调用构造器创建SparkApplication类的对象实例:

  1. val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
  2. // org.apache.spark.deploy.yarn.YarnClusterApplication
  3. mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
  4. } else {
  5. new JavaMainApplication(mainClass)
  6. }

app调用start方法,因为研究spark在yarn上的运行模式,所以查看SparkApplication接口实现org.apache.spark.deploy.yarn.YarnClusterApplication的start方法。

Client端逻辑

在start方法中创建org.apache.spark.deploy.yarn.Client对象并调用run方法

  1. override def start(args: Array[String], conf: SparkConf): Unit = {
  2. // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
  3. // so remove them from sparkConf here for yarn mode.
  4. conf.remove(JARS)
  5. conf.remove(FILES)
  6. new Client(new ClientArguments(args), conf, null).run()
  7. }

在Client类中进行了一些初始化:

  1. 创建yarn客户端
  2. 创建ApplicationMaster
  3. 获取am内存配置,包括amMemory,amMemoryOverhead,amCores
  4. 获取executor内存和堆外内存
  5. 其他的一些配置

然后调用client的run方法,再调用org.apache.spark.deploy.yarn.Client#submitApplication提交应用程序。提交过程分为以下几个环节:

  1. 初始化yarn客户端并启动
  2. 从yarn resource manager获取一个application
  3. 验证集群资源是否充足: - executor内存不超过单container最大可用内存- am内存不超过单container最大可用内存
  1. val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
  2. val executorMem =executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory
  3. val amMem = amMemory + amMemoryOverhead
  1. 创建containerContext,获取appContext
  1. val containerContext: ContainerLaunchContext = createContainerLaunchContext(newAppResponse)
  2. val appContext: ApplicationSubmissionContext = createApplicationSubmissionContext(newApp, containerContext)

在创建containerContext中,会设置java参数,启动am的命令。

  1. //创建amContainer
  2. val amContainer: ContainerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
  3. //启动进程的主类
  4. val amClass =
  5. if (isClusterMode) {
  6. //从这里接着走
  7. Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
  8. } else {
  9. Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
  10. }
  11. //传入am的参数
  12. val amArgs: Seq[String] =
  13. Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
  14. Seq("--properties-file",
  15. buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
  16. Seq("--dist-cache-conf",
  17. buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))
  18. //启动am的命令
  19. val commands: Iterable[String] = prefixEnv ++
  20. Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
  21. javaOpts ++ amArgs ++
  22. Seq(
  23. "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
  24. "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
  25. // TODO: it would be nicer to just make sure there are no null commands here
  26. val printableCommands = commands.map(s => if (s == null) "null" else s).toList
  27. amContainer.setCommands(printableCommands.asJava)

在创建appContext中,会对刚刚从yarn获取的app设置一些参数,包括:

  1. app名字
  2. 队列
  3. 刚刚获取的containerContext上下文
  4. app的类型,为SPARK
  5. 失败重试次数spark.yarn.maxAppAttempts和重试间隔时间spark.yarn.am.attemptFailuresValidityInterval
  6. am的amMemory + amMemoryOverhead
  7. 优先级

然后yarn客户端会提交程序

  1. yarnClient.submitApplication(appContext)

ApplicationMaster(am)执行流程

yarn客户端提交程序后会在刚刚获取的container里运行刚刚传进来的java命令,启动am进程。这里启动的主类就是上文comman命令里的org.apache.spark.deploy.yarn.ApplicationMaster,在main方法中,创建ApplicationMaster对象:

  1. master = new ApplicationMaster(amArgs, sparkConf, yarnConf) //创建master,点进去,创建yarn client

调用am的run方法:

  1. 获取attemptId
  1. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
  1. 设置上下文
  1. new CallerContext(
  2. "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
  3. Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
  1. 启动driver
  1. if (isClusterMode) {
  2. log.info("运行driver进程!")
  3. runDriver()
  4. } else {
  5. runExecutorLauncher()
  6. }

在runDrvier方法中,会先获取用户线程

  1. userClassThread = startUserApplication()

在org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication方法中,会:

  1. 获取参数
  2. 获取用户main方法
  1. val mainMethod: Method = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
  1. 创建线程,在run方法中通过反射调用main方法
  1. val userThread: Thread = new Thread {
  2. override def run(): Unit = {
  3. mainMethod.invoke(null, userArgs.toArray)
  4. }
  5. }
  1. 设置线程类加载器和线程名字
  1. userThread.setContextClassLoader(userClassLoader)
  2. userThread.setName("Driver")
  1. 启动线程
  2. 返回线程

这个线程就是driver线程。
获取完driver线程后,获取sparkContext,获取driver的host和port,然后向rm注册am,在org.apache.spark.deploy.yarn.ApplicationMaster#registerAM方法中,之前创建的yarnClient会调用register方法,

  1. client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

在register方法中,会创建am客户端,初始化并且启动,然后注册am。

  1. amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)

仍然在runDriver方法中,会进行资源的申请和分配

  1. createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

在org.apache.spark.deploy.yarn.ApplicationMaster#createAllocator方法中:

  • 创建分配器并分配资源
  1. allocator = client.createAllocator(
  2. yarnConf,
  3. _sparkConf,
  4. appAttemptId,
  5. driverUrl,
  6. driverRef,
  7. securityMgr,
  8. localResources)
  9. allocator.allocateResources()

在org.apache.spark.deploy.yarn.YarnAllocator#allocateResources中,

  • 和yarn交互,持续获取container的最新状态,可用信息,完成状态,健康信息等
  1. val allocateResponse: AllocateResponse = amClient.allocate(progressIndicator)

这个allocate方法作用在文档里的说明为

  1. * Request additional containers and receive new container allocations.
  2. * Requests made via <code>addContainerRequest</code> are sent to the
  3. * <code>ResourceManager</code>. New containers assigned to the master are
  4. * retrieved. Status of completed containers and node health updates are also
  5. * retrieved. This also doubles up as a heartbeat to the ResourceManager and
  6. * must be made periodically. The call may not always return any new
  7. * allocations of containers. App should not make concurrent allocate
  8. * requests. May cause request loss.

意思就是请求额外的container并且收到新的container分配。(rm)给master的新container会被获取到,container完成状态和节点健康更新也会被获取到。

  • 然后就是获取到所有可用container,
  1. val allocatedContainers: util.List[Container] = allocateResponse.getAllocatedContainers()
  • 接着就是在这些container上启动executor
  1. handleAllocatedContainers(allocatedContainers.asScala)
  2. /**
  3. * Handle containers granted by the RM by launching executors on them.
  4. */
  5. def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
  6. 。。。

在org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers中,释放不需要的container,在剩余的container上启动executor。

  1. runAllocatedContainers(containersToUse)

在org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers中,遍历所有可用container,在每个ececutor上启动executor线程

  1. new ExecutorRunnable(
  2. Some(container),
  3. conf,
  4. sparkConf,
  5. driverUrl,
  6. executorId,
  7. executorHostname,
  8. executorMemory,
  9. executorCores,
  10. appAttemptId.getApplicationId.toString,
  11. securityMgr,
  12. localResources,
  13. ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
  14. ).run()

在org.apache.spark.deploy.yarn.ExecutorRunnable#run中,会初始化nmClient并且启动,然后启动container

  1. def run(): Unit = {
  2. log.info("Starting Executor Container/启动executor容器")
  3. nmClient = NMClient.createNMClient()
  4. nmClient.init(conf)
  5. nmClient.start()
  6. startContainer()
  7. }

在org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer中,会设置container启动context,获取container启动命令,然后nmClient启动container。

  1. val commands = prefixEnv ++
  2. Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
  3. javaOpts ++
  4. Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
  5. "--driver-url", masterAddress,
  6. "--executor-id", executorId,
  7. "--hostname", hostname,
  8. "--cores", executorCores.toString,
  9. "--app-id", appId,
  10. "--resourceProfileId", resourceProfileId.toString) ++
  11. userClassPath ++
  12. Seq(
  13. s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
  14. s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
  15. nmClient.startContainer(container.get, ctx)

executor执行逻辑

从上面解读流程中得知executor启动主类为org.apache.spark.executor.YarnCoarseGrainedExecutorBackend,下面研究executor会做些什么。在YarnCoarseGrainedExecutorBackend的main方法中,会调用run方法。在run方法中,会建立和driver的联系

  1. var driver: RpcEndpointRef = null
  2. driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)

创建executor的环境

  1. val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
  2. arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

然后创建CoarseGrainedExecutorBackend对象,设置env.rpcEnv的终端,名字就叫Executor

  1. val backend: CoarseGrainedExecutorBackend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
  2. env.rpcEnv.setupEndpoint("Executor", backend)

然后会和driver进行通信,一番操作后,在org.apache.spark.executor.CoarseGrainedExecutorBackend#receive中,会进行模式匹配,如果是注册executor,会创建一个executor对象

  1. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)
  2. driver.get.send(LaunchedExecutor(executorId))

然后向driver发送注册executor的消息。

下图展示了整个流程各个组件承担的工作以及之间的相互关系:
![(img-DpDAiBQQ-1648646392041)(https://note.youdao.com/yws/res/10752/3ACAB105C7154995B9D9244313C63D0B)]](https://img-blog.csdnimg.cn/f372add1358c4ee2bc845cca7c7b9ad2.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5qGD6Iqx5rqQ5YW76bih,size_20,color_FFFFFF,t_70,g_se,x_16)

到目前位置,driver和container都已经启动完毕。

总结

综上所述,在am中,总共执行了以下几个步骤:

  • 启动driver
  • 申请资源(container)
  • 在container上启动executor

本文转载自: https://blog.csdn.net/ralph_wren/article/details/123857348
版权归原作者 桃花源养鸡 所有, 如有侵权,请联系我们删除。

“Spark任务提交源码”的评论:

还没有评论