首页 > 代码库 > Spark_总结四

Spark_总结四

 


Spark_总结四

1.Spark SQL  

技术分享

    Spark SQL 和 Hive on Spark 两者的区别?
        spark on hive:hive只是作为元数据存储的角色,解析,优化,执行都是spark做的    
        hive on spark: hive既作为存储的角色,又作为计算角色的一部分,hive将sql解析Spark任务,底层是Spark引擎hive2.0以后推荐使用Spark引擎,转化为Spark任务,hvie2.0以前都是转化为MR任务)
            
    Spark SQL 转化的过程(底层架构)
技术分享
 
【SQL/HQL-->解析器-->分析器-->优化器-->CostModel消耗模型(选出消耗最低的,就是效率最高的),最终将传入的SQL转换为RDD的计算】
 
须知:
        若想使用SparkSQL必须创建SQLContext 必须是传入SparkContext 不能是SparkConf
技术分享
 
1.DataFrame与RDD的区别?   ||   什么是DataFrame?

技术分享

区别:
      Spark core是基于RDD的编程,Spark SQL是基于DataFrame的编程,DataFrame的底层就是封装的RDD,只不过DataFrame底层RDD的泛型是ROW(DataFrame <==> RDD<ROW>),另外,DataFrame中有对列的描述,但是RDD没有对列的描述。      
What is DataFrame:
      DataFrame RDD 类似,DataFrame 是一个分布式数据容器,更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息(比如对列的描述), 即 schema。同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、 array 和 map)。 从 API 易用性的角度上 看,DataFrameAPI 提供的是一套高层的关系操作函数式的 RDDAPI 更加友好,门槛更低。 

3.创建DataFrame的来源和方   ||   如何对DataFrame中封装的数据进行操作?

3.1创建DataFrame的来源和方
技术分享
 

3.2如何对DataFrame中封装的数据进行操作?

   当我们的DataFrame构建好之后,里面封装了我们的数据,需要对数据进行操作即对DataFrame进行操作,有两种方式
3.2.1   通过方法
        sqlContext.read()    返回DataFrameReader对象
        sqlContext.read().json("student.json")   读取一个json文件(这个json文件中的内容不能是嵌套的)读进来变成DataFrame,
        df.select("age").show(),如果没有show,这个程序就不会执行,这个show就类似与Spark中Action类型的算子,触发执行
示例代码:
  1. package com.hzf.spark.exercise;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaSparkContext;
  4. import org.apache.spark.sql.DataFrame;
  5. import org.apache.spark.sql.SQLContext;
  6. publicclassTestSparkSQL02{
  7. publicstaticvoid main(String[] args){
  8. SparkConf conf =newSparkConf().setAppName("DataFrameOps").setMaster("local");
  9. JavaSparkContext sc =newJavaSparkContext(conf);
  10. SQLContext sqlContext =newSQLContext(sc);
  11. DataFrame df = sqlContext.read().json("people.json");
  12. /*
  13. * 操作DataFrame的第一种方式
  14. * */
  15. //类似 SQL的select from table;
  16. df.show();
  17. //desc table
  18. df.printSchema();
  19. //select age from table;
  20. df.select("age").show();
  21. //select name from table;
  22. df.select("name").show();
  23. //select name,age+10 from table;
  24. df.select(df.col("name"),df.col("age").plus(10)).show();
  25. //select * from table where age > 20
  26. df.filter(df.col("age").gt(20)).show();
  27. }
  28. }
result:
技术分享
 
