首页 > 代码库 > Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

Spark SQL是处理结构化数据的Spark模块。它提供了DataFrames这样的编程抽象。同一时候也能够作为分布式SQL查询引擎使用。

DataFrames

DataFrame是一个带有列名的分布式数据集合。等同于一张关系型数据库中的表或者R/Python中的data frame,只是在底层做了非常多优化;我们能够使用结构化数据文件、Hive tables,外部数据库或者RDDS来构造DataFrames。

1. 開始入口:

入口须要从SQLContext类或者它的子类開始,当然须要使用SparkContext创建SQLContext;这里我们使用pyspark(已经自带了SQLContext即sc):

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

还能够使用HiveContext,它能够提供比SQLContext很多其它的功能。比如能够使用更完整的HiveQL解析器写查询,使用Hive UDFs。从Hive表中读取数据等。

使用HiveContext并不须要安装hive,Spark默认将HiveContext单独打包避免对hive过多的依赖

2.创建DataFrames
使用JSON文件创建:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame to stdout
df.show()

注意:
这里你可能须要将文件存入HDFS(这里的文件在Spark安装文件夹中,1.4版本号)

hadoop fs -mkdir examples/src/main/resources/
hadoop fs -put /appcom/spark/examples/src/main/resources/*         /user/hdpuser/examples/src/main/resources/

3.DataFrame操作

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Create the DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
df.show()
## age  name
## null Michael
## 30   Andy
## 19   Justin

# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin

# Select everybody, but increment the age by 1
df.select(df[‘name‘], df[‘age‘] + 1).show()
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# Select people older than 21
df.filter(df[‘age‘] > 21).show()
## age name
## 30  Andy

# Count people by age
df.groupBy("age").count().show()
## age  count
## null 1
## 19   1
## 30   1

4.使用编程执行SQL查询
SQLContext能够使用编程执行SQL查询并返回DataFrame。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")

5.和RDD交互

将RDD转换成DataFrames有两种方法:

  • 利用反射来判断包括特定类型对象的RDD的schema。这样的方法会简化代码而且在你已经知道schema的时候非常适用。
  • 使用编程接口。构造一个schema并将其应用在已知的RDD上。

一、利用反射判断Schema
Spark SQL能够将含Row对象的RDD转换成DataFrame。并判断数据类型。通过将一个键值对(key/value)列表作为kwargs传给Row类来构造Rows。

key定义了表的列名,类型通过看第一列数据来判断。

(所以这里RDD的第一列数据不能有缺失)未来版本号中将会通过看很多其它数据来判断数据类型。像如今对JSON文件的处理一样。

# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print teenName

二、编程指定Schema
通过编程指定Schema须要3步:

  1. 从原来的RDD创建一个元祖或列表的RDD。
  2. 用StructType 创建一个和步骤一中创建的RDD中元祖或列表的结构相匹配的Schema。

  3. 通过SQLContext提供的createDataFrame方法将schema 应用到RDD上。

# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")

# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print name
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

Spark SQL and DataFrame Guide(1.4.1)——之DataFrames