首页 > 代码库 > Spark分析之Master、Worker以及Application三者之间如何建立连接

Spark分析之Master、Worker以及Application三者之间如何建立连接

Master.preStart(){  webUi.bind()  context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除  case CheckForWorkerTimeOut => {    timeOutDeadWorkers()  }  /** Check for, and remove, any timed-out workers */    def timeOutDeadWorkers() {    ...    if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {      workers -= worker     }  }}
Worker.preStart(){  override def preStart() {    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))    webUi.bind()    registerWithMaster()  //注册该Worker到Master  }  def tryRegisterAllMasters() {    for (masterUrl <- masterUrls) {      logInfo("Connecting to master " + masterUrl + "...")      val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)    }  }}

 

Master.scalacase RegisterWorker(){    persistenceEngine.addWorker(worker)  sender ! RegisteredWorker(masterUrl, masterWebUiUrl)  //向Worker发送Worker注册成功事件  schedule()  //调度部分后续章节分析  }
Worker.scalacase RegisteredWorker(){  registered = true  context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)  //Worker注册成功后,定时向Master发送心跳信息}case SendHeartbeat =>  masterLock.synchronized {  if (connected) { master ! Heartbeat(workerId) }}
Master.scalacase Heartbeat(workerId) => {  idToWorker.get(workerId) match {  case Some(workerInfo) =>    workerInfo.lastHeartbeat = System.currentTimeMillis()  //更新该worker的上次发送心跳信息的时间  case None =>    logWarning("Got heartbeat from unregistered worker " + workerId)  }}

=================如上步骤完成了Worker到Master的连接===============================================

 

SparkContext启动时:

SparkContext.createTaskScheduler()  ==>new SparkDeploySchedulerBackend()    ==>创建AppClient并启动      ==>ClientActor.preStart():registerWithMaster(){actor ! RegisterApplication(appDescription)}  //向Master发起RegisterApplication事件
Master.scalacase RegisterApplication(description) {  val app = createApplication(description, sender)  registerApplication(app)  persistenceEngine.addApplication(app)  sender ! RegisteredApplication(app.id, masterUrl)  //向Worker发起RegisteredApplication事件表示该Application已经注册成功  schedule()}

=======================如上步骤完成了Application到Master的连接===============================================