3.2.2   通过注册临时表,传入SQL语句(推荐使用)
示例代码:
  1. package com.hzf.spark.exercise;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaSparkContext;
  4. import org.apache.spark.sql.DataFrame;
  5. import org.apache.spark.sql.SQLContext;
  6. publicclassTestSparkSQL01{
  7. publicstaticvoid main(String[] args){
  8. SparkConf conf =newSparkConf().setAppName("DataFrameOps").setMaster("local");
  9. JavaSparkContext sc =newJavaSparkContext(conf);
  10. SQLContext sqlContext =newSQLContext(sc);
  11. DataFrame df = sqlContext.read().json("people.json");
  12. //将DataFrame中封装的数据注册为一张临时表,对临时表进行sql操作
  13. df.registerTempTable("people");
  14. DataFrame sql = sqlContext.sql("SELECT * FROM people WHERE age IS NOT NULL");
  15. sql.show();
  16. }
  17. }
result:
技术分享
 
3.3创建DataFrame的几种方式,来源(json,jsonRDD,parquet,非json格式,mysql)
 
<1>读取Json格式文件-->DataFrame:Json 文件中不能有嵌套的格式
      加载json格式文件-->DataFrame有两种方式
           方式一:DataFrame df = sqlContext.read().format("json").load("people.json");
           方式二:DataFrame df = sqlContext.read().json("people.json");
数据集:
技术分享
示例代码:
  1. package com.bjsxt.java.spark.sql.json;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.SparkContext;
  6. import org.apache.spark.api.java.JavaPairRDD;
  7. import org.apache.spark.api.java.JavaRDD;
  8. import org.apache.spark.api.java.JavaSparkContext;
  9. import org.apache.spark.api.java.function.Function;
  10. import org.apache.spark.api.java.function.PairFunction;
  11. import org.apache.spark.sql.DataFrame;
  12. import org.apache.spark.sql.Row;
  13. import org.apache.spark.sql.RowFactory;
  14. import org.apache.spark.sql.SQLContext;
  15. import org.apache.spark.sql.types.DataTypes;
  16. import org.apache.spark.sql.types.StructField;
  17. import org.apache.spark.sql.types.StructType;
  18. import scala.Tuple2;
  19. /**
  20. * JSON数据源
  21. * @author Administrator
  22. *
  23. */
  24. publicclassJSONDataSource{
  25. publicstaticvoid main(String[] args){
  26. SparkConf conf =newSparkConf()
  27. .setAppName("JSONDataSource")
  28. // .set("spark.default.parallelism", "100")
  29. .setMaster("local");
  30. JavaSparkContext sc =newJavaSparkContext(conf);
  31. SQLContext sqlContext =newSQLContext(sc);
  32. DataFrame studentScoresDF = sqlContext.read().json("student.json");
  33. studentScoresDF.registerTempTable("student_scores");
  34. DataFrame goodStudentScoresDF = sqlContext.sql(
  35. "select name,count(score) from student_scores where score>=80 group by name");
  36. List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(newFunction<Row,String>(){
  37. privatestaticfinallong serialVersionUID =1L;
  38. @Override
  39. publicString call(Row row)throwsException{
  40. return row.getString(0);
  41. }
  42. }).collect();
  43. for(String str: goodStudentNames){
  44. System.out.println(str);
  45. }
  46. }
  47. }
result:
技术分享
 
<2>jsonRDD-->DataFrame
技术分享
 
<3>读取Parquet格式文件-->DataFrame自动推测分区,合并 Schema。
经验:将Spark中的文本转换为Parquet以提升性能

    parquet是一个基于列的存储格式,列式存储布局可以加速查询,因为它只检查所有需要的列并对它们的值执行计算,因此只读取一个数据文件或表的小部分数据。Parquet 还支持灵活的压缩选项,因此可以显著减少磁盘上的存储

   如果在 HDFS 上拥有基于文本的数据文件或表,而且正在使用 Spark SQL 对它们执行查询,那么强烈推荐将文本数据文件转换为 Parquet 数据文件,以实现性能和存储收益。当然,转换需要时间,但查询性能的提升在某些情况下可能达到 30 倍或更高,存储的节省可高达 75%!

parquet的压缩比高,将一个普通的文本转化为parquet格式,如何去转?
       val lineRDD = sc.textFile()
       DF.save(parquet) //将RDD转化为DF
