0


Spark源码阅读02-Spark核心原理之作业执行原理

listener.jobFailed(e)

return

}

case e: Exception =>

logWarning("Creating new stage failed due to exception - job: " + jobId, e)

listener.jobFailed(e)

return

}

// Job submitted, clear internal data.

barrierJobIdToNumTasksCheckFailures.remove(jobId)

//根据最后一个调度阶段finalStage生成作业

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

clearCacheLocs()

logInfo(“Got job %s (%s) with %d output partitions”.format(

job.jobId, callSite.shortForm, partitions.length))

logInfo(“Final stage: " + finalStage + " (” + finalStage.name + “)”)

logInfo("Parents of final stage: " + finalStage.parents)

logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()

jobIdToActiveJob(jobId) = job

activeJobs += job

finalStage.setActiveJob(job)

val stageIds = jobIdToStageIds(jobId).toArray

val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

listenerBus.post(

SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

//提交执行

submitStage(finalStage)

}

获取或创建给定RDD的父阶段列表。将使用提供的firstJobId创建新阶段

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {

getShuffleDependencies(rdd).map { shuffleDep =>

getOrCreateShuffleMapStage(shuffleDep, firstJobId)

}.toList

}

当finalRDD存在父调度阶段,需要从发生Shuffle操作的RDD往前遍历,找出所有的ShuffleMapStage,这是调度阶段的关键部分。其由getShuffleDependencies方法实现。

private[scheduler] def getShuffleDependencies(

rdd: RDD[]): HashSet[ShuffleDependency[, _, _]] = {

val parents = new HashSet[ShuffleDependency[_, _, _]]

val visited = new HashSet[RDD[_]]

//存放等待访问的堆栈,存放的是非ShuffleDependency的RDD

val waitingForVisit = new ArrayStack[RDD[_]]

waitingForVisit.push(rdd)

while (waitingForVisit.nonEmpty) {

val toVisit = waitingForVisit.pop()

if (!visited(toVisit)) {

visited += toVisit

toVisit.dependencies.foreach {

//所依赖的RDD操作是ShuffleDependency的RDD,作为划分shuffleMap调度阶段界限

case shuffleDep: ShuffleDependency[_, _, _] =>

parents += shuffleDep

case dependency =>

waitingForVisit.push(dependency.rdd)

}

}

}

parents

}

当所有的调度阶段划分结束后,这些调度阶段建立起依赖关系。该依赖关系是通过调度阶段其中属性parents:List[Stage]来定义的,通过这些属性可以获取当前阶段所有祖先阶段,可以根据这些信息按照顺序提交调度阶段进行运行。

提交调度阶段


在DAGScheduler的handleJobSubmitted方法中,生成finalStage的同时建立起所有调度阶段的依赖关系,然后通过finalStage生成一个作业实例,在该作业实例中按照顺序提交调度阶段进行执行,在执行过程中通过监听总线获取作业、阶段执行情况。代码实现如下:

private def submitStage(stage: Stage) {

val jobId = activeJobForStage(stage)

if (jobId.isDefined) {

logDebug(“submitStage(” + stage + “)”)

if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

//在该方法中,获取该调度阶段的父调度阶段,获取的方法是通过RDD的依赖关系向前遍历看

//是否存在Shuffle操作,这里并没有使用调度阶段的依赖关系获取

val missing = getMissingParentStages(stage).sortBy(_.id)

logDebug("missing: " + missing)

if (missing.isEmpty) {

//如果不存在父调度阶段,直接把该调度阶段提交执行

logInfo(“Submitting " + stage + " (” + stage.rdd + “), which has no missing parents”)

submitMissingTasks(stage, jobId.get)

} else {

//如果存在父调度阶段,把该阶段加入到等待运行调度阶段列表中,

//同时递归调用submitStage方法,直至找到开始的调度阶段,即该调度阶段没有父调度阶段

for (parent <- missing) {

submitStage(parent)

}

waitingStages += stage

}

}

} else {

abortStage(stage, "No active job for stage " + stage.id, None)

}

}

当入口的调度阶段运行完成后相继提交后续调度阶段,在调度前先判断该调度阶段所依赖的父调度阶段的结果是否可用。通过ShuffleMapTask实现上述判断。代码实现如下:

private[scheduler] def handleTaskCompletion(event: CompletionEvent) {

case smt: ShuffleMapTask =>

val shuffleStage = stage.asInstanceOf[ShuffleMapStage]

shuffleStage.pendingPartitions -= task.partitionId

val status = event.result.asInstanceOf[MapStatus]

val execId = status.location.executorId

logDebug("ShuffleMapTask finished on " + execId)

if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {

logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")

} else {

// The epoch of the task is acceptable (i.e., the task was launched after the most

// recent failure we’re aware of for the executor), so mark the task’s output as

// available.

mapOutputTracker.registerMapOutput(

shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)

}

//如果当前调度阶段在运行调度阶段列表中,并没有任务处于挂起状态(均已完成),则标记

//该调度阶段已经完成并注册输出结果的位置

if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {

markStageAsFinished(shuffleStage)

logInfo(“looking for newly runnable stages”)

logInfo("running: " + runningStages)

logInfo("waiting: " + waitingStages)

logInfo("failed: " + failedStages)

// This call to increment the epoch may not be strictly necessary, but it is retained

// for now in order to minimize the changes in behavior from an earlier version of the

// code. This existing behavior of always incrementing the epoch following any

// successful shuffle map stage completion may have benefits by causing unneeded

// cached map outputs to be cleaned up earlier on executors. In the future we can

// consider removing this call, but this will require some extra investigation.

// See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.

mapOutputTracker.incrementEpoch()

clearCacheLocs()

//如果某些任务执行失败了,则重新提交运行

if (!shuffleStage.isAvailable) {

// Some tasks had failed; let’s resubmit this shuffleStage.

// TODO: Lower-level scheduler should also deal with this

logInfo(“Resubmitting " + shuffleStage + " (” + shuffleStage.name +

") because some of its tasks had failed: " +

shuffleStage.findMissingPartitions().mkString(", "))

submitStage(shuffleStage)

} else {

markMapStageJobsAsFinished(shuffleStage)

submitWaitingChildStages(shuffleStage)

}

}

}

}

提交任务


当调度阶段提交运行后,在DAGScheduler的submitMissingTasks方法中,会根据调度阶段Partition个数拆分对应个数任务,这些任务组成一个任务集提交到TaskScheduler进行处理。对于ResultStage生成ResultTask,对于ShuffleMapStage生成ShuffleMapTask。对于每一个任务集包含了对应调度阶段的所有任务,这些任务处理逻辑完全一样,不同的是对应处理的数据。

private def submitMissingTasks(stage: Stage, jobId: Int) {

val tasks: Seq[Task[_]] = try {

val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()

stage match {

//对于ShuffleMapStage生成ShuffleMapTask任务

case stage: ShuffleMapStage =>

stage.pendingPartitions.clear()

partitionsToCompute.map { id =>

val locs = taskIdToLocations(id)

val part = partitions(id)

stage.pendingPartitions += id

new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,

taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),

Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())

}

//对于ResultStage生成ResultTask任务

case stage: ResultStage =>

partitionsToCompute.map { id =>

val p: Int = stage.partitions(id)

val part = partitions§

val locs = taskIdToLocations(id)

new ResultTask(stage.id, stage.latestInfo.attemptNumber,

taskBinary, part, locs, id, properties, serializedTaskMetrics,

Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,

stage.rdd.isBarrier())

}

}

} catch {

}

