首页 > 代码库 > SparkSQL简易入门

SparkSQL简易入门

SparkSQL操作文本文件

val sqlContext = new org.apache.spark.sql.SQLContext(sc)import sqlContext._case class PageViews(track_time: String, url: String, session_id: String,referer: String, ip: String,end_user_id: String, city_id:String)val page_views = sc.textFile("hdfs://hadoop000:8020/sparksql/page_views.dat").map(_.split("\t")).map(p => PageViews(p(0), p(1), p(2), p(3), p(4), p(5), p(6)))page_views.registerTempTable("page_views")val sql1 = sql("SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10")sql1.collect()val sql2 = sql("SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10")sql2.collect()

 

SparkSQL操作Parquet文件

SparkSQL支持读取Parquet中的数据、支持写到Parquet中时保存元数据的schema信息;列式存储避免读出不需要的数据,提高查询效率,减少GC;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)import sqlContext._case class Person(name: String, age: Int)val people = sc.textFile("hdfs://hadoop000:8020/sparksql/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))people.saveAsParquetFile("hdfs://hadoop000:8020/sparksql/resources/people.parquet") //val parquetFile = sqlContext.parquetFile("hdfs://hadoop000:8020/sparksql/resources/people.parquet") //parquetFile.registerAsTable("parquetFile") val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")teenagers.map(t => "Name: " + t(0)).collect

 

SparkSQL操作json文件

val sqlContext = new org.apache.spark.sql.SQLContext(sc)val path = "hdfs://hadoop000:8020/sparksql/resources/people.json"val people = sqlContext.jsonFile(path)import sqlContext._people.printSchema()people.registerTempTable("people")val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")teenagers.collectval anotherPeopleRDD = sc.parallelize(  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)anotherPeople.collect

 

SparkSQL操作DSL

使用DSL我们可以直接基于读取的RDD数据进行SQL操作,无需注册成Table,用Scala的symbols代表table中的每一列;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)import sqlContext._case class Person(name: String, age: Int)val people = sc.textFile("hdfs://hadoop000:8020/sparksql/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))val teenagers = people.where(age >= 10).where(age <= 19).select(name)teenagers.toDebugStringteenagers.map(t => "Name: " + t(0)).collect().foreach(println)

 

SparkSQL操作已有的hive表

spark-shell方式访问

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext._sql("SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10").collect().foreach(println)sql("SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10").collect().foreach(println)

spark-sql方式访问

需要将hive-site.xml拷贝到$SPARK_HOME/conf下

SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10;SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10;

hive-thriftserver方式访问:

1)启动hive-thriftserver:

cd $SPARK_HOME/sbinstart-thriftserver.sh 

指定端口方式启动:start-thriftserver.sh --hiveconf hive.server2.thrift.port=14000

2)启动beeline客户端:

cd $SPARK_HOME/binbeeline -u jdbc:hive2://hadoop000:10000/default -n spark
SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10;SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10;

 

 SparkSQL缓存表

在Spark1.2版本之后注意事项:

1)使用SchemaRDD.cache或者SQLContext.cacheTable,都采用列式存储的方式缓存到内存中;

2)SQLContext.cacheTable/uncacheTable都是eager的,而不再是lazy;不再需要手工触发action后才进行缓存;

3)可以通过CACHE [LAZY] TABLE tb1 [AS SELECT ...] 手工设置LAZY或者EAGER;

 cacheTable后注意观察WEBUI界面Stroage的变化

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext._sql("cache table page_views")sql("select session_id, count(session_id) as c from page_views  group by session_id order by c desc limit 10").collect().foreach(println)sql("uncache table page_views")

 

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext._sql("CACHE TABLE page_views_cached_eager AS SELECT * FROM page_views") sql("select session_id, count(session_id) as c from page_views_cached_eager  group by session_id order by c desc limit 10").collect().foreach(println)uncacheTable("page_views_cached") 
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext._sql("CACHE LAZY TABLE page_views_cached_lazy AS SELECT * FROM page_views") sql("select count(*) as c from page_views_cached_lazy").collect().foreach(println)sql("select session_id, count(session_id) as c from page_views_cached_lazy  group by session_id order by c desc limit 10").collect().foreach(println)uncacheTable("page_views_cached") 

 

SparkSQL简易入门