0


Spark-driver和executor启动过程

一、上下文

《Spark-SparkSubmit详细过程》详细分析了从脚本提交任务后driver是如何调用到自己编写的Spark代码的,而我们的Spark代码在运行前必须准备好分布式资源,接下来我们就分析下资源是如何分配的

二、Spark代码示例

我们以一个简单的WordCount程序为例,来分析Spark后端是如何为这个程序分配资源的

  1. object WordCount {
  2. def main(args: Array[String]): Unit = {
  3. //可以通过 SparkConf 为 Spark 绝大多数配置设置参数,且这些参数的优先级要高于系统属性
  4. //注意:一旦 SparkConf 传递给 Spark 后,就无法再对其进行修改,因为Spark不支持运行时修改
  5. val conf = new SparkConf().setAppName("WordCount")
  6. //Spark 的主要入口点 SparkContext 表示到Spark集群的连接,用于在该集群上创建RDD、累加器、广播变量
  7. //每个JVM只能有一个 SparkContext 处于活动状态
  8. val sc = new SparkContext(conf)
  9. //从HDFS、本地文件系统(在所有节点上都可用)或任何Hadoop支持的文件系统URI读取文本文件,并将其作为字符串的RDD返回。
  10. val sourceRdd = sc.textFile("file/word_count_data.txt")
  11. //原始一行数据:js,c,vba,json,xml
  12. //flatMap将每行数据按照逗号分割,得到每个单词 形成 (单词1) (单词2) (单词1) ... 的格式
  13. //map将每个单词的次数都赋值成1形成 (单词1,1) (单词2,1) (单词1,次数) ... 的格式
  14. //reduceByKey将相同单词中的次数进行累加
  15. val resultRdd = sourceRdd.flatMap(_.split(",")).map(x=>{(x,1)}).reduceByKey(_+_)
  16. //打印结果
  17. resultRdd.foreach(println)
  18. //停止SparkContext
  19. sc.stop()
  20. }

SparkConf 是对该程序的一个属性设置,且支持链式设置,会覆盖默认的系统属性。一旦程序开始运行就不可以对其再修改了。

SparkContext是程序与Spark集群连接的入口,可以用于在该集群上创建RDD、广播变量、累加器等等,那么RDD运行所需的资源肯定是在创建SparkContext时就已经具备好的。下面我们看看SparkContext中是如何结合spark-submit参数来协调资源的

三、SparkContext

  1. class SparkContext(config: SparkConf) extends Logging {
  2. //创建一个从系统属性加载设置的SparkContext(例如,使用./bin/spark-submit启动时)
  3. def this() = this(new SparkConf())
  4. //......省略.......
  5. //私有变量。这些变量保留了上下文的内部状态,外部世界无法访问。它们是可变的
  6. //因为我们想提前将它们初始化为一个中性值,这样在构造函数仍在运行时调用“stop()”是安全的。
  7. //只列举重要的一些属性
  8. //支持应用的个性化配置
  9. private var _conf: SparkConf = _
  10. //一个正在运行的Spark实例的所有运行时环境对象(无论是master还是worker),包括序列化器、RpcEnv、块管理器、映射输出跟踪器等。目前,Spark代码通过全局变量查找SparkEnv,因此所有线程都可以访问相同的SparkEnv。它可以通过SparkEnv.get访问(例如在创建SparkContext之后)
  11. //我们后面单独分析下它
  12. private var _env: SparkEnv = _
  13. //用于调度系统的后端接口,允许在TaskSchedulerImpl下插入不同的系统。我们假设一个类似Mesos的模型,当机器可用时,应用程序会获得资源供应,并可以在机器上启动任务。
  14. private var _schedulerBackend: SchedulerBackend = _
  15. //低级任务调度程序接口,目前由[[org.apache.sspark.scheduler.TaskSchedulerImpl]]专门实现。
  16. //此接口允许插入不同的任务调度器。每个TaskScheduler都为单个SparkContext安排任务。
  17. //这些调度器从DAGScheduler获取每个阶段提交给它们的任务集,并负责将任务发送到集群、运行它们、在出现故障时重试,以及缓解延迟。他们将事件返回给DAGScheduler。
  18. private var _taskScheduler: TaskScheduler = _
  19. //一个常住在 Driver端的 HeartbeatReceiver 通信端点 ,用来接收所有 executors 的心跳
  20. private var _heartbeatReceiver: RpcEndpointRef = _
  21. //实现面向 stage 调度的高级调度层。它为每个作业计算一个 stage 的DAG,
  22. //跟踪哪些RDD和stage输出被物化,并找到运行作业的最小时间表。
  23. //然后,它将stage作为TaskSet提交给在集群上运行它们的底层TaskScheduler实现。
  24. //TaskSet包含完全独立的任务,这些任务可以根据集群上已有的数据
  25. @volatile private var _dagScheduler: DAGScheduler = _
  26. try {
  27. //......省略.......
  28. //创建SparkEnv
  29. _env = createSparkEnv(_conf, isLocal, listenerBus)
  30. //设置 常住driver端的 _heartbeatReceiver 的 Endpoint
  31. _heartbeatReceiver = env.rpcEnv.setupEndpoint(
  32. HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
  33. // 创建并启动调度程序
  34. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  35. _schedulerBackend = sched
  36. _taskScheduler = ts
  37. _dagScheduler = new DAGScheduler(this)
  38. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
  39. //启动任务调度器
  40. _taskScheduler.start()
  41. //......省略.......
  42. //基于给定的主URL创建任务调度器。返回调度器后端和任务调度器的 2-tuple
  43. private def createTaskScheduler(...){
  44. //......省略.......
  45. master match {
  46. case "local" =>
  47. case LOCAL_N_REGEX(threads) =>
  48. case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
  49. case SPARK_REGEX(sparkUrl) =>
  50. //创建一个 任务调度实现类 TaskSchedulerImpl
  51. val scheduler = new TaskSchedulerImpl(sc)
  52. val masterUrls = sparkUrl.split(",").map("spark://" + _)
  53. //创建一个 StandaloneSchedulerBackend 是 SchedulerBackend 的子类
  54. val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
  55. //初始化 任务调度器
  56. scheduler.initialize(backend)
  57. (backend, scheduler)
  58. case LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) =>
  59. case masterUrl =>
  60. }
  61. }

四、TaskSchedulerImpl

  1. private[spark] class TaskSchedulerImpl(
  2. val sc: SparkContext,
  3. val maxTaskFailures: Int,
  4. isLocal: Boolean = false,
  5. clock: Clock = new SystemClock)
  6. extends TaskScheduler with Logging {
  7. //初始化
  8. def initialize(backend: SchedulerBackend): Unit = {
  9. this.backend = backend
  10. //构建调度器 默认是 FIFO 调度 可以通过 spark.scheduler.mode 进行配置
  11. schedulableBuilder = {
  12. schedulingMode match {
  13. case SchedulingMode.FIFO =>
  14. new FIFOSchedulableBuilder(rootPool)
  15. case SchedulingMode.FAIR =>
  16. new FairSchedulableBuilder(rootPool, sc)
  17. case _ =>
  18. throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
  19. s"$schedulingMode")
  20. }
  21. }
  22. schedulableBuilder.buildPools()
  23. }
  24. //启动调度程序
  25. override def start(): Unit = {
  26. //SparkContext中创建的是 StandaloneSchedulerBackend 因此会调用 它的 start()
  27. //StandaloneSchedulerBackend 又会调用其父类CoarseGrainedSchedulerBackend 的 start()
  28. backend.start()
  29. if (!isLocal && conf.get(SPECULATION_ENABLED)) {
  30. logInfo("Starting speculative execution thread")
  31. speculationScheduler.scheduleWithFixedDelay(
  32. () => Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() },
  33. SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  34. }
  35. }
  36. }

五、CoarseGrainedSchedulerBackend

  1. //等待粗粒度执行器连接的调度程序后端。此后端在Spark作业期间保留每个执行器,而不是在任务完成时放弃执行器,
  2. //并要求调度器为每个新任务启动一个新的执行器。执行器可以通过多种方式启动,例如粗粒度Mesos模式的Mesos任务或Spark独立部署模式(Spark.deploy.*)的独立进程。
  3. class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
  4. extends ExecutorAllocationClient with SchedulerBackend with Logging {
  5. //这里会创建并注册一个 DriverEndpoint ,且 DriverEndpoint的 onStart() 方法会执行
  6. val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
  7. protected def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint()
  8. override def start(): Unit = {
  9. }
  10. class DriverEndpoint extends IsolatedRpcEndpoint with Logging {
  11. override def onStart(): Unit = {
  12. // 定期恢复下 以允许延迟调度工作
  13. val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
  14. reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
  15. Option(self).foreach(_.send(ReviveOffers))
  16. }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
  17. }
  18. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  19. case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
  20. attributes, resources, resourceProfileId) =>
  21. if (executorDataMap.contains(executorId)) {
  22. context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
  23. } else if (scheduler.excludedNodes.contains(hostname) ||
  24. isExecutorExcluded(executorId, hostname)) {
  25. // 如果集群管理器在被排除的节点上为我们提供了一个Executor(因为在我们通知它我们的排除之前,它已经开始分配这些资源,或者如果它忽略了我们的排除),那么我们会立即拒绝该Executor
  26. logInfo(s"Rejecting $executorId as it has been excluded.")
  27. context.sendFailure(
  28. new IllegalStateException(s"Executor is excluded due to failures: $executorId"))
  29. } else {
  30. //如果Executor 的rpc-env没有监听传入连接,则“hostPort”将为null,应使用客户端连接联系Executor 。
  31. val executorAddress = if (executorRef.address != null) {
  32. executorRef.address
  33. } else {
  34. context.senderAddress
  35. }
  36. logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId, " +
  37. s" ResourceProfileId $resourceProfileId")
  38. addressToExecutorId(executorAddress) = executorId
  39. totalCoreCount.addAndGet(cores)
  40. totalRegisteredExecutors.addAndGet(1)
  41. val resourcesInfo = resources.map { case (rName, info) =>
  42. // 这必须同步,因为在请求Executor时会读取此块中突变的变量
  43. val numParts = scheduler.sc.resourceProfileManager
  44. .resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
  45. (info.name, new ExecutorResourceInfo(info.name, info.addresses, numParts))
  46. }
  47. val data = new ExecutorData(executorRef, executorAddress, hostname,
  48. 0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
  49. resourcesInfo, resourceProfileId, registrationTs = System.currentTimeMillis())
  50. // This must be synchronized because variables mutated
  51. // in this block are read when requesting executors
  52. CoarseGrainedSchedulerBackend.this.synchronized {
  53. executorDataMap.put(executorId, data)
  54. if (currentExecutorIdCounter < executorId.toInt) {
  55. currentExecutorIdCounter = executorId.toInt
  56. }
  57. }
  58. listenerBus.post(
  59. SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
  60. // Note: some tests expect the reply to come after we put the executor in the map
  61. context.reply(true)
  62. }
  63. }
  64. override def receive: PartialFunction[Any, Unit] = {
  65. //启动 executor
  66. case LaunchedExecutor(executorId) =>
  67. executorDataMap.get(executorId).foreach { data =>
  68. data.freeCores = data.totalCores
  69. }
  70. //仅仅为一个 executor 提供虚假资源 offer
  71. makeOffers(executorId)
  72. }
  73. }
  74. }