if (tasks.size > 0) {

//把这些任务以任务集的方式提交到taskScheduler

logInfo(s"Submitting ${tasks.size} missing tasks from

     s 
    
   
     t 
    
   
     a 
    
   
     g 
    
   
     e 
    
   
     ( 
    
   
  
    stage ( 
   
  
stage({stage.rdd}) (first 15 " +

s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")

taskScheduler.submitTasks(new TaskSet(

tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

} else {

// Because we posted SparkListenerStageSubmitted earlier, we should mark

// the stage as completed here in case there are no tasks to run

//如果调度阶段中不存在任务标记,则表明该调度阶段已经完成

markStageAsFinished(stage, None)

stage match {

case stage: ShuffleMapStage =>

logDebug(s"Stage ${stage} is actually done; " +

s"(available: ${stage.isAvailable}," +

s"available outputs: ${stage.numAvailableOutputs}," +

s"partitions: ${stage.numPartitions})")

markMapStageJobsAsFinished(stage)

case stage : ResultStage =>

logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")

}

submitWaitingChildStages(stage)

}

}

当TaskScheduler收到发送过来的任务集时,在sunmitTasks方法中构成一个TaskSetManager的实例,用于管理这个任务集的生命周期,而该TaskSetManager会放入系统的调度池中,根据系统的调度算法进行调度。代码实现如下:

override def submitTasks(taskSet: TaskSet) {

val tasks = taskSet.tasks

logInfo(“Adding task set " + taskSet.id + " with " + tasks.length + " tasks”)

this.synchronized {

//创建任务集的管理,用于管理这个任务集的声明周期

val manager = createTaskSetManager(taskSet, maxTaskFailures)

val stage = taskSet.stageId

val stageTaskSets =

taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

stageTaskSets.foreach { case (_, ts) =>

ts.isZombie = true

}

stageTaskSets(taskSet.stageAttemptId) = manager

//将该任务集的管理器加入到系统调度池中,由系统统一调配,该调度器属于应用级别

//支持FIFO和FAIR(公平调度)两种

schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {

starvationTimer.scheduleAtFixedRate(new TimerTask() {

override def run() {

if (!hasLaunchedTask) {

logWarning("Initial job has not accepted any resources; " +

"check your cluster UI to ensure that workers are registered " +

“and have sufficient resources”)

} else {

this.cancel()

}

}

}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)

}

hasReceivedTask = true

}

//调用调度器后台进程SparkDeploySchedulerBackend的reviveOffers方法分配资源并运行

backend.reviveOffers()

}

在上面的代码中最后会调用reviveOffers方法,该方法先会获取集群中可用的Executor,然后发送到TaskSchedulerImpl中进行对任务集的任务分配运行资源,最后提交到launchTasks方法中。

private def makeOffers() {

// Make sure no executor is killed while some task is launching on it

val taskDescs = withLock {

// Filter out executors under killing

//调用集群中可用的Executor列表

val activeExecutors = executorDataMap.filterKeys(executorIsAlive)

val workOffers = activeExecutors.map {

case (id, executorData) =>

new WorkerOffer(id, executorData.executorHost, executorData.freeCores,

Some(executorData.executorAddress.hostPort))

}.toIndexedSeq

//对任务集的任务分配运行资源,并把这些任务提交运行

scheduler.resourceOffers(workOffers)

}

if (!taskDescs.isEmpty) {

launchTasks(taskDescs)

}

}

在上述代码中的resourceOffers方法是非常重要的资源分配步骤。在分配的过程中会根据调度策略对TaskSetManager进行排序,然后依次对TaskSetManager按照就近原则分配资源。代码实现如下:

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {

// Mark each slave as alive and remember its hostname

// Also track if new executor is added

//对传入的可用Executor列表进行处理,记录其信息,如果有新的Executor加入,则进行标记

var newExecAvail = false

for (o <- offers) {

if (!hostToExecutors.contains(o.host)) {

hostToExecutors(o.host) = new HashSetString

}

if (!executorIdToRunningTaskIds.contains(o.executorId)) {

hostToExecutors(o.host) += o.executorId

executorAdded(o.executorId, o.host)

executorIdToHost(o.executorId) = o.host

executorIdToRunningTaskIds(o.executorId) = HashSetLong

newExecAvail = true

}

for (rack <- getRackForHost(o.host)) {

hostsByRack.getOrElseUpdate(rack, new HashSetString) += o.host

}

}

// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do

// this here to avoid a separate thread and added synchronization overhead, and also because

// updating the blacklist is only relevant when task offers are being made.

blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>

offers.filter { offer =>

!blacklistTracker.isNodeBlacklisted(offer.host) &&

!blacklistTracker.isExecutorBlacklisted(offer.executorId)

}

}.getOrElse(offers)

//为任务随机分配Executor,避免任务集中分配到Worker上

val shuffledOffers = shuffleOffers(filteredOffers)

// Build a list of tasks to assign to each worker.

//用于存储分配好资源任务

val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))

val availableCpus = shuffledOffers.map(o => o.cores).toArray

val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum

//获取按照资源调度策略排序好的TaskSetManager

val sortedTaskSets = rootPool.getSortedTaskSetQueue

//如果有新加入的Executor,需要重新计算数据本地性

for (taskSet <- sortedTaskSets) {

logDebug(“parentName: %s, name: %s, runningTasks: %s”.format(

taskSet.parent.name, taskSet.name, taskSet.runningTasks))

if (newExecAvail) {

taskSet.executorAdded()

}

}

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order

// of locality levels so that it gets a chance to launch local tasks on all of them.

// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY

for (taskSet <- sortedTaskSets) {

// Skip the barrier taskSet if the available slots are less than the number of pending tasks.

if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {

// Skip the launch process.

// TODO SPARK-24819 If the job requires more slots than available (both busy and free

// slots), fail the job on submit.

logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +

s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +

s"number of available slots is $availableSlots.")

} else {

//为分配好的TaskSetManager列表进行分配资源,分配的原则就是就近原则

//按照顺序PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY

var launchedAnyTask = false

// Record all the executor IDs assigned barrier tasks on.

val addressesWithDescs = ArrayBuffer(String, TaskDescription)

for (currentMaxLocality <- taskSet.myLocalityLevels) {

var launchedTaskAtCurrentMaxLocality = false

do {

launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,

currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)

launchedAnyTask |= launchedTaskAtCurrentMaxLocality

} while (launchedTaskAtCurrentMaxLocality)

}

if (!launchedAnyTask) {

taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>

executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {

case Some ((executorId, _)) =>

if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {

blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))

val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000

unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout

logInfo(s"Waiting for $timeout ms for completely "

  • s"blacklisted task to be schedulable again before aborting $taskSet.")

abortTimer.schedule(

createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)

}

case None => // Abort Immediately

logInfo(“Cannot schedule any task because of complete blacklisting. No idle” +

s" executors can be found to kill. Aborting $taskSet." )

taskSet.abortSinceCompletelyBlacklisted(taskIndex)

}

}

} else {

if (unschedulableTaskSetToExpiryTime.nonEmpty) {

logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +

“recently scheduled.”)

unschedulableTaskSetToExpiryTime.clear()

}

}

if (launchedAnyTask && taskSet.isBarrier) {

require(addressesWithDescs.size == taskSet.numTasks,

s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +

s"because only ${addressesWithDescs.size} out of a total number of " +

s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +

“been blacklisted or cannot fulfill task locality requirements.”)

// materialize the barrier coordinator.

maybeInitBarrierCoordinator()

// Update the taskInfos into all the barrier task properties.

val addressesStr = addressesWithDescs

// Addresses ordered by partitionId

.sortBy(_._2.partitionId)

.map(_._1)

.mkString(“,”)

addressesWithDescs.foreach(_._2.properties.setProperty(“addresses”, addressesStr))

logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +

s"stage ${taskSet.stageId}.")

}

}

}

// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don’t get

// launched within a configured time.

if (tasks.size > 0) {

hasLaunchedTask = true

}

return tasks

}

分配好资源的任务提交到CoarseGrainedSchedulerBackend的launchTasks方法中去,在该方法中会把任务一个个发送到worker阶段上的CoarseGrainedExecutorBackend,然后通过内部的Executor来执行任务,代码实现如下:

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

for (task <- tasks.flatten) {

//序列化每一个task

val serializedTask = TaskDescription.encode(task)

if (serializedTask.limit() >= maxRpcMessageSize) {

Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>

try {

var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +

"spark.rpc.message.maxSize (%d bytes). Consider increasing " +

“spark.rpc.message.maxSize or using broadcast variables for large values.”

msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)

taskSetMgr.abort(msg)

} catch {

case e: Exception => logError(“Exception in error callback”, e)

}

}

}

else {

val executorData = executorDataMap(task.executorId)

executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +

s"${executorData.executorHost}.")

//向worker节点的CoarseGrainedExecutorBackend发送消息执行Task

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

}

}

}

执行任务


当CoarseGrainedExecutorBackend接收到LaunchTask消息时,会调用Executor的launchTask方法进行处理。在Executor的launchTask方法中,初始化一个TaskRunner来封装任务,它用于管理任务运行时的细节,再把TaskRunner对象放入到ThreadPool(线程池)中执行。具体的任务执行在TaskRunner的run方法的前半部分实现,代码如下:

override def run(): Unit = {

threadId = Thread.currentThread.getId

Thread.currentThread.setName(threadName)

val threadMXBean = ManagementFactory.getThreadMXBean

//生成内存管理taskMemoryManager实例,用于运行期间内存管理

val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)

val deserializeStartTime = System.currentTimeMillis()

val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {

threadMXBean.getCurrentThreadCpuTime

} else 0L

Thread.currentThread.setContextClassLoader(replClassLoader)

val ser = env.closureSerializer.newInstance()

logInfo(s"Running $taskName (TID $taskId)")

//向Driver终端点发送任务运行开始消息

execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)

var taskStartTime: Long = 0

var taskStartCpu: Long = 0

startGCTime = computeTotalGcTime()

try {

// Must be set before updateDependencies() is called, in case fetching dependencies

// requires access to properties contained within (e.g. for access control).

Executor.taskDeserializationProps.set(taskDescription.properties)

//对任务运行所需要的文件、Jar包、代码等反序列化

updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)

task = ser.deserialize[Task[Any]](

taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)

task.localProperties = taskDescription.properties

task.setTaskMemoryManager(taskMemoryManager)

// If this task has been killed before we deserialized it, let’s quit now. Otherwise,

