首页 > 代码库 > Task的执行过程分析

Task的执行过程分析

Task的执行过程分析

Task的执行通过Worker启动时生成的Executor实例进行,

caseRegisteredExecutor(sparkProperties)=>

logInfo("Successfullyregistered with driver")

//Make this host instead of hostPort ?

executor=newExecutor(executorId, Utils.parseHostPort(hostPort)._1,sparkProperties)


通过executor实例的launchTask启动task的执行操作。


deflaunchTask(context: ExecutorBackend, taskId: Long, serializedTask:ByteBuffer) {

valtr = newTaskRunner(context, taskId, serializedTask)

runningTasks.put(taskId,tr)

threadPool.execute(tr)

}


生成TaskRunner线程,把task与当前的Wroker的启动的executorBackend传入,

onyarn模式为CoarseGrainedExecutorBackend.

通过threadPool线程池执行生成TaskRunner线程。


TaskRunner.run函数:

用于执行task任务的线程

overridedefrun(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser){ () =>

valstartTime =System.currentTimeMillis()

SparkEvn后面在进行分析。

SparkEnv.set(env)

Thread.currentThread.setContextClassLoader(replClassLoader)

valser =SparkEnv.get.closureSerializer.newInstance()

logInfo("Runningtask ID " + taskId)

通过execBackend更新此task的状态。设置task的状态为RUNNING.master发送StatusUpdate事件。

execBackend.statusUpdate(taskId,TaskState.RUNNING,EMPTY_BYTE_BUFFER)

varattemptedTask:Option[Task[Any]] = None

vartaskStart:Long = 0

defgcTime =ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum

valstartGCTime= gcTime


try{

SparkEnv.set(env)

Accumulators.clear()

解析出task的资源信息。包括要执行的jar,其它资源,task定义信息

val(taskFiles,taskJars,taskBytes)= Task.deserializeWithDependencies(serializedTask)

更新资源信息,并把task执行的jar更新到当前ThreadClassLoader中。

updateDependencies(taskFiles,taskJars)

通过SparkEnv中配置的Serialize实现对task定义进行反serialize,得到Task实例。

Task的具体实现为ShuffleMapTask或者ResultTask

task=ser.deserialize[Task[Any]](taskBytes,Thread.currentThread.getContextClassLoader)


如果killed的值为true,不执行当前task任务,进入catch处理。

//If this task has been killed before wedeserializedit, let‘s quit now. Otherwise,

//continue executing the task.

if(killed) {

//Throw an exception rather than returning, because returning within atry{} block

//causes a NonLocalReturnControl exception to be thrown. TheNonLocalReturnControl

//exception will be caught by the catch block, leading to an incorrectExceptionFailure

//for the task.

throwTaskKilledException

}


attemptedTask= Some(task)

logDebug("Task" + taskId +"‘sepoch is " + task.epoch)

env.mapOutputTracker.updateEpoch(task.epoch)

生成TaskContext实例,通过Task.runTask执行task的任务,等待task执行完成。

//Run the actual task and measure its runtime.

taskStart= System.currentTimeMillis()

valvalue =task.run(taskId.toInt)

valtaskFinish= System.currentTimeMillis()


此时task执行结束,检查如果task是被killed的结果,进入catch处理。

//If the task has been killed, let‘s fail it.

if(task.killed){

throwTaskKilledException

}

task执行的返回结果进行serialize操作。

valresultSer =SparkEnv.get.serializer.newInstance()

valbeforeSerialization= System.currentTimeMillis()

valvalueBytes=resultSer.serialize(value)

valafterSerialization= System.currentTimeMillis()

发送监控指标

for(m <-task.metrics){

m.hostname= Utils.localHostName()

m.executorDeserializeTime= (taskStart-startTime).toInt

m.executorRunTime= (taskFinish-taskStart).toInt

m.jvmGCTime= gcTime - startGCTime

m.resultSerializationTime= (afterSerialization-beforeSerialization).toInt

}


valaccumUpdates= Accumulators.values

Task的返回结果生成DirectTaskResult实例。并对其进行serialize操作。

valdirectResult=newDirectTaskResult(valueBytes,accumUpdates,task.metrics.getOrElse(null))

valserializedDirectResult=ser.serialize(directResult)

logInfo("Serializedsize of result for " + taskId +"is " +serializedDirectResult.limit)

检查taskresult的大小是否超过了akka的发送消息大小,

如果是通过BlockManager来管理结果,设置RDD的存储级别为MEMORYDISK,否则表示没有达到actor消息大小,

直接使用TaskResult,此部分信息主要是需要通过状态更新向Scheduler向送StatusUpdate事件调用。

valserializedResult= {

if(serializedDirectResult.limit>=akkaFrameSize-1024) {

logInfo("Storingresult for " + taskId +"in local BlockManager")

valblockId =TaskResultBlockId(taskId)

env.blockManager.putBytes(

blockId,serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER)

ser.serialize(newIndirectTaskResult[Any](blockId))

}else{

logInfo("Sendingresult for " + taskId +"directly to driver")

serializedDirectResult

}

}

通过execBackend更新此task的状态。设置task的状态为FINISHED.master发送StatusUpdate事件。

execBackend.statusUpdate(taskId,TaskState.FINISHED,serializedResult)

logInfo("Finishedtask ID " + taskId)

}catch{

出现异常,发送FAILED事件。

caseffe:FetchFailedException => {

valreason =ffe.toTaskEndReason

execBackend.statusUpdate(taskId,TaskState.FAILED,ser.serialize(reason))

}


caseTaskKilledException => {

logInfo("Executorkilled task " + taskId)

execBackend.statusUpdate(taskId,TaskState.KILLED,ser.serialize(TaskKilled))

}


caset:Throwable => {

valserviceTime= (System.currentTimeMillis() - taskStart).toInt

valmetrics =attemptedTask.flatMap(t=> t.metrics)

for(m <-metrics) {

m.executorRunTime=serviceTime

m.jvmGCTime= gcTime - startGCTime

}

valreason =ExceptionFailure(t.getClass.getName,t.toString,t.getStackTrace,metrics)

execBackend.statusUpdate(taskId,TaskState.FAILED,ser.serialize(reason))


//TODO: Should we exit the whole executor here? On the one hand, thefailed task may

//have left some weird state around depending on when the exception wasthrown, but on

//the other hand, maybe we could detect that when future tasks fail andexit then.

logError("Exceptionin task ID " + taskId,t)

//System.exit(1)

}

}finally{

shuffleMemoryMap中移出此线程对应的shuffle的内存空间

//TODO: Unregister shuffle memory only for ResultTask

valshuffleMemoryMap=env.shuffleMemoryMap

shuffleMemoryMap.synchronized{

shuffleMemoryMap.remove(Thread.currentThread().getId)

}

runningTasks中移出此task

runningTasks.remove(taskId)

}

}

}


