首页 > 代码库 > spark 笔记 13: 再看DAGScheduler,stage状态更新流程
spark 笔记 13: 再看DAGScheduler,stage状态更新流程
当某个task完成后,某个shuffle Stage X可能已完成,那么就可能会一些仅依赖Stage X的Stage现在可以执行了,所以要有响应task完成的状态更新流程。
=======================DAG task完成后的更新流程===================
->CoarseGrainedSchedulerBackend::receiveWithLogging --调度器的事件接收器
->case StatusUpdate(executorId, taskId, state, data) --状态更新事件(来源于CoarseGrainedExecutorBackend)
->scheduler.statusUpdate(taskId, state, data.value) --状态更新
->taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) --将成功的时间封装到一个executor排队执行
->getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {
->val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { --反序列化结果
->scheduler.handleSuccessfulTask(taskSetManager, tid, result) --处理成功的task
->taskSetManager.handleSuccessfulTask(tid, taskResult)
-> sched.dagScheduler.taskEnded(tasks(index) ... result.metrics) --另起一段
->maybeFinishTaskSet() --判断是否taskSet结束了,更新状态。注意:在DAG里,调度的粒度是taskSet。
->sched.taskSetFinished(this) --如果taskSet结束了,更新DAG的这个调度单元
->activeTaskSets -= manager.taskSet.id --从active taskSet中删除tid
->manager.parent.removeSchedulable(manager)
->schedulableQueue.remove(schedulable) --从调度队列中删除tid
->schedulableNameToSchedulable.remove(schedulable.name) --删除调度单元。
->makeOffers(executorId) --将这个executorId分配给其他task使用
->DAGScheduler::taskEnded --任务结束事件处理流程
->eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
->def receive
->case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics)
->dagScheduler.handleTaskCompletion(completion) --Responds to a task finishing.
//This is called inside the event loop so it assumes that it can modify the scheduler‘s internal state
->event.reason match => case Success => --task结果是成功的
->if (event.accumUpdates != null) --如果是状态更新
->event.accumUpdates.foreach { case (id, partialValue) --更新状态
->listenerBus.post(SparkListenerTaskEnd(...)) --通知listener任务结束
->stage.pendingTasks -= task
->task match {
->case rt: ResultTask[_, _] => --如果是ResultTask
->if (job.numFinished == job.numPartitions) --如果所有的分片数据都完成
->markStageAsFinished(stage) --那么这个Stage就是结束了
->runningStages -= stage --从running状态中删除
->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知Stage结束
->cleanupStateForJobAndIndependentStages(job) --清除依赖关系
->val registeredStages = jobIdToStageIds.get(job.jobId) --找到这个job对应的所有Stage(job对应多个stage)
->stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach
//查找所有stage,找出注册了依赖于这个job所在stage的。
->case (stageId, stage) =>
->val jobSet = stage.jobIds
->if (!jobSet.contains(job.jobId)) --这些存在依赖的stage中,应该包含这个job的注册
->logError("Job %d not registered for stage %d even though that stage was registered for the job"
.format(job.jobId, stageId))
->if (jobSet.isEmpty) // no other job needs this stage 没有其他job了,这个依赖的stage也结束了。
-> removeStage(stageId) --删除stage
->listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) --通知job结束
->job.listener.taskSucceeded(rt.outputId, event.result) --通知task成功
->case smt: ShuffleMapTask => --如果是shuffleMapTask
->if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) --如果stage的所有task都完成
->markStageAsFinished(stage) --标志stage完成
->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知stage完成
->logInfo("looking for newly runnable stages") --stage完成了,意味着依赖这个stage的stage可以执行了
->mapOutputTracker.registerMapOutputs --(?用处不明)
->clearCacheLocs()
->if (stage.outputLocs.exists(_ == Nil)) // Some tasks had failed; let‘s resubmit this stage
->submitStage(stage)
->else
->val newlyRunnable = new ArrayBuffer[Stage]
-> for (stage <- waitingStages if getMissingParentStages(stage) == Nil) 如果一个stage没有依赖其他stage
->newlyRunnable += stage --这个没有依赖的stage就可以执行了
->waitingStages --= newlyRunnable
->runningStages ++= newlyRunnable
->for {stage <- newlyRunnable.sortBy(_.id); jobId <- activeJobForStage(stage)}
->submitMissingTasks(stage, jobId) --将这些没有依赖的stage的所有active job提交执行
->submitWaitingStages() --//Check for waiting or failed stages which are now eligible for resubmission.
//Ordinarily run on every iteration of the event loop. 每个事件处理都会触发去检查waiting状态的stage是否能够执行了。
->logTrace("Checking for newly runnable parent stages")
->waitingStages.clear()
->for (stage <- waitingStagesCopy.sortBy(_.jobId))
->submitStage(stage)
========================end================================
来自为知笔记(Wiz)
spark 笔记 13: 再看DAGScheduler,stage状态更新流程
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。