首页 > 代码库 > 如何为SparkSQL添加hive中支持的而SparkSQL暂未支持的命令
如何为SparkSQL添加hive中支持的而SparkSQL暂未支持的命令
以ANALYZE为例描述
ANALYZE在Hive中的使用方法详见:https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
ANALYZE在Hive中使用简单介绍
一张表有4个分区:
Partition1: (ds=‘2008-04-08‘, hr=11)Partition2: (ds=‘2008-04-08‘, hr=12)Partition3: (ds=‘2008-04-09‘, hr=11)Partition4: (ds=‘2008-04-09‘, hr=12)
ANALYZE TABLE Table1 PARTITION(ds=‘2008-04-09‘, hr=11) COMPUTE STATISTICS;
结果是:partition3 (ds=‘2008-04-09‘, hr=11)
ANALYZE TABLE Table1 PARTITION(ds=‘2008-04-09‘, hr) COMPUTE STATISTICS;
结果是: partitions 3和4
ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS;
结果是: 全部的4个partitions
ANALYZE TABLE Table1 COMPUTE STATISTICS;
对于非分区表可以使用如上命令。
注意:如果Table1是分区表,在使用ANALYZE是必须要指定分区,否则Semantic Analyzer会报错。
如何在SparkSQL中添加代码支持Hive中的ANALYZE功能
红色代码是为ANALYZE新添加的功能
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
private[hive] case class AnalyzeTable(tableName: String) extends Commandprivate[hive] object HiveQl { val tree = getAst(sql) if (nativeCommands contains tree.getText) { NativeCommand(sql) } else { ifExists) => val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") DropTable(tableName, ifExists.nonEmpty)
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan" case Token("TOK_ANALYZE", Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) :: isNoscan) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables if (partitionSpec.nonEmpty) { // Analyze partitions will be treated as a Hive native command. NativePlaceholder } else if (isNoscan.isEmpty) { // If users do not specify "noscan", it will be treated as a Hive native command. NativePlaceholder } else { val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") AnalyzeTable(tableName) }}
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nilcase AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nilcase describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match {
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
/** * :: DeveloperApi :: * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. * * Right now, it only supports Hive tables and it only updates the size of a Hive table * in the Hive metastore. */@DeveloperApicase class AnalyzeTable(tableName: String) extends LeafNode with Command { def hiveContext = sqlContext.asInstanceOf[HiveContext] def output = Seq.empty override protected[sql] lazy val sideEffectResult = { hiveContext.analyze(tableName) Seq.empty[Any] } override def execute(): RDD[Row] = { sideEffectResult sparkContext.emptyRDD[Row] }}
添加ANALYZE后的SparkSQL使用方法:
ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。