六、StandaloneSchedulerBackend

  1. //Spark独立集群管理器的[[SchedulerBackend]]实现。
  2. private[spark] class StandaloneSchedulerBackend(
  3. scheduler: TaskSchedulerImpl,
  4. sc: SparkContext,
  5. masters: Array[String])
  6. extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
  7. with StandaloneAppClientListener
  8. with Logging {
  9. override def start(): Unit = {
  10. super.start()
  11. //调度器后端应仅在客户端模式下尝试连接到启动器。
  12. //在集群模式下,将应用程序提交给Master的代码需要连接到启动器。
  13. if (sc.deployMode == "client") {
  14. launcherBackend.connect()
  15. }
  16. //executors 中的 endpoint 需要持有 driver的地址 用于和 driver通信
  17. val driverUrl = RpcEndpointAddress(
  18. sc.conf.get(config.DRIVER_HOST_ADDRESS),
  19. sc.conf.get(config.DRIVER_PORT),
  20. CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
  21. val args = Seq(
  22. "--driver-url", driverUrl,
  23. "--executor-id", "{{EXECUTOR_ID}}",
  24. "--hostname", "{{HOSTNAME}}",
  25. "--cores", "{{CORES}}",
  26. "--app-id", "{{APP_ID}}",
  27. "--worker-url", "{{WORKER_URL}}")
  28. val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS)
  29. .map(Utils.splitCommandString).getOrElse(Seq.empty)
  30. val classPathEntries = sc.conf.get(config.EXECUTOR_CLASS_PATH)
  31. .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
  32. val libraryPathEntries = sc.conf.get(config.EXECUTOR_LIBRARY_PATH)
  33. .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
  34. //使用一些必要的配置启动 executors ,以便在调度程序中注册
  35. val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
  36. val javaOpts = sparkJavaOpts ++ extraJavaOpts
  37. // executors 端的主类 CoarseGrainedExecutorBackend
  38. val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
  39. args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  40. val webUrl = sc.ui.map(_.webUrl).getOrElse("")
  41. val coresPerExecutor = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
  42. // 如果我们使用动态分配,现在将初始执行器限制设置为0。ExecutorDallocationManager稍后会将实际的初始限额发送给Master。
  43. val initialExecutorLimit =
  44. if (Utils.isDynamicAllocationEnabled(conf)) {
  45. Some(0)
  46. } else {
  47. None
  48. }
  49. val executorResourceReqs = ResourceUtils.parseResourceRequirements(conf,
  50. config.SPARK_EXECUTOR_PREFIX)
  51. //这里有一个 ApplicationDescription 之前有要给 DriverDescription
  52. //可以想到 它是用来启动一个 Application 用的
  53. val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
  54. webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit,
  55. resourceReqsPerExecutor = executorResourceReqs)
  56. //创建一个 StandaloneAppClient
  57. //允许应用程序与Spark独立集群管理器对话。获取集群事件的主URL、应用程序描述和侦听器,并在发生各种事件时回调侦听器。
  58. client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  59. //启动它
  60. client.start()
  61. launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  62. waitForRegistration()
  63. launcherBackend.setState(SparkAppHandle.State.RUNNING)
  64. }
  65. }

