首页 > 代码库 > Spark API综合实战:动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战

Spark API综合实战:动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战

 

  这里,我以指定executor-memory参数的方式,启动spark-shell。

 

启动hadoop集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ jps
8457 Jps
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

技术分享

 

 

启动spark集群

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

技术分享

 

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

  在命令行中,我指定了spark-shell运行时暂时用的每个机器上executor的内存大小为1GB。

技术分享

技术分享

技术分享

 

从HDFS上读取该文件

技术分享

scala> val rdd1 = sc.textFile("/README.md")

scala> val rdd1 = sc.textFile("hdfs:SparkSingleNode:9000/README.md")

 技术分享

返回,MapPartitionsRDD

 

使用,toDebugString,可以查看其lineage的关系。

rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

scala> rdd1.toDebugString
16/09/26 22:47:01 INFO mapred.FileInputFormat: Total input paths to process : 1
res0: String =
(2) MapPartitionsRDD[1] at textFile at <console>:21 []
| /README.md HadoopRDD[0] at textFile at <console>:21 []

scala>

 技术分享

可以看出,MapPartitionsRDD是HadoopRDD转换而来的。

 

hadoopFile,这个方法,产生HadoopRDD

map,这个方法,产生MapPartitionsRDD

 

从源码分析过程

 技术分享

技术分享

技术分享

 技术分享

 

 

scala> val result = rdd1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

 技术分享

技术分享

le>:23, took 15.095588 s
result: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (DataFram...
scala>

 

不可这样使用toDebugString

scala> result.toDebugString
<console>:26: error: value toDebugString is not a member of Array[(String, Int)]
result.toDebugString

技术分享

 

 

技术分享

scala> val wordcount = rdd1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10] at reduceByKey at <console>:23

scala> wordcount.toDebugString
res3: String =
(2) ShuffledRDD[10] at reduceByKey at <console>:23 []
+-(2) MapPartitionsRDD[9] at map at <console>:23 []
| MapPartitionsRDD[8] at flatMap at <console>:23 []
| MapPartitionsRDD[1] at textFile at <console>:21 []
| /README.md HadoopRDD[0] at textFile at <console>:21 []

scala>

 

或者

技术分享

技术分享

 

 疑问:为什么没有MappedRDD?难道是版本问题??

 

Spark API综合实战:动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战