Task执行过程的状态更新

ExecutorBackend.statusUpdate

onyarn模式实现类CoarseGrainedExecutorBackend,通过masteractor发送StatusUpdate事件。

overridedefstatusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {

driver! StatusUpdate(executorId, taskId, state, data)

}


master中的ExecutorBackend处理状态更新操作:

实现类:CoarseGrainedSchedulerBackend.DriverActor

caseStatusUpdate(executorId,taskId,state,data) =>

通过TaskSchedulerImplstatusUpdate处理状态更新。

scheduler.statusUpdate(taskId,state,data.value)

如果Task状态为完成状态,完成状态包含(FINISHED,FAILED,KILLED,LOST)

if(TaskState.isFinished(state)){

if(executorActor.contains(executorId)){

每一个task占用一个cpucore,此时task完成,把可用的core值加一

freeCores(executorId) +=1

在此executor上接着执行其于的task任务,此部分可参见scheduler调度过程分析中的部分说明。

makeOffers(executorId)

}else{

//Ignoring the update since we don‘t know about the executor.

valmsg ="Ignored task status update (%dstate %s) from unknown executor %s with ID %s"

logWarning(msg.format(taskId,state,sender,executorId))

}

}


TaskSchedulerImpl.statusUpdate函数处理流程


defstatusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer){

varfailedExecutor:Option[String] = None

synchronized {

try{

如果Task的状态传入为Task的执行丢失,同时taskexecutor列表中存在

if(state == TaskState.LOST&&taskIdToExecutorId.contains(tid)){

得到此task执行的worker所属的executorID

//We lost this entire executor, so remember that it‘s gone

valexecId =taskIdToExecutorId(tid)

如果此executoractiveExecutor,执行schedulerexecutorLost操作。

包含TaskSetManager,会执行TaskSetManager.executorLost操作.

设置当前的executorfailedExecutor,共函数最后使用。

if(activeExecutorIds.contains(execId)){

removeExecutor(execId)

failedExecutor= Some(execId)

}

}

taskIdToTaskSetId.get(tid)match{

caseSome(taskSetId)=>

如果task状态是完成状态,非RUNNING状态。移出对应的容器中的值

if(TaskState.isFinished(state)) {

taskIdToTaskSetId.remove(tid)

if(taskSetTaskIds.contains(taskSetId)){

taskSetTaskIds(taskSetId)-= tid

}

taskIdToExecutorId.remove(tid)

}

activeTaskSets.get(taskSetId).foreach{ taskSet =>

如果task是成功完成,从TaskSet中移出此task,同时通过TaskResultGetter获取数据。

if(state == TaskState.FINISHED){

taskSet.removeRunningTask(tid)

taskResultGetter.enqueueSuccessfulTask(taskSet,tid, serializedData)

}elseif(Set(TaskState.FAILED,TaskState.KILLED,TaskState.LOST).contains(state)){

task任务执行失败的处理部分:

taskSet.removeRunningTask(tid)

taskResultGetter.enqueueFailedTask(taskSet,tid, state, serializedData)

}

}

caseNone =>

logInfo("Ignoringupdate with state %s from TID %s because its task set is gone"

.format(state, tid))

}

}catch{

casee:Exception => logError("Exceptionin statusUpdate",e)

}

}

如果有failedworker,通过dagScheduler处理此executor.

//Update the DAGScheduler without holding a lock on this, since thatcan deadlock

if(failedExecutor!= None) {

dagScheduler.executorLost(failedExecutor.get)

发起task执行的分配与任务执行操作。

backend.reviveOffers()

}

}


