首页 > 代码库 > Spark集群测试

Spark集群测试

1. Spark Shell测试

Spark Shell是一个特别适合快速开发Spark原型程序的工具,可以帮助我们熟悉Scala语言。即使你对Scala不熟悉,仍然可以使用这一工具。Spark Shell使得用户可以和Spark集群进行交互,提交查询,这便于调试,也便于初学者使用Spark。

 

测试案例1:

[Spark@Master spark]$ MASTER=spark://Master:7077 bin/spark-shell //连接到集群Spark assembly has been built with Hive, including Datanucleus jars on classpath14/12/01 11:11:03 INFO spark.SecurityManager: Changing view acls to: Spark,14/12/01 11:11:03 INFO spark.SecurityManager: Changing modify acls to: Spark,14/12/01 11:11:03 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Spark, ); users with modify permissions: Set(Spark, )14/12/01 11:11:03 INFO spark.HttpServer: Starting HTTP Server14/12/01 11:11:03 INFO server.Server: jetty-8.y.z-SNAPSHOT14/12/01 11:11:03 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:3694214/12/01 11:11:03 INFO util.Utils: Successfully started service HTTP class server on port 36942.Welcome to      ____              __     / __/__  ___ _____/ /__    _\ \/ _ \/ _ `/ __/  _/   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0      /_/Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)Type in expressions to have them evaluated.Type :help for more information.14/12/01 11:11:10 INFO spark.SecurityManager: Changing view acls to: Spark,14/12/01 11:11:10 INFO spark.SecurityManager: Changing modify acls to: Spark,14/12/01 11:11:10 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Spark, ); users with modify permissions: Set(Spark, )14/12/01 11:11:11 INFO slf4j.Slf4jLogger: Slf4jLogger started14/12/01 11:11:11 INFO Remoting: Starting remoting14/12/01 11:11:11 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@Master:45322]14/12/01 11:11:11 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@Master:45322]14/12/01 11:11:11 INFO util.Utils: Successfully started service sparkDriver on port 45322.14/12/01 11:11:11 INFO spark.SparkEnv: Registering MapOutputTracker14/12/01 11:11:11 INFO spark.SparkEnv: Registering BlockManagerMaster14/12/01 11:11:12 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20141201111112-e9cc14/12/01 11:11:12 INFO util.Utils: Successfully started service Connection manager for block manager on port 52705.14/12/01 11:11:12 INFO network.ConnectionManager: Bound socket to port 52705 with id = ConnectionManagerId(Master,52705)14/12/01 11:11:12 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB14/12/01 11:11:12 INFO storage.BlockManagerMaster: Trying to register BlockManager14/12/01 11:11:12 INFO storage.BlockManagerMasterActor: Registering block manager Master:52705 with 267.3 MB RAM14/12/01 11:11:12 INFO storage.BlockManagerMaster: Registered BlockManager14/12/01 11:11:12 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-87ad77b3-40b1-4320-958f-b1d632f2b4f514/12/01 11:11:12 INFO spark.HttpServer: Starting HTTP Server14/12/01 11:11:12 INFO server.Server: jetty-8.y.z-SNAPSHOT14/12/01 11:11:12 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:5110714/12/01 11:11:12 INFO util.Utils: Successfully started service HTTP file server on port 51107.14/12/01 11:11:12 INFO server.Server: jetty-8.y.z-SNAPSHOT14/12/01 11:11:12 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:404014/12/01 11:11:12 INFO util.Utils: Successfully started service SparkUI on port 4040.14/12/01 11:11:12 INFO ui.SparkUI: Started SparkUI at http://Master:404014/12/01 11:11:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/12/01 11:11:14 INFO client.AppClient$ClientActor: Connecting to master spark://Master:7077...14/12/01 11:11:14 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.014/12/01 11:11:14 INFO repl.SparkILoop: Created spark context..Spark context available as sc.scala> 14/12/01 11:11:15 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141201111115-000014/12/01 11:11:15 INFO client.AppClient$ClientActor: Executor added: app-20141201111115-0000/0 on worker-20141201031041-Slave1-49261 (Slave1:49261) with 1 cores14/12/01 11:11:15 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141201111115-0000/0 on hostPort Slave1:49261 with 1 cores, 512.0 MB RAM14/12/01 11:11:15 INFO client.AppClient$ClientActor: Executor added: app-20141201111115-0000/1 on worker-20141201031041-Slave2-33833 (Slave2:33833) with 1 cores14/12/01 11:11:15 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141201111115-0000/1 on hostPort Slave2:33833 with 1 cores, 512.0 MB RAM14/12/01 11:11:15 INFO client.AppClient$ClientActor: Executor updated: app-20141201111115-0000/0 is now RUNNING14/12/01 11:11:15 INFO client.AppClient$ClientActor: Executor updated: app-20141201111115-0000/1 is now RUNNING14/12/01 11:11:19 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@Slave1:41369/user/Executor#-1591583962] with ID 014/12/01 11:11:19 INFO storage.BlockManagerMasterActor: Registering block manager Slave1:57062 with 267.3 MB RAM14/12/01 11:11:19 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@Slave2:47569/user/Executor#-1622351454] with ID 114/12/01 11:11:20 INFO storage.BlockManagerMasterActor: Registering block manager Slave2:52207 with 267.3 MB RAMscala> val file = sc.textFile("hdfs://Master:9000/data/test1")14/12/01 11:12:12 INFO storage.MemoryStore: ensureFreeSpace(163705) called with curMem=0, maxMem=28024897514/12/01 11:12:12 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 267.1 MB)14/12/01 11:12:12 INFO storage.MemoryStore: ensureFreeSpace(12910) called with curMem=163705, maxMem=28024897514/12/01 11:12:12 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.6 KB, free 267.1 MB)14/12/01 11:12:12 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on Master:52705 (size: 12.6 KB, free: 267.3 MB)14/12/01 11:12:12 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0file: org.apache.spark.rdd.RDD[String] = hdfs://Master:9000/data/test1 MappedRDD[1] at textFile at <console>:12scala> val count = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_)14/12/01 11:12:43 INFO mapred.FileInputFormat: Total input paths to process : 1count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:14scala> count.collect()14/12/01 11:12:59 INFO spark.SparkContext: Starting job: collect at <console>:1714/12/01 11:12:59 INFO scheduler.DAGScheduler: Registering RDD 3 (map at <console>:14)14/12/01 11:12:59 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:17) with 2 output partitions (allowLocal=false)14/12/01 11:12:59 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at <console>:17)14/12/01 11:12:59 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)14/12/01 11:12:59 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)14/12/01 11:12:59 INFO scheduler.DAGScheduler: Submitting Stage 1 (MappedRDD[3] at map at <console>:14), which has no missing parents14/12/01 11:12:59 INFO storage.MemoryStore: ensureFreeSpace(3424) called with curMem=176615, maxMem=28024897514/12/01 11:12:59 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 267.1 MB)14/12/01 11:12:59 INFO storage.MemoryStore: ensureFreeSpace(2051) called with curMem=180039, maxMem=28024897514/12/01 11:12:59 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.0 KB, free 267.1 MB)14/12/01 11:12:59 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on Master:52705 (size: 2.0 KB, free: 267.3 MB)14/12/01 11:12:59 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece014/12/01 11:12:59 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[3] at map at <console>:14)14/12/01 11:12:59 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks14/12/01 11:12:59 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, Slave2, NODE_LOCAL, 1174 bytes)14/12/01 11:12:59 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, Slave1, NODE_LOCAL, 1174 bytes)14/12/01 11:13:00 INFO network.ConnectionManager: Accepted connection from [Slave1/192.168.8.30:43475]14/12/01 11:13:00 INFO network.SendingConnection: Initiating connection to [Slave1/192.168.8.30:57062]14/12/01 11:13:00 INFO network.ConnectionManager: Accepted connection from [Slave2/192.168.8.31:43976]14/12/01 11:13:00 INFO network.SendingConnection: Connected to [Slave1/192.168.8.30:57062], 1 messages pending14/12/01 11:13:00 INFO network.SendingConnection: Initiating connection to [Slave2/192.168.8.31:52207]14/12/01 11:13:00 INFO network.SendingConnection: Connected to [Slave2/192.168.8.31:52207], 1 messages pending14/12/01 11:13:00 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on Slave1:57062 (size: 2.0 KB, free: 267.3 MB)14/12/01 11:13:00 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on Slave2:52207 (size: 2.0 KB, free: 267.3 MB)14/12/01 11:13:00 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on Slave1:57062 (size: 12.6 KB, free: 267.3 MB)14/12/01 11:13:00 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on Slave2:52207 (size: 12.6 KB, free: 267.3 MB)14/12/01 11:13:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 8197 ms on Slave2 (1/2)14/12/01 11:13:07 INFO scheduler.DAGScheduler: Stage 1 (map at <console>:14) finished in 8.626 s14/12/01 11:13:07 INFO scheduler.DAGScheduler: looking for newly runnable stages14/12/01 11:13:07 INFO scheduler.DAGScheduler: running: Set()14/12/01 11:13:07 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)14/12/01 11:13:07 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 8585 ms on Slave1 (2/2)14/12/01 11:13:07 INFO scheduler.DAGScheduler: failed: Set()14/12/01 11:13:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/12/01 11:13:07 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()14/12/01 11:13:07 INFO scheduler.DAGScheduler: Submitting Stage 0 (ShuffledRDD[4] at reduceByKey at <console>:14), which is now runnable14/12/01 11:13:07 INFO storage.MemoryStore: ensureFreeSpace(2112) called with curMem=182090, maxMem=28024897514/12/01 11:13:07 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.1 KB, free 267.1 MB)14/12/01 11:13:07 INFO storage.MemoryStore: ensureFreeSpace(1327) called with curMem=184202, maxMem=28024897514/12/01 11:13:07 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1327.0 B, free 267.1 MB)14/12/01 11:13:07 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on Master:52705 (size: 1327.0 B, free: 267.3 MB)14/12/01 11:13:07 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece014/12/01 11:13:07 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (ShuffledRDD[4] at reduceByKey at <console>:14)14/12/01 11:13:07 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks14/12/01 11:13:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 2, Slave2, PROCESS_LOCAL, 948 bytes)14/12/01 11:13:07 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 3, Slave1, PROCESS_LOCAL, 948 bytes)14/12/01 11:13:07 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on Slave1:57062 (size: 1327.0 B, free: 267.3 MB)14/12/01 11:13:07 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on Slave2:52207 (size: 1327.0 B, free: 267.3 MB)14/12/01 11:13:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@Slave1:3699114/12/01 11:13:08 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 143 bytes14/12/01 11:13:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@Slave2:5033314/12/01 11:13:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 2) in 149 ms on Slave2 (1/2)14/12/01 11:13:08 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:17) finished in 0.179 s14/12/01 11:13:08 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 3) in 181 ms on Slave1 (2/2)14/12/01 11:13:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/01 11:13:08 INFO spark.SparkContext: Job finished: collect at <console>:17, took 8.947687849 sres0: Array[(String, Int)] = Array((spark,1), (hadoop,2), (hbase,1))scala> 