parquet操作示例
   是否指定format--若存储时,指定format为json格式,那么则生成json格式文件,否则不指定format,默认文件以parquet形式进行存储 
测试一:指定format为json格式,存储在本地
测试数据:   top.txt
技术分享
测试代码

技术分享

测试结果
技术分享
 
技术分享
 
测试二:不指定format,那么文件默认以parquet形式进行存储,存储在本地
 
测试数据:   people.json
技术分享
测试代码
技术分享
测试结果
技术分享
技术分享
 
测试三:读取本地parquet存储格式的文件
测试代码
技术分享
测试结果
技术分享
 
测试四:读取hdfs上parquet形式的文件
技术分享
测试代码
技术分享
测试结果
技术分享
 
<4> RDD(非json格式变成DataFrame)
读取txt 文件-->DataFrame从 txt 文件读取,然后转为 RDD,最后转为 DataFrame
                                       RDD 转为 DataFrame 有两种方式
                        (1)反射机制
                              注意点:自定义的类一定要是 public,并且要实现序列化接口 Serializable
                                             取数据的时候,在 JavaAPI 中会有顺序问题(因为 DataFrame 转为 RDD<Row> 的时候,会进行一次字典排序改变 Row 的位置,而Scala 的 API 则没有这个问题)
                        (2)动态创建 Schema,先将 RDD 中的每一行类型变 为 RDD<Row> 类型,然后创建 DataFrame 的元数据-->构建 StructType,用于最后 DataFrame 元数据的描述,基于现有的 StructType 以及 RDD<Row> 来构造 DataFrame。(如果列的信息比较长可以存到数据库里) 
<4.1>反射机制
数据
技术分享
示例代码:
自定义类
技术分享
  1. package com.bjsxt.java.spark.sql.createdf;
  2. import java.util.List;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.Function;
  7. import org.apache.spark.sql.DataFrame;
  8. import org.apache.spark.sql.Row;
  9. import org.apache.spark.sql.SQLContext;
  10. /**
  11. * 使用反射的方式将RDD转换成为DataFrame
  12. * 1、自定义的类必须是public
  13. * 2、自定义的类必须是可序列化的
  14. * 3、RDD转成DataFrame的时候,他会根据自定义类中的字段名进行排序。
  15. * @author zfg
  16. *
  17. */
  18. publicclass RDD2DataFrameByReflection {
  19. publicstaticvoid main(String[] args){
  20. SparkConf conf =newSparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
  21. JavaSparkContext sc =newJavaSparkContext(conf);
  22. SQLContext sqlcontext =newSQLContext(sc);
  23. JavaRDD<String> lines = sc.textFile("Peoples.txt");
  24. JavaRDD<Person> personsRdd = lines.map(newFunction<String,Person>(){
  25. privatestaticfinallong serialVersionUID =1L;
  26. @Override
  27. publicPerson call(String line)throwsException{
  28. String[] split = line.split(",");
  29. Person p =newPerson();
  30. p.setId(Integer.valueOf(split[0].trim()));
  31. p.setName(split[1]);
  32. p.setAge(Integer.valueOf(split[2].trim()));
  33. return p;
  34. }
  35. });
  36. //传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
  37. //在底层通过反射的方式或得Person的所有field,结合RDD本身,就生成了DataFrame
  38. DataFrame df = sqlcontext.createDataFrame(personsRdd,Person.class);
  39. //命名table的名字为person
  40. df.registerTempTable("personTable");
  41. DataFrame resultDataFrame = sqlcontext.sql("select * from personTable where age > 7");
  42. resultDataFrame.show();
  43. //将df转成rdd
  44. JavaRDD<Row> resultRDD = resultDataFrame.javaRDD();
  45. JavaRDD<Person> result = resultRDD.map(newFunction<Row,Person>(){
  46. privatestaticfinallong serialVersionUID =1L;
  47. @Override
  48. publicPerson call(Row row)throwsException{
  49. Person p =newPerson();
  50. p.setAge(row.getInt(0));
  51. p.setId(row.getInt(1));
  52. p.setName(row.getString(2));
  53. return p;
  54. }
  55. });
  56. List<Person> personList = result.collect();
  57. for(Person person : personList){
  58. System.out.println(person.toString());
  59. }
  60. }
  61. }