TaskStatus.LOST状态,同时executoractiveExecutorIds

TaskStatus的状态为LOST时,同时executor是活动的executor(也就是有过执行task的情况)

privatedefremoveExecutor(executorId: String) {

activeExecutorIds中移出此executor

activeExecutorIds-= executorId

得到此executor对应的workerhost

valhost =executorIdToHost(executorId)

取出host对应的所有executor,并移出当前的executor

valexecs =executorsByHost.getOrElse(host,newHashSet)

execs-=executorId

if(execs.isEmpty){

executorsByHost-=host

}

executor对应的host容器中移出此executor

executorIdToHost-= executorId

此处主要是去执行TaskSetManager.executorLost函数。

rootPool.executorLost(executorId,host)

}


TaskSetManager.executorLost函数:

此函数主要处理executor导致task丢失的情况,把executor上的task重新添加到pendingtasks列表中

overridedefexecutorLost(execId: String, host: String) {

logInfo("Re-queueingtasks for " + execId +"from TaskSet " +taskSet.id)


//Re-enqueue pending tasks for this host based on the status of thecluster -- for example, a

//task that used to have locations on only this host might now go tothe no-prefslist. Note

//that it‘s okay if we add a task to the same queue twice (if it hadmultiple preferred

//locations), because findTaskFromList will skip already-running tasks.

重新生成此TaskSet中的pending队列,因为当前executor的实例被移出,需要重新生成。

for(index <-getPendingTasksForExecutor(execId)) {

addPendingTask(index,readding=true)

}

for(index <-getPendingTasksForHost(host)) {

addPendingTask(index,readding=true)

}


//Re-enqueue any tasks that ran on the failed executor if this is ashuffle map stage

如果当前的RDDshufflerdd,

if(tasks(0).isInstanceOf[ShuffleMapTask]){

for((tid,info) <-taskInfosifinfo.executorId== execId) {

valindex =taskInfos(tid).index

if(successful(index)){

successful(index)=false

copiesRunning(index) -=1

tasksSuccessful-=1

addPendingTask(index)

//Tell the DAGScheduler that this task was resubmitted so that itdoesn‘t think our

//stage finishes when a total of tasks.size tasks finish.

通过DAGScheduler发送CompletionEvent处理事件,事件类型为Resubmitted,

sched.dagScheduler.taskEnded(tasks(index),Resubmitted,null,null,info,null)

}

}

}

如果task还处于running状态,同时此tasklostexecutor上运行,

//Also re-enqueue any tasks that were running on the node

for((tid,info) <-taskInfosifinfo.running&&info.executorId== execId) {

设置taskfailed值为true,移出此taskrunning列表中的值,重新添加taskpendingtasks队列中。

handleFailedTask(tid,TaskState.FAILED,None)

}

}