// continue executing the task.

val killReason = reasonIfKilled

//任务在反序列化之前被杀死,则抛出异常并退出

if (killReason.isDefined) {

// Throw an exception rather than returning, because returning within a try{} block

// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl

// exception will be caught by the catch block, leading to an incorrect ExceptionFailure

// for the task.

throw new TaskKilledException(killReason.get)

}

// The purpose of updating the epoch here is to invalidate executor map output status cache

// in case FetchFailures have occurred. In local mode

env.mapOutputTracker

will be

// MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so

// we don’t need to make any special calls here.

if (!isLocal) {

logDebug("Task " + taskId + "'s epoch is " + task.epoch)

env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)

}

// Run the actual task and measure its runtime.

//调用Task的runTask方法,由于Task本身是一个抽象类,具体的runTask方法由他的

//两个子类ShuffeleMapTask和ResultTask

taskStartTime = System.currentTimeMillis()

taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {

threadMXBean.getCurrentThreadCpuTime

} else 0L

var threwException = true

val value = Utils.tryWithSafeFinally {

val res = task.run(

taskAttemptId = taskId,

attemptNumber = taskDescription.attemptNumber,

metricsSystem = env.metricsSystem)

threwException = false

res

}

}

对于ShuffleTask来说,它的计算结果会写到BlockManager之中,最终返回给DAGScheduler的是一个MapStatus对象。代码实现如下:

override def runTask(context: TaskContext): MapStatus = {

// Deserialize the RDD using the broadcast variable.

val threadMXBean = ManagementFactory.getThreadMXBean

val deserializeStartTime = System.currentTimeMillis()

val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {

threadMXBean.getCurrentThreadCpuTime

} else 0L

//反序列化获取RDD和RDD的依赖

val ser = SparkEnv.get.closureSerializer.newInstance()

val (rdd, dep) = ser.deserialize[(RDD[], ShuffleDependency[, _, _])](

ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {

threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime

} else 0L

var writer: ShuffleWriter[Any, Any] = null

try {

val manager = SparkEnv.get.shuffleManager

writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

//首先调用rdd.iterator,如果该RDD已经Cache或者Checkpoint,那么直接读取结果

//否则计算,计算结果会保存在本地系统的BlockManager中

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

//关闭writer,返回计算结果,返回包含了数据的location和size等元数据信息的MapStatus信息

writer.stop(success = true).get

} catch {

}

对于Result的runTask方法而言,它最终返回的是func函数的计算结果。

技术学习总结

学习技术一定要制定一个明确的学习路线,这样才能高效的学习,不必要做无效功,既浪费时间又得不到什么效率,大家不妨按照我这份路线来学习。

最后面试分享

大家不妨直接在牛客和力扣上多刷题,同时,我也拿了一些面试题跟大家分享,也是从一些大佬那里获得的,大家不妨多刷刷题,为金九银十冲一波!

ap(taskBinary.value), Thread.currentThread.getContextClassLoader)

_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {

threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime

} else 0L

var writer: ShuffleWriter[Any, Any] = null

try {

val manager = SparkEnv.get.shuffleManager

writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

//首先调用rdd.iterator,如果该RDD已经Cache或者Checkpoint,那么直接读取结果

//否则计算,计算结果会保存在本地系统的BlockManager中

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

//关闭writer,返回计算结果,返回包含了数据的location和size等元数据信息的MapStatus信息

writer.stop(success = true).get

} catch {

}

对于Result的runTask方法而言,它最终返回的是func函数的计算结果。

技术学习总结

学习技术一定要制定一个明确的学习路线,这样才能高效的学习,不必要做无效功,既浪费时间又得不到什么效率,大家不妨按照我这份路线来学习。

[外链图片转存中…(img-w4co6GMn-1721153578256)]

[外链图片转存中…(img-QXGBv11o-1721153578257)]

[外链图片转存中…(img-b5a5FU70-1721153578257)]

最后面试分享

大家不妨直接在牛客和力扣上多刷题,同时,我也拿了一些面试题跟大家分享,也是从一些大佬那里获得的,大家不妨多刷刷题,为金九银十冲一波!

[外链图片转存中…(img-APzjKvxF-1721153578258)]

[外链图片转存中…(img-KEZ2h16y-1721153578258)]

标签: spark java javascript

本文转载自: https://blog.csdn.net/2401_85377244/article/details/140481347
版权归原作者 2401_85377244 所有, 如有侵权,请联系我们删除。

“Spark源码阅读02-Spark核心原理之作业执行原理”的评论:

还没有评论