首页 > 代码库 > 大数据:Spark Standalone 集群调度(一)从远程调试开始说application创建

大数据:Spark Standalone 集群调度(一)从远程调试开始说application创建

远程debug,特别是在集群方式时候,会很方便了解代码的运行方式,这也是码农比较喜欢的方式

虽然scala的语法和java不一样,但是scala是运行在JVM虚拟机上的,也就是scala最后编译成字节码运行在JVM上,那么远程调试方式就是JVM调试方式

在服务器端:

-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7001,suspend=y 

客户端通过socket就能远程调试代码

1. 调试submit, master, worker代码

1.1 Submit 调试 

客户端client 运行Submit,这里就不描述,通常spark的用例都是用

spark-submit

提交一个spark任务

其本质就是类似下面命令

/usr/java/jdk1.8.0_111/bin/java -cp /work/spark-2.1.0-bin-hadoop2.7/conf/:/work/spark-2.1.0-bin-hadoop2.7/jars/* -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7000,suspend=y -Xmx1g org.apache.spark.deploy.SparkSubmit --master spark://raintungmaster:7077 --class rfcexample --jars /work/spark-2.1.0-bin-hadoop2.7/examples/jars/scopt_2.11-3.3.0.jar,/work/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar /tmp/machinelearning.jar

调用SparkSubmit的类去提交任务,debug的参数直接往上加就是了

1.2 master, worker 的设置调试

export SPARK_WORKER_OPTS="-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8000,suspend=n"
export SPARK_MASTER_OPTS="-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8001,suspend=n"
在启动的时候设置环境变量就可以了

2. 调试executor 代码

发现设置woker环境参数,但确一直都无法调试在spark executor 运行的代码,既然executor是在worker上运行的,当然是可以远程debug,但为啥executor不能调试呢?

3. Spark standalone 的集群调度

既然executor不能调试,我们需要把submit, master, worker的调度关系搞清楚

3.1 Submit 提交任务

刚才已经描述过submit实际上初始化了SparkSubmit的类,在SparkSubmit的main方法中调用了runMain方法

try {
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        findCause(t) match {
          case SparkUserAppException(exitCode) =>
            System.exit(exitCode)

          case t: Throwable =>
            throw t
        }
    }

而核心就是调用了在我们提交的类的main方法,在上面的例子里就是参数
--class rfcexample
调用了rfcexample的main方法

通常我们在写spark的运行的类的方法,会初始化spark的上下文
val sc = new SparkContext(conf)
SparkContext初始化的时候会启动一个task的任务
// Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
    // constructor
    _taskScheduler.start()

在standalone 的模式下最后会调用StandaloneSchedulerBackend.scala 的start方法

    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)

构建Application的描述符号,启动一个StandaloneAppClient 去connect 的master

3.2 Master 分配任务

Submit 里创建了一个客户端构建了一个application的描述,注册application 到master中,在master的dispatcher分发消息会收到registerapplication的消息
case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don‘t send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }

创建一个新的application id, 注册这个application,一个application只能绑定一个客户端端口,同一个客户端的ip:port只能注册一个application,在schedule里通过计算application的内存,core的要求,进行对有效的worker分配executor
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

在worker的endpoint发送了LaunchExecutor的序列化消息

3.3 Worker 分配任务

在worker.scala中dispatcher收到了LaunchExecutor 消息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          // Create the executor‘s working directory
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          val appLocalDirs = appDirectories.getOrElse(appId,
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq)
          appDirectories(appId) = appLocalDirs
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }
创建了一个工作目录,启动了ExecutorRunner
private[worker] def start() {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() }
    }
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It‘s possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.RUNNING) {
        state = ExecutorState.FAILED
      }
      killProcess(Some("Worker shutting down")) }
  }

在ExecutorRunner.scala的start的方法里,启动了线程ExecutorRunner for xxx, 运行executor,难道application里的方法就是在这个线程里运行的?
private def fetchAndRunExecutor() {
    try {
      // Launch the process
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
      .....
      process = builder.start()
      ......
      val exitCode = process.waitFor()
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      ......
    }
  }

在看fetchAndRunExecutor的方法里,我们看到了builder.start,这是一个ProcessBuilder,也就是当前线程启动了一个子进程运行命令

这就是为什么我们无法通过debug worker的方式去debug executor, 因为这是另一个进程

4. 调试executor进程

我们刚才跟了代码一路,发现在master接受到RegisterApplication消息到发送调度worker的LaunchExecutor消息,并没有对消息进行处理,最后子进程的运行命令是从ApplicationDescription中的command获取到,而我们也知道ApplicationDescription 就是3.1种的Submit创建的,那就回到StandaloneSchedulerBackend.scala 的start方法

val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
我们看到了executor 的java的参数是在javaOpts里控制的,也就是
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
原来是参数spark.executor.extraJavaOptions控制的,反过来去翻spark文档,虽然有点晚
spark.executor.extraJavaOptions (none)A string of extra JVM options to pass to executors. For instance, GC settings or other logging.  Note that it is illegal to set Spark properties or maximum heap size (-Xmx)settings  with this option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory.

在这个文档里,我们可以通过设置conf 对spark_submit 进行executor 进行JVM参数设置
--conf "spark.executor.extraJavaOptions=-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7001,suspend=y"
整个运行的submit的进程就是
/usr/java/jdk1.8.0_111/bin/java -cp /work/spark-2.1.0-bin-hadoop2.7/conf/:/work/spark-2.1.0-bin-hadoop2.7/jars/* -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7000,suspend=y -Xmx1g org.apache.spark.deploy.SparkSubmit --master spark://raintungmaster:7077 --class rfcexample --jars /work/spark-2.1.0-bin-hadoop2.7/examples/jars/scopt_2.11-3.3.0.jar,/work/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar --conf "spark.executor.extraJavaOptions=-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7001,suspend=y" /tmp/machinelearning.jar

注意:
如果你的worker 不能起多个executor,毕竟监听端口在一起机器上只能起一个。




大数据:Spark Standalone 集群调度(一)从远程调试开始说application创建