Spark任务提交流程
从任务提交入口类到driver和executor启动
背景知识
spark版本: 3.0.4
部署模式:集群模式
部署集群:yarn
hadoop版本:2.7.3
spark源码地址:https://github.com/apache/spark
本人fork下来添加一些注释的源码地址:https://gitee.com/hangzhou-tech/spark.git
spark源码分支:branch-3.0
自己创建的分支:branch-3.0.4
程序提交入口
org.apache.spark.deploy.SparkSubmit#main方法创建Sparksubmit对象,调用dosubmit方法,传入参数。
override def main(args: Array[String]): Unit = {
log.info("这里是程序提交入口")
val submit = new SparkSubmit() {
self =>
。。。
}
submit.doSubmit(args)
}
在doSubmit方法中解析传入的参数,然后进行模式匹配,根据传入的参数action决定走哪个模式,接下来研究走submit的逻辑:
val appArgs: SparkSubmitArguments = parseArguments(args)
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
}
在submit方法中定义了doRunMain方法,然后调用该方法,再调用runMain方法,在该方法中的文档中解释了要做的两件事:
- 准备启动环境,设置了类路径,系统属性,以及运行用户主类所需的程序参数;
- 使用这些启动环境去调用用户主类到main方法。
在方法中第一行调用了org.apache.spark.deploy.SparkSubmit#prepareSubmitEnvironment,该方法经过一系列逻辑处理,返回一个tuple,包含四个元素:
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
- 子进程(用户主类程序)所需参数
- 子进程所需类路径,包括本地和远端路径
- 系统属性,包含sparkConf,hadoop四个配置文件
- 子进程的主类(根据部署模式是client模式还是cluster模式会有所不同,client模式就是用户主类名,cluster模式就是org.apache.spark.deploy.yarn.YarnClusterApplication)
继续走runMain方法,根据childMainClass创建Class对象,再用反射调用构造器创建SparkApplication类的对象实例:
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
// org.apache.spark.deploy.yarn.YarnClusterApplication
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
app调用start方法,因为研究spark在yarn上的运行模式,所以查看SparkApplication接口实现org.apache.spark.deploy.yarn.YarnClusterApplication的start方法。
Client端逻辑
在start方法中创建org.apache.spark.deploy.yarn.Client对象并调用run方法
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove(JARS)
conf.remove(FILES)
new Client(new ClientArguments(args), conf, null).run()
}
在Client类中进行了一些初始化:
- 创建yarn客户端
- 创建ApplicationMaster
- 获取am内存配置,包括amMemory,amMemoryOverhead,amCores
- 获取executor内存和堆外内存
- 其他的一些配置
然后调用client的run方法,再调用org.apache.spark.deploy.yarn.Client#submitApplication提交应用程序。提交过程分为以下几个环节:
- 初始化yarn客户端并启动
- 从yarn resource manager获取一个application
- 验证集群资源是否充足: - executor内存不超过单container最大可用内存- am内存不超过单container最大可用内存
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
val executorMem =executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory
val amMem = amMemory + amMemoryOverhead
- 创建containerContext,获取appContext
val containerContext: ContainerLaunchContext = createContainerLaunchContext(newAppResponse)
val appContext: ApplicationSubmissionContext = createApplicationSubmissionContext(newApp, containerContext)
在创建containerContext中,会设置java参数,启动am的命令。
//创建amContainer
val amContainer: ContainerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
//启动进程的主类
val amClass =
if (isClusterMode) {
//从这里接着走
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
//传入am的参数
val amArgs: Seq[String] =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
Seq("--dist-cache-conf",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))
//启动am的命令
val commands: Iterable[String] = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
在创建appContext中,会对刚刚从yarn获取的app设置一些参数,包括:
- app名字
- 队列
- 刚刚获取的containerContext上下文
- app的类型,为SPARK
- 失败重试次数spark.yarn.maxAppAttempts和重试间隔时间spark.yarn.am.attemptFailuresValidityInterval
- am的amMemory + amMemoryOverhead
- 优先级
然后yarn客户端会提交程序
yarnClient.submitApplication(appContext)
ApplicationMaster(am)执行流程
yarn客户端提交程序后会在刚刚获取的container里运行刚刚传进来的java命令,启动am进程。这里启动的主类就是上文comman命令里的org.apache.spark.deploy.yarn.ApplicationMaster,在main方法中,创建ApplicationMaster对象:
master = new ApplicationMaster(amArgs, sparkConf, yarnConf) //创建master,点进去,创建yarn client
调用am的run方法:
- 获取attemptId
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
- 设置上下文
new CallerContext(
"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
- 启动driver
if (isClusterMode) {
log.info("运行driver进程!")
runDriver()
} else {
runExecutorLauncher()
}
在runDrvier方法中,会先获取用户线程
userClassThread = startUserApplication()
在org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication方法中,会:
- 获取参数
- 获取用户main方法
val mainMethod: Method = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
- 创建线程,在run方法中通过反射调用main方法
val userThread: Thread = new Thread {
override def run(): Unit = {
mainMethod.invoke(null, userArgs.toArray)
}
}
- 设置线程类加载器和线程名字
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
- 启动线程
- 返回线程
这个线程就是driver线程。
获取完driver线程后,获取sparkContext,获取driver的host和port,然后向rm注册am,在org.apache.spark.deploy.yarn.ApplicationMaster#registerAM方法中,之前创建的yarnClient会调用register方法,
client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
在register方法中,会创建am客户端,初始化并且启动,然后注册am。
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
仍然在runDriver方法中,会进行资源的申请和分配
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
在org.apache.spark.deploy.yarn.ApplicationMaster#createAllocator方法中:
- 创建分配器并分配资源
allocator = client.createAllocator(
yarnConf,
_sparkConf,
appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)
allocator.allocateResources()
在org.apache.spark.deploy.yarn.YarnAllocator#allocateResources中,
- 和yarn交互,持续获取container的最新状态,可用信息,完成状态,健康信息等
val allocateResponse: AllocateResponse = amClient.allocate(progressIndicator)
这个allocate方法作用在文档里的说明为
* Request additional containers and receive new container allocations.
* Requests made via <code>addContainerRequest</code> are sent to the
* <code>ResourceManager</code>. New containers assigned to the master are
* retrieved. Status of completed containers and node health updates are also
* retrieved. This also doubles up as a heartbeat to the ResourceManager and
* must be made periodically. The call may not always return any new
* allocations of containers. App should not make concurrent allocate
* requests. May cause request loss.
意思就是请求额外的container并且收到新的container分配。(rm)给master的新container会被获取到,container完成状态和节点健康更新也会被获取到。
- 然后就是获取到所有可用container,
val allocatedContainers: util.List[Container] = allocateResponse.getAllocatedContainers()
- 接着就是在这些container上启动executor
handleAllocatedContainers(allocatedContainers.asScala)
/**
* Handle containers granted by the RM by launching executors on them.
*/
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
。。。
在org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers中,释放不需要的container,在剩余的container上启动executor。
runAllocatedContainers(containersToUse)
在org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers中,遍历所有可用container,在每个ececutor上启动executor线程
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
).run()
在org.apache.spark.deploy.yarn.ExecutorRunnable#run中,会初始化nmClient并且启动,然后启动container
def run(): Unit = {
log.info("Starting Executor Container/启动executor容器")
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
}
在org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer中,会设置container启动context,获取container启动命令,然后nmClient启动container。
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
nmClient.startContainer(container.get, ctx)
executor执行逻辑
从上面解读流程中得知executor启动主类为org.apache.spark.executor.YarnCoarseGrainedExecutorBackend,下面研究executor会做些什么。在YarnCoarseGrainedExecutorBackend的main方法中,会调用run方法。在run方法中,会建立和driver的联系
var driver: RpcEndpointRef = null
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
创建executor的环境
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
然后创建CoarseGrainedExecutorBackend对象,设置env.rpcEnv的终端,名字就叫Executor
val backend: CoarseGrainedExecutorBackend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
env.rpcEnv.setupEndpoint("Executor", backend)
然后会和driver进行通信,一番操作后,在org.apache.spark.executor.CoarseGrainedExecutorBackend#receive中,会进行模式匹配,如果是注册executor,会创建一个executor对象
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
然后向driver发送注册executor的消息。
下图展示了整个流程各个组件承担的工作以及之间的相互关系:
![(img-DpDAiBQQ-1648646392041)(https://note.youdao.com/yws/res/10752/3ACAB105C7154995B9D9244313C63D0B)]](https://img-blog.csdnimg.cn/f372add1358c4ee2bc845cca7c7b9ad2.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5qGD6Iqx5rqQ5YW76bih,size_20,color_FFFFFF,t_70,g_se,x_16)
到目前位置,driver和container都已经启动完毕。
总结
综上所述,在am中,总共执行了以下几个步骤:
- 启动driver
- 申请资源(container)
- 在container上启动executor
版权归原作者 桃花源养鸡 所有, 如有侵权,请联系我们删除。