七、StandaloneAppClient

  1. private[spark] class StandaloneAppClient(...){
  2. private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
  3. private val REGISTRATION_TIMEOUT_SECONDS = 20
  4. private val REGISTRATION_RETRIES = 3
  5. private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
  6. with Logging {
  7. override def onStart(): Unit = {
  8. //向Master注册一个app
  9. registerWithMaster(1)
  10. }
  11. //异步向所有 Master 注册。它将每隔 20 秒调用“registerWithMaster”,直到超过 3 次数。一旦我们成功连接到主机,所有调度工作和期货都将被取消。
  12. //thRetry表示这是第n次尝试向master注册。
  13. private def registerWithMaster(nthRetry: Int): Unit = {
  14. registerMasterFutures.set(tryRegisterAllMasters())
  15. registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
  16. override def run(): Unit = {
  17. if (registered.get) {
  18. registerMasterFutures.get.foreach(_.cancel(true))
  19. registerMasterThreadPool.shutdownNow()
  20. } else if (nthRetry >= REGISTRATION_RETRIES) {
  21. markDead("All masters are unresponsive! Giving up.")
  22. } else {
  23. registerMasterFutures.get.foreach(_.cancel(true))
  24. registerWithMaster(nthRetry + 1)
  25. }
  26. }
  27. }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
  28. }
  29. }
  30. private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  31. for (masterAddress <- masterRpcAddresses) yield {
  32. registerMasterThreadPool.submit(new Runnable {
  33. override def run(): Unit = try {
  34. if (registered.get) {
  35. return
  36. }
  37. logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
  38. val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
  39. //向Master 发送 RegisterApplication 消息
  40. masterRef.send(RegisterApplication(appDescription, self))
  41. } catch {
  42. case ie: InterruptedException => // Cancelled
  43. case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
  44. }
  45. })
  46. }
  47. }
  48. def start(): Unit = {
  49. // 只需启动一个rpcEndpoint;它将呼叫回听众
  50. //设置一个 AppClient 端点 ClientEndpoint 的 onstart 的方法会调起
  51. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  52. }
  53. }

八、Master

  1. private[deploy] class Master(...){
  2. //处理其他端点的消息
  3. override def receive: PartialFunction[Any, Unit] = {
  4. case RegisterApplication(description, driver) =>
  5. // TODO Prevent repeated registrations from some driver
  6. if (state == RecoveryState.STANDBY) {
  7. // ignore, don't send response
  8. } else {
  9. logInfo("Registering app " + description.name)
  10. //创建一个 app
  11. val app = createApplication(description, driver)
  12. //driver端添加app的持有 比如 在waitingApps 中添加 这个 app 为后续调度做准备
  13. registerApplication(app)
  14. logInfo("Registered app " + description.name + " with ID " + app.id)
  15. //持久化这个app
  16. persistenceEngine.addApplication(app)
  17. //给driver返回一个已经注册的响应
  18. driver.send(RegisteredApplication(app.id, self))
  19. //调度,开始在 Worker 上 分配 executors
  20. schedule()
  21. }
  22. }
  23. //创建一个app
  24. private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
  25. ApplicationInfo = {
  26. val now = System.currentTimeMillis()
  27. val date = new Date(now)
  28. val appId = newApplicationId(date)
  29. //且会自己执行 init()
  30. new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
  31. }
  32. private def schedule(): Unit = {
  33. if (state != RecoveryState.ALIVE) {
  34. return
  35. }
  36. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
  37. val numWorkersAlive = shuffledAliveWorkers.size
  38. var curPos = 0
  39. //启动等待的所有 driver 目前 driver 已经启动了,跳过这一步
  40. for (driver <- waitingDrivers.toList) {
  41. ...
  42. }
  43. //在Workers上启动Executors
  44. startExecutorsOnWorkers()
  45. }
  46. //调度并在 workers 上 启动 Executors
  47. private def startExecutorsOnWorkers(): Unit = {
  48. //现在这是一个非常简单的FIFO调度器。依次对等待的 app 进行调度
  49. for (app <- waitingApps) {
  50. val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
  51. // 如果剩余的内核小于coresPerExecutor,则不会分配剩余的内核
  52. // 简单理解就是 app剩下的核 要满足最少是一个 executor 所需的核数 ,也就是以 executor 所需的核数为单位 进行分配 executor 最少的 核数为 1
  53. if (app.coresLeft >= coresPerExecutor) {
  54. //过滤掉哪些 没有足够资源 的 worker 并按照剩余的核数倒序排序 来依次启动 executors
  55. val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
  56. .filter(canLaunchExecutor(_, app.desc))
  57. .sortBy(_.coresFree).reverse
  58. val appMayHang = waitingApps.length == 1 &&
  59. waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
  60. if (appMayHang) {
  61. logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
  62. }
  63. //真正去计算资源分配 返回要给 数组 里面有分配好的核数
  64. val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
  65. //现在我们已经决定了在每个worker上分配多少个内核,让我们分配它们
  66. for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
  67. allocateWorkerResourceToExecutors(
  68. app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
  69. }
  70. }
  71. }
  72. }
  73. //调度 executors 去 workers 上启动。返回一个数组,其中包含分配给每个worker的核心数。
  74. //有两种启动executor的模式。
  75. // 第一种方法试图将应用程序的executor分散到尽可能多的worker进程上,默认设置,更适合数据局部性
  76. // 第二种方法则相反(即在尽可能少的worker进程中启动它们)。
  77. //分配给每个executor的核心数量是可配置的。当显式设置此选项时,如果worker进程有足够的内核和内存,
  78. //则可以在同一worker进程上启动来自同一应用程序的多个executor。
  79. //否则,默认情况下,每个executor都会抓取worker上可用的所有内核,
  80. // 在这种情况下,在一次单独的计划迭代中,每个应用程序只能在每个worker上启动一个executor。
  81. //请注意,当未设置“spark.executor.cores”时,
  82. // 我们仍然可以在同一个worker上从同一个应用程序启动多个executor。
  83. // 假设appA和appB都有一个executor在worker1上运行,并且appA.coresLef>0,则appB完成并释放worker1上的所有内核,
  84. // 因此对于下一个计划迭代,appA启动一个新的executor,抓取worker1上所有空闲的内核,因此我们从运行在worker1的appA中获得多个executor。
  85. private def scheduleExecutorsOnWorkers(
  86. app: ApplicationInfo,
  87. usableWorkers: Array[WorkerInfo],
  88. spreadOutApps: Boolean): Array[Int] = {
  89. //每个Executor所需的核数
  90. val coresPerExecutor = app.desc.coresPerExecutor
  91. //每个Executor所需的最小核数
  92. val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
  93. //每个worker一个Executor
  94. val oneExecutorPerWorker = coresPerExecutor.isEmpty
  95. //每个Executor所需的内存
  96. val memoryPerExecutor = app.desc.memoryPerExecutorMB
  97. //每个Executor所需的资源
  98. val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
  99. //可用的worker数量
  100. val numUsable = usableWorkers.length
  101. val assignedCores = new Array[Int](numUsable) // 去每个worker上需要申请的核数
  102. val assignedExecutors = new Array[Int](numUsable) // 去每个worker上需要申请的executor数量
  103. var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
  104. /**返回指定的worker是否可以为此app启动Executor */
  105. def canLaunchExecutorForApp(pos: Int): Boolean = {
  106. val keepScheduling = coresToAssign >= minCoresPerExecutor
  107. val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
  108. val assignedExecutorNum = assignedExecutors(pos)
  109. //如果我们允许每个worker有多个executor,那么我们总是可以启动新的executor。
  110. //否则,如果这个worker上已经有一个executor,只需给它更多的内核。
  111. val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
  112. if (launchingNewExecutor) {
  113. val assignedMemory = assignedExecutorNum * memoryPerExecutor
  114. val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
  115. val assignedResources = resourceReqsPerExecutor.map {
  116. req => req.resourceName -> req.amount * assignedExecutorNum
  117. }.toMap
  118. val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
  119. case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
  120. }
  121. val enoughResources = ResourceUtils.resourcesMeetRequirements(
  122. resourcesFree, resourceReqsPerExecutor)
  123. val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
  124. keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
  125. } else {
  126. //我们正在为现有的执行器添加内核,因此无需检查内存和执行器限制
  127. keepScheduling && enoughCores
  128. }
  129. }
  130. //过滤可以启动executor 的worker
  131. var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
  132. //对可以启动executor 的worker循环进行 executors 的分配
  133. while (freeWorkers.nonEmpty) {
  134. freeWorkers.foreach { pos =>
  135. var keepScheduling = true
  136. while (keepScheduling && canLaunchExecutorForApp(pos)) {
  137. coresToAssign -= minCoresPerExecutor
  138. assignedCores(pos) += minCoresPerExecutor
  139. if (oneExecutorPerWorker) {
  140. assignedExecutors(pos) = 1
  141. } else {
  142. assignedExecutors(pos) += 1
  143. }
  144. //分散app意味着将其executor分散到尽可能多的worker中。如果我们不分散,
  145. //那么我们应该继续在这个worker上调度executor,直到我们使用了它的所有资源。
  146. //否则,请转到下一个worker。 默认 keepScheduling = true
  147. if (spreadOutApps) {
  148. keepScheduling = false
  149. }
  150. }
  151. }
  152. freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
  153. }
  154. assignedCores
  155. }
  156. //将Worke的资源分配给一个或多个Executor。
  157. private def allocateWorkerResourceToExecutors(
  158. app: ApplicationInfo,
  159. assignedCores: Int,
  160. coresPerExecutor: Option[Int],
  161. worker: WorkerInfo): Unit = {
  162. //如果指定了每个Executor的Cores,我们将分配给此worker的核心平均分配给Executor 没有余数。
  163. //否则,我们将启动一个Executor,获取此worker上所有分配的Cores。
  164. val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  165. val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  166. //一个一个executor去worker上启动
  167. for (i <- 1 to numExecutors) {
  168. val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
  169. //这里面会创建一个 ExecutorDesc 作为 Executor 启动的描述 像之前的app、driver都有这个描述
  170. val exec = app.addExecutor(worker, coresToAssign, allocated)
  171. //启动Executor
  172. launchExecutor(worker, exec)
  173. app.state = ApplicationState.RUNNING
  174. }
  175. }
  176. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  177. //在那个 worker 上启动 executor
  178. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  179. worker.addExecutor(exec)
  180. //Master endpoint 向 worker endpoint 发送 LaunchExecutor 消息
  181. worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
  182. exec.application.desc, exec.cores, exec.memory, exec.resources))
  183. //Master endpoint 向 driverendpoint 发送 ExecutorAdded消息
  184. exec.application.driver.send(
  185. ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  186. }
  187. }

