首页 > 代码库 > spark 笔记 7: DAGScheduler

spark 笔记 7: DAGScheduler

在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的。这是一个很重要的类。在看这个类实现之前,需要对actor模式有一点了解:http://en.wikipedia.org/wiki/Actor_model http://www.slideshare.net/YungLinHo/introduction-to-actor-model-and-akka 粗略知道actor模式怎么实现就可以了。另外,应该先看看DAG相关的概念和论文 http://en.wikipedia.org/wiki/Directed_acyclic_graph    http://www.netlib.org/utk/people/JackDongarra/PAPERS/DAGuE_technical_report.pdf 

/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
* stages for each job
, keeps track of which RDDs and stage outputs are materialized, and finds a
* minimal schedule to run the job
. It then submits stages as TaskSets to an underlying
* TaskScheduler implementation that runs them on the cluster.
*
* In addition to coming up with a DAG of stages, this class also determines the preferred
* locations to run
each task on, based on the current cache status, and passes these to the
* low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
* lost, in which case old stages may need to be resubmitted
. Failures *within* a stage that are
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage.
*
*/
package org.apache.spark.scheduler
private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = SystemClock)
extends Logging {
状态机(actor 消息响应):
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
extends Actor with Logging {

override def preStart() {
// set DAGScheduler for taskScheduler to ensure eventProcessActor is always
// valid when the messages arrive
dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
}

/**
* The main event loop of the DAG scheduler.
*/
def receive = {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)

case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)

case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)

case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason) =>
dagScheduler.handleTaskSetFailed(taskSet, reason)

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}

重要的属性:
private val nextStageId = new AtomicInteger(0)
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren‘t done
private[scheduler] val waitingStages = new HashSet[Stage]
// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]
// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]
private[scheduler] val activeJobs = new HashSet[ActiveJob]
// Contains the locations that each RDD‘s partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
private val dagSchedulerActorSupervisor =
env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
// A closure serializer that we reuse.
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

private[scheduler] var eventProcessActor: ActorRef = _

/**
* Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] =
{
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
/**
* Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
* We run the operation in a separate thread just in case it takes a bunch of time, so that we
* don‘t block the DAGScheduler event loop or other concurrent jobs.
*/
protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally")
new Thread("Local computation of job " + job.jobId) {
override def run() {
runLocallyWithinThread(job)
}
}.start()
}


private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
{
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
/** Called when stage‘s parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {

/** Finds the earliest-created active job that needs the stage */
// TODO: Probably should actually find among the active jobs that need this
// stage the one with the highest priority (highest-priority pool, earliest created).
// That should take care of at least part of the priority inversion problem with
// cross-job dependencies.
private def activeJobForStage(stage: Stage): Option[Int] = {
val jobsThatUseStage: Array[Int] = stage.jobIds.toArray.sorted
jobsThatUseStage.find(jobIdToActiveJob.contains)
}

/**
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
* architecture where any thread can post an event (e.g. a task finishing or a new job being
* submitted) but there is a single "logic" thread that reads these events and takes decisions.
* This greatly simplifies synchronization.
*/
private[scheduler] sealed trait DAGSchedulerEvent
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
*
* Until start() is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
*/
private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
/**
* A SparkListenerEvent bus that relays events to its listeners
*/
private[spark] trait SparkListenerBus extends Logging {

// SparkListeners attached to this event bus
protected val sparkListeners = new ArrayBuffer[SparkListener]
with mutable.SynchronizedBuffer[SparkListener]

def addListener(listener: SparkListener) {
sparkListeners += listener
}

/**
* Post an event to all attached listeners.
* This does nothing if the event is SparkListenerShutdown.
*/
def postToAll(event: SparkListenerEvent) {

/**
* Apply the given function to all attached listeners, catching and logging any exception.
*/
private def foreachListener(f: SparkListener => Unit): Unit = {
sparkListeners.foreach { listener =>
try {
f(listener)
} catch {
case e: Exception =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}

}

===========================Job 提交流程======================================
submitJob   --每个action都会调用到一个submitJob的操作
    -> send: JobSubmitted --它发送一个消息给DAGScheduler(因为提交得机器可能不是master?)
        -> handleJobSubmitted   --DAGScheduler处理接收到的消息
            -> newStage   --创建一个stage
            -> new ActiveJob   ---找到一个active状态的
            -> [runLocally]}  --如果是简单的job,直接在本地执行。
            localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
            ->runLocally(job)  --don‘t block the DAGScheduler event loop or other concurrent jobs
                ->runLocallyWithinThread(job)  --创建新的线程执行本地job,不阻塞DAG进程
                    ->TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true)
                    ->result = job.func(taskContext, rdd.iterator(split, taskContext))  执行job
                    ->job.listener.taskSucceeded(0, result)  --通知监听者job结果
                    ->listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))  --通知job结束
            ->submitStage(finalStage)   -- Submits stage, but first recursively submits any missing parents递归提交
                -> activeJobForStage   --Finds the earliest-created active job that needs the stage。在jobIdToActiveJob找
                -> getMissingParentStages   --如果一个stage依赖于一个shuffle stage,这个RDD就是missing的
                     ->waitingForVisit.push(stage.rdd)
            ->waitingForVisit.pop()
            ->getShuffleMapStage
                ->registerShuffleDependencies 将所有父节点的shuffle注册到shuffleToMapStage和mapOutputTracker
                    ->getAncestorShuffleDependencies :返回一个栈,里面装着包含shuffle的父依赖节点;
                    ->newOrUsedStage  --给RDD创建shuffle stage;如果存在,使用老的loc覆盖新的loc
                        ->mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) or
                        ->mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.size)
                    ->shuffleToMapStage(currentShufDep.shuffleId) = stage  --加入DAG的hash属性中
                ->newOrUsedStage -- 给当前RDD创建shuffle stage
                ->shuffleToMapStage(shuffleDep.shuffleId) = stage   --加入DAG的hash属性中
            ->NarrowDependency  ->waitingForVisit.push(narrowDep.rdd) --narrowDeps的不分析,直接加入栈去找它的父节点。
                -> submitMissingTasks  --Called when stage‘s parents are available and we can now do its task。这个stage没有依赖缺失了。
            -> stage.pendingTasks.clear() 清空正在执行的task。
            -> partitionsToCompute = ? --First figure out the indexes of partition ids to compute. 
                找出需要执行的分片。shuffle要执行更多分片
            ->runningStages += stage  更新running记录
            ->listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))  --通知应用程序stage被提交。
            ->Broadcasted binary for the task, used to dispatch tasks to executors. serialized copy of the RDD and for each task,
                which means each task gets a different copy of the RDD, This is necessary in Hadoop 
                where the JobConf/Configuration object is not thread-safe
                     ->// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
            ->// For ResultTask, serialize and broadcast (rdd, func).
            ->new ShuffleMapTask(stage.id, taskBinary, part, locs)  创建task
            ->new ResultTask(stage.id, taskBinary, part, locs, id)
            -> Preemptively serialize a task to make sure it can be serialized. For catch exception.
            ->stage.pendingTasks ++= tasks
            ->taskScheduler.submitTasks  --将task提交到taskScheduler
               -> submitStage(parent) --(递归)如果能找到一个stage是missing状态,那就将它的依赖节点submit
======================end=========================================









来自为知笔记(Wiz)


spark 笔记 7: DAGScheduler