DAGScheduler处理CompletionEvent事件。

...........................

casecompletion@ CompletionEvent(task,reason,_, _, taskInfo,taskMetrics)=>

listenerBus.post(SparkListenerTaskEnd(task,reason,taskInfo,taskMetrics))

handleTaskCompletion(completion)

.........................

caseResubmitted =>

logInfo("Resubmitted" +task+ ", so marking it as stillrunning")

pendingTasks(stage)+=task


(TaskState.FAILED,TaskState.KILLED,TaskState.LOST)状态

.........................

}elseif(Set(TaskState.FAILED,TaskState.KILLED,TaskState.LOST).contains(state)){

taskrunning容器中移出

taskSet.removeRunningTask(tid)

此函数主要是解析出出错的信息。并通过TaskSchedulerImpl.handleFailedTask处理exception

taskResultGetter.enqueueFailedTask(taskSet,tid, state, serializedData)

}



TaskSchedulerImpl.handleFailedTask函数:

defhandleFailedTask(

taskSetManager: TaskSetManager,

tid: Long,

taskState: TaskState,

reason: Option[TaskEndReason]) =synchronized {

taskSetManager.handleFailedTask(tid,taskState, reason)

如果task不是被KILLED掉的task,重新发起task的分配与执行操作。

if(taskState != TaskState.KILLED){

//Need to revive offers again now that the task set manager state hasbeen updated to

//reflect failed tasks that need to be re-run.

backend.reviveOffers()

}

}


TaskSetManager.handleFailedTask函数流程

TaskSetManager.handleFailedTask,函数,处理task执行的exception信息。

defhandleFailedTask(tid: Long, state: TaskState, reason:Option[TaskEndReason]) {

valinfo =taskInfos(tid)

if(info.failed){

return

}

removeRunningTask(tid)

valindex =info.index

info.markFailed()

varfailureReason="unknown"

if(!successful(index)){

logWarning("LostTID %s (task %s:%d)".format(tid,taskSet.id,index))

copiesRunning(index) -=1

如果是通过TaskSetManager.executorLost函数发起的此函数调用(Task.LOST),下面的case部分不会执行,

否则是task的执行exception情况,也就是状态更新中非Task.LOST状态时。

//Check if the problem is a map output fetch failure. In that case,this

//task will never succeed on any node, so tell the scheduler about it.

reason.foreach {

casefetchFailed:FetchFailed =>

读取失败,移出所有此tasksettask执行。并从scheduler中移出此taskset的调度,不再执行下面流程

logWarning("Losswas due to fetch failure from " +fetchFailed.bmAddress)

sched.dagScheduler.taskEnded(tasks(index),fetchFailed,null,null,info,null)

successful(index)=true

tasksSuccessful+=1

sched.taskSetFinished(this)

removeAllRunningTasks()

return


caseTaskKilled =>

taskkill掉,移出此task,同时不再执行下面流程

logWarning("Task%d was killed.".format(tid))

sched.dagScheduler.taskEnded(tasks(index),reason.get,null,null,info,null)

return


caseef:ExceptionFailure =>

sched.dagScheduler.taskEnded(

tasks(index),ef,null,null,info,ef.metrics.getOrElse(null))

if(ef.className== classOf[NotSerializableException].getName()) {

//If the task result wasn‘trerializable,there‘s no point in trying to re-execute it.

logError("Task%s:%s had a not serializable result: %s; not retrying".format(

taskSet.id,index,ef.description))

abort("Task%s:%s had a not serializable result: %s".format(

taskSet.id,index,ef.description))

return

}

valkey =ef.description

failureReason="Exception failure:%s".format(ef.description)

valnow =clock.getTime()

val(printFull,dupCount) ={

if(recentExceptions.contains(key)){

val(dupCount,printTime)=recentExceptions(key)

if(now -printTime >EXCEPTION_PRINT_INTERVAL){

recentExceptions(key)= (0, now)

(true,0)

}else{

recentExceptions(key)= (dupCount+1,printTime)

(false,dupCount +1)

}

}else{

recentExceptions(key)= (0, now)

(true,0)

}

}

if(printFull){

vallocs =ef.stackTrace.map(loc=>"\tat%s".format(loc.toString))

logWarning("Losswas due to %s\n%s\n%s".format(

ef.className,ef.description,locs.mkString("\n")))

}else{

logInfo("Losswas due to %s [duplicate %d]".format(ef.description,dupCount))

}


caseTaskResultLost =>

failureReason="Lost result for TID %s onhost %s".format(tid,info.host)

logWarning(failureReason)

sched.dagScheduler.taskEnded(tasks(index),TaskResultLost,null,null,info,null)


case_ => {}

}

重新把task添加到pending的执行队列中,同时如果状态非KILLED的状态,设置并检查是否达到重试的最大次数

//On non-fetch failures, re-enqueue the task as pending for a maxnumber of retries

addPendingTask(index)

if(state != TaskState.KILLED){

numFailures(index) +=1

if(numFailures(index)>=maxTaskFailures){

logError("Task%s:%d failed %d times; aborting job".format(

taskSet.id,index,maxTaskFailures))

abort("Task%s:%d failed %d times (most recent failure: %s)".format(

taskSet.id,index,maxTaskFailures,failureReason))

}

}

}else{

logInfo("Ignoringtask-lost event for TID " + tid +

"because task " +index+ " is already finished")

}

}