九、Worker

  1. private[deploy] class Worker(......
  2. extends ThreadSafeRpcEndpoint with Logging {
  3. override def receive: PartialFunction[Any, Unit] = synchronized {
  4. case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
  5. if (masterUrl != activeMasterUrl) {
  6. logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
  7. } else if (decommissioned) {
  8. logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
  9. } else {
  10. try {
  11. logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
  12. // 创建 executor 的 本地工作目录
  13. val executorDir = new File(workDir, appId + "/" + execId)
  14. if (!executorDir.mkdirs()) {
  15. throw new IOException("Failed to create directory " + executorDir)
  16. }
  17. // 为执行者创建本地目录。这些通过SPARK_EXECUTOR_DIRS环境变量传递给executor,并在应用程序完成时由Worker删除。
  18. val appLocalDirs = appDirectories.getOrElse(appId, {
  19. val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
  20. val dirs = localRootDirs.flatMap { dir =>
  21. try {
  22. val appDir = Utils.createDirectory(dir, namePrefix = "executor")
  23. Utils.chmod700(appDir)
  24. Some(appDir.getAbsolutePath())
  25. } catch {
  26. ...
  27. }
  28. }.toSeq
  29. dirs
  30. })
  31. appDirectories(appId) = appLocalDirs
  32. //管理一个executor流程的执行。这目前仅在standalone模式下使用。
  33. val manager = new ExecutorRunner(
  34. appId,
  35. execId,
  36. appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
  37. cores_,
  38. memory_,
  39. self,
  40. workerId,
  41. webUi.scheme,
  42. host,
  43. webUi.boundPort,
  44. publicAddress,
  45. sparkHome,
  46. executorDir,
  47. workerUri,
  48. conf,
  49. appLocalDirs,
  50. ExecutorState.LAUNCHING,
  51. resources_)
  52. executors(appId + "/" + execId) = manager
  53. manager.start()
  54. coresUsed += cores_
  55. memoryUsed += memory_
  56. addResourcesUsed(resources_)
  57. } catch {
  58. ......
  59. }
  60. }
  61. }
  62. }

十、ExecutorRunner

  1. private[deploy] class ExecutorRunner(...){
  2. private[worker] def start(): Unit = {
  3. //准备一个线程启动 executor
  4. workerThread = new Thread("ExecutorRunner for " + fullId) {
  5. override def run(): Unit = { fetchAndRunExecutor() }
  6. }
  7. //线程启动 fetchAndRunExecutor() 执行 :下载并运行应用程序描述中描述的executor
  8. //第6步中封装的启动类为 org.apache.spark.executor.CoarseGrainedExecutorBacken
  9. //下面我们看看 executor 中做了什么
  10. workerThread.start()
  11. ......
  12. }
  13. }

十一、CoarseGrainedExecutorBacken(executor进程主类)

  1. private[spark] object CoarseGrainedExecutorBackend extends Logging {
  2. def main(args: Array[String]): Unit = {
  3. val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
  4. CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
  5. new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
  6. arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
  7. env, arguments.resourcesFileOpt, resourceProfile)
  8. }
  9. //解析参数并运行
  10. run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
  11. System.exit(0)
  12. }
  13. def run(
  14. arguments: Arguments,
  15. backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
  16. CoarseGrainedExecutorBackend): Unit = {
  17. //......
  18. // 创建 RpcEnv 获取 driver端的 Spark properties.
  19. val executorConf = new SparkConf
  20. val fetcher = RpcEnv.create(
  21. "driverPropsFetcher",
  22. arguments.bindAddress,
  23. arguments.hostname,
  24. -1,
  25. executorConf,
  26. new SecurityManager(executorConf),
  27. numUsableCores = 0,
  28. clientMode = true)
  29. //尝试3次获取 driver Endpoint 引用
  30. var driver: RpcEndpointRef = null
  31. val nTries = 3
  32. for (i <- 0 until nTries if driver == null) {
  33. try {
  34. driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
  35. } catch {
  36. case e: Throwable => if (i == nTries - 1) {
  37. throw e
  38. }
  39. }
  40. }
  41. // 向driver发送 RetrieveSparkAppConfig 消息
  42. val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
  43. val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
  44. fetcher.shutdown()
  45. //根据从driver端获取的属性创建SparkEnv
  46. val driverConf = new SparkConf()
  47. for ((key, value) <- props) {
  48. // this is required for SSL in standalone mode
  49. if (SparkConf.isExecutorStartupConf(key)) {
  50. driverConf.setIfMissing(key, value)
  51. } else {
  52. driverConf.set(key, value)
  53. }
  54. }
  55. cfg.hadoopDelegationCreds.foreach { tokens =>
  56. SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
  57. }
  58. driverConf.set(EXECUTOR_ID, arguments.executorId)
  59. //为executor创建SparkEnv
  60. val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
  61. arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
  62. // 在BlockStoreClient中设置应用程序尝试ID(如果可用)
  63. val appAttemptId = env.conf.get(APP_ATTEMPT_ID)
  64. appAttemptId.foreach(attemptId =>
  65. env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
  66. )
  67. //创建CoarseGrainedExecutorBackend
  68. val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
  69. //并将CoarseGrainedExecutorBackend设置为 Executor Endpoint 其上的 onStart() 方法执行
  70. env.rpcEnv.setupEndpoint("Executor", backend)
  71. arguments.workerUrl.foreach { url =>
  72. env.rpcEnv.setupEndpoint("WorkerWatcher",
  73. new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = backend.stopping))
  74. }
  75. env.rpcEnv.awaitTermination()
  76. }
  77. }
  78. }

十二、CoarseGrainedExecutorBackend(executor中的Rpc端点)

  1. private[spark] class CoarseGrainedExecutorBackend(...)
  2. extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
  3. //当driver成功接受注册请求时,内部用于启动executor的消息。
  4. case object RegisteredExecutor
  5. override def onStart(): Unit = {
  6. //......
  7. logInfo("Connecting to driver: " + driverUrl)
  8. try {
  9. //提供了一个实用程序,用于将Spark JVM内的SparkConf(例如,执行器、驱动程序或独立的shuffle服务)转换为TransportConf,其中包含有关我们环境的详细信息,如分配给此JVM的内核数量。
  10. val shuffleClientTransportConf = SparkTransportConf.fromSparkConf(env.conf, "shuffle")
  11. //判断Netty是否可以直接使用 off-heap 内存 且 操作系统能分配的最大 off-heap < 200M 抛出异常
  12. //涉及配置
  13. // spark.network.sharedByteBufAllocators.enabled 默认true 是否在不同Netty通道之间共享池化ByteBuf分配器的标志。如果启用,则只创建两个池化ByteBuf分配器:一个允许缓存(用于传输服务器),另一个不允许缓存(对于传输客户端)。禁用后,将为每个传输服务器和客户端创建一个新的分配器。
  14. // spark.io.preferDirectBufs 默认true 共享ByteBuf分配器将首选堆外字节缓冲区
  15. // spark.network.io.preferDirectBufs 默认true 在Netty中分配堆外字节缓冲区
  16. if (NettyUtils.preferDirectBufs(shuffleClientTransportConf) &&
  17. PlatformDependent.maxDirectMemory() < env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)) {
  18. throw new SparkException(s"Netty direct memory should at least be bigger than " +
  19. s"'${MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key}', but got " +
  20. s"${PlatformDependent.maxDirectMemory()} bytes < " +
  21. s"${env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)}")
  22. }
  23. _resources = parseOrFindResources(resourcesFileOpt)
  24. } catch {
  25. case NonFatal(e) =>
  26. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
  27. }
  28. rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
  29. //这是一个非常快速的操作,因此我们可以使用“ThreadUtils.sameThread”
  30. driver = Some(ref)
  31. //向driver 发送 RegisterExecutor 消息 ,我们看第五步中driver的处理
  32. ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
  33. extractAttributes, _resources, resourceProfile.id))
  34. }(ThreadUtils.sameThread).onComplete {
  35. case Success(_) =>
  36. self.send(RegisteredExecutor)
  37. case Failure(e) =>
  38. exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
  39. }(ThreadUtils.sameThread)
  40. }
  41. override def receive: PartialFunction[Any, Unit] = {
  42. //driver收到注册 executor 并返回 true 执行该段逻辑
  43. case RegisteredExecutor =>
  44. logInfo("Successfully registered with driver")
  45. try {
  46. //创建一个 Executor
  47. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
  48. resources = _resources)
  49. //再向driver发送 LaunchedExecutor 消息,可以看 第五步 driver端的处理
  50. driver.get.send(LaunchedExecutor(executorId))
  51. } catch {
  52. case NonFatal(e) =>
  53. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
  54. }
  55. }
  56. }

