首页 > 代码库 > Spark学习笔记——读写HDFS

Spark学习笔记——读写HDFS

使用Spark读写HDFS中的parquet文件

文件夹中的parquet文件

技术分享

build.sbt文件

name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.1.0",
  "mysql" % "mysql-connector-java" % "5.1.31",
  "org.apache.spark" %% "spark-sql" % "2.1.0",
  "org.apache.hbase" % "hbase-common" % "1.3.0",
  "org.apache.hbase" % "hbase-client" % "1.3.0",
  "org.apache.hbase" % "hbase-server" % "1.3.0",
  "org.apache.hbase" % "hbase" % "1.2.1"
)

 

Scala实现方法

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import java.util.Properties

import com.google.common.collect.Lists
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat


/**
  * Created by mi on 17-4-11.
  */

case class resultset(name: String,
                     info: String,
                     summary: String)

case class IntroItem(name: String, value: String)


case class BaikeLocation(name: String,
                         url: String = "",
                         info: Seq[IntroItem] = Seq(),
                         summary: Option[String] = None)

case class MewBaikeLocation(name: String,
                         url: String = "",
                         info: Option[String] = None,
                         summary: Option[String] = None)


object MysqlOpt {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    //定义数据库和表信息
    val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
    val table = "baike_pages"

    //读取parquetFile,并写入Mysql
    val sparkSession = SparkSession.builder()
      .master("local")
      .appName("spark session example")
      .getOrCreate()
    val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow")
//    parquetDF.collect().take(20).foreach(println)
    //parquetDF.show()

    //BaikeLocation是读取的parquet文件中的case class
    val ds = parquetDF.as[BaikeLocation].map { line =>
      //把info转换为新的case class中的类型String
      val info = line.info.map(item => item.name + ":" + item.value).mkString(",")
      //注意需要把字段放在一个case class中,不然会丢失列信息
      MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary)
    }.cache()

    ds.show()
//    ds.take(2).foreach(println)

    //写入Mysql
    //    val prop = new Properties()
    //    prop.setProperty("user", "root")
    //    prop.setProperty("password", "123456")
    //    ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop)

    //写入parquetFile
    ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1")

  }

}

 

Spark学习笔记——读写HDFS