首页 > 代码库 > 如何为SparkSQL添加hive中支持的而SparkSQL暂未支持的命令

如何为SparkSQL添加hive中支持的而SparkSQL暂未支持的命令

以ANALYZE为例描述

ANALYZE在Hive中的使用方法详见:https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables

 

ANALYZE在Hive中使用简单介绍

一张表有4个分区:

Partition1: (ds=2008-04-08, hr=11)Partition2: (ds=2008-04-08, hr=12)Partition3: (ds=2008-04-09, hr=11)Partition4: (ds=2008-04-09, hr=12)

 

ANALYZE TABLE Table1 PARTITION(ds=2008-04-09, hr=11) COMPUTE STATISTICS;

结果是:partition3 (ds=‘2008-04-09‘, hr=11) 

 

ANALYZE TABLE Table1 PARTITION(ds=2008-04-09, hr) COMPUTE STATISTICS;

结果是: partitions 3和4

 

ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS;

结果是: 全部的4个partitions

 

ANALYZE TABLE Table1 COMPUTE STATISTICS;

对于非分区表可以使用如上命令。

注意:如果Table1是分区表,在使用ANALYZE是必须要指定分区,否则Semantic Analyzer会报错。

 

如何在SparkSQL中添加代码支持Hive中的ANALYZE功能

红色代码是为ANALYZE新添加的功能

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

private[hive] case class AnalyzeTable(tableName: String) extends Commandprivate[hive] object HiveQl {  val tree = getAst(sql)  if (nativeCommands contains tree.getText) {    NativeCommand(sql)  } else {    ifExists) =>    val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")    DropTable(tableName, ifExists.nonEmpty)    
    // Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"    case Token("TOK_ANALYZE", Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) :: isNoscan) =>

    // Reference: https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables    if (partitionSpec.nonEmpty) {   // Analyze partitions will be treated as a Hive native command.   NativePlaceholder    } else if (isNoscan.isEmpty) {   // If users do not specify "noscan", it will be treated as a Hive native command.   NativePlaceholder    } else {   val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")       AnalyzeTable(tableName)
    }}

 

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nilcase AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nilcase describe: logical.DescribeCommand =>    val resolvedTable = context.executePlan(describe.table).analyzed    resolvedTable match {

 

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

/** * :: DeveloperApi :: * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. * * Right now, it only supports Hive tables and it only updates the size of a Hive table * in the Hive metastore. */@DeveloperApicase class AnalyzeTable(tableName: String) extends LeafNode with Command {  def hiveContext = sqlContext.asInstanceOf[HiveContext]  def output = Seq.empty  override protected[sql] lazy val sideEffectResult = {    hiveContext.analyze(tableName)    Seq.empty[Any]  }  override def execute(): RDD[Row] = {    sideEffectResult    sparkContext.emptyRDD[Row]  }}

 

添加ANALYZE后的SparkSQL使用方法:

ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan