首页 > 代码库 > spark 启动job的流程分析

spark 启动job的流程分析

WordCount开始分析

编写一个例子程序

编写一个从HDFS中读取并计算wordcount的例子程序:

packageorg.apache.spark.examples


importorg.apache.spark.SparkContext

importorg.apache.spark.SparkContext._


objectWordCount{

defmain(args : Array[String]) {

valsc = newSparkContext(args(0),"wordcount by hdfs",

System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass()))

//hadoophdfs的根路径下得到一个文件

valfile =sc.textFile("/hadoop-test.txt")

valcounts =file.flatMap(line=> line.split(" "))

.map(word => (word,1)).reduceByKey(_+ _)

counts.saveAsTextFile("/newtest.txt")

}


}


生成SparkContext实例

在上面例子中,要执行map/reduce操作,首先需要一个SparkContext,因此看看SparkContext的实例生成

defthis(

master: String,

appName: String,

sparkHome: String =null,

jars: Seq[String] = Nil,

environment: Map[String, String]= Map(),

preferredNodeLocationData:Map[String, Set[SplitInfo]] = Map()) =

{

this(SparkContext.updatedConf(newSparkConf(), master, appName, sparkHome, jars, environment),

preferredNodeLocationData)

}


编写WordCount例子时使用了上面列出的构造函数,后面两个environmentpreferredNodeLocationData传入为默认值。

调用updatedConf的单例函数,生成或更新当前的SparkConf实例。

调用SparkContext的默认构造函数。

1.生成并启动监控的Jettyui,SparkUI.

2.生成TaskScheduler实例,并启动。

此函数会根据不同的mastername生成不同的TaskScheduler实例。,yarn-clusterYarnClusterScheduler

主要用来启动/停止task,监控task的运行状态。

private[spark]vartaskScheduler= SparkContext.createTaskScheduler(this,master,appName)

taskScheduler.start()

3.生成DAGScheduler实例,并启动。


@volatileprivate[spark]vardagScheduler=newDAGScheduler(taskScheduler)

dagScheduler.start()


scheduler进行start操作后,通过调用postStartHook来把SparkContext添加到appmaster中。

生成WorkerRunnable线程,通过nmclient启动worker对应的container。此container线程CoarseGrainedExecutorBackend的实例,此实例通过Executor实例来加载相关的task

SparkContext.textFile生成RDD

此方法用来生成RDD的实例,通常读取文本文件的方式通过textFile来进行,并其调用hadoopFile来执行。

通过hadoopFile得到一个HadoopRDD<K,V>的实例后,通过.map得到V的值。并生成RDD返回。

deftextFile(path: String, minSplits: Int = defaultMinSplits):RDD[String] = {

hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text],

minSplits).map(pair=> pair._2.toString)

}

最终通过hadoopFile函数生成一个HadoopRDD实例。

defhadoopFile[K, V](

path: String,

inputFormatClass: Class[_ <:InputFormat[K, V]],

keyClass: Class[K],

valueClass: Class[V],

minSplits: Int = defaultMinSplits

): RDD[(K, V)] = {

//AHadoopconfiguration can be about 10 KB, which is pretty big, so broadcastit.

valconfBroadcast= broadcast(newSerializableWritable(hadoopConfiguration))

valsetInputPathsFunc= (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf,path)

newHadoopRDD(

this,

confBroadcast,

Some(setInputPathsFunc),

inputFormatClass,

keyClass,

valueClass,

minSplits)

}


RDD函数的抽象执行

reduceByKey需要执行shufflereduce,也就是需要多个map中的数据集合到相同的reduce中运行,生成相关的DAG任务

valfile =sc.textFile("/hadoop-test.txt")

valcounts =file.flatMap(line=> line.split(" "))

.map(word => (word,1)).reduceByKey(_+ _)

counts.saveAsTextFile("/newtest.txt")


在以上代码中,textFile,flatMap,map,reduceByKeysparkRDDtransformation,

saveAsTextFile才是RDD中进行执行操作的action.

以下引用http://my-oschina-net/hanzhankang/blog/200275的相关说明:

具体可参见:http://spark.apache.org/docs/0.9.0/scala-programming-guide.html

1transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD

2action是得到一个值,或者一个结果(直接将RDDcache到内存中)

所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。


transformation操作:

map(func):对调用mapRDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

filter(func):对调用filterRDD数据集中的每个元素都使用func,然后返回一个包含使functrue的元素构成的RDD

flatMap(func):map差不多,但是flatMap生成的是多个结果

mapPartitions(func):map很像,但是map是每个element,而mapPartitions是每个partition

mapPartitionsWithSplit(func):mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index

sample(withReplacement,faction,seed):抽样

union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合

distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinctelement

groupByKey(numTasks):返回(K,Seq[V]),也就是hadoopreduce函数接受的key-valuelist

reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数

sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascendingboolean类型

join(otherDataset,[numTasks]):当有两个KVdataset(K,V)(K,W),返回的是(K,(V,W))dataset,numTasks为并发的任务数

cogroup(otherDataset,[numTasks]):当有两个KVdataset(K,V)(K,W),返回的是(K,Seq[V],Seq[W])dataset,numTasks为并发的任务数

cartesian(otherDataset):笛卡尔积就是m*n,大家懂的


action操作:

reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组

count():返回的是dataset中的element的个数

first():返回的是dataset中的第一个元素

take(n):返回前nelements,这个士driverprogram返回的

takeSample(withReplacementnumseed):抽样返回一个dataset中的num个元素,随机种子seed

saveAsTextFilepath):把dataset写到一个textfile中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file

saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

countByKey():返回的是key对应的个数的一个map,作用于一个RDD

foreach(func):dataset中的每个元素都使用func


RDDaction中提交Job

在执行RDDsaveAsTextFile时调用SparkContext.runJob方法

saveAsTextFile方法,-->saveAsHadoopFile,最终调用SparkContext.runJob方法

defsaveAsTextFile(path: String) {

this.map(x=> (NullWritable.get(),newText(x.toString)))

.saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)

}

......以下一行代码就是在saveASTextFile函数嵌套调用中最终调用的函数,调用SparkContext.runJob

self.context.runJob(self,writeToFile _)

SparkContext.runJob的定义:

defrunJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T])=> U): Array[U] = {

runJob(rdd, func,0untilrdd.partitions.size,false)

}

SparkContext的最终执行runJob函数定义

defrunJob[T, U: ClassTag](

rdd: RDD[T],//此处是具体的RDD实例值

func: (TaskContext, Iterator[T])=> U,//具体的执行的action的逻辑,reduceByKey

partitions:Seq[Int],//分区数组,一个数值从0partitions.size-1

allowLocal: Boolean,//是否可以在本地执行

//result的处理逻辑,每一个Task的处理

resultHandler: (Int, U)=>Unit) {

valcallSite =getCallSite

valcleanedFunc= clean(func)

logInfo("Startingjob: " +callSite)

valstart =System.nanoTime

通过DAGScheduler.runJob去执行job的运行操作,请看下面的DAGScheduler处理job提交。

dagScheduler.runJob(rdd,cleanedFunc,partitions,callSite,allowLocal,

resultHandler,localProperties.get)

logInfo("Jobfinished: " +callSite+ ", took "+ (System.nanoTime -start)/ 1e9 + "s")

rdd.doCheckpoint()

}

DAGScheduler处理job提交

上面的函数最终通过DagScheduler.runJob进行执行。

defrunJob[T, U: ClassTag](

rdd: RDD[T],

func: (TaskContext, Iterator[T])=> U,

partitions: Seq[Int],

callSite: String,

allowLocal: Boolean,

resultHandler: (Int, U) =>Unit,

properties: Properties =null)

{

valwaiter =submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler,properties)

等待job运行完成。

waiter.awaitResult()match{

caseJobSucceeded => {}

caseJobFailed(exception:Exception, _) =>

logInfo("Failedto run " + callSite)

throwexception

}

}

调用DAGShceduler.submitJob来提交任务。

defsubmitJob[T, U](

rdd: RDD[T],

func: (TaskContext, Iterator[T])=> U,

partitions: Seq[Int],

callSite: String,

allowLocal: Boolean,

resultHandler: (Int, U) =>Unit,

properties: Properties =null):JobWaiter[U] =

{

//Check to make sure we are not launching a task on a partition thatdoes not exist.

valmaxPartitions= rdd.partitions.length

partitions.find(p => p >=maxPartitions).foreach{ p =>

thrownewIllegalArgumentException(

"Attemptingto access a non-existent partition: "+ p + ". "+

"Totalnumber of partitions: " +maxPartitions)

}


valjobId =nextJobId.getAndIncrement()

if(partitions.size ==0){

returnnewJobWaiter[U](this,jobId,0,resultHandler)

}


assert(partitions.size >0)

valfunc2 =func.asInstanceOf[(TaskContext, Iterator[_]) => _]

valwaiter =newJobWaiter(this,jobId,partitions.size, resultHandler)

akkaactor发送一个event,eventJobSubmitted,!表示发送消息

eventProcessActor! JobSubmitted(

jobId,rdd,func2,partitions.toArray, allowLocal, callSite,waiter,properties)

waiter

}


DAGShceduler中的start方法时,会生成如下代码,此代码receive接收eventProcessActor发送的消息并进行处理

defstart() {

eventProcessActor= env.actorSystem.actorOf(Props(newActor {

/**

* A handle to the periodicaltask, used to cancel the task when the actor is stopped.

*/

varresubmissionTask:Cancellable = _


overridedefpreStart() {

importcontext.dispatcher

/**

* A message is sent to theactor itself periodically to remind the actor to resubmit failed

* stages. In this way, stageresubmission can be done within the same thread context of

* other event processing logicto avoid unnecessary synchronization overhead.

*/

resubmissionTask=context.system.scheduler.schedule(

RESUBMIT_TIMEOUT,RESUBMIT_TIMEOUT,self,ResubmitFailedStages)

}


/**

* The main event loop of the DAGscheduler.

*/

接收发送的scheduler事件,并通过processEvent进行处理。

defreceive = {

caseevent:DAGSchedulerEvent =>

logTrace("Gotevent of type " +event.getClass.getName)


/**

* All events are forwarded to`processEvent()`, so that the event processing logic can

* easily tested withoutstarting a dedicated actor. Please refer to `DAGSchedulerSuite`

* for details.

*/

if(!processEvent(event)){

submitWaitingStages()

}else{

resubmissionTask.cancel()

context.stop(self)

}

}

}))

}


processEvent中处理JobSubmitted的处理流程:

以下代码中生成一个finalStage,每一个JOB都有一个finalStage,根据job划分出不同的stage,并且提交stage

private[scheduler]defprocessEvent(event: DAGSchedulerEvent): Boolean = {

eventmatch{

caseJobSubmitted(jobId,rdd,func,partitions,allowLocal,callSite,listener,properties)=>

varfinalStage:Stage = null

try{

//New stage creation may throw an exception if, for example, jobs arerun on a HadoopRDD

//whose underlying HDFS files have been deleted.

finalStage= newStage(rdd,partitions.size,None,jobId,Some(callSite))

}catch{

casee:Exception =>

logWarning("Creatingnew stage failed due to exception - job: "+jobId, e)

listener.jobFailed(e)

returnfalse

}

valjob = newActiveJob(jobId,finalStage,func,partitions,callSite,listener,properties)

clearCacheLocs()

logInfo("Gotjob " +job.jobId+" ("+ callSite+ ") with "+partitions.length+

"output partitions (allowLocal=" +allowLocal+")")

logInfo("Finalstage: " +finalStage+" ("+ finalStage.name+")")

logInfo("Parentsof final stage: " +finalStage.parents)

logInfo("Missingparents: " +getMissingParentStages(finalStage))

如果可以本地运行,同时此finalStage没有stage的依赖关系,同时partitions只有一个。也就是只有一个处理的split

那么这时直接通过localThread的方式来运行此job实例。不通过TaskScheduler进行处理。

if(allowLocal&&finalStage.parents.size==0 &&partitions.length==1) {

//Compute very short actions like first() or take() with no parentstages locally.

listenerBus.post(SparkListenerJobStart(job,Array(),properties))

runLocally(job)

}else{

否则表示partitions有多个,或者stage本身的依赖关系,也就是像reduce这种场景。

根据job对应的stage(finalStage),调用submitStage,通过stage之间的依赖关系得出stageDAG,并以依赖关系进行处理:

idToActiveJob(jobId)=job

activeJobs+=job

resultStageToJob(finalStage)=job

listenerBus.post(SparkListenerJobStart(job,jobIdToStageIds(jobId).toArray,properties))

submitStage(finalStage)

}


submitStage方法处理流程:

privatedef submitStage(stage: Stage) {

valjobId =activeJobForStage(stage)

if(jobId.isDefined){

logDebug("submitStage("+ stage +")")

if(!waiting(stage)&& !running(stage)&& !failed(stage)){

valmissing =getMissingParentStages(stage).sortBy(_.id)

logDebug("missing:" +missing)

if(missing ==Nil) {

logInfo("Submitting" + stage +"(" + stage.rdd+"), which has no missingparents")

submitMissingTasks(stage,jobId.get)

running+= stage

}else{

for(parent <-missing) {

submitStage(parent)

}

waiting+= stage

}

}

}else{

abortStage(stage,"Noactive job for stage " + stage.id)

}

}


对于一个刚生成的job,此时的stage为刚生成,

此时submitStage调用getMissingParentStages得到stageparent,也就是RDD的依赖关系

生成parentStage是通过RDDdependencies来生成相关的RDD的依赖关系,

如果依赖关系是ShuffleDependency,生成一个mapStage来作为finalStageparent,

否则是NarrowDependency,不生成新的stage.count,各task没有相关的数据依赖

也就是说,对应需要执行shuffle操作的job,会生成mapStagefinalStage进行,

而不需要shufflejob只需要一个finalStage

privatedef getMissingParentStages(stage:Stage): List[Stage] = {

valmissing =newHashSet[Stage]

valvisited =newHashSet[RDD[_]]

defvisit(rdd: RDD[_]) {

if(!visited(rdd)){

visited+= rdd

if(getCacheLocs(rdd).contains(Nil)){

for(dep <-rdd.dependencies) {

depmatch{

caseshufDep:ShuffleDependency[_,_] =>

valmapStage =getShuffleMapStage(shufDep,stage.jobId)

if(!mapStage.isAvailable){

missing+=mapStage

}

casenarrowDep:NarrowDependency[_] =>

visit(narrowDep.rdd)

}

}

}

}

}

visit(stage.rdd)

missing.toList

}


接下来回到submitStage方法中,如果stage没有missingstage(没有parentstage),执行task的提交操作。

if (missing== Nil) {

logInfo("Submitting" + stage +"(" + stage.rdd+"), which has no missingparents")

submitMissingTasks(stage,jobId.get)

设置当前的stagerunning,因为当前的stage没有parentstage,直接running当前的stage

running+= stage

}else{

for(parent <-missing) {

stage中包含有parentstage,因此stage需要进行顺序执行。先执行parentstage.递归调用

submitStage(parent)

}

设置当前的stagewaiting,表示此stage需要等待parent的执行完成。

waiting+= stage

}


执行submitMissingTasks流程处理,把stage根据partition生成TaskSet,通过TaskScheduler提交Task.

privatedef submitMissingTasks(stage:Stage, jobId: Int) {

logDebug("submitMissingTasks("+ stage +")")

//Get our pending tasks and remember them in our pendingTasks entry

valmyPending =pendingTasks.getOrElseUpdate(stage,newHashSet)

myPending.clear()

vartasks =ArrayBuffer[Task[_]]()

检查stage是否是mapStage,如果是shuffleMapStage,生成ShuffleMapTask,并添加到tasks列表中。

mapStage表示还有其它stage依赖此stage

if(stage.isShuffleMap){

for(p <- 0until stage.numPartitionsifstage.outputLocs(p)== Nil) {

得到rddstage中当前传入的partitionTaskLocation(也就是Taskhost)

vallocs =getPreferredLocs(stage.rdd,p)

tasks+=newShuffleMapTask(stage.id,stage.rdd,stage.shuffleDep.get,p,locs)

}

}else{

否则表示是一个finalStage,此类stage直接输出结果,生成ResultTask,并添加到tasks列表中。

//This is a final stage; figure out its job‘s missing partitions

valjob =resultStageToJob(stage)

for(id <- 0untiljob.numPartitionsif!job.finished(id)){

valpartition =job.partitions(id)

得到rddstage中当前传入的partitionTaskLocation(也就是Taskhost)

vallocs =getPreferredLocs(stage.rdd,partition)

tasks+=newResultTask(stage.id,stage.rdd,job.func,partition,locs,id)

}

}


valproperties=if(idToActiveJob.contains(jobId)){

idToActiveJob(stage.jobId).properties

}else{

//thisstage will be assigned to "default" pool

null

}


//must be run listener before possible NotSerializableException

//should be "StageSubmitted" first and then "JobEnded"

listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage),properties))

如果有生成的tasks,也就是此job中有需要执行的task,

if(tasks.size>0) {

//Preemptively serialize a task to make sure it can be serialized. Weare catching this

//exception here because it would be fairly hard to catch thenon-serializableexception

//down the road, where we have several different implementations forlocal scheduler and

//cluster schedulers.

try{

SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)

}catch{

casee:NotSerializableException =>

abortStage(stage,"Tasknot serializable: " +e.toString)

running-= stage

return

}


logInfo("Submitting" +tasks.size+" missing tasks from "+ stage +" ("+ stage.rdd+")")

myPending++=tasks

logDebug("Newpending tasks: " +myPending)

执行TaskScheduler.submitTasks处理函数,TaskScheduler的实现在onyarn中为YarnClusterScheduler.

请参见下面的TaskScheduler提交task流程分析

taskSched.submitTasks(

newTaskSet(tasks.toArray,stage.id,stage.newAttemptId(), stage.jobId,properties))

stageToInfos(stage).submissionTime= Some(System.currentTimeMillis())

}else{

logDebug("Stage" + stage +"is actually done; %b %d %d".format(

stage.isAvailable,stage.numAvailableOutputs,stage.numPartitions))

running-= stage

}

}


到目前为此,jobDAGScheduler的处理流程完成。等待TaskScheduler处理完数据后,回调DAGScheduler.


TaskScheduler提交task流程分析

TaskScheduleronyarn模式时,实现为YarnClusterScheduler。提交task时,通过调用submitTasks函数。

YarnClusterScheduler继承与TaskSchedulerImpl.

通过TaskSchedulerImpl.submitTaskstask的提交进行处理。


overridedef submitTasks(taskSet: TaskSet){

valtasks =taskSet.tasks

logInfo("Addingtask set " + taskSet.id+" with "+ tasks.length+ " tasks")

this.synchronized{

生成一个TaskSetManager实例,并把此实例设置到activeTaskSets的容器中。

在生成实例的过程中,会把taskSet传入,并得到要执行的task个数,

并根据tasklocation信息,

生成副本执行次数的容器copiesRunning,列表的个数为jobtask的个数,所有的列表值为0,表示没有副本执行

task分别放到pendingTasksForExecutor(process_local)此时没有值,

/pendingTasksForHost(node_local),此时此节点的task全在此里面,hostworker注册时已经存在

/pendingTasksForRack(rack)/,通常情况不会有值

pendingTasksWithNoPrefs(待分配),通常情况不会有值。

allPendingTasks(any)

所有的task都在最后一个中

valmanager =newTaskSetManager(this,taskSet,maxTaskFailures)

activeTaskSets(taskSet.id)=manager

TaskSetManager添加到rootPool中。

schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)

针对此TaskSet(job)生成一个跟踪每一个task的容器

taskSetTaskIds(taskSet.id)=newHashSet[Long]()

定时检查taskSet是否被启动,如果没有被启动,提示无资源,如果被启动成功,关闭此检查线程。

if(!isLocal && !hasReceivedTask){

starvationTimer.scheduleAtFixedRate(newTimerTask() {

overridedefrun() {

if(!hasLaunchedTask){

logWarning("Initialjob has not accepted any resources; "+

"checkyour cluster UI to ensure that workers are registered "+

"andhave sufficient memory")

}else{

this.cancel()

}

}

},STARVATION_TIMEOUT,STARVATION_TIMEOUT)

}

hasReceivedTask=true

}

通过backend发起执行消息,backendSchedulerBackend的具体实现,

yarn-cluster模式为CoarseGrainedSchedulerBackend

backend.reviveOffers()

}


CoarseGrainedSchedulerBackend.reviveOffers

通过driverActoractor实例发起一个ReviveOffers的事件处理消息。

overridedef reviveOffers() {

driverActor! ReviveOffers

}


driverActor的实现为CoarseGrainedSchedulerBackend.DriverActor实例。


DriverActor中处理revive的函数为receive.其中,处理ReviveOffers部分定义如下:

caseReviveOffers =>

makeOffers()

最终调用的makeOffers函数。

defmakeOffers() {

executorHostfreeCores的值由来请查看appmaster启动时的补充

launchTasks(scheduler.resourceOffers(

executorHost.toArray.map{case(id,host)=> newWorkerOffer(id,host,freeCores(id))}))

}


通过CoarseGrainedSchedulerBackend对应的scheduler(TaskSchdulerImpl).resourceOffers得到tasks

defresourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] =synchronized {

SparkEnv.set(sc.env)


//Mark each slave as alive and remember itshostname

for(o <-offers) {

executor(worker)对应的host存储到对应的容器中,通过executoridhost

executorIdToHost(o.executorId)=o.host

得到当前所有注册的workerhost,写入到对应的容器中,此容器表示node_local

if(!executorsByHost.contains(o.host)){

executorsByHost(o.host)=newHashSet[String]()


通过DAGScheduler.executorGainedexecutorIdhost进行处理,

请参见下面DAGScheduler中处理ExecutorGained处理。

executorGained(o.executorId,o.host)

}

}

根据所有的worker,根据每一个worker的的cpucore,生成[arraybuffer[]]

//Build a list of tasks to assign to each worker

valtasks =offers.map(o => newArrayBuffer[TaskDescription](o.cores))

得到每一个worker可用的cpu

valavailableCpus= offers.map(o => o.cores).toArray

得到rootPool中排序后的队列中的所有的TaskSet存储的TaskSetMansger数组

valsortedTaskSets=rootPool.getSortedTaskSetQueue()

for(taskSet <-sortedTaskSets){

logDebug("parentName:%s, name: %s, runningTasks: %s".format(

taskSet.parent.name,taskSet.name,taskSet.runningTasks))

}


//Take each TaskSet in our scheduling order, and then offer it eachnode in increasing order

//of locality levels so that it gets a chance to launch local tasks onall of them.

varlaunchedTask=false

迭代出每一个TaskSetMansger,同时根据每一个TaskSetMansger,

迭代去按网络的优先级执行PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL,ANY

scala中的for如果包含多个执行器,也就是<-的表达式,多个用;号分开,后面一个优先前面一个执行

也就是后一个执行完成后,相当于一个嵌套的for

此处开始执行对taskSet的执行节点选择,针对每一个taskset,首先使用PROCESS_LOCAL开始。

for(taskSet <-sortedTaskSets;maxLocality<- TaskLocality.values) {

do{

迭代所有的worker,并在每迭代出一个worker时,在此机器上生成执行taskSet中对应的相关task

针对TaskSetmanager.resourceOffer的处理流程,见后面的细节分析,现在不分析此实现。

launchedTask=false

for(i <- 0until offers.size) {

valexecId =offers(i).executorId

valhost =offers(i).host

生成task执行的节点信息等,每次执行resourceOffer生成一个TaskDescription

task对应的executoridhost添加到对应的activeExecutorIdsexecutorsByHost

for(task <-taskSet.resourceOffer(execId,host,availableCpus(i),maxLocality)){

tasks(i)+=task

valtid =task.taskId

taskIdToTaskSetId(tid)=taskSet.taskSet.id

taskSetTaskIds(taskSet.taskSet.id)+=tid

taskIdToExecutorId(tid)=execId

此时把activeExecutorIds的值添加一个正在执行的executor,这个值的作用是当有多个stage的依赖时,

下一个stage在执行submitTasks时,

生成的TaskSetManager中会把新stage对应的taskexecutor直接使用此executor,也就是PROCESS_LOCAL.

activeExecutorIds+=execId

executorsByHost(host)+=execId

availableCpus(i) -=1

launchedTask=true

}

}

TaskLocality,如果在一个较小的locality时找到一个task,从这个locality中接着找,

否则跳出去从下一个locality重新找,放大locality的查找条件。

如果launchedTask的值为true,表示在传入的locality级别上查找到task要执行对应的级别,

那么在当前级别下接着去找到下一个可执行的TASK,否则launchedTask的值为false,放大一个locality的级别。

launchedTask的值为false,当前迭代的locality的级别为PROCESS_LOCAL,那么把级别放大到NODE_LOCAL重新查找.

}while(launchedTask)

}

如果tasks生成成功,设置hasLaunchedTask的值为true,前面我们提到过的submitTasks中的检查线程开始结束。

if(tasks.size>0) {

hasLaunchedTask=true

}

返回生成成功的task列表。交给CoarseGrainedSchedulerBackend.launchTasks处理

returntasks

}



CoarseGrainedSchedulerBackend.launchTasks处理流程:

通过worker注册的actor,CoarseGrainedExecutorBackend发送消息,处理LaunchTask事件

deflaunchTasks(tasks: Seq[Seq[TaskDescription]]) {

for(task <-tasks.flatten) {

freeCores(task.executorId) -=1

executorActor(task.executorId)! LaunchTask(task)

}

}


CoarseGrainedExecutorBackend中处理LaunchTask事件事件。

overridedef receive = {

caseLaunchTask(taskDesc) =>

logInfo("Gotassigned task " +taskDesc.taskId)

if(executor==null){

logError("ReceivedLaunchTask command but executor was null")

System.exit(1)

}else{

通过executor执行task,见后面的分析。

executor.launchTask(this,taskDesc.taskId,taskDesc.serializedTask)

}


TaskSetManager.resourceOffer函数,每次执行得到一个task的执行节点。


defresourceOffer(

execId: String,

host: String,

availableCpus: Int,

maxLocality:TaskLocality.TaskLocality)

:Option[TaskDescription] =

{
如果成功的task个数小于当前的job要执行的task的个数,

同时worker中可用的cpu资源需要大于或等于spark.task.cpus配置的值,默认需要大于或等于1.

if(tasksSuccessful<numTasks&& availableCpus >=CPUS_PER_TASK){

valcurTime =clock.getTime()

得到一个默认的locality的值,默认情况下最有可能是NODE_LOCAL.

此处根据上一次查找可执行节点的时间,得到一个合适此执行时间的一个locality级别。

通过spark.locality.wait配置全局的等待时间。默认为3000ms。作用于PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL

通过spark.locality.wait.process配置PROCESS_LOCAL的等待时间。

通过spark.locality.wait.node配置NODE_LOCAL的等待时间。

通过spark.locality.wait.rack配置RACK_LOCAL的等待时间。

这里的查找方式是通过当前的currentLocalityIndex的值,默认从0开始,找到对应可执行的级别,

检查当前时间减去上次的查找级别的执行时间是否大于上面配置的在此级别的执行时间,

如果大于配置的时间,把currentLocalityIndex的值+1重新检查,返回一个合适的locality级别。

如果执行查找的时间超过了以上配置的几个locality的级别的查找时间,此时返回的值为ANY.

varallowedLocality= getAllowedLocalityLevel(curTime)

首先把当前可执行的locality设置为PROCESS_LOCAL.maxLocality是最大的级别,

得到的可执行级别不能超过此级别,PROCESS_LOCAL开始一级一级向上加大。

maxLocality的级别从PROCESS_LOCAL一级一级向上加,

如果getAllowedLocalityLevel查找到的级别大于现在传入的级别。把级别设置为传入的级别。

maxLocality传入按PROCESS_LOCAL/NODE_LOCAL/RACK_LOCAL/ANY进行传入。

if(allowedLocality> maxLocality) {

allowedLocality= maxLocality// We‘re not allowed tosearch for farther-away tasks

}

通过findTask来得到task对应的执行网络选择。

见下面的TaskSetManager.findTask选择task的执行节点的流程部分

findTask(execId, host,allowedLocality)match{

caseSome((index,taskLocality))=> {

//Found a task; do some bookkeeping and return a task description

valtask =tasks(index)

valtaskId =sched.newTaskId()

//Figure out whether this should count as a preferred launch

logInfo("Startingtask %s:%d as TID %s on executor %s: %s (%s)".format(

taskSet.id,index,taskId,execId, host, taskLocality))

设置task的执行副本加一,

//Do various bookkeeping

copiesRunning(index) +=1

valinfo = newTaskInfo(taskId,index,curTime,execId, host, taskLocality)

taskInfos(taskId)=info

taskAttempts(index)=info ::taskAttempts(index)

得到当前加载的节点执行级别的index,并更新当前查找此执行节点的查找时间为当前时间。

//Update our locality level for delay scheduling

currentLocalityIndex= getLocalityIndex(taskLocality)

lastLaunchTime=curTime

//Serialize and return the task

valstartTime =clock.getTime()

//We rely on the DAGScheduler to catch non-serializableclosures and RDDs, so in here

//we assume the task can be serialized without exceptions.

valserializedTask= Task.serializeWithDependencies(

task,sched.sc.addedFiles,sched.sc.addedJars,ser)

valtimeTaken =clock.getTime() - startTime

task添加到runningTasksSet的容器中。

addRunningTask(taskId)

logInfo("Serializedtask %s:%d as %d bytes in %d ms".format(

taskSet.id,index,serializedTask.limit,timeTaken))

valtaskName ="task %s:%d".format(taskSet.id,index)

如果task的执行尝试的值为1,表示是第一次尝试执行,通过DAGScheduler触发BeginEvent事件。

if(taskAttempts(index).size==1)

taskStarted(task,info)

returnSome(newTaskDescription(taskId,execId,taskName,index,serializedTask))

}

case_ =>

}

}

None

}


TaskSetManager.findTask选择task的执行节点的流程部分:

从不同的locality级别中取出需要执行的task.

privatedef findTask(execId: String,host: String, locality: TaskLocality.Value)

:Option[(Int, TaskLocality.Value)] =

{

此处优先找PROCESS_LOCAL的值,但是我现在还没有搞明白这个pendingTasksForExecutor的值从何来。

TaskSetManager生成时可以看出pendingTasksForExecutor的值在实例生成时,

TaskSchedulerImpl.activeExecutorIds中检查并生成。但实例生成此,此容器还没有值。这点还没搞明白。

新的批注:

当有stage的依赖关系时,第一个stage执行完成后,activeExecutorIds的容器会有执行过的executor列表。

对上一个stage执行完成后,新的一个stage开始执行,

生成的TaskSetManagerpendingTasksForExecutor中包含可以直接使用上一个stage中部分task执行的executortask.

因此,如果有stage的依赖关系时,下一个stage中的task在此时如果executorid相同,直接使用PROCESS_LOCAL来执行。

第一个stage执行时,PROCESS_LOCAL不会被选择,正常情况locality的选择会放大的NODE_LOCAL开始。

for(index <-findTaskFromList(getPendingTasksForExecutor(execId))) {

returnSome((index,TaskLocality.PROCESS_LOCAL))

}

if(TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)){

for(index <-findTaskFromList(getPendingTasksForHost(host))) {

returnSome((index,TaskLocality.NODE_LOCAL))

}

}

if(TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)){

for{

rack<- sched.getRackForHost(host)

index<- findTaskFromList(getPendingTasksForRack(rack))

} {

returnSome((index,TaskLocality.RACK_LOCAL))

}

}