result:
技术分享
技术分享

<4.2>动态创建Schema方式
数据
技术分享
示例代码:
  1. package com.bjsxt.java.spark.sql.createdf;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.api.java.JavaRDD;
  6. import org.apache.spark.api.java.JavaSparkContext;
  7. import org.apache.spark.api.java.function.Function;
  8. import org.apache.spark.sql.DataFrame;
  9. import org.apache.spark.sql.Row;
  10. import org.apache.spark.sql.RowFactory;
  11. import org.apache.spark.sql.SQLContext;
  12. import org.apache.spark.sql.types.DataTypes;
  13. import org.apache.spark.sql.types.StructField;
  14. import org.apache.spark.sql.types.StructType;
  15. publicclass RDD2DataFrameByProgrammatically {
  16. publicstaticvoid main(String[] args){
  17. SparkConf conf =newSparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
  18. JavaSparkContext sc =newJavaSparkContext(conf);
  19. SQLContext sqlcontext =newSQLContext(sc);
  20. /**
  21. * 在RDD的基础上创建类型为Row的RDD
  22. */
  23. JavaRDD<String> lines = sc.textFile("Peoples.txt");
  24. JavaRDD<Row> rowRDD = lines.map(newFunction<String,Row>(){
  25. privatestaticfinallong serialVersionUID =1L;
  26. @Override
  27. publicRow call(String line)throwsException{
  28. String[] split = line.split(",");
  29. returnRowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2]));
  30. }
  31. });
  32. /**
  33. * 动态构造DataFrame的元数据,一般而言,有多少列以及每列的具体类型可能来自于Json,也可能来自于DB
  34. */
  35. ArrayList<StructField> structFields =newArrayList<StructField>();
  36. structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true));
  37. structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
  38. structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));
  39. //构建StructType,用于最后DataFrame元数据的描述
  40. StructType schema =DataTypes.createStructType(structFields);
  41. /**
  42. * 基于已有的MetaData以及RDD<Row> 来构造DataFrame
  43. */
  44. DataFrame df = sqlcontext.createDataFrame(rowRDD, schema);
  45. /**
  46. *注册成为临时表以供后续的SQL操作查询
  47. */
  48. df.registerTempTable("persons");
  49. /**
  50. * 进行数据的多维度分析
  51. */
  52. DataFrame result = sqlcontext.sql("select * from persons where age > 7");
  53. result.show();
  54. /**
  55. * 对结果进行处理,包括由DataFrame转换成为RDD<Row>
  56. */
  57. List<Row> listRow = result.javaRDD().collect();
  58. for(Row row : listRow){
  59. System.out.println(row);
  60. }
  61. }
  62. }
result:
技术分享
技术分享

<5> 读取MySql 中表里的数据-->DataFrame
       Spark Build-in内置支持的json jdbc mysql,hive...如果数据库支持jdbc连接,Spark 就可以基于这个数据库尽行数据的处理