十三、Executor

  1. //Spark执行器,由线程池支持运行任务。
  2. //这可以与Mesos、YARN、kubernetes和独立调度器一起使用。内部RPC接口用于与driver通信,Mesos细粒度模式除外。
  3. private[spark] class Executor(...){
  4. // 维护正在运行的任务列表
  5. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
  6. //当Executor无法向driver程序发送心跳超过“HEARTBEAT_MAX_FAILURES”次数时,它应该自行终止。默认值为60。例如,如果最大失败次数为60次,心跳间隔为10秒,则它将尝试发送长达600秒(10分钟)的心跳。
  7. private val HEARTBEAT_MAX_FAILURES = conf.get(EXECUTOR_HEARTBEAT_MAX_FAILURES)
  8. //发送心跳的间隔(毫秒) spark.executor.heartbeatInterval 默认 10s
  9. private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
  10. //heartbeat 任务
  11. private val heartbeater = new Heartbeater(
  12. () => Executor.this.reportHeartBeat(),
  13. "executor-heartbeater",
  14. HEARTBEAT_INTERVAL_MS)
  15. //启动工作线程池
  16. private val threadPool = {
  17. val threadFactory = new ThreadFactoryBuilder()
  18. .setDaemon(true)
  19. .setNameFormat("Executor task launch worker-%d")
  20. .setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
  21. .build()
  22. Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
  23. }
  24. //启动任务
  25. def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
  26. val tr = new TaskRunner(context, taskDescription, plugins)
  27. runningTasks.put(taskDescription.taskId, tr)
  28. threadPool.execute(tr)
  29. if (decommissioned) {
  30. log.error(s"Launching a task while in decommissioned state.")
  31. }
  32. }
  33. }