//Look for no-preftasks after rack-local tasks since they can run anywhere.

for(index <-findTaskFromList(pendingTasksWithNoPrefs)){

returnSome((index,TaskLocality.PROCESS_LOCAL))

}

if(TaskLocality.isAllowed(locality, TaskLocality.ANY)){

for(index <-findTaskFromList(allPendingTasks)){

returnSome((index,TaskLocality.ANY))

}

}

//Finally, if all else has failed, find a speculative task

findSpeculativeTask(execId, host,locality)

}

appmaster启动时的补充


一些需要的说明:在makeOffers中调用了TaskScheduler.resourceOffers函数,

此函数中传入的executorHost,freeCores的值什么时候得到呢:

我们知道在appmaster启动的时候。会根据设置的num-worker个数,向rm申请worker运行的资源,

并通过WorkerRunnable启动worker对应的container。启动CoarseGrainedExecutorBackend实例在container.

在实例中连接appmaster对应的sparkContext中的scheduler中的CoarseGrainedSchedulerBackend.DriverActor

DriverActornameCoarseGrainedSchedulerBackend.ACTOR_NAME.


如下是CoarseGrainedExecutorBackend生成的一些代码片段:

YarnAllocationHandler.allocateResources中得到actor的名称。

valworkerId =workerIdCounter.incrementAndGet().toString

valdriverUrl ="akka.tcp://spark@%s:%s/user/%s".format(

sparkConf.get("spark.driver.host"),

sparkConf.get("spark.driver.port"),

CoarseGrainedSchedulerBackend.ACTOR_NAME)


YarnAllocationHandler.allocateResources通过WorkerRunnable的线程启动workercontainer

valworkerRunnable=newWorkerRunnable(

container,

conf,

sparkConf,

driverUrl,

workerId,

workerHostname,

workerMemory,

workerCores)

newThread(workerRunnable).start()


CoarseGrainedExecutorBackend实例启动时,向actor注册。

overridedefpreStart() {

logInfo("Connectingto driver: " + driverUrl)

driver=context.actorSelection(driverUrl)

发起worker启动时注册Executor的消息。

driver!RegisterExecutor(executorId,hostPort, cores)

context.system.eventStream.subscribe(self,classOf[RemotingLifecycleEvent])

}


CoarseGrainedSchedulerBackend中发起RegisterExecutor的事件处理。

defreceive = {

caseRegisterExecutor(executorId,hostPort,cores) =>

Utils.checkHostPort(hostPort,"Host port expected "+ hostPort)

如果此executorActor中已经包含有发送此消息过来的actor,表示此worker已经注册,

通过发送消息过来的actor(sender表示发送此消息的actor)发送一个RegisterExecutorFailed事件。

if(executorActor.contains(executorId)){

sender! RegisterExecutorFailed("Duplicateexecutor ID: " + executorId)

}else{

否则表示actor(worker)还没有被注册,把actor添加到executorActor中,

同时向发送消息过来的actor(sender表示发送此消息的actor)发送一个RegisteredExecutor消息.

logInfo("Registeredexecutor: " + sender +"with ID " +executorId)

sender !RegisteredExecutor(sparkProperties)

executorActor(executorId)= sender

添加在TaskScheduler中提交task时使用的executorHost,freeCores

executorHost(executorId)= Utils.parseHostPort(hostPort)._1

freeCores(executorId)=cores

executorAddress(executorId)= sender.path.address

addressToExecutorId(sender.path.address)=executorId

totalCoreCount.addAndGet(cores)

把现在注册的所有的节点添加到TaskScheduler.executorsByHost中。在生成TaskSetManager是会使用

makeOffers()

}


CoarseGrainedExecutorBackend中接收appmasterschedulerreceive.

针对RegisteredExecutorRegisterExecutorFailed的处理流程:

receive函数中处理RegisterExecutorFailed:如果已经存在,直接exit掉此jvm.

caseRegisterExecutorFailed(message)=>

logError("Slaveregistration failed: " +message)

System.exit(1)


receive函数中处理RegisteredExecutor:如果不存在,生成Executor实例。此时worker启动完成,

并向master注册成功。

caseRegisteredExecutor(sparkProperties) =>

logInfo("Successfullyregistered with driver")

//Make this host instead of hostPort ?

executor=newExecutor(executorId, Utils.parseHostPort(hostPort)._1,sparkProperties)


RDD的依赖关系

..........

Spark中的Scheduler

........

Task的执行过程