示例代码:
  1. package com.bjsxt.java.spark.sql.jdbc;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaSparkContext;
  4. import org.apache.spark.sql.DataFrame;
  5. import org.apache.spark.sql.DataFrameReader;
  6. import org.apache.spark.sql.SQLContext;
  7. /**
  8. * JDBC数据源
  9. *
  10. * @author Administrator
  11. *
  12. */
  13. publicclassJDBCDataSource{
  14. publicstaticvoid main(String[] args){
  15. SparkConf conf =newSparkConf().setAppName("JDBCDataSource").setMaster("local");
  16. JavaSparkContext sc =newJavaSparkContext(conf);
  17. SQLContext sqlContext =newSQLContext(sc);
  18. // 方法1、分别将mysql中两张表的数据加载为DataFrame
  19. /*
  20. * Map<String, String> options = new HashMap<String, String>();
  21. * options.put("url", "jdbc:mysql://hadoop1:3306/testdb");
  22. * options.put("driver", "com.mysql.jdbc.Driver");
  23. * options.put("user","spark");
  24. * options.put("password", "spark2016");
  25. * options.put("dbtable", "student_info");
  26. * DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();
  27. * options.put("dbtable", "student_score");
  28. * DataFrame studentScoresDF = sqlContext.read().format("jdbc") .options(options).load();
  29. */
  30. // 方法2、分别将mysql中两张表的数据加载为DataFrame
  31. DataFrameReader reader = sqlContext.read().format("jdbc");
  32. reader.option("url","jdbc:mysql://node4:3306/testdb");
  33. reader.option("driver","com.mysql.jdbc.Driver");
  34. reader.option("user","root");
  35. reader.option("password","123");
  36. reader.option("dbtable","student_info");
  37. DataFrame studentInfosDF = reader.load();
  38. reader.option("dbtable","student_score");
  39. DataFrame studentScoresDF = reader.load();
  40. // 将两个DataFrame转换为JavaPairRDD,执行join操作
  41. studentInfosDF.registerTempTable("studentInfos");
  42. studentScoresDF.registerTempTable("studentScores");
  43. String sql ="SELECT studentInfos.name,age,score "
  44. +" FROM studentInfos JOIN studentScores"
  45. +" ON (studentScores.name = studentInfos.name)"
  46. +" WHERE studentScores.score > 80";
  47. DataFrame sql2 = sqlContext.sql(sql);
  48. sql2.show();
  49. }
  50. }
result:
技术分享

4. 如何将DataFrame中的值写入到外部存储中去?

存储模式SaveMode.Overwrite || Ignore || Append   ||   ErrorifExit)
<1> 读取本地json格式文件,并以json形式写入到hdfs(不指定format,默认是parquet)
测试代码
技术分享
测试结果
技术分享
技术分享
 
补充:
1.什么是下推过滤器?
       在join之前过滤,而不是join之后进行过滤
 
2.select * from table 在SparkSQL和Hive on MR中的区别?
      SparkSQL 中 select * from table 在spark中是要具体执行spark任务的,而在 Hive on MR 中 select * from table直接读取数据,所以SparkSQL 中执行select * from不一定比Hive on MR中的快
 
3.如何将一个DataFrame变成一个RDD?
    JavaRDD<ROW> rdd = resultFrame.javaRDD()
 

5.整合Spark和Hive?

    6.1Spark 目录下面的 conf 下放一个配置文件 hive-site.xml 文件。
    6.2在 hive 的服务端启动 MetaStore Server因为 HiveContext 会用到 metastore 服务。(在 Spark-shell 里面使用 HiveContext 的时候,要记住导入 HiveContext)】(hive --service metastore)
    6.3启动hdfs【因为hive的数据是存在hdfs上的Spark集群(start-all.sh spark-start-all.sh)
    6.4进入Spark shell,测试Spark 和 Hive是否整合成功
  1. scala>import org.apache.spark.sql.hive.HiveContext
  2. scala>val hiveContext =newHiveContext(sc)
  3. scala>hiveContext.sql("show tables").show
    6.5整合测试(详见Spark_some配置)注意!将代码提交到Spark集群上运行时,需要将hdfs-site.xml拷贝到SPARK_HOME/conf下

6.SqlContext和HiveContext的关系?

       SQLcontext 是 HiveContext 的父类
       在集群中运行的时候用 HiveContext,可以基于 Hive 来操作 Hive 表,对源数据进行CRUD的操作。 
  
    
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Spark_总结四