测试案例2:

运行Spark自带测试程序

[Spark@Master spark]$ bin/run-example org.apache.spark.examples.SparkPi 2 spark://192.168.8.29:7077Spark assembly has been built with Hive, including Datanucleus jars on classpath14/12/01 11:01:24 INFO spark.SecurityManager: Changing view acls to: Spark,14/12/01 11:01:24 INFO spark.SecurityManager: Changing modify acls to: Spark,14/12/01 11:01:24 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Spark, ); users with modify permissions: Set(Spark, )14/12/01 11:01:24 INFO slf4j.Slf4jLogger: Slf4jLogger started14/12/01 11:01:25 INFO Remoting: Starting remoting14/12/01 11:01:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@Master:60670]14/12/01 11:01:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@Master:60670]14/12/01 11:01:25 INFO util.Utils: Successfully started service sparkDriver on port 60670.14/12/01 11:01:25 INFO spark.SparkEnv: Registering MapOutputTracker14/12/01 11:01:25 INFO spark.SparkEnv: Registering BlockManagerMaster14/12/01 11:01:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20141201110125-998714/12/01 11:01:25 INFO util.Utils: Successfully started service Connection manager for block manager on port 35768.14/12/01 11:01:25 INFO network.ConnectionManager: Bound socket to port 35768 with id = ConnectionManagerId(Master,35768)14/12/01 11:01:25 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB14/12/01 11:01:25 INFO storage.BlockManagerMaster: Trying to register BlockManager14/12/01 11:01:25 INFO storage.BlockManagerMasterActor: Registering block manager Master:35768 with 267.3 MB RAM14/12/01 11:01:25 INFO storage.BlockManagerMaster: Registered BlockManager14/12/01 11:01:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-68503776-9126-4e30-89a3-83a560210e1414/12/01 11:01:25 INFO spark.HttpServer: Starting HTTP Server14/12/01 11:01:25 INFO server.Server: jetty-8.y.z-SNAPSHOT14/12/01 11:01:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:3389014/12/01 11:01:25 INFO util.Utils: Successfully started service HTTP file server on port 33890.14/12/01 11:01:26 INFO server.Server: jetty-8.y.z-SNAPSHOT14/12/01 11:01:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:404014/12/01 11:01:26 INFO util.Utils: Successfully started service SparkUI on port 4040.14/12/01 11:01:26 INFO ui.SparkUI: Started SparkUI at http://Master:404014/12/01 11:01:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/12/01 11:01:27 INFO spark.SparkContext: Added JAR file:/home/Spark/husor/spark/lib/spark-examples-1.1.0-hadoop2.4.0.jar at http://Master:33890/jars/spark-examples-1.1.0-hadoop2.4.0.jar with timestamp 141740288736214/12/01 11:01:27 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@Master:60670/user/HeartbeatReceiver14/12/01 11:01:27 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:3514/12/01 11:01:27 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 2 output partitions (allowLocal=false)14/12/01 11:01:27 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35)14/12/01 11:01:27 INFO scheduler.DAGScheduler: Parents of final stage: List()14/12/01 11:01:27 INFO scheduler.DAGScheduler: Missing parents: List()14/12/01 11:01:27 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map at SparkPi.scala:31), which has no missing parents14/12/01 11:01:28 INFO storage.MemoryStore: ensureFreeSpace(1728) called with curMem=0, maxMem=28024897514/12/01 11:01:28 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1728.0 B, free 267.3 MB)14/12/01 11:01:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[1] at map at SparkPi.scala:31)14/12/01 11:01:28 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks14/12/01 11:01:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1223 bytes)14/12/01 11:01:28 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)14/12/01 11:01:28 INFO executor.Executor: Fetching http://Master:33890/jars/spark-examples-1.1.0-hadoop2.4.0.jar with timestamp 141740288736214/12/01 11:01:28 INFO util.Utils: Fetching http://Master:33890/jars/spark-examples-1.1.0-hadoop2.4.0.jar to /tmp/fetchFileTemp7489373377783107634.tmp14/12/01 11:01:28 INFO executor.Executor: Adding file:/tmp/spark-ad7b4d7f-9793-406b-b3a9-21bd79fddf9f/spark-examples-1.1.0-hadoop2.4.0.jar to class loader14/12/01 11:01:28 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 701 bytes result sent to driver14/12/01 11:01:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1223 bytes)14/12/01 11:01:28 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)14/12/01 11:01:29 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 701 bytes result sent to driver14/12/01 11:01:29 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 765 ms on localhost (1/2)14/12/01 11:01:29 INFO scheduler.DAGScheduler: Stage 0 (reduce at SparkPi.scala:35) finished in 0.936 s14/12/01 11:01:29 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 177 ms on localhost (2/2)14/12/01 11:01:29 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/01 11:01:29 INFO spark.SparkContext: Job finished: reduce at SparkPi.scala:35, took 1.3590325 sPi is roughly 3.1387214/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}14/12/01 11:01:29 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}14/12/01 11:01:29 INFO ui.SparkUI: Stopped Spark web UI at http://Master:404014/12/01 11:01:29 INFO scheduler.DAGScheduler: Stopping DAGScheduler14/12/01 11:01:30 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!14/12/01 11:01:30 INFO network.ConnectionManager: Selector thread was interrupted!14/12/01 11:01:30 INFO network.ConnectionManager: ConnectionManager stopped14/12/01 11:01:30 INFO storage.MemoryStore: MemoryStore cleared14/12/01 11:01:30 INFO storage.BlockManager: BlockManager stopped14/12/01 11:01:30 INFO storage.BlockManagerMaster: BlockManagerMaster stopped14/12/01 11:01:30 INFO spark.SparkContext: Successfully stopped SparkContext14/12/01 11:01:30 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.14/12/01 11:01:30 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

 

