listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
//根据最后一个调度阶段finalStage生成作业
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo(“Got job %s (%s) with %d output partitions”.format(
job.jobId, callSite.shortForm, partitions.length))
logInfo(“Final stage: " + finalStage + " (” + finalStage.name + “)”)
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
//提交执行
submitStage(finalStage)
}
获取或创建给定RDD的父阶段列表。将使用提供的firstJobId创建新阶段
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
当finalRDD存在父调度阶段,需要从发生Shuffle操作的RDD往前遍历,找出所有的ShuffleMapStage,这是调度阶段的关键部分。其由getShuffleDependencies方法实现。
private[scheduler] def getShuffleDependencies(
rdd: RDD[]): HashSet[ShuffleDependency[, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
//存放等待访问的堆栈,存放的是非ShuffleDependency的RDD
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
//所依赖的RDD操作是ShuffleDependency的RDD,作为划分shuffleMap调度阶段界限
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
当所有的调度阶段划分结束后,这些调度阶段建立起依赖关系。该依赖关系是通过调度阶段其中属性parents:List[Stage]来定义的,通过这些属性可以获取当前阶段所有祖先阶段,可以根据这些信息按照顺序提交调度阶段进行运行。
提交调度阶段
在DAGScheduler的handleJobSubmitted方法中,生成finalStage的同时建立起所有调度阶段的依赖关系,然后通过finalStage生成一个作业实例,在该作业实例中按照顺序提交调度阶段进行执行,在执行过程中通过监听总线获取作业、阶段执行情况。代码实现如下:
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(“submitStage(” + stage + “)”)
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//在该方法中,获取该调度阶段的父调度阶段,获取的方法是通过RDD的依赖关系向前遍历看
//是否存在Shuffle操作,这里并没有使用调度阶段的依赖关系获取
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
//如果不存在父调度阶段,直接把该调度阶段提交执行
logInfo(“Submitting " + stage + " (” + stage.rdd + “), which has no missing parents”)
submitMissingTasks(stage, jobId.get)
} else {
//如果存在父调度阶段,把该阶段加入到等待运行调度阶段列表中,
//同时递归调用submitStage方法,直至找到开始的调度阶段,即该调度阶段没有父调度阶段
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
当入口的调度阶段运行完成后相继提交后续调度阶段,在调度前先判断该调度阶段所依赖的父调度阶段的结果是否可用。通过ShuffleMapTask实现上述判断。代码实现如下:
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
…
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
shuffleStage.pendingPartitions -= task.partitionId
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we’re aware of for the executor), so mark the task’s output as
// available.
mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
}
//如果当前调度阶段在运行调度阶段列表中,并没有任务处于挂起状态(均已完成),则标记
//该调度阶段已经完成并注册输出结果的位置
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo(“looking for newly runnable stages”)
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
// This call to increment the epoch may not be strictly necessary, but it is retained
// for now in order to minimize the changes in behavior from an earlier version of the
// code. This existing behavior of always incrementing the epoch following any
// successful shuffle map stage completion may have benefits by causing unneeded
// cached map outputs to be cleaned up earlier on executors. In the future we can
// consider removing this call, but this will require some extra investigation.
// See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
mapOutputTracker.incrementEpoch()
clearCacheLocs()
//如果某些任务执行失败了,则重新提交运行
if (!shuffleStage.isAvailable) {
// Some tasks had failed; let’s resubmit this shuffleStage.
// TODO: Lower-level scheduler should also deal with this
logInfo(“Resubmitting " + shuffleStage + " (” + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
markMapStageJobsAsFinished(shuffleStage)
submitWaitingChildStages(shuffleStage)
}
}
}
…
}
提交任务
当调度阶段提交运行后,在DAGScheduler的submitMissingTasks方法中,会根据调度阶段Partition个数拆分对应个数任务,这些任务组成一个任务集提交到TaskScheduler进行处理。对于ResultStage生成ResultTask,对于ShuffleMapStage生成ShuffleMapTask。对于每一个任务集包含了对应调度阶段的所有任务,这些任务处理逻辑完全一样,不同的是对应处理的数据。
private def submitMissingTasks(stage: Stage, jobId: Int) {
…
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
//对于ShuffleMapStage生成ShuffleMapTask任务
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
//对于ResultStage生成ResultTask任务
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions§
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
…
}
if (tasks.size > 0) {
//把这些任务以任务集的方式提交到taskScheduler
logInfo(s"Submitting ${tasks.size} missing tasks from
s
t
a
g
e
(
stage (
stage({stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
//如果调度阶段中不存在任务标记,则表明该调度阶段已经完成
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}
当TaskScheduler收到发送过来的任务集时,在sunmitTasks方法中构成一个TaskSetManager的实例,用于管理这个任务集的生命周期,而该TaskSetManager会放入系统的调度池中,根据系统的调度算法进行调度。代码实现如下:
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo(“Adding task set " + taskSet.id + " with " + tasks.length + " tasks”)
this.synchronized {
//创建任务集的管理,用于管理这个任务集的声明周期
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
//将该任务集的管理器加入到系统调度池中,由系统统一调配,该调度器属于应用级别
//支持FIFO和FAIR(公平调度)两种
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
“and have sufficient resources”)
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//调用调度器后台进程SparkDeploySchedulerBackend的reviveOffers方法分配资源并运行
backend.reviveOffers()
}
在上面的代码中最后会调用reviveOffers方法,该方法先会获取集群中可用的Executor,然后发送到TaskSchedulerImpl中进行对任务集的任务分配运行资源,最后提交到launchTasks方法中。
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
//调用集群中可用的Executor列表
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
//对任务集的任务分配运行资源,并把这些任务提交运行
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}
在上述代码中的resourceOffers方法是非常重要的资源分配步骤。在分配的过程中会根据调度策略对TaskSetManager进行排序,然后依次对TaskSetManager按照就近原则分配资源。代码实现如下:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
//对传入的可用Executor列表进行处理,记录其信息,如果有新的Executor加入,则进行标记
var newExecAvail = false
for (o <- offers) {
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSetString
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSetLong
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSetString) += o.host
}
}
// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted(offer.host) &&
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)
//为任务随机分配Executor,避免任务集中分配到Worker上
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
//用于存储分配好资源任务
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
//获取按照资源调度策略排序好的TaskSetManager
val sortedTaskSets = rootPool.getSortedTaskSetQueue
//如果有新加入的Executor,需要重新计算数据本地性
for (taskSet <- sortedTaskSets) {
logDebug(“parentName: %s, name: %s, runningTasks: %s”.format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
// Skip the launch process.
// TODO SPARK-24819 If the job requires more slots than available (both busy and free
// slots), fail the job on submit.
logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
s"number of available slots is $availableSlots.")
} else {
//为分配好的TaskSetManager列表进行分配资源,分配的原则就是就近原则
//按照顺序PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedAnyTask = false
// Record all the executor IDs assigned barrier tasks on.
val addressesWithDescs = ArrayBuffer(String, TaskDescription)
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
case Some ((executorId, _)) =>
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))
val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
logInfo(s"Waiting for $timeout ms for completely "
- s"blacklisted task to be schedulable again before aborting $taskSet.")
abortTimer.schedule(
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
}
case None => // Abort Immediately
logInfo(“Cannot schedule any task because of complete blacklisting. No idle” +
s" executors can be found to kill. Aborting $taskSet." )
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
}
} else {
if (unschedulableTaskSetToExpiryTime.nonEmpty) {
logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
“recently scheduled.”)
unschedulableTaskSetToExpiryTime.clear()
}
}
if (launchedAnyTask && taskSet.isBarrier) {
require(addressesWithDescs.size == taskSet.numTasks,
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because only ${addressesWithDescs.size} out of a total number of " +
s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
“been blacklisted or cannot fulfill task locality requirements.”)
// materialize the barrier coordinator.
maybeInitBarrierCoordinator()
// Update the taskInfos into all the barrier task properties.
val addressesStr = addressesWithDescs
// Addresses ordered by partitionId
.sortBy(_._2.partitionId)
.map(_._1)
.mkString(“,”)
addressesWithDescs.foreach(_._2.properties.setProperty(“addresses”, addressesStr))
logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
s"stage ${taskSet.stageId}.")
}
}
}
// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don’t get
// launched within a configured time.
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
分配好资源的任务提交到CoarseGrainedSchedulerBackend的launchTasks方法中去,在该方法中会把任务一个个发送到worker阶段上的CoarseGrainedExecutorBackend,然后通过内部的Executor来执行任务,代码实现如下:
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//序列化每一个task
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
“spark.rpc.message.maxSize or using broadcast variables for large values.”
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError(“Exception in error callback”, e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//向worker节点的CoarseGrainedExecutorBackend发送消息执行Task
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
执行任务
当CoarseGrainedExecutorBackend接收到LaunchTask消息时,会调用Executor的launchTask方法进行处理。在Executor的launchTask方法中,初始化一个TaskRunner来封装任务,它用于管理任务运行时的细节,再把TaskRunner对象放入到ThreadPool(线程池)中执行。具体的任务执行在TaskRunner的run方法的前半部分实现,代码如下:
override def run(): Unit = {
threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
//生成内存管理taskMemoryManager实例,用于运行期间内存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
//向Driver终端点发送任务运行开始消息
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStartTime: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()
try {
// Must be set before updateDependencies() is called, in case fetching dependencies
// requires access to properties contained within (e.g. for access control).
Executor.taskDeserializationProps.set(taskDescription.properties)
//对任务运行所需要的文件、Jar包、代码等反序列化
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)
// If this task has been killed before we deserialized it, let’s quit now. Otherwise,
// continue executing the task.
val killReason = reasonIfKilled
//任务在反序列化之前被杀死,则抛出异常并退出
if (killReason.isDefined) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException(killReason.get)
}
// The purpose of updating the epoch here is to invalidate executor map output status cache
// in case FetchFailures have occurred. In local mode
env.mapOutputTracker
will be
// MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
// we don’t need to make any special calls here.
if (!isLocal) {
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
}
// Run the actual task and measure its runtime.
//调用Task的runTask方法,由于Task本身是一个抽象类,具体的runTask方法由他的
//两个子类ShuffeleMapTask和ResultTask
taskStartTime = System.currentTimeMillis()
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
var threwException = true
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
}
…
}
对于ShuffleTask来说,它的计算结果会写到BlockManager之中,最终返回给DAGScheduler的是一个MapStatus对象。代码实现如下:
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
//反序列化获取RDD和RDD的依赖
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[], ShuffleDependency[, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//首先调用rdd.iterator,如果该RDD已经Cache或者Checkpoint,那么直接读取结果
//否则计算,计算结果会保存在本地系统的BlockManager中
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
//关闭writer,返回计算结果,返回包含了数据的location和size等元数据信息的MapStatus信息
writer.stop(success = true).get
} catch {
…
}
对于Result的runTask方法而言,它最终返回的是func函数的计算结果。
技术学习总结
学习技术一定要制定一个明确的学习路线,这样才能高效的学习,不必要做无效功,既浪费时间又得不到什么效率,大家不妨按照我这份路线来学习。
最后面试分享
大家不妨直接在牛客和力扣上多刷题,同时,我也拿了一些面试题跟大家分享,也是从一些大佬那里获得的,大家不妨多刷刷题,为金九银十冲一波!
ap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//首先调用rdd.iterator,如果该RDD已经Cache或者Checkpoint,那么直接读取结果
//否则计算,计算结果会保存在本地系统的BlockManager中
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
//关闭writer,返回计算结果,返回包含了数据的location和size等元数据信息的MapStatus信息
writer.stop(success = true).get
} catch {
…
}
对于Result的runTask方法而言,它最终返回的是func函数的计算结果。
技术学习总结
学习技术一定要制定一个明确的学习路线,这样才能高效的学习,不必要做无效功,既浪费时间又得不到什么效率,大家不妨按照我这份路线来学习。
[外链图片转存中…(img-w4co6GMn-1721153578256)]
[外链图片转存中…(img-QXGBv11o-1721153578257)]
[外链图片转存中…(img-b5a5FU70-1721153578257)]
最后面试分享
大家不妨直接在牛客和力扣上多刷题,同时,我也拿了一些面试题跟大家分享,也是从一些大佬那里获得的,大家不妨多刷刷题,为金九银十冲一波!
[外链图片转存中…(img-APzjKvxF-1721153578258)]
[外链图片转存中…(img-KEZ2h16y-1721153578258)]
版权归原作者 2401_85377244 所有, 如有侵权,请联系我们删除。