exec "$SPARK_HOME"/bin/spark-submit sparkr-shell-main "$@"


exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"



case SparkSubmitAction.SUBMIT => submit(appArgs)

 * Submit the application using the provided parameters.? *? 
 * This runs in two steps. First, we prepare the launch environment by setting up? 
 * the appropriate classpath, system properties, and application arguments for? 
 * running the child main class based on the cluster manager and the deploy mode.?
 * Second, we use this launch environment to invoke the main method of the child?        * main class.? 
?private def submit(args: SparkSubmitArguments): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
Usage: RRunner <main R file> [app arguments]
sun.java.command=com.aliyun.odps.cupid.runtime.Main --class org.apache.spark.deploy.RRunner --primary-r-file testOdpsRdd.R --arg testOdpsRdd.R?  
new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
如何让spark worker识别sparkR代码呢?在R语言中变量R_PROFILE_USER ,用来初始化R运行环境,sparkR相关代码被打包提交到计算集群以后,在计算节点上面首先设置这个数值指向到初始化脚本${SPARK_HOME}/sparkr/SparkR/profile/general.R,这个脚本中识别路径,并且把解压后sparkR的代码安装到当前R环境中。下面是其代码

.First <- function() {
  packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
  .libPaths(c(packageDir, .libPaths()))
// In YARN mode for an R app, add the SparkR package archive to archives?
// that can be distributed with the job
?if (args.isR && clusterManager == YARN) {?  
  val rPackagePath = RUtils.localSparkRPackagePath?
  if (rPackagePath.isEmpty) {?    
     printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")?  
   val rPackageFile =?    RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)? 
    if (!rPackageFile.exists()) {?    
       printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")?  
    val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)?? 
    // Assigns a symbol link name "sparkr" to the shipped package.?  
    args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")?
?// If we‘re running a R app, set the main class to our specific R runner

?if (args.isR && deployMode == CLIENT) {?
  if (args.primaryResource == SPARKR_SHELL) {?
    args.mainClass = "org.apache.spark.api.r.RBackend"
?  } else {?
    // If a R file is provided, add it to the child arguments and list of files to deploy.?       // Usage: RRunner <main R file> [app arguments]?   
    args.mainClass = "org.apache.spark.deploy.RRunner"?
    args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs?            args.files = mergeFileLists(args.files, args.primaryResource)?  
    if (isYarnCluster && args.isR) {? 
 // In yarn-cluster mode for a R app, add primary resource to files? 
 // that can be distributed with the job?
  args.files = mergeFileLists(args.files, args.primaryResource)
// In legacy standalone cluster mode, use Client as a wrapper around the user 
?childMainClass = "org.apache.spark.deploy.Client"
?// In client mode, launch the application main class directly
?// In addition, add the main application jar and any added jars (if any) to the classpath
?if (deployMode == CLIENT) {
?  childMainClass = args.mainClass?
  if (isUserJar(args.primaryResource)) {?
    childClasspath += args.primaryResource?  
}?  if (args.jars != null) {
 childClasspath ++= args.jars.split(",") 
}?  if (args.childArgs != null) {
 childArgs ++= args.childArgs 
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class?
if (isYarnCluster) {
?  childMainClass = "org.apache.spark.deploy.yarn.Client"
?  if (args.isPython) {
?    childArgs += ("--primary-py-file", args.primaryResource)?
    if (args.pyFiles != null) {
?      childArgs += ("--py-files", args.pyFiles)?
    childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")?  
} else if (args.isR) {
?    val mainFile = new Path(args.primaryResource).getName? 
   childArgs += ("--primary-r-file", mainFile)?
    childArgs += ("--class", "org.apache.spark.deploy.RRunner")?  
} else {?
    if (args.primaryResource != SPARK_INTERNAL) {
?      childArgs += ("--jar", args.primaryResource)?
    childArgs += ("--class", args.mainClass)?
  }?  if (args.childArgs != null) {
?    args.childArgs.foreach { arg => childArgs += ("--arg", arg) 
?  }
class SparkContext(object)


233 if not SparkContext._gateway:
234    SparkContext._gateway=gateway or launch_gateway()
235    SparkContext._jvm=SparkContext._gateway.jvm
207         # Create a temporary directory inside spark.local.dir:
208         local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
209         self._temp_dir = 210             self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir, "pyspark") 211                 .getAbsolutePath()

