首页 > 代码库 > Spark学习散点总结

Spark学习散点总结

使用Spark 时,通常会有两种模式。
一、在交互式编程环境(REPL, a.k.a spark-shell)下实现一些代码,测试一些功能点。
二、像MapReduce 那样提前编写好源代码并编译打包(仅限 Java 或 Scala,Python 不需要),然后将程序代码通过spark-submit 命令提交到 YARN 集群完成计算。

spark-shell

启动 spark-shell 通常需要指定 master、executor 内存、executor 数量等参数。由于 YARN 集群有审计机制,每个人提交的 spark application 需要指定 name 参数,同时确保 name 是以个人的 LDAP 用户名为后缀。另外,如果你不确定 driver 是否有足够的内存能容纳一个 RDD 的计算结果,建议不要使用 RDD 的 collect 方法而使用其 take 方法,否则会使 driver 发生 OOM。

  1.scala交互式编程环境

  通过命令启动sprak-shell

/opt/tige/spark2/bin/spark-shell --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g--conf spark.dynamicAllocation.maxExecutors=10 --name spark_test_{your username} 

    启动spark后系统自动创建sc和sqlContext(HiveContext实例),可以使用它们来创建RDD或者DataFarme

  2.使用Python交互式编程环境

  通过命令pyspark

/opt/tiger/spark_deploy/spark2/bin/ipyspark --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g --num-executors 8 --name spark_test_${your LDAP user name}

spark-submit

首先我们需要使用 Spark 的 API 实现一个拥有入口(main)的程序,然后通过 spark-submit 提交到 YARN 集群。
  1. Scala 版本的 WordCount

    import org.apache.spark.{SparkConf, SparkContext}
      
    object WordCount extends App {
        val sparkConf = new SparkConf()
        sparkConf.setAppName("spark_test_${your LDAP user name}")
        sparkConf.setMaster("yarn-client")
        sparkConf.set("spark.driver.memory", "4g")
        sparkConf.set("spark.executor.memory", "8g")
        sparkConf.set("spark.dynamicAllocation.initialExecutors", "3")
        sparkConf.set("spark.dynamicAllocation.maxExecutors", "10")
        val sc = new SparkContext(sparkConf)
        val words = sc.textFile("/path/to/text/file")
        val wordCount = words.map(word => (word, 1)).reduceByKey(_ + _).collect()
        wordCount.foreach(println)
    }

    完成代码编写与编译打包之后就可以通过 spark-submit 来提交应用了,命令如下:

    /opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client --class WordCount your_spark_test.jar

     

  2. python版本的WordCount
    from pyspark import SparkContext, SparkConf
    from operator import add
      
    if __name__ == __main__:
        conf = SparkConf()
        conf.setMaster(yarn-client)
        conf.setAppName(spark_test_${your LDAP user name})
        conf.set("spark.driver.memory", "4g")
        conf.set("spark.executor.memory", "8g")
        conf.set("spark.dynamicAllocation.initialExecutors", "3")
        conf.set("spark.dynamicAllocation.maxExecutors", "10")
        sc = SparkContext(conf=conf)
      
        words = sc.textFile("/path/to/text/file")
        wordCount = words.map(lambda word: (word, 1)).reduceByKey(add).collect()
        for key, value in wordCount:
            print key, value
    假设上面这段 Python 代码的文件名为 your_spark_test.py,那么提交这段代码到 YARN 集群的命令如下:
    /opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client your_spark_test.py

     

Spark学习散点总结