2. 利用Intellij IDEA(Scala插件)编写相应的Spark程序后进行打包成.jar文件后,提交到Spark集群进行运行

其中,com.husor.Test.WordCount.scala代码如下:

package com.husor.Testimport org.apache.spark.{SparkContext,SparkConf}import org.apache.spark.SparkContext._/** * Created by huxiu on 2014/11/27. */object WordCount {  def main(args: Array[String]) {    println("Test is starting......")    if (args.length < 2) {      System.err.println("Usage: HDFS_InputFile <File> HDFS_OutputDir <Directory>")      System.exit(1)    }    //System.setProperty("hadoop.home.dir", "d:\\winutil\\")    val conf = new SparkConf().setAppName("WordCount")                              .setSparkHome("SPARK_HOME")    val spark = new SparkContext(conf)    //val spark = new SparkContext("local","WordCount")    val file = spark.textFile(args(0))    //在控制台上进行输出    //file.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)    //val wordcounts = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)    val wordCounts = file.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)    wordCounts.saveAsTextFile(args(1))    spark.stop()    println("Test is Succeed!!!")  }}

 

相应的执行脚本runSpark.sh如下:

#!/bin/bashset -xspark-submit --class com.husor.Test.WordCount --master spark://Master:7077 \--executor-memory 512m --total-executor-cores 1 /home/Spark/husor/spark/SparkTest.jar hdfs://Master:9000/data/test1 \hdfs://Master:9000/user/huxiu/SparkWordCount

给执行脚本runSpark.sh添加执行权限(chmod +x runSpark.sh),执行过程如下:

[Spark@Master spark]$ ./runSpark.sh + spark-submit --class com.husor.Test.WordCount --master spark://Master:7077 --executor-memory 512m --total-executor-cores 1 /home/Spark/husor/spark/SparkTest.jar hdfs://Master:9000/data/test1 hdfs://Master:9000/user/huxiu/SparkWordCountSpark assembly has been built with Hive, including Datanucleus jars on classpathTest is starting......14/12/01 12:10:50 INFO spark.SecurityManager: Changing view acls to: Spark,14/12/01 12:10:50 INFO spark.SecurityManager: Changing modify acls to: Spark,14/12/01 12:10:50 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Spark, ); users with modify permissions: Set(Spark, )14/12/01 12:10:50 INFO slf4j.Slf4jLogger: Slf4jLogger started14/12/01 12:10:50 INFO Remoting: Starting remoting14/12/01 12:10:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@Master:37899]14/12/01 12:10:51 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@Master:37899]14/12/01 12:10:51 INFO util.Utils: Successfully started service sparkDriver on port 37899.14/12/01 12:10:51 INFO spark.SparkEnv: Registering MapOutputTracker14/12/01 12:10:51 INFO spark.SparkEnv: Registering BlockManagerMaster14/12/01 12:10:51 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20141201121051-618914/12/01 12:10:51 INFO util.Utils: Successfully started service Connection manager for block manager on port 34131.14/12/01 12:10:51 INFO network.ConnectionManager: Bound socket to port 34131 with id = ConnectionManagerId(Master,34131)14/12/01 12:10:51 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB14/12/01 12:10:51 INFO storage.BlockManagerMaster: Trying to register BlockManager14/12/01 12:10:51 INFO storage.BlockManagerMasterActor: Registering block manager Master:34131 with 267.3 MB RAM14/12/01 12:10:51 INFO storage.BlockManagerMaster: Registered BlockManager14/12/01 12:10:51 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-83b486ec-2237-4f71-be00-0418e485151f14/12/01 12:10:51 INFO spark.HttpServer: Starting HTTP Server14/12/01 12:10:51 INFO server.Server: jetty-8.y.z-SNAPSHOT14/12/01 12:10:51 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:3490214/12/01 12:10:51 INFO util.Utils: Successfully started service HTTP file server on port 34902.14/12/01 12:10:51 INFO server.Server: jetty-8.y.z-SNAPSHOT14/12/01 12:10:51 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:404014/12/01 12:10:51 INFO util.Utils: Successfully started service SparkUI on port 4040.14/12/01 12:10:51 INFO ui.SparkUI: Started SparkUI at http://Master:404014/12/01 12:10:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/12/01 12:10:52 INFO spark.SparkContext: Added JAR file:/home/Spark/husor/spark/SparkTest.jar at http://Master:34902/jars/SparkTest.jar with timestamp 141740705294114/12/01 12:10:53 INFO client.AppClient$ClientActor: Connecting to master spark://Master:7077...14/12/01 12:10:53 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.014/12/01 12:10:53 INFO storage.MemoryStore: ensureFreeSpace(163705) called with curMem=0, maxMem=28024897514/12/01 12:10:53 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 267.1 MB)14/12/01 12:10:53 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141201121053-000614/12/01 12:10:53 INFO client.AppClient$ClientActor: Executor added: app-20141201121053-0006/0 on worker-20141201031041-Slave1-49261 (Slave1:49261) with 1 cores14/12/01 12:10:53 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141201121053-0006/0 on hostPort Slave1:49261 with 1 cores, 512.0 MB RAM14/12/01 12:10:54 INFO client.AppClient$ClientActor: Executor updated: app-20141201121053-0006/0 is now RUNNING14/12/01 12:10:54 INFO storage.MemoryStore: ensureFreeSpace(12910) called with curMem=163705, maxMem=28024897514/12/01 12:10:54 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.6 KB, free 267.1 MB)14/12/01 12:10:54 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on Master:34131 (size: 12.6 KB, free: 267.3 MB)14/12/01 12:10:54 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece014/12/01 12:10:54 INFO mapred.FileInputFormat: Total input paths to process : 114/12/01 12:10:55 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id14/12/01 12:10:55 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id14/12/01 12:10:55 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap14/12/01 12:10:55 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition14/12/01 12:10:55 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id14/12/01 12:10:55 INFO spark.SparkContext: Starting job: saveAsTextFile at WordCount.scala:3514/12/01 12:10:55 INFO scheduler.DAGScheduler: Registering RDD 3 (map at WordCount.scala:34)14/12/01 12:10:55 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at WordCount.scala:35) with 2 output partitions (allowLocal=false)14/12/01 12:10:55 INFO scheduler.DAGScheduler: Final stage: Stage 0(saveAsTextFile at WordCount.scala:35)14/12/01 12:10:55 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)14/12/01 12:10:55 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)14/12/01 12:10:55 INFO scheduler.DAGScheduler: Submitting Stage 1 (MappedRDD[3] at map at WordCount.scala:34), which has no missing parents14/12/01 12:10:55 INFO storage.MemoryStore: ensureFreeSpace(3400) called with curMem=176615, maxMem=28024897514/12/01 12:10:55 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 267.1 MB)14/12/01 12:10:55 INFO storage.MemoryStore: ensureFreeSpace(2055) called with curMem=180015, maxMem=28024897514/12/01 12:10:55 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.0 KB, free 267.1 MB)14/12/01 12:10:55 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on Master:34131 (size: 2.0 KB, free: 267.3 MB)14/12/01 12:10:55 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece014/12/01 12:10:55 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[3] at map at WordCount.scala:34)14/12/01 12:10:55 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks14/12/01 12:10:57 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@Slave1:38410/user/Executor#898843507] with ID 014/12/01 12:10:57 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, Slave1, NODE_LOCAL, 1222 bytes)14/12/01 12:10:57 INFO storage.BlockManagerMasterActor: Registering block manager Slave1:44906 with 267.3 MB RAM14/12/01 12:10:58 INFO network.ConnectionManager: Accepted connection from [Slave1/192.168.8.30:43149]14/12/01 12:10:58 INFO network.SendingConnection: Initiating connection to [Slave1/192.168.8.30:44906]14/12/01 12:10:58 INFO network.SendingConnection: Connected to [Slave1/192.168.8.30:44906], 1 messages pending14/12/01 12:10:58 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on Slave1:44906 (size: 2.0 KB, free: 267.3 MB)14/12/01 12:10:58 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on Slave1:44906 (size: 12.6 KB, free: 267.3 MB)14/12/01 12:10:59 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, Slave1, NODE_LOCAL, 1222 bytes)14/12/01 12:11:00 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 159 ms on Slave1 (1/2)14/12/01 12:11:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 2454 ms on Slave1 (2/2)14/12/01 12:11:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/12/01 12:11:00 INFO scheduler.DAGScheduler: Stage 1 (map at WordCount.scala:34) finished in 4.444 s14/12/01 12:11:00 INFO scheduler.DAGScheduler: looking for newly runnable stages14/12/01 12:11:00 INFO scheduler.DAGScheduler: running: Set()14/12/01 12:11:00 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)14/12/01 12:11:00 INFO scheduler.DAGScheduler: failed: Set()14/12/01 12:11:00 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()14/12/01 12:11:00 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[5] at saveAsTextFile at WordCount.scala:35), which is now runnable14/12/01 12:11:00 INFO storage.MemoryStore: ensureFreeSpace(57552) called with curMem=182070, maxMem=28024897514/12/01 12:11:00 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 56.2 KB, free 267.0 MB)14/12/01 12:11:00 INFO storage.MemoryStore: ensureFreeSpace(19863) called with curMem=239622, maxMem=28024897514/12/01 12:11:00 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.4 KB, free 267.0 MB)14/12/01 12:11:00 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on Master:34131 (size: 19.4 KB, free: 267.2 MB)14/12/01 12:11:00 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece014/12/01 12:11:00 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[5] at saveAsTextFile at WordCount.scala:35)14/12/01 12:11:00 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks14/12/01 12:11:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 2, Slave1, PROCESS_LOCAL, 996 bytes)14/12/01 12:11:00 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on Slave1:44906 (size: 19.4 KB, free: 267.2 MB)14/12/01 12:11:00 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@Slave1:5185014/12/01 12:11:00 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 133 bytes14/12/01 12:11:00 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 3, Slave1, PROCESS_LOCAL, 996 bytes)14/12/01 12:11:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 2) in 412 ms on Slave1 (1/2)14/12/01 12:11:00 INFO scheduler.DAGScheduler: Stage 0 (saveAsTextFile at WordCount.scala:35) finished in 0.710 s14/12/01 12:11:00 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 3) in 308 ms on Slave1 (2/2)14/12/01 12:11:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/01 12:11:00 INFO spark.SparkContext: Job finished: saveAsTextFile at WordCount.scala:35, took 5.556490798 s14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}14/12/01 12:11:00 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}14/12/01 12:11:00 INFO ui.SparkUI: Stopped Spark web UI at http://Master:404014/12/01 12:11:00 INFO scheduler.DAGScheduler: Stopping DAGScheduler14/12/01 12:11:00 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors14/12/01 12:11:00 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down14/12/01 12:11:01 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(Slave1,44906)14/12/01 12:11:01 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(Slave1,44906)14/12/01 12:11:01 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(Slave1,44906)14/12/01 12:11:02 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!14/12/01 12:11:02 INFO network.ConnectionManager: Selector thread was interrupted!14/12/01 12:11:02 INFO network.ConnectionManager: ConnectionManager stopped14/12/01 12:11:02 INFO storage.MemoryStore: MemoryStore cleared14/12/01 12:11:02 INFO storage.BlockManager: BlockManager stopped14/12/01 12:11:02 INFO storage.BlockManagerMaster: BlockManagerMaster stopped14/12/01 12:11:02 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.14/12/01 12:11:02 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.14/12/01 12:11:02 INFO spark.SparkContext: Successfully stopped SparkContextTest is Succeed!!!14/12/01 12:11:02 INFO Remoting: Remoting shut down14/12/01 12:11:02 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.[Spark@Master spark]$ hdfs dfs -cat /user/huxiu/SparkWordCount/part-0000114/12/01 12:11:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable(spark,1)(hadoop,2)(hbase,1)[Spark@Master spark]$ hdfs dfs -ls /user/huxiu/SparkWordCount/14/12/01 12:11:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableFound 3 items-rw-r--r--   2 Spark huxiu          0 2014-12-01 12:11 /user/huxiu/SparkWordCount/_SUCCESS-rw-r--r--   2 Spark huxiu          0 2014-12-01 12:11 /user/huxiu/SparkWordCount/part-00000-rw-r--r--   2 Spark huxiu         31 2014-12-01 12:11 /user/huxiu/SparkWordCount/part-00001[Spark@Master spark]$ hdfs dfs -cat /user/huxiu/SparkWordCount/part-0000014/12/01 12:11:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Note:

运行过程中可能会出现 Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory异常,而内存肯定是够的,但就是无法获取资源!检查防火墙,果然客户端只开启的对80端口的访问,其他都禁止了!

Solution:

关闭各节点上的防火墙(service iptables stop),然后在Spark on yarn集群上执行上述脚本runSpark.sh即可

Spark集群测试