首页 > 代码库 > Spark分析之Master、Worker以及Application三者之间如何建立连接
Spark分析之Master、Worker以及Application三者之间如何建立连接
Master.preStart(){ webUi.bind() context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除 case CheckForWorkerTimeOut => { timeOutDeadWorkers() } /** Check for, and remove, any timed-out workers */ def timeOutDeadWorkers() { ... if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) { workers -= worker } }}
Worker.preStart(){ override def preStart() { webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.bind() registerWithMaster() //注册该Worker到Master } def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } }}
Master.scalacase RegisterWorker(){ persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) //向Worker发送Worker注册成功事件 schedule() //调度部分后续章节分析 }
Worker.scalacase RegisteredWorker(){ registered = true context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) //Worker注册成功后,定时向Master发送心跳信息}case SendHeartbeat => masterLock.synchronized { if (connected) { master ! Heartbeat(workerId) }}
Master.scalacase Heartbeat(workerId) => { idToWorker.get(workerId) match { case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() //更新该worker的上次发送心跳信息的时间 case None => logWarning("Got heartbeat from unregistered worker " + workerId) }}
=================如上步骤完成了Worker到Master的连接===============================================
SparkContext启动时:
SparkContext.createTaskScheduler() ==>new SparkDeploySchedulerBackend() ==>创建AppClient并启动 ==>ClientActor.preStart():registerWithMaster(){actor ! RegisterApplication(appDescription)} //向Master发起RegisterApplication事件
Master.scalacase RegisterApplication(description) { val app = createApplication(description, sender) registerApplication(app) persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) //向Worker发起RegisteredApplication事件表示该Application已经注册成功 schedule()}
=======================如上步骤完成了Application到Master的连接===============================================
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。