首页 > 代码库 > SparkSQL之旅

SparkSQL之旅

1.准备数据employee.txt

1001,Gong Shaocheng,11002,Li Dachao,11003,Qiu Xin,11004,Cheng Jiangzhong,21005,Wo Binggang,3

将数据放入hdfs

[root@jfp3-1 spark-studio]# hdfs dfs -put employee.txt /user/spark_studio

 

2.启动spark shell

[root@jfp3-1 spark-1.0.0-bin-hadoop2]# ./bin/spark-shell --master spark://192.168.0.71:7077

3.编写脚本

val sqlContext = new org.apache.spark.sql.SQLContext(sc)import sqlContext._case class Employee(employeeId: Int, name: String, departmentId: Int)// Create an RDD of Employee objects and register it as a table.val employees = sc.textFile("hdfs://jfp3-1:8020/user/spark_studio/employee.txt").map(_.split(",")).map(p => Employee(p(0), p(1), p(2).trim.toInt))employees.registerAsTable("employee")// SQL statements can be run by using the sql methods provided by sqlContext.val fsis = sql("SELECT name FROM employee WHERE departmentId = 1")// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.fsis.map(t => "Name: " + t(0)).collect().foreach(println)

4.运行

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@17124319scala> import sqlContext._import sqlContext._scala> case class Employee(employeeId: String, name: String, departmentId: Int)defined class Employeescala> val employees = sc.textFile("hdfs://jfp3-1:8020/user/spark_studio/employee.txt").map(_.split(",")).map(p => Employee(p(0), p(1), p(2).trim.toInt))14/06/18 09:54:25 INFO MemoryStore: ensureFreeSpace(138763) called with curMem=0, maxMem=30922506214/06/18 09:54:25 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.5 KB, free 294.8 MB)employees: org.apache.spark.rdd.RDD[Employee] = MappedRDD[3] at map at <console>:19scala> employees.registerAsTable("employee")scala> val fsis = sql("SELECT name FROM employee WHERE departmentId = 1")14/06/18 09:54:44 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/06/18 09:54:44 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences14/06/18 09:54:44 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange14/06/18 09:54:44 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressionsfsis: org.apache.spark.sql.SchemaRDD = SchemaRDD[6] at RDD at SchemaRDD.scala:98== Query Plan ==Project [name#1:1] Filter (departmentId#2:2 = 1)  ExistingRdd [employeeId#0,name#1,departmentId#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174scala> fsis.map(t => "Name: " + t(0)).collect().foreach(println)14/06/18 09:55:27 INFO FileInputFormat: Total input paths to process : 114/06/18 09:55:27 INFO SparkContext: Starting job: collect at <console>:2014/06/18 09:55:27 INFO DAGScheduler: Got job 0 (collect at <console>:20) with 2 output partitions (allowLocal=false)14/06/18 09:55:27 INFO DAGScheduler: Final stage: Stage 0(collect at <console>:20)14/06/18 09:55:27 INFO DAGScheduler: Parents of final stage: List()14/06/18 09:55:27 INFO DAGScheduler: Missing parents: List()14/06/18 09:55:27 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[9] at map at <console>:20), which has no missing parents14/06/18 09:55:27 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[9] at map at <console>:20)14/06/18 09:55:27 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks14/06/18 09:55:27 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 1: jfp3-2 (NODE_LOCAL)14/06/18 09:55:27 INFO TaskSetManager: Serialized task 0.0:0 as 3508 bytes in 2 ms14/06/18 09:55:27 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 2: jfp3-3 (NODE_LOCAL)14/06/18 09:55:27 INFO TaskSetManager: Serialized task 0.0:1 as 3508 bytes in 0 ms14/06/18 09:55:28 INFO TaskSetManager: Finished TID 1 in 1266 ms on jfp3-3 (progress: 1/2)14/06/18 09:55:28 INFO TaskSetManager: Finished TID 0 in 1276 ms on jfp3-2 (progress: 2/2)14/06/18 09:55:28 INFO DAGScheduler: Completed ResultTask(0, 1)14/06/18 09:55:28 INFO DAGScheduler: Completed ResultTask(0, 0)14/06/18 09:55:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/06/18 09:55:28 INFO DAGScheduler: Stage 0 (collect at <console>:20) finished in 1.284 s14/06/18 09:55:28 INFO SparkContext: Job finished: collect at <console>:20, took 1.386154401 s
Name: Gong ShaochengName: Li DachaoName: Qiu Xin

 5.将数据存为parquet格式,并运行sql

scala> val parquetFile = sqlContext.parquetFile("hdfs://jfp3-1:8020/user/spark_studio/employee.parquet")14/06/18 10:24:36 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/06/18 10:24:36 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences14/06/18 10:24:36 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange14/06/18 10:24:36 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare ExpressionsparquetFile: org.apache.spark.sql.SchemaRDD = SchemaRDD[13] at RDD at SchemaRDD.scala:98== Query Plan ==ParquetTableScan [employeeId#9,name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), Nonescala> parquetFile.registerAsTable("parquetFile")scala> val telcos = sql("SELECT name FROM parquetFile WHERE departmentId = 3")14/06/18 10:24:37 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/06/18 10:24:37 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences14/06/18 10:24:37 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange14/06/18 10:24:37 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions14/06/18 10:24:37 INFO MemoryStore: ensureFreeSpace(180579) called with curMem=138763, maxMem=30922506214/06/18 10:24:37 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 176.3 KB, free 294.6 MB)telcos: org.apache.spark.sql.SchemaRDD = SchemaRDD[14] at RDD at SchemaRDD.scala:98== Query Plan ==Project [name#10:0] Filter (departmentId#11:1 = 3)  ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), Nonescala> telcos.collect().foreach(println)14/06/18 10:24:40 INFO FileInputFormat: Total input paths to process : 214/06/18 10:24:40 INFO ParquetInputFormat: Total input paths to process : 214/06/18 10:24:40 INFO ParquetFileReader: reading summary file: hdfs://jfp3-1:8020/user/spark_studio/employee.parquet/_metadata14/06/18 10:24:40 INFO deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize14/06/18 10:24:40 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize14/06/18 10:24:40 INFO SparkContext: Starting job: collect at <console>:2014/06/18 10:24:40 INFO DAGScheduler: Got job 2 (collect at <console>:20) with 2 output partitions (allowLocal=false)14/06/18 10:24:40 INFO DAGScheduler: Final stage: Stage 2(collect at <console>:20)14/06/18 10:24:40 INFO DAGScheduler: Parents of final stage: List()14/06/18 10:24:40 INFO DAGScheduler: Missing parents: List()14/06/18 10:24:40 INFO DAGScheduler: Submitting Stage 2 (SchemaRDD[14] at RDD at SchemaRDD.scala:98== Query Plan ==Project [name#10:0] Filter (departmentId#11:1 = 3)  ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None), which has no missing parents14/06/18 10:24:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (SchemaRDD[14] at RDD at SchemaRDD.scala:98== Query Plan ==Project [name#10:0] Filter (departmentId#11:1 = 3)  ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None)14/06/18 10:24:40 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks14/06/18 10:24:40 INFO TaskSetManager: Starting task 2.0:0 as TID 4 on executor 2: jfp3-3 (NODE_LOCAL)14/06/18 10:24:40 INFO TaskSetManager: Serialized task 2.0:0 as 3116 bytes in 1 ms14/06/18 10:24:40 INFO TaskSetManager: Starting task 2.0:1 as TID 5 on executor 0: jfp3-4 (NODE_LOCAL)14/06/18 10:24:40 INFO TaskSetManager: Serialized task 2.0:1 as 3116 bytes in 1 ms14/06/18 10:24:40 INFO DAGScheduler: Completed ResultTask(2, 0)14/06/18 10:24:40 INFO TaskSetManager: Finished TID 4 in 200 ms on jfp3-3 (progress: 1/2)14/06/18 10:24:42 INFO DAGScheduler: Completed ResultTask(2, 1)14/06/18 10:24:42 INFO TaskSetManager: Finished TID 5 in 2162 ms on jfp3-4 (progress: 2/2)14/06/18 10:24:42 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/06/18 10:24:42 INFO DAGScheduler: Stage 2 (collect at <console>:20) finished in 2.177 s14/06/18 10:24:42 INFO SparkContext: Job finished: collect at <console>:20, took 2.210887848 s
[Wo Binggang]

 

 6. DSL syntax支持

scala> all.collect().foreach(println)14/06/18 10:37:45 INFO SparkContext: Starting job: collect at <console>:2414/06/18 10:37:45 INFO DAGScheduler: Got job 6 (collect at <console>:24) with 2 output partitions (allowLocal=false)14/06/18 10:37:45 INFO DAGScheduler: Final stage: Stage 6(collect at <console>:24)14/06/18 10:37:45 INFO DAGScheduler: Parents of final stage: List()14/06/18 10:37:45 INFO DAGScheduler: Missing parents: List()14/06/18 10:37:45 INFO DAGScheduler: Submitting Stage 6 (SchemaRDD[33] at RDD at SchemaRDD.scala:98== Query Plan ==Project [name#19:1] Filter (departmentId#20:2 >= 1)  ExistingRdd [employeeId#18,name#19,departmentId#20], MapPartitionsRDD[30] at mapPartitions at basicOperators.scala:174), which has no missing parents14/06/18 10:37:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 6 (SchemaRDD[33] at RDD at SchemaRDD.scala:98== Query Plan ==Project [name#19:1] Filter (departmentId#20:2 >= 1)  ExistingRdd [employeeId#18,name#19,departmentId#20], MapPartitionsRDD[30] at mapPartitions at basicOperators.scala:174)14/06/18 10:37:45 INFO TaskSchedulerImpl: Adding task set 6.0 with 2 tasks14/06/18 10:37:45 INFO TaskSetManager: Starting task 6.0:0 as TID 200 on executor 2: jfp3-3 (NODE_LOCAL)14/06/18 10:37:45 INFO TaskSetManager: Serialized task 6.0:0 as 3541 bytes in 0 ms14/06/18 10:37:45 INFO TaskSetManager: Starting task 6.0:1 as TID 201 on executor 1: jfp3-2 (NODE_LOCAL)14/06/18 10:37:45 INFO TaskSetManager: Serialized task 6.0:1 as 3541 bytes in 1 ms14/06/18 10:37:45 INFO TaskSetManager: Finished TID 200 in 33 ms on jfp3-3 (progress: 1/2)14/06/18 10:37:45 INFO DAGScheduler: Completed ResultTask(6, 0)14/06/18 10:37:45 INFO DAGScheduler: Completed ResultTask(6, 1)14/06/18 10:37:45 INFO TaskSetManager: Finished TID 201 in 37 ms on jfp3-2 (progress: 2/2)14/06/18 10:37:45 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 14/06/18 10:37:45 INFO DAGScheduler: Stage 6 (collect at <console>:24) finished in 0.039 s14/06/18 10:37:45 INFO SparkContext: Job finished: collect at <console>:24, took 0.052556716 s[Gong Shaocheng][Li Dachao][Qiu Xin][Cheng Jiangzhong][Wo Binggang]