首页 > 代码库 > spark1.1.0源码阅读-taskScheduler
spark1.1.0源码阅读-taskScheduler
1. sparkContext中设置createTaskScheduler
1 case "yarn-standalone" | "yarn-cluster" => 2 if (master == "yarn-standalone") { 3 logWarning( 4 "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.") 5 } 6 val scheduler = try { 7 val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") 8 val cons = clazz.getConstructor(classOf[SparkContext]) 9 cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]10 } catch {11 // TODO: Enumerate the exact reasons why it can fail12 // But irrespective of it, it means we cannot proceed !13 case e: Exception => {14 throw new SparkException("YARN mode not available ?", e)15 }16 }17 val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)18 scheduler.initialize(backend) //调用实现类的initialize函数19 scheduler
在taskSchedulerImpl.scala中
1 def initialize(backend: SchedulerBackend) { 2 this.backend = backend 3 // temporarily set rootPool name to empty 4 rootPool = new Pool("", schedulingMode, 0, 0) 5 schedulableBuilder = { 6 schedulingMode match { 7 case SchedulingMode.FIFO => 8 new FIFOSchedulableBuilder(rootPool) 9 case SchedulingMode.FAIR =>10 new FairSchedulableBuilder(rootPool, conf)11 }12 }13 schedulableBuilder.buildPools()14 }
2. submitTasks
1 override def submitTasks(taskSet: TaskSet) { 2 val tasks = taskSet.tasks 3 logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") 4 this.synchronized { 5 val manager = new TaskSetManager(this, taskSet, maxTaskFailures) 6 activeTaskSets(taskSet.id) = manager 7 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) 8 9 if (!isLocal && !hasReceivedTask) {10 starvationTimer.scheduleAtFixedRate(new TimerTask() {11 override def run() {12 if (!hasLaunchedTask) {13 logWarning("Initial job has not accepted any resources; " +14 "check your cluster UI to ensure that workers are registered " +15 "and have sufficient memory")16 } else {17 this.cancel()18 }19 }20 }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)21 }22 hasReceivedTask = true23 }24 backend.reviveOffers()25 }
3. CoarseGrainedSchedulerBackend的reviveOffers
1 override def reviveOffers() {2 driverActor ! ReviveOffers //将msg发给CoarseGrainedSchedulerBackend的driverActor3 }
1 case ReviveOffers =>2 makeOffers()
1 // Make fake resource offers on all executors2 def makeOffers() {3 launchTasks(scheduler.resourceOffers(4 executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))5 }
1 /**2 * Represents free resources available on an executor.3 */4 private[spark]5 case class WorkerOffer(executorId: String, host: String, cores: Int)
1 /** 2 * Called by cluster manager to offer resources on slaves. We respond by asking our active task 3 * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so 4 * that tasks are balanced across the cluster. 5 */ 6 def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { 7 SparkEnv.set(sc.env) 8 9 // Mark each slave as alive and remember its hostname10 for (o <- offers) {11 executorIdToHost(o.executorId) = o.host12 if (!executorsByHost.contains(o.host)) {13 executorsByHost(o.host) = new HashSet[String]()14 executorAdded(o.executorId, o.host)15 }16 }17 18 // Randomly shuffle offers to avoid always placing tasks on the same set of workers.19 val shuffledOffers = Random.shuffle(offers)20 // Build a list of tasks to assign to each worker.21 val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))22 val availableCpus = shuffledOffers.map(o => o.cores).toArray23 val sortedTaskSets = rootPool.getSortedTaskSetQueue24 for (taskSet <- sortedTaskSets) {25 logDebug("parentName: %s, name: %s, runningTasks: %s".format(26 taskSet.parent.name, taskSet.name, taskSet.runningTasks))27 }28 29 // Take each TaskSet in our scheduling order, and then offer it each node in increasing order30 // of locality levels so that it gets a chance to launch local tasks on all of them.31 var launchedTask = false32 for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {33 do {34 launchedTask = false35 for (i <- 0 until shuffledOffers.size) {36 val execId = shuffledOffers(i).executorId37 val host = shuffledOffers(i).host38 if (availableCpus(i) >= CPUS_PER_TASK) {39 for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {40 tasks(i) += task41 val tid = task.taskId42 taskIdToTaskSetId(tid) = taskSet.taskSet.id43 taskIdToExecutorId(tid) = execId44 activeExecutorIds += execId45 executorsByHost(host) += execId46 availableCpus(i) -= CPUS_PER_TASK47 assert (availableCpus(i) >= 0)48 launchedTask = true49 }50 }51 }52 } while (launchedTask)53 }54 55 if (tasks.size > 0) {56 hasLaunchedTask = true57 }58 return tasks59 }
4. launchTasks
1 // Launch tasks returned by a set of resource offers2 def launchTasks(tasks: Seq[Seq[TaskDescription]]) {3 for (task <- tasks.flatten) {4 freeCores(task.executorId) -= scheduler.CPUS_PER_TASK5 executorActor(task.executorId) ! LaunchTask(task)6 }7 }
1 class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) 2 extends SchedulerBackend with Logging 3 { 4 // Use an atomic variable to track total number of cores in the cluster for simplicity and speed 5 var totalCoreCount = new AtomicInteger(0) 6 val conf = scheduler.sc.conf 7 private val timeout = AkkaUtils.askTimeout(conf) 8 9 class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {10 private val executorActor = new HashMap[String, ActorRef]11 private val executorAddress = new HashMap[String, Address]12 private val executorHost = new HashMap[String, String]13 private val freeCores = new HashMap[String, Int]14 private val totalCores = new HashMap[String, Int]15 private val addressToExecutorId = new HashMap[Address, String]
1 // Driver to executors2 case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
1 private[spark] class TaskDescription( 2 val taskId: Long, 3 val executorId: String, 4 val name: String, 5 val index: Int, // Index within this task‘s TaskSet 6 _serializedTask: ByteBuffer) 7 extends Serializable { 8 9 // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer10 private val buffer = new SerializableBuffer(_serializedTask)11 12 def serializedTask: ByteBuffer = buffer.value13 14 override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)15 }
5. CoarseGrainedSchedulerBackend收到executor的注册之后,记录executor
1 def receive = { 2 case RegisterExecutor(executorId, hostPort, cores) => 3 Utils.checkHostPort(hostPort, "Host port expected " + hostPort) 4 if (executorActor.contains(executorId)) { 5 sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) 6 } else { 7 logInfo("Registered executor: " + sender + " with ID " + executorId) 8 sender ! RegisteredExecutor(sparkProperties) 9 executorActor(executorId) = sender10 executorHost(executorId) = Utils.parseHostPort(hostPort)._111 totalCores(executorId) = cores12 freeCores(executorId) = cores13 executorAddress(executorId) = sender.path.address14 addressToExecutorId(sender.path.address) = executorId15 totalCoreCount.addAndGet(cores)16 makeOffers()17 }
executor先向CoarseGrainedSchedulerBackend注册,然后CoarseGrainedSchedulerBackend发task(序列化后)到这个executor上去。
6. CoarseGrainedExecutorBackend跟CoarseGrainedSchedulerBackend通信。
1 private[spark] class CoarseGrainedExecutorBackend( 2 driverUrl: String, 3 executorId: String, 4 hostPort: String, 5 cores: Int, 6 sparkProperties: Seq[(String, String)]) 7 extends Actor with ActorLogReceive with ExecutorBackend with Logging { 8 9 Utils.checkHostPort(hostPort, "Expected hostport")10 11 var executor: Executor = null12 var driver: ActorSelection = null13 14 override def preStart() {15 logInfo("Connecting to driver: " + driverUrl)16 driver = context.actorSelection(driverUrl)17 driver ! RegisterExecutor(executorId, hostPort, cores) //注册18 context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])19 }20 21 override def receiveWithLogging = {22 case RegisteredExecutor =>23 logInfo("Successfully registered with driver")24 // Make this host instead of hostPort ?25 executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,26 false)27 28 case RegisterExecutorFailed(message) =>29 logError("Slave registration failed: " + message)30 System.exit(1)31 32 case LaunchTask(data) => //收到task33 if (executor == null) {34 logError("Received LaunchTask command but executor was null")35 System.exit(1)36 } else {37 val ser = SparkEnv.get.closureSerializer.newInstance()38 val taskDesc = ser.deserialize[TaskDescription](data.value)39 logInfo("Got assigned task " + taskDesc.taskId)40 executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask)41 }
7. executor.launchTask
1 def launchTask(2 context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {3 val tr = new TaskRunner(context, taskId, taskName, serializedTask)4 runningTasks.put(taskId, tr)5 threadPool.execute(tr)6 }
且听下回分解
spark1.1.0源码阅读-taskScheduler
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。