首页 > 代码库 > SparkSQL简易入门
SparkSQL简易入门
SparkSQL操作文本文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)import sqlContext._case class PageViews(track_time: String, url: String, session_id: String,referer: String, ip: String,end_user_id: String, city_id:String)val page_views = sc.textFile("hdfs://hadoop000:8020/sparksql/page_views.dat").map(_.split("\t")).map(p => PageViews(p(0), p(1), p(2), p(3), p(4), p(5), p(6)))page_views.registerTempTable("page_views")val sql1 = sql("SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10")sql1.collect()val sql2 = sql("SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10")sql2.collect()
SparkSQL操作Parquet文件
SparkSQL支持读取Parquet中的数据、支持写到Parquet中时保存元数据的schema信息;列式存储避免读出不需要的数据,提高查询效率,减少GC;
val sqlContext = new org.apache.spark.sql.SQLContext(sc)import sqlContext._case class Person(name: String, age: Int)val people = sc.textFile("hdfs://hadoop000:8020/sparksql/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))people.saveAsParquetFile("hdfs://hadoop000:8020/sparksql/resources/people.parquet") //存val parquetFile = sqlContext.parquetFile("hdfs://hadoop000:8020/sparksql/resources/people.parquet") //读parquetFile.registerAsTable("parquetFile") val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")teenagers.map(t => "Name: " + t(0)).collect
SparkSQL操作json文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)val path = "hdfs://hadoop000:8020/sparksql/resources/people.json"val people = sqlContext.jsonFile(path)import sqlContext._people.printSchema()people.registerTempTable("people")val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")teenagers.collectval anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)anotherPeople.collect
SparkSQL操作DSL
使用DSL我们可以直接基于读取的RDD数据进行SQL操作,无需注册成Table,用Scala的symbols代表table中的每一列;
val sqlContext = new org.apache.spark.sql.SQLContext(sc)import sqlContext._case class Person(name: String, age: Int)val people = sc.textFile("hdfs://hadoop000:8020/sparksql/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))val teenagers = people.where(‘age >= 10).where(‘age <= 19).select(‘name)teenagers.toDebugStringteenagers.map(t => "Name: " + t(0)).collect().foreach(println)
SparkSQL操作已有的hive表
spark-shell方式访问:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext._sql("SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10").collect().foreach(println)sql("SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10").collect().foreach(println)
spark-sql方式访问:
需要将hive-site.xml拷贝到$SPARK_HOME/conf下
SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10;SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10;
hive-thriftserver方式访问:
1)启动hive-thriftserver:
cd $SPARK_HOME/sbinstart-thriftserver.sh
指定端口方式启动:start-thriftserver.sh --hiveconf hive.server2.thrift.port=14000
2)启动beeline客户端:
cd $SPARK_HOME/binbeeline -u jdbc:hive2://hadoop000:10000/default -n spark
SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10;SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10;
SparkSQL缓存表
在Spark1.2版本之后注意事项:
1)使用SchemaRDD.cache或者SQLContext.cacheTable,都采用列式存储的方式缓存到内存中;
2)SQLContext.cacheTable/uncacheTable都是eager的,而不再是lazy;不再需要手工触发action后才进行缓存;
3)可以通过CACHE [LAZY] TABLE tb1 [AS SELECT ...] 手工设置LAZY或者EAGER;
cacheTable后注意观察WEBUI界面Stroage的变化
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext._sql("cache table page_views")sql("select session_id, count(session_id) as c from page_views group by session_id order by c desc limit 10").collect().foreach(println)sql("uncache table page_views")
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext._sql("CACHE TABLE page_views_cached_eager AS SELECT * FROM page_views") sql("select session_id, count(session_id) as c from page_views_cached_eager group by session_id order by c desc limit 10").collect().foreach(println)uncacheTable("page_views_cached")
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext._sql("CACHE LAZY TABLE page_views_cached_lazy AS SELECT * FROM page_views") sql("select count(*) as c from page_views_cached_lazy").collect().foreach(println)sql("select session_id, count(session_id) as c from page_views_cached_lazy group by session_id order by c desc limit 10").collect().foreach(println)uncacheTable("page_views_cached")
SparkSQL简易入门
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。