首页 > 代码库 > spark定制之六:sql版start.scala
spark定制之六:sql版start.scala
上个版本的start.scala用的是HiveContext,这个是SQLContext的,不需编译。
# cat testperson.txt #字段用table键分隔
zs 10 30.0
li 12 32.0
# spark-shell -i:start.scala
scala> help
根据提示逐步运行
import org.apache.spark.sql.SchemaRDD var FIELD_SEPERATOR = "\t" var RECORD_SEPERATOR = "\n" var lastrdd : SchemaRDD = null object MyFileUtil extends java.io.Serializable { import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.FileStatus import scala.collection.mutable.ListBuffer def regularFile(filepath:String):String = { if(filepath == "") { filepath; } else if(filepath.startsWith("hdfs:")) { filepath } else if(filepath.startsWith("file:")) { filepath } else if(filepath.startsWith("/")) { "file://" + filepath } else { val workdir = System.getProperty("user.dir") "file://" + workdir + "/" + filepath } } var SAFEMINPATH_LENGTH : Int = 24 def getFileSystem(filepath:String) = { if(filepath.startsWith("hdfs:")) { FileSystem.get(new org.apache.hadoop.conf.Configuration()); } else if(filepath.startsWith("file:")) { FileSystem.getLocal(new org.apache.hadoop.conf.Configuration()); } else { throw new Exception("file path invalid") } } def deletePath(filepath:String) = { if(filepath.length < SAFEMINPATH_LENGTH) throw new Exception("file path is to short") var fs : FileSystem = getFileSystem(filepath) if (fs.exists(new Path(filepath))) { fs.delete(new Path(filepath), true); } } def listFile(fs:FileSystem, path:Path, pathlist:ListBuffer[Path], statuslist:ListBuffer[FileStatus]=null) { if ( fs.exists(path) ) { val substatuslist = fs.listStatus(path); for(substatus <- substatuslist){ if(statuslist != null) statuslist.append(substatus) if(substatus.isDir()){ listFile(fs,substatus.getPath(),pathlist); }else{ pathlist.append(substatus.getPath()); } } } } def hasContext(filepath:String) = { val realpath = regularFile(filepath) val fs = getFileSystem(realpath) val pathlist = ListBuffer[Path]() val statuslist = ListBuffer[FileStatus]() listFile(fs,new Path(filepath),pathlist,statuslist) var length:Long = 0 for( status <- statuslist ) length += status.getLen() length > 0 } } org.apache.spark.repl.Main.interp.command(""" class MySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) extends java.io.Serializable { def go() = { var startstr = "" var endstr = RECORD_SEPERATOR val result = rdd.collect result.foreach( x => print(x.mkString(startstr,FIELD_SEPERATOR,endstr)) ) } def result() = { rdd.collect } def saveto(output: String) = { import org.apache.hadoop.io.{NullWritable,Text} var startstr = "" var endstr = RECORD_SEPERATOR if(output.startsWith("hdfs:")) { val outputpath = MyFileUtil.regularFile(output) MyFileUtil.deletePath(outputpath) rdd.map(x => (NullWritable.get(), new Text(x.mkString(FIELD_SEPERATOR))) ).saveAsHadoopFile[ org.apache.hadoop.mapred.TextOutputFormat[NullWritable, Text] ](outputpath) } else { val outputpath = MyFileUtil.regularFile(output) MyFileUtil.deletePath(outputpath) val result = rdd.collect() val writer = new java.io.FileWriter(output) result.foreach(x => writer.write(x.mkString(startstr,FIELD_SEPERATOR,endstr)) ) writer.close() } } } object MySchemaRDD { implicit def toMySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) = new MySchemaRDD(rdd) } """) val ssc = new org.apache.spark.sql.SQLContext(sc) import ssc._ import MySchemaRDD._ def getRegisterString(rddname:String,classname:String,tablename:String,tabledef:String) : String = { val members = tabledef.trim.split(",").map(_.trim.split(" ").filter(""!=)).map(x => (x(0).trim,x(1).trim.head.toString.toUpperCase+x(1).trim.tail)) val classmemberdef = members.map(x => (x._1+":"+x._2)).mkString(",") val convertstr = members.map(x => x._2).zipWithIndex.map(x => "t("+x._2+").to"+x._1).mkString(",") return s""" case class ${classname}(${classmemberdef}) val schemardd = ${rddname}.map(_.split("${FIELD_SEPERATOR}")).map(t=>${classname}(${convertstr})) ssc.registerRDDAsTable(schemardd,"${tablename}") """ } org.apache.spark.repl.Main.interp.command(""" class MyCommandTranslator(cmd:String) extends java.io.Serializable { def go()(implicit f: SchemaRDD => MySchemaRDD) = { lastrdd = sql(cmd) lastrdd.go() } def saveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = { lastrdd = sql(cmd) lastrdd.saveto(output) } def result()(implicit f: SchemaRDD => MySchemaRDD) = { lastrdd = sql(cmd) lastrdd.result() } // def hqlgo()(implicit f: SchemaRDD => MySchemaRDD) = { // lastrdd = hql(cmd) // lastrdd.go() // } // // def hqlsaveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = { // lastrdd = hql(cmd) // lastrdd.saveto(output) // } // // def hqlresult()(implicit f: SchemaRDD => MySchemaRDD) = { // lastrdd = hql(cmd) // lastrdd.result() // } def defineas(tabledef:String) = { if( tabledef != "" ) { org.apache.spark.repl.Main.interp.command( getRegisterString(cmd,cmd.toUpperCase,cmd,tabledef) ) } else { org.apache.spark.repl.Main.interp.command( "ssc.registerRDDAsTable(${cmd},\"${cmd}\")" ) } } def from(filepath:String) { if( cmd.trim.startsWith("create table ") ) { val tablename = cmd.trim.substring(13).trim().split(" ")(0) val leftstr = cmd.substring(13).trim().substring(tablename.length).trim() val tabledef = leftstr.substring(1,leftstr.length-1).trim() val realfile = MyFileUtil.regularFile(filepath) org.apache.spark.repl.Main.interp.command( "val "+tablename+" = sc.textFile(\""+realfile+"\")" ) new MyCommandTranslator(tablename).defineas(tabledef) } else { println("usage:") println("\"create table sometablename (field1 string,field2 int...)\" from \"somefile or hdfs:somepath\"") } } def isok() = { if(cmd.contains(".") || cmd.contains("/")) { MyFileUtil.hasContext(cmd) } else { val res = sql(s"select count(*) from ${cmd}").result() val count = res(0).getLong(0) count > 0 } } } object MyCommandTranslator { implicit def stringToTranslator(cmd:String) = new MyCommandTranslator(cmd) def show(tabledata:Array[org.apache.spark.sql.Row]) = { tabledata.foreach( x => println(x.mkString("\t"))) } } """) def to = MyCommandTranslator import MyCommandTranslator._ val onetable = sql("select 1 as id") ssc.registerRDDAsTable(onetable,"onetable") def help = { println("""example: "create table testperson (name string,age int,weight double)" from "testperson.txt" "select * from testperson" go "select * from testperson" saveto "somelocalfile.txt" "select * from testperson" saveto "hdfs:/basedir/parentdir/testperson" "testperson" isok "somelocalfile.txt" isok "hdfs:/basedir/parentdir/testperson" isok val data = http://www.mamicode.com/"select * from testperson" result >
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。