DAGScheduler处理taskEnded流程:

deftaskEnded(

task: Task[_],

reason: TaskEndReason,

result: Any,

accumUpdates: Map[Long, Any],

taskInfo: TaskInfo,

taskMetrics: TaskMetrics) {

eventProcessActor! CompletionEvent(task, reason, result, accumUpdates, taskInfo,taskMetrics)

}

处理CompletionEvent事件:

casecompletion@ CompletionEvent(task,reason, _,_,taskInfo,taskMetrics)=>

listenerBus.post(SparkListenerTaskEnd(task,reason,taskInfo,taskMetrics))

handleTaskCompletion(completion)


DAGScheduler.handleTaskCompletion

读取失败的case,

caseFetchFailed(bmAddress,shuffleId,mapId,reduceId)=>

//Mark the stage that the reducer was in asunrunnable

valfailedStage=stageIdToStage(task.stageId)

running-=failedStage

failed+=failedStage

..............................

//Mark the map whose fetch failed as broken in the map stage

valmapStage =shuffleToMapStage(shuffleId)

if(mapId !=-1) {

mapStage.removeOutputLoc(mapId,bmAddress)

mapOutputTracker.unregisterMapOutput(shuffleId,mapId,bmAddress)

}

...........................

failed+=mapStage

//Remember that a fetch failed now; this is used to resubmit the broken

//stages later, after a small wait (to give other tasks the chance tofail)

lastFetchFailureTime= System.currentTimeMillis()// TODO:Use pluggableclock

//TODO: mark the executor as failed only if there were lots of fetchfailures on it

if(bmAddress!=null){

stage中可执行的partition中对应的executoridlocation全部移出。

handleExecutorLost(bmAddress.executorId,Some(task.epoch))

}


caseExceptionFailure(className,description,stackTrace,metrics) =>

//Do nothing here, left up to the TaskScheduler to decide how to handleuser failures


caseTaskResultLost =>

//Do nothing here; the TaskScheduler handles these failures andresubmits the task.



TaskStatus.FINISHED状态

此状态表示task正常完成,

if(state == TaskState.FINISHED){

移出taskSet中的running队列中移出此task

taskSet.removeRunningTask(tid)

获取task的响应数据。

taskResultGetter.enqueueSuccessfulTask(taskSet,tid, serializedData)


TaskResultGetter.enqueueSuccessfulTask函数:


defenqueueSuccessfulTask(

taskSetManager: TaskSetManager,tid: Long, serializedData: ByteBuffer) {

getTaskResultExecutor.execute(newRunnable {

overridedefrun() {

try{

从响应的结果中得到数据,需要先执行deserialize操作。

valresult =serializer.get().deserialize[TaskResult[_]](serializedData)match{

如果result的结果小于akkaactor传输的大小,直接返回task的执行结果

casedirectResult:DirectTaskResult[_] => directResult

caseIndirectTaskResult(blockId)=>

否则,result结果太大,通过BlockManager管理,通过blockManager拿到result的数据

logDebug("Fetchingindirect task result for TID %s".format(tid))

DAGScheduler发送GettingResultEvent事件处理,

见下面TaskSchedulerImpl.handleTaskGettingResult函数

scheduler.handleTaskGettingResult(taskSetManager,tid)

得到task的执行结果

valserializedTaskResult= sparkEnv.blockManager.getRemoteBytes(blockId)

task执行完成,并拿结果失败,见上面的错误处理中的TaskResultLost部分。

if(!serializedTaskResult.isDefined){

/*We won‘t be able to get the task result if the machine that ran thetask failed

* between when the taskended and when we tried to fetch the result, or if the

* block manager had toflush the result. */

scheduler.handleFailedTask(

taskSetManager, tid,TaskState.FINISHED,Some(TaskResultLost))

return

}

task的执行结果进行deserialized操作。

valdeserializedResult=serializer.get().deserialize[DirectTaskResult[_]](

serializedTaskResult.get)

拿到执行结果,移出对应的blockid

sparkEnv.blockManager.master.removeBlock(blockId)

deserializedResult

}

result.metrics.resultSize= serializedData.limit()

见下面的TaskSchedulerImpl.handleSuccessfulTask处理函数。

scheduler.handleSuccessfulTask(taskSetManager,tid,result)

}catch{

casecnf:ClassNotFoundException =>

valloader =Thread.currentThread.getContextClassLoader

taskSetManager.abort("ClassNotFoundwith classloader: " + loader)

caseex:Throwable =>

taskSetManager.abort("Exceptionwhile deserializing and fetching task: %s".format(ex))

}

}

})

}


TaskSchedulerImpl.handleTaskGettingResult函数:


defhandleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {

taskSetManager.handleTaskGettingResult(tid)

}

taskSetManager

defhandleTaskGettingResult(tid: Long) = {

valinfo =taskInfos(tid)

info.markGettingResult()

sched.dagScheduler.taskGettingResult(tasks(info.index),info)

}

通过DAGScheduler发起GettingResultEvent事件。

deftaskGettingResult(task: Task[_], taskInfo: TaskInfo) {

eventProcessActor! GettingResultEvent(task, taskInfo)

}


GettingResultEvent事件的处理:其实就是打个酱油,无实际处理操作。

caseGettingResultEvent(task,taskInfo)=>

listenerBus.post(SparkListenerTaskGettingResult(task,taskInfo))



TaskSchedulerImpl.handleSuccessfulTask处理函数:

defhandleSuccessfulTask(

taskSetManager: TaskSetManager,

tid: Long,

taskResult: DirectTaskResult[_])= synchronized {

taskSetManager.handleSuccessfulTask(tid,taskResult)

}

TastSetManager

defhandleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {

valinfo =taskInfos(tid)

valindex =info.index

info.markSuccessful()

running队列中移出此task

removeRunningTask(tid)

if(!successful(index)){

logInfo("FinishedTID %s in %d ms on %s (progress: %d/%d)".format(

tid,info.duration,info.host,tasksSuccessful,numTasks))

dagscheduler发送success消息,

sched.dagScheduler.taskEnded(

tasks(index),Success,result.value,result.accumUpdates,info,result.metrics)

设置成功完成的task个数加一,同时在successful容器中设置task对应的运行状态为true,表示成功。

//Mark successful and stop if all the tasks have succeeded.

tasksSuccessful+=1

successful(index)=true

如果完成的task个数,达到task的总个数,完成此taskset,也就相当于完成了一个rdd

if(tasksSuccessful==numTasks){

sched.taskSetFinished(this)

}

}else{

logInfo("Ignorningtask-finished event for TID " + tid+" because task "+

index+" has already completedsuccessfully")

}

}


DAGScheduler处理CompletionEventSuccess,,,,

caseSuccess =>

logInfo("Completed" +task)

if(event.accumUpdates!=null){

Accumulators.add(event.accumUpdates)// TODO: do this only if task wasn‘tresubmitted

}

把等待执行队列中移出此task

pendingTasks(stage)-=task

stageToInfos(stage).taskInfos+=event.taskInfo-> event.taskMetrics

根据task的执行类型,处理两个类型的Task

taskmatch{

如果taskResultTask,表示不需要shuffle操作

casert:ResultTask[_, _] =>

resultStageToJob.get(stage)match{

caseSome(job)=>

如果此执行的stageActiveJob中对应此taskpartition存储的finished标志为false,

if(!job.finished(rt.outputId)){

设置task的完成标志为true

job.finished(rt.outputId)=true

job中完成的task个数加一,同时检查是否所有的task都完成,如果所有task都完成,

从相关的容器中移出此job与对应的stage.

job.numFinished+=1

//If the whole job has finished, remove it

if(job.numFinished==job.numPartitions){

idToActiveJob-=stage.jobId

activeJobs-=job

resultStageToJob-=stage

markStageAsFinished(stage)

jobIdToStageIdsRemove(job.jobId)

listenerBus.post(SparkListenerJobEnd(job,JobSucceeded))

}

调用ActiveJob内的JobWaiter.taskSucceeded函数,更新此task为完成,同时把result传入进行输出处理。

job.listener.taskSucceeded(rt.outputId,event.result)

}

caseNone =>

logInfo("Ignoringresult from " +rt+ " because its job has finished")

}

针对shuffletask的执行完成,处理流程:

casesmt:ShuffleMapTask =>

valstatus =event.result.asInstanceOf[MapStatus]

valexecId =status.location.executorId

logDebug("ShuffleMapTaskfinished on " +execId)

if(failedEpoch.contains(execId)&&smt.epoch<=failedEpoch(execId)){

logInfo("Ignoringpossibly bogus ShuffleMapTask completion from "+execId)

}else{

shuffleresult(MapStatus)写入到stageoutputLoc中。每添加一个会把numAvailableOutputs的值加一,

numAvailableOutputs的值==numPartitions的值时,表示shufflemap执行完成。

stage.addOutputLoc(smt.partitionId,status)

}

如果此stage还处在running状态,同时pendingTasks中所有的task已经处理完成

if(running.contains(stage)&&pendingTasks(stage).isEmpty){

更新stage的状态

markStageAsFinished(stage)

.......................................


此处表示shufflestage处理完成,把shuffleidstageoutputLocs注册到mapOutputTracker中。

把每一个shuffletaks执行的executorhost等信息,每一个task执行完成的大小。注册到mapoutput中。

每一个taskshufflewriter都会有shuffleid的信息,注册成功后,

下一个stage会根据mapoutputtracker中此shuffleid的信息读取数据。

mapOutputTracker.registerMapOutputs(

stage.shuffleDep.get.shuffleId,

stage.outputLocs.map(list=>if(list.isEmpty)nullelselist.head).toArray,

changeEpoch =true)

}

clearCacheLocs()

stage中每一个partitionoutputLoc默认值为Nil,如果发现有partition的值为Nil,表示有task处理失败,

重新提交此stage.此时会把没有成功的task重新执行。

if(stage.outputLocs.exists(_== Nil)) {

.........................................

submitStage(stage)

}else{

valnewlyRunnable=newArrayBuffer[Stage]

for(stage <-waiting) {

logInfo("Missingparents for " +stage+ ": "+ getMissingParentStages(stage))

}

此处检查下面未执行的所有的stage,如果stage(RDD)的上级shuffle依赖完成,

或者后面所有的stage不再有shufflestage的所有stage,拿到这些个stage.

for(stage <-waiting ifgetMissingParentStages(stage) == Nil) {

newlyRunnable+=stage

}

执行此stage后面的所有可执行的stage,waiting中移出要执行的stage,

waiting--=newlyRunnable

running队列中添加要执行的新的stage.

running++=newlyRunnable

for{

stage<-newlyRunnable.sortBy(_.id)

jobId<- activeJobForStage(stage)

} {

提交下一个stagetask分配与执行。

logInfo("Submitting" +stage+ " ("+ stage.rdd+"), which is now runnable")

submitMissingTasks(stage,jobId)

}

}

}

}


