首页 > 代码库 > spark sql 操作

spark sql 操作

DSL风格语法

1、查看DataFrame中的内容

scala> df1.show+---+--------+---+| id| name|age|+---+--------+---+| 1|zhansgan| 16|| 2| lisi| 18|| 3| wangwu| 21|| 4|xiaofang| 22|+---+--------+---+

2、查看DataFrame部分列的数据

scala> df1.select(df1.col("name")).show+--------+| name|+--------+|zhansgan|| lisi|| wangwu||xiaofang|+--------+

  

scala> df1.select(col("name"), col("age")).show+--------+---+| name|age|+--------+---+|zhansgan| 16|| lisi| 18|| wangwu| 21||xiaofang| 22|+--------+---+

 

scala> df1.select("name").show+--------+| name|+--------+|zhansgan|| lisi|| wangwu||xiaofang|+--------+


3、查看DataFrame schema信息

scala> df1.printSchemaroot|-- id: integer (nullable = false)|-- name: string (nullable = true)|-- age: integer (nullable = false)

 

 

4、查询name和age并将age + 1

scala> df1.select(col("name"), col("age") + 1).show+--------+---------+| name|(age + 1)|+--------+---------+|zhansgan| 17|| lisi| 19|| wangwu| 22||xiaofang| 23|+--------+---------+

  

scala> df1.select(df1("name"), df1("age") + 1).show+--------+---------+| name|(age + 1)|+--------+---------+|zhansgan| 17|| lisi| 19|| wangwu| 22||xiaofang| 23|+--------+---------+


5、过滤年龄大于20的人

scala> df1.filter(col("age") > 20).show+---+--------+---+| id| name|age|+---+--------+---+| 3| wangwu| 21|| 4|xiaofang| 22|+---+--------+---+

  

6、按年龄分组,并统计年龄相同的人数

scala> df1.groupBy("age").count().show+---+-----+ |age|count|+---+-----+| 16| 1|| 18| 1|| 21| 1|| 22| 1|+---+-----+

  

SQL风格

在使用SQL风格前,首先需要将DataFrame注册成表

df1.registerTempTable("t_person")

 

1、查询年龄最大的前两个人

scala> sqlContext.sql("select * from t_person order by age desc limit 2").show+---+--------+---+| id| name|age|+---+--------+---+| 4|xiaofang| 22|| 3| wangwu| 21|+---+--------+---+

  

2、显示表的schema信息

scala> sqlContext.sql("desc t_person").show+--------+---------+-------+|col_name|data_type|comment|+--------+---------+-------+| id| int| || name| string| || age| int| |+--------+---------+-------+

  

DataFrame api 操作

 

package bigdata.spark.sqlimport org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkContext, SparkConf}import scala.reflect.internal.util.TableDef.Column/**  * Created by Administrator on 2017/4/27.  */object SparkSqlDemo {  def main(args: Array[String]) {    val conf = new SparkConf()    conf.setAppName("SparkSqlDemo")    conf.setMaster("local")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val rdd1 = sc.textFile("hdfs://m1:9000/persons.txt").map(_.split(" "))    val rdd2 = rdd1.map(x => Person(x(0).toInt, x(1), x(2).toInt))    // 导入隐式转换,里面包含了RDD隐式转换为DataFrame的方法    import sqlContext.implicits._    // df1现在已经是DataFrame了    val df1 = rdd2.toDF    df1.show    df1.select("age").show()    df1.select(col="age").show    df1.select(df1.col("age")).show    import df1._    df1.select(col("age")).show    df1.select(col("age") > 20).show    df1.select(col("age") + 1).show    df1.filter(col("age") > 20).show()    df1.registerTempTable("t_person")    sqlContext.sql("select * from t_person").show()    sqlContext.sql("select * from t_person order by age desc limit 2").show()    sc.stop()  }  // 这个类必须放在main方法外面,不然的话会报错  case class Person(id:Int, name:String, age:Int)}

  

StructType指定Schema

package bigdata.spark.sqlimport org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}import org.apache.spark.sql.{Row, SQLContext}import org.apache.spark.{SparkContext, SparkConf}import scala.reflect.internal.util.TableDef.Column/**  * Created by Administrator on 2017/4/27.  */object SparkSqlDemo {  def main(args: Array[String]) {    val conf = new SparkConf()    conf.setAppName("SparkSqlDemo")    conf.setMaster("local")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val rdd1 = sc.textFile("hdfs://m1:9000/persons.txt").map(_.split(" "))    val rdd2 = rdd1.map(x => Row(x(0).toInt, x(1), x(2).toInt))    // 创建schema    val schema = StructType(      List(        // 名称 类型 是否可以为空        StructField("id", IntegerType, false),        StructField("name", StringType, false),        StructField("age", IntegerType, false)      )    )    // 创建DataFrame    val df1 = sqlContext.createDataFrame(rdd2, schema)    df1.registerTempTable("t_person")    sqlContext.sql("select * from t_person").show()    sc.stop()  }}

  

spark sql 操作