首页 > 代码库 > spark1.1.0源码阅读-taskScheduler

spark1.1.0源码阅读-taskScheduler

1. sparkContext中设置createTaskScheduler

 1       case "yarn-standalone" | "yarn-cluster" => 2         if (master == "yarn-standalone") { 3           logWarning( 4             "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.") 5         } 6         val scheduler = try { 7           val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") 8           val cons = clazz.getConstructor(classOf[SparkContext]) 9           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]10         } catch {11           // TODO: Enumerate the exact reasons why it can fail12           // But irrespective of it, it means we cannot proceed !13           case e: Exception => {14             throw new SparkException("YARN mode not available ?", e)15           }16         }17         val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)18         scheduler.initialize(backend) //调用实现类的initialize函数19         scheduler

在taskSchedulerImpl.scala中

 1   def initialize(backend: SchedulerBackend) { 2     this.backend = backend 3     // temporarily set rootPool name to empty 4     rootPool = new Pool("", schedulingMode, 0, 0) 5     schedulableBuilder = { 6       schedulingMode match { 7         case SchedulingMode.FIFO => 8           new FIFOSchedulableBuilder(rootPool) 9         case SchedulingMode.FAIR =>10           new FairSchedulableBuilder(rootPool, conf)11       }12     }13     schedulableBuilder.buildPools()14   }

2. submitTasks

 1   override def submitTasks(taskSet: TaskSet) { 2     val tasks = taskSet.tasks 3     logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") 4     this.synchronized { 5       val manager = new TaskSetManager(this, taskSet, maxTaskFailures) 6       activeTaskSets(taskSet.id) = manager 7       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) 8  9       if (!isLocal && !hasReceivedTask) {10         starvationTimer.scheduleAtFixedRate(new TimerTask() {11           override def run() {12             if (!hasLaunchedTask) {13               logWarning("Initial job has not accepted any resources; " +14                 "check your cluster UI to ensure that workers are registered " +15                 "and have sufficient memory")16             } else {17               this.cancel()18             }19           }20         }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)21       }22       hasReceivedTask = true23     }24     backend.reviveOffers()25   }

3. CoarseGrainedSchedulerBackend的reviveOffers

1   override def reviveOffers() {2     driverActor ! ReviveOffers  //将msg发给CoarseGrainedSchedulerBackend的driverActor3   }
1       case ReviveOffers =>2         makeOffers()
1     // Make fake resource offers on all executors2     def makeOffers() {3       launchTasks(scheduler.resourceOffers(4         executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))5     }
1 /**2  * Represents free resources available on an executor.3  */4 private[spark]5 case class WorkerOffer(executorId: String, host: String, cores: Int)
 1   /** 2    * Called by cluster manager to offer resources on slaves. We respond by asking our active task 3    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so 4    * that tasks are balanced across the cluster. 5    */ 6   def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { 7     SparkEnv.set(sc.env) 8  9     // Mark each slave as alive and remember its hostname10     for (o <- offers) {11       executorIdToHost(o.executorId) = o.host12       if (!executorsByHost.contains(o.host)) {13         executorsByHost(o.host) = new HashSet[String]()14         executorAdded(o.executorId, o.host)15       }16     }17 18     // Randomly shuffle offers to avoid always placing tasks on the same set of workers.19     val shuffledOffers = Random.shuffle(offers)20     // Build a list of tasks to assign to each worker.21     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))22     val availableCpus = shuffledOffers.map(o => o.cores).toArray23     val sortedTaskSets = rootPool.getSortedTaskSetQueue24     for (taskSet <- sortedTaskSets) {25       logDebug("parentName: %s, name: %s, runningTasks: %s".format(26         taskSet.parent.name, taskSet.name, taskSet.runningTasks))27     }28 29     // Take each TaskSet in our scheduling order, and then offer it each node in increasing order30     // of locality levels so that it gets a chance to launch local tasks on all of them.31     var launchedTask = false32     for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {33       do {34         launchedTask = false35         for (i <- 0 until shuffledOffers.size) {36           val execId = shuffledOffers(i).executorId37           val host = shuffledOffers(i).host38           if (availableCpus(i) >= CPUS_PER_TASK) {39             for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {40               tasks(i) += task41               val tid = task.taskId42               taskIdToTaskSetId(tid) = taskSet.taskSet.id43               taskIdToExecutorId(tid) = execId44               activeExecutorIds += execId45               executorsByHost(host) += execId46               availableCpus(i) -= CPUS_PER_TASK47               assert (availableCpus(i) >= 0)48               launchedTask = true49             }50           }51         }52       } while (launchedTask)53     }54 55     if (tasks.size > 0) {56       hasLaunchedTask = true57     }58     return tasks59   }

4. launchTasks

1     // Launch tasks returned by a set of resource offers2     def launchTasks(tasks: Seq[Seq[TaskDescription]]) {3       for (task <- tasks.flatten) {4         freeCores(task.executorId) -= scheduler.CPUS_PER_TASK5         executorActor(task.executorId) ! LaunchTask(task)6       }7     }
 1 class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) 2   extends SchedulerBackend with Logging 3 { 4   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed 5   var totalCoreCount = new AtomicInteger(0) 6   val conf = scheduler.sc.conf 7   private val timeout = AkkaUtils.askTimeout(conf) 8  9   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {10     private val executorActor = new HashMap[String, ActorRef]11     private val executorAddress = new HashMap[String, Address]12     private val executorHost = new HashMap[String, String]13     private val freeCores = new HashMap[String, Int]14     private val totalCores = new HashMap[String, Int]15     private val addressToExecutorId = new HashMap[Address, String]
1   // Driver to executors2   case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
 1 private[spark] class TaskDescription( 2     val taskId: Long, 3     val executorId: String, 4     val name: String, 5     val index: Int,    // Index within this task‘s TaskSet 6     _serializedTask: ByteBuffer) 7   extends Serializable { 8  9   // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer10   private val buffer = new SerializableBuffer(_serializedTask)11 12   def serializedTask: ByteBuffer = buffer.value13 14   override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)15 }

5. CoarseGrainedSchedulerBackend收到executor的注册之后,记录executor

 1     def receive = { 2       case RegisterExecutor(executorId, hostPort, cores) => 3         Utils.checkHostPort(hostPort, "Host port expected " + hostPort) 4         if (executorActor.contains(executorId)) { 5           sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) 6         } else { 7           logInfo("Registered executor: " + sender + " with ID " + executorId) 8           sender ! RegisteredExecutor(sparkProperties) 9           executorActor(executorId) = sender10           executorHost(executorId) = Utils.parseHostPort(hostPort)._111           totalCores(executorId) = cores12           freeCores(executorId) = cores13           executorAddress(executorId) = sender.path.address14           addressToExecutorId(sender.path.address) = executorId15           totalCoreCount.addAndGet(cores)16           makeOffers()17         }

executor先向CoarseGrainedSchedulerBackend注册,然后CoarseGrainedSchedulerBackend发task(序列化后)到这个executor上去。

6. CoarseGrainedExecutorBackend跟CoarseGrainedSchedulerBackend通信。

 1 private[spark] class CoarseGrainedExecutorBackend( 2     driverUrl: String, 3     executorId: String, 4     hostPort: String, 5     cores: Int, 6     sparkProperties: Seq[(String, String)]) 7   extends Actor with ActorLogReceive with ExecutorBackend with Logging { 8  9   Utils.checkHostPort(hostPort, "Expected hostport")10 11   var executor: Executor = null12   var driver: ActorSelection = null13 14   override def preStart() {15     logInfo("Connecting to driver: " + driverUrl)16     driver = context.actorSelection(driverUrl)17     driver ! RegisterExecutor(executorId, hostPort, cores) //注册18     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])19   }20 21   override def receiveWithLogging = {22     case RegisteredExecutor =>23       logInfo("Successfully registered with driver")24       // Make this host instead of hostPort ?25       executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,26         false)27 28     case RegisterExecutorFailed(message) =>29       logError("Slave registration failed: " + message)30       System.exit(1)31 32     case LaunchTask(data) =>  //收到task33       if (executor == null) {34         logError("Received LaunchTask command but executor was null")35         System.exit(1)36       } else {37         val ser = SparkEnv.get.closureSerializer.newInstance()38         val taskDesc = ser.deserialize[TaskDescription](data.value)39         logInfo("Got assigned task " + taskDesc.taskId)40         executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask)41       }

 7. executor.launchTask

1   def launchTask(2       context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {3     val tr = new TaskRunner(context, taskId, taskName, serializedTask)4     runningTasks.put(taskId, tr)5     threadPool.execute(tr)6   }

且听下回分解

spark1.1.0源码阅读-taskScheduler