十四、总结

1、代码中根据SparkConf构建SparkContext

2、创建任务调度器并启用

3、StandaloneSchedulerBackend 和 CoarseGrainedSchedulerBackend 的 start() 启动

4、DriverEndpoint 创建 等待其他Endpoint发送消息 (比如Master 和 Executur Endpoint)

5、构建Executor的启动参数,主类为CoarseGrainedExecutorBackend

6、创建StandaloneAppClient并启动

7、Driver端创建ClientEndpoint并向Master注册

8、创建app描述信息向Master发送RegisterApplication 消息

9、Master 根据app描述信息开始调度资源,决策在哪些Worker上启动多少个Executor

10、Master端以Executor为单位依次向划分好的Worker发送LaunchExecutor消息,向Driver发送ExecutorAdded消息

11、Worker 创建一个线程启动封装好的Executor进程(主类为CoarseGrainedExecutorBackend)

12、Executor中会创建Executor Endpoint,并向Driver进行注册,如果注册成功会向自己发送RegisteredExecutor消息

13、Executor处理给自己发的RegisteredExecutor消息,其中会创建一个Executor对象并向Driver发送LaunchedExecutor消息

14、Executor对象由线程池支持运行任务,且并默认每隔10s发送依次心跳给Driver

为了方便理解和记忆,我们也画下流程图,下载放大就会清晰哟


本文转载自: https://blog.csdn.net/lu070828/article/details/141355638
版权归原作者 隔着天花板看星星 所有, 如有侵权,请联系我们删除。

“Spark-driver和executor启动过程”的评论:

还没有评论