首页 > 代码库 > Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
1、spreadOutApp尽量平均分配到每个executor上;
2、非spreadOutApp尽量在使用单个executor的资源。
源码分析
org.apache.spark.deploy.master.Master
1、首先判断,master状态不是ALIVE的话,直接返回
2、调度driver
3、 Application的调度机制(核心之核心,重中之重)
源码如下:
1 /* 2 *schedule()解决了spark资源调度的问题 3 */ 4 private def schedule() { 5 //首先判断,master状态不是ALIVE的话,直接返回 6 //也就是说,stanby master是不会进行application等资源调度的 7 if (state != RecoveryState.ALIVE) { return } 8 9 // First schedule drivers, they take strict precedence over applications 10 // Randomization helps balance drivers 11 12 //Random.shuffle的原理,大家要清楚,就是对传入的集合的元素进行随机的打乱 13 //取出了workers中的所有之前注册上来的worker,进行过滤,必须是状态为ALIVE的worker 14 //对状态为ALIVE的worker,调用Random的shuffle方法进行随机的打乱 15 val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) 16 val numWorkersAlive = shuffledAliveWorkers.size 17 var curPos = 0 18 19 //首先,调度driver 20 //为什么要调度driver,大家想一下,什么情况下,会注册driver,并且会导致driver被调度 21 //其实 ,只有用yarn-cluster模式提交的时候,才会注册driver;因为standalone和yarn-client模式,都会在本地直接 22 //启动driver,而不会来注册driver,就更不可能让master调度driver了 23 24 //driver调度机制 25 //遍历waittingDrivers ArrayBuffer 26 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers 27 // We assign workers to each waiting driver in a round-robin fashion. For each driver, we 28 // start from the last worker that was assigned a driver, and continue onwards until we have 29 // explored all alive workers. 30 var launched = false 31 var numWorkersVisited = 0 32 33 //while的条件,numWorkersVisited小于numWorkersAlive 34 //什么意思?就是说,只要还有活着的worker没有遍历到,那么就继续进行遍历 35 //而且,当前这个driver还没有被启动,也就是launched为false 36 while (numWorkersVisited < numWorkersAlive && !launched) { 37 val worker = shuffledAliveWorkers(curPos) 38 numWorkersVisited += 1 39 40 //如果当前这个worker的空闲内存量大于等于,driver需要的内存 41 //并且worker的空闲cpu数量,大于等于driver需要的cpu数量 42 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { 43 //启动driver 44 launchDriver(worker, driver) 45 //并且将driver从waitingDrivers队列中移除 46 waitingDrivers -= driver 47 launched = true 48 } 49 50 //将指针指向下一个worker 51 curPos = (curPos + 1) % numWorkersAlive 52 } 53 } 54 55 // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app 56 // in the queue, then the second app, etc. 57 // Application的调度机制(核心之核心,重中之重) 58 // 首先, application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps 59 if (spreadOutApps) { 60 // Try to spread out each app among all the nodes, until it has all its cores 61 62 //首先,遍历waitingApps中的ApplicationInfo,并且过滤出application还需要高度的cores的application 63 for (app <- waitingApps if app.coresLeft > 0) { 64 //从workers中,过滤状态为ALIVE的,再次过滤可以被Application使用的Worker,然后按照剩余cpu数量倒序排序 65 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) 66 .filter(canUse(app, _)).sortBy(_.coresFree).reverse 67 val numUsable = usableWorkers.length 68 //创建一个空数组,存储了要分配给每个worker的cpu数量 69 val assigned = new Array[Int](numUsable) // Number of cores to give on each node 70 //获取到底要分配多少cpu,取app剩余要分配的cpu的数量和worker总共可用cpu数量的最小值 71 var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) 72 73 //通过这种算法,其实会将每个application,要启动的executor,都平均分布到各个worker上去 74 //比如有20个cpu core要分配,那么实际会循环两遍worker,每次循环,给每个worker分配1个core 75 //最后每个worker分配了2个core 76 77 //while条件,只要要分配的cpu,还没有分配完,就继续循环 78 var pos = 0 79 while (toAssign > 0) { 80 //每一个worker,如果空闲的cpu数量大于,已经分配出去的cpu数量 81 //也就是说,worker还有可分配的cpu 82 if (usableWorkers(pos).coresFree - assigned(pos) > 0) { 83 //将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了 84 toAssign -= 1 85 //给这个worker分配的cpu数量,加1 86 assigned(pos) += 1 87 } 88 //指针移动到下一下worker 89 pos = (pos + 1) % numUsable 90 } 91 92 // Now that we‘ve decided how many cores to give on each node, let‘s actually give them 93 // 给每个worker分配完application要求的cpu core之后 94 // 遍历worker 95 for (pos <- 0 until numUsable) { 96 //只要判断之前给这个worker分配到了core 97 if (assigned(pos) > 0) { 98 //首先,在application内部缓存结构中,添加executor 99 //并且创建ExecutorDesc对象,其中封装了,给这个executor分配多少个cpu core 100 //在spark-submit脚本中,可以指定要多少executor,每个execuor多少个cpu,多少内存 101 //那么基于源码机制,实际上,executor的实际数量,以及每个executor的cpu,可能与配置是不一样的 102 //因为,我人帝里基于总的cpu来分配的,就是比如,要求3个executor,每个要3个cpu,那么比如,有9个workers,每个有1个cpu 103 //那么其实总其知道,要分配9个core,其实根据这种算法,会给每个worker分配一个core,然后给每个worker启动一个executor 104 //最后会启动,9个executor,每个executor有1个cpu core 105 val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) 106 //那么就在worker上启动executor 107 launchExecutor(usableWorkers(pos), exec) 108 //将application状态设置为running 109 app.state = ApplicationState.RUNNING 110 } 111 } 112 } 113 } else { 114 // Pack each app into as few nodes as possible until we‘ve assigned all its cores 115 116 //非spreadOutApps调度算法 117 118 //这种算法与spreadOutApps算法正好相反,1、spreadOutApp尽量平均分配到每个executor上;2、非spreadOutApp尽量在使用单个executor的资源。 119 //每个application,都尽可能分配到尽量少的worker上去,比如总其有10个worker,每个有10个core 120 //app总共要分配 20个core,那么其实,只会分配到两个worker上,每个worker都占满10个core 121 //那么,其余的app,就只能 分配到下一个worker了 122 //比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个croe 123 //只会启动2个executor,每个有10个cores 124 125 //将每个Application,尽可能少的分配到worker上去 126 //首先,遍历worker,并且是状态为ALIVE,还有空闲cpu的worker 127 for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { 128 //遍历application,并且是还有城朵分配的core的application 129 for (app <- waitingApps if app.coresLeft > 0) { 130 //判断,如果当前这个worker可以被 application使用 131 if (canUse(app, worker)) { 132 //取worker剩余cpu数量,与app要分配的cpu数量的最小值 133 val coresToUse = math.min(worker.coresFree, app.coresLeft) 134 //如果Worker剩余cpu为0了,就不分配了 135 if (coresToUse > 0) { 136 // 给app添加一个executor 137 val exec = app.addExecutor(worker, coresToUse) 138 //在worker上启动executor 139 launchExecutor(worker, exec) 140 //将application状态设置为running 141 app.state = ApplicationState.RUNNING 142 } 143 } 144 } 145 } 146 } 147 }
Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。