JobWaiter.taskSucceeded函数,

task完成后的处理函数。

overridedef taskSucceeded(index: Int,result: Any): Unit = synchronized {

if(_jobFinished){

thrownewUnsupportedOperationException("taskSucceeded()called on a finished JobWaiter")

}

通过resultHandler函数把结果进行处理。此函数是生成JobWaiter时传入

resultHandler(index,result.asInstanceOf[T])

把完成的task值加一

finishedTasks+=1

if(finishedTasks== totalTasks) {

如果完成的task个数等于所有的task的个数时,设置job的完成状态为true,并设置状态为JobSucceeded

如果设置为true,表示job执行完成,前面的等待执行完成结束等待。

_jobFinished=true

jobResult= JobSucceeded

this.notifyAll()

}

}



Task.runTask函数实现

Task的实现分为两类,

需要进行shuffle操作的ShuffleMapTask,

不需要进行shuffle操作的ResultTask.


ResulitTask.runTask

overridedef runTask(context:TaskContext): U = {

metrics= Some(context.taskMetrics)

try{

此处通过生成task实例时也就是DAGSchedulerrunJob时传入的function进行处理

比如在PairRDDFunction.saveAsHadoopDataset中定义的writeToFile函数

rdd.iterator中会根据不现的RDD的实现,执行其compute函数,

compute函数具体执行通过业务代码中定义的如map函数传入的定义的function进行执行,

func(context,rdd.iterator(split,context))

}finally{

context.executeOnCompleteCallbacks()

}

}


ShuffleMapTask.runTask


overridedef runTask(context:TaskContext): MapStatus = {

valnumOutputSplits=dep.partitioner.numPartitions

metrics= Some(context.taskMetrics)


valblockManager= SparkEnv.get.blockManager

valshuffleBlockManager=blockManager.shuffleBlockManager

varshuffle:ShuffleWriterGroup = null

varsuccess =false


try{

通过shuffleId拿到一个shuffle的写入实例

//Obtain all the block writers for shuffle blocks.

valser =SparkEnv.get.serializerManager.get(dep.serializerClass,SparkEnv.get.conf)

shuffle=shuffleBlockManager.forMapTask(dep.shuffleId,partitionId,numOutputSplits,ser)

执行rdd.iterator操作,生成Pair,也就是Product2,根据key重新shuffle到不同的文件中。

当所有的shuffletask完成后,会把此stage注册到mapOutputTracker中,

等待下一个stage从中读取数据并执行其它操作,每一个shuffletask完成后会生成一个MapStatus实例,

此实例主要包含有shuffle执行的executorhost等信息,每一个task执行完成的大小。

具体的shuffle数据读取可参见后面的shufle分析.

//Write the map output to its associated buckets.

for(elem <-rdd.iterator(split,context)) {

valpair =elem.asInstanceOf[Product2[Any,Any]]

valbucketId =dep.partitioner.getPartition(pair._1)

shuffle.writers(bucketId).write(pair)

}


//Commit the writes. Get the size of each bucket block (total blocksize).

vartotalBytes=0L

vartotalTime =0L

valcompressedSizes:Array[Byte] = shuffle.writers.map{ writer: BlockObjectWriter =>

writer.commit()

writer.close()

valsize =writer.fileSegment().length

totalBytes+=size

totalTime+= writer.timeWriting()

MapOutputTracker.compressSize(size)

}


//Update shuffle metrics.

valshuffleMetrics=newShuffleWriteMetrics

shuffleMetrics.shuffleBytesWritten=totalBytes

shuffleMetrics.shuffleWriteTime=totalTime

metrics.get.shuffleWriteMetrics= Some(shuffleMetrics)


success=true

newMapStatus(blockManager.blockManagerId,compressedSizes)

}catch{casee:Exception =>

//If there is an exception from running the task, revert the partialwrites

//and throw the exception upstream to Spark.

if(shuffle !=null&&shuffle.writers!=null){

for(writer <-shuffle.writers){

writer.revertPartialWrites()

writer.close()

}

}

throwe

}finally{

//Release the writers back to the shuffle block manager.

if(shuffle !=null&&shuffle.writers!=null){

shuffle.releaseWriters(success)

}

//Execute thecallbackson task completion.

context.executeOnCompleteCallbacks()

}

}