首页 > 代码库 > 整理对Spark SQL的理解

整理对Spark SQL的理解

Catalyst


Catalyst是与Spark解耦的一个独立库,是一个impl-free的执行计划的生成和优化框架。

目前与Spark Core还是耦合的,对此user邮件组里有人对此提出疑问,见mail。

 

以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程。


Catalyst定位

其他系统如果想基于Spark做一些类sql、标准sql甚至其他查询语言的查询,需要基于Catalyst提供的解析器、执行计划树结构、逻辑执行计划的处理规则体系等类体系来实现执行计划的解析、生成、优化、映射工作。

对应上图中,主要是左侧的TreeNodelib及中间三次转化过程中涉及到的类结构都是Catalyst提供的。至于右侧物理执行计划映射生成过程,物理执行计划基于成本的优化模型,具体物理算子的执行都由系统自己实现。

 

Catalyst现状

在解析器方面提供的是一个简单的scala写的sql parser,支持语义有限,而且应该是标准sql的。

在规则方面,提供的优化规则是比较基础的(和Pig/Hive比没有那么丰富),不过一些优化规则其实是要涉及到具体物理算子的,所以部分规则需要在系统方那自己制定和实现(如spark-sql里的SparkStrategy)。

Catalyst也有自己的一套数据类型。


下面介绍Catalyst里几套重要的类结构。


TreeNode体系

TreeNode是Catalyst执行计划表示的数据结构,是一个树结构,具备一些scala collection的操作能力和树遍历能力。这棵树一直在内存里维护,不会dump到磁盘以某种格式的文件存在,且无论在映射逻辑执行计划阶段还是优化逻辑执行计划阶段,树的修改是以替换已有节点的方式进行的。

 

TreeNode,内部带一个children: Seq[BaseType]表示孩子节点,具备foreach、map、collect等针对节点操作的方法,以及transformDown(默认,前序遍历)、transformUp这样的遍历树上节点,对匹配节点实施变化的方法。

提供UnaryNode,BinaryNode, LeafNode三种trait,即非叶子节点允许有一个或两个子节点。

TreeNode提供的是范型。

TreeNode有两个子类继承体系,QueryPlan和Expression。QueryPlan下面是逻辑和物理执行计划两个体系,前者在Catalyst里有详细实现,后者需要在系统自己实现。Expression是表达式体系,后面章节都会展开介绍。


Tree的transformation实现:

传入PartialFunction[TreeType,TreeType],如果与操作符匹配,则节点会被结果替换掉,否则节点不会变动。整个过程是对children递归执行的。


执行计划表示模型


逻辑执行计划

QueryPlan继承自TreeNode,内部带一个output: Seq[Attribute],具备transformExpressionDown、transformExpressionUp方法。

在Catalyst中,QueryPlan的主要子类体系是LogicalPlan,即逻辑执行计划表示。其物理执行计划表示由使用方实现(spark-sql项目中)。

LogicalPlan继承自QueryPlan,内部带一个reference:Set[Attribute],主要方法为resolve(name:String): Option[NamedeExpression],用于分析生成对应的NamedExpression。

LogicalPlan有许多具体子类,也分为UnaryNode, BinaryNode, LeafNode三类,具体在org.apache.spark.sql.catalyst.plans.logical路径下。




逻辑执行计划实现

LeafNode主要子类是Command体系:


各command的语义可以从子类名字看出,代表的是系统可以执行的non-query命令,如DDL。

 

UnaryNode的子类:



BinaryNode的子类:



物理执行计划

另一方面,物理执行计划节点在具体系统里实现,比如spark-sql工程里的SparkPlan继承体系。



物理执行计划实现

每个子类都要实现execute()方法,大致有以下实现子类(不全)。

LeadNode的子类:



UnaryNode的子类:



BinaryNode的子类:


提到物理执行计划,还要提一下Catalyst提供的分区表示模型


执行计划映射

Catalyst还提供了一个QueryPlanner[Physical <: TreeNode[PhysicalPlan]]抽象类,需要子类制定一批strategies: Seq[Strategy],其apply方法也是类似根据制定的具体策略来把逻辑执行计划算子映射成物理执行计划算子。由于物理执行计划的节点是在具体系统里实现的,所以QueryPlanner及里面的strategies也需要在具体系统里实现。



在spark-sql项目中,SparkStrategies继承了QueryPlanner[SparkPlan],内部制定了LeftSemiJoin, HashJoin,PartialAggregation, BroadcastNestedLoopJoin, CartesianProduct等几种策略,每种策略接受的都是一个LogicalPlan,生成的是Seq[SparkPlan],每个SparkPlan理解为具体RDD的算子操作。

比如在BasicOperators这个Strategy里,以match-case匹配的方式处理了很多基本算子(可以一对一直接映射成RDD算子),如下:

case logical.Project(projectList, child) =>
  execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
  execution.Filter(condition, planLater(child)) :: Nil
case logical.Aggregate(group, agg, child) =>
  execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil
case logical.Sample(fraction, withReplacement, seed, child) =>
  execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil

Expression体系


Expression,即表达式,指不需要执行引擎计算,而可以直接计算或处理的节点,包括Cast操作,Projection操作,四则运算,逻辑操作符运算等。

具体可以参考org.apache.spark.sql.expressionspackage下的类。


Rules体系


凡是需要处理执行计划树(Analyze过程,Optimize过程,SparkStrategy过程),实施规则匹配和节点处理的,都需要继承RuleExecutor[TreeType]抽象类。

RuleExecutor内部提供了一个Seq[Batch],里面定义的是该RuleExecutor的处理步骤。每个Batch代表着一套规则,配备一个策略,该策略说明了迭代次数(一次还是多次)。

protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

Rule[TreeType <: TreeNode[_]]是一个抽象类,子类需要复写apply(plan: TreeType)方法来制定处理逻辑。

RuleExecutor的apply(plan: TreeType): TreeType方法会按照batches顺序和batch内的Rules顺序,对传入的plan里的节点迭代处理,处理逻辑为由具体Rule子类实现。


Hive相关


Hive支持方式


Spark SQL对hive的支持是单独的spark-hive项目,对Hive的支持包括HQL查询、hive metaStore信息、hive SerDes、hive UDFs/UDAFs/ UDTFs,类似Shark。

只有在HiveContext下通过hive api获得的数据集,才可以使用hql进行查询,其hql的解析依赖的是org.apache.hadoop.hive.ql.parse.ParseDriver类的parse方法,生成Hive AST。

 

实际上sql和hql,并不是一起支持的。可以理解为hql是独立支持的,能被hql查询的数据集必须读取自hive api。下图中的parquet、json等其他文件支持只发生在sql环境下(SQLContext)。



Hive on Spark


Hive官方提出了Hive onSpark的JIRA。Shark结束之后,拆分为两个方向:



从这里看,对Hive的兼容支持将转移到Hive on Spark上,之前Shark的经验将在Hive社区的这个支持上体现。我理解,目前SparkSQL里的那种Hive支持方式,只是为了在Spark环境下集成操纵Hive数据,它的hql执行是调用Hive客户端Driver,跑在hadoop MR上的,本身不是Hive on Spark的实现,只是为了使用RDD间接操作Hive数据集。

 

所以如果想要把现有Hive任务迁移到Spark上,应该使用Shark或者等待Hive on Spark。

Spark SQL里的Hive支持不是hive on spark的实现,而更像一个读写Hive数据的客户端。且其hql支持只包含hive数据,与sql环境是互相独立的。

 

以上两节是Spark SQL Hive、Shark、Hive on Spark的区别和理解。


SQL Core


Spark SQL的核心是把已有的RDD,带上Schema信息,然后注册成类似sql里的”Table”,对其进行sql查询。这里面主要分两部分,一是生成SchemaRD,二是执行查询。


生成SchemaRDD


如果是spark-hive项目,那么读取metadata信息作为Schema、读取hdfs上数据的过程交给Hive完成,然后根据这俩部分生成SchemaRDD,在HiveContext下进行hql()查询。

 

对于Spark SQL来说,

数据方面,RDD可以来自任何已有的RDD,也可以来自支持的第三方格式,如json file、parquet file。

SQLContext下会把带case class的RDD隐式转化为SchemaRDD

implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
new SchemaRDD(this,
SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))

ExsitingRdd单例里会反射出case class的attributes,并把RDD的数据转化成Catalyst的GenericRow,最后返回RDD[Row],即一个SchemaRDD。这里的具体转化逻辑可以参考ExsitingRdd的productToRowRdd和convertToCatalyst方法。

之后可以进行SchemaRDD提供的注册table操作、针对Schema复写的部分RDD转化操作、DSL操作、saveAs操作等等。

 

Row和GenericRow是Catalyst里的行表示模型

Row用Seq[Any]来表示values,GenericRow是Row的子类,用数组表示values。Row支持数据类型包括Int, Long, Double, Float, Boolean, Short, Byte, String。支持按序数(ordinal)读取某一个列的值。读取前需要做isNullAt(i: Int)的判断。

各自都有Mutable类,提供setXXX(i: int, value: Any)修改某序数上的值。


层次结构




下图大致对比了Pig,Spark SQL,Shark在实现层次上的区别,仅做参考。





查询流程


SQLContext里对sql的一个解析和执行流程:

1.  第一步parseSql(sql: String),simple sql parser做词法语法解析,生成LogicalPlan。

 

2.  第二步analyzer(logicalPlan),把做完词法语法解析的执行计划进行初步分析和映射,

目前SQLContext内的Analyzer由Catalyst提供,定义如下:

new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)

catalog为SimpleCatalog,catalog是用来注册table和查询relation的。

而这里的FunctionRegistry不支持lookupFunction方法,所以该analyzer不支持Function注册,即UDF。

Analyzer内定义了几批规则:

  val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      typeCoercionRules :_*),
    Batch("Check Analysis", Once,
      CheckResolution),
    Batch("AnalysisOperators", fixedPoint,
      EliminateAnalysisOperators)
  )

3.  从第二步得到的是初步的logicalPlan,接下来第三步是optimizer(plan)。

Optimizer里面也是定义了几批规则,会按序对执行计划进行优化操作。

  val batches =
    Batch("Combine Limits", FixedPoint(100),
      CombineLimits) ::
    Batch("ConstantFolding", FixedPoint(100),
      NullPropagation,
      ConstantFolding,
      LikeSimplification,
      BooleanSimplification,
      SimplifyFilters,
      SimplifyCasts,
      SimplifyCaseConversionExpressions) ::
    Batch("Filter Pushdown", FixedPoint(100),
      CombineFilters,
      PushPredicateThroughProject,
      PushPredicateThroughJoin,
      ColumnPruning) :: Nil

4.  优化后的执行计划,还要丢给SparkPlanner处理,里面定义了一些策略,目的是根据逻辑执行计划树生成最后可以执行的物理执行计划树,即得到SparkPlan。

    val strategies: Seq[Strategy] =
      CommandStrategy(self) ::
      TakeOrdered ::
      PartialAggregation ::
      LeftSemiJoin ::
      HashJoin ::
      InMemoryScans ::
      ParquetOperations ::
      BasicOperators ::
      CartesianProduct ::
      BroadcastNestedLoopJoin :: Nil

5.  在最终真正执行物理执行计划前,最后还要进行两次规则,SQLContext里定义这个过程叫prepareForExecution,这个步骤是额外增加的,直接new RuleExecutor[SparkPlan]进行的。

    val batches =
      Batch("Add exchange", Once, AddExchange(self)) ::
      Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil

6.  最后调用SparkPlan的execute()执行计算。这个execute()在每种SparkPlan的实现里定义,一般都会递归调用children的execute()方法,所以会触发整棵Tree的计算。


其他特性


内存列存储

SQLContext下cache/uncache table的时候会调用列存储模块。

该模块借鉴自Shark,目的是当把表数据cache在内存的时候做行转列操作,以便压缩。

 

实现类

InMemoryColumnarTableScan类是SparkPlan LeafNode的实现,即是一个物理执行计划。传入一个SparkPlan(确认了的物理执行计)和一个属性序列,内部包含一个行转列、触发计算并cache的过程(且是lazy的)。

 

ColumnBuilder针对不同的数据类型(boolean, byte, double, float, int, long, short, string)由不同的子类把数据写到ByteBuffer里,即包装Row的每个field,生成Columns。与其对应的ColumnAccessor是访问column,将其转回Row。

 

CompressibleColumnBuilder和CompressibleColumnAccessor是带压缩的行列转换builder,其ByteBuffer内部存储结构如下

 *    .--------------------------- Column type ID (4 bytes)
 *    |   .----------------------- Null count N (4 bytes)
 *    |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
 *    |   |   |     .------------- Compression scheme ID (4 bytes)
 *    |   |   |     |   .--------- Compressed non-null elements
 *    V   V   V     V   V
 *   +---+---+-----+---+---------+
 *   |   |   | ... |   | ... ... |
 *  +---+---+-----+---+---------+
 *  \-----------/ \-----------/
 *       header         body

CompressionScheme子类是不同的压缩实现


都是scala实现的,未借助第三方库。不同的实现,指定了支持的column data类型。在build()的时候,会比较每种压缩,选择压缩率最小的(若仍大于0.8就不压缩了)。

这里的估算逻辑,来自子类实现的gatherCompressibilityStats方法。


Cache逻辑

cache之前,需要先把本次cache的table的物理执行计划生成出来。

在cache这个过程里,InMemoryColumnarTableScan并没有触发执行,但是生成了以InMemoryColumnarTableScan为物理执行计划的SparkLogicalPlan,并存成table的plan。

其实在cache的时候,首先去catalog里寻找这个table的信息和table的执行计划,然后会进行执行(执行到物理执行计划生成),然后把这个table再放回catalog里维护起来,这个时候的执行计划已经是最终要执行的物理执行计划了。但是此时Columner模块相关的转换等操作都是没有触发的。

真正的触发还是在execute()的时候,同其他SparkPlan的execute()方法触发场景是一样的。


Uncache逻辑

UncacheTable的时候,除了删除catalog里的table信息之外,还调用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,并进行了unpersist()操作。cacheColumnBuffers主要做了把RDD每个partition里的ROW的每个Field存到了ColumnBuilder内。


UDF(暂不支持)

如前面对SQLContext里Analyzer的分析,其FunctionRegistry没有实现lookupFunction。

在spark-hive项目里,HiveContext里是实现了FunctionRegistry这个trait的,其实现为HiveFunctionRegistry,实现逻辑见org.apache.spark.sql.hive.hiveUdfs


Parquet支持

待整理

http://parquet.io/

 

Specific Docs and Codes:

https://github.com/apache/incubator-parquet-format

https://github.com/apache/incubator-parquet-mr

http://www.slideshare.net/julienledem/parquet-hadoop-summit-2013


JSON支持

SQLContext下,增加了jsonFile的读取方法,而且目前看,代码里实现的是hadoop textfile的读取,也就是这份json文件应该是在HDFS上的。具体这份json文件的载入,InputFormat是TextInputFormat,key class是LongWritable,value class是Text,最后得到的是value部分的那段String内容,即RDD[String]。

 

除了jsonFile,还支持jsonRDD,例子:

http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

 

读取json文件之后,转换成SchemaRDD。JsonRDD.inferSchema(RDD[String])里有详细的解析json和映射出schema的过程,最后得到该json的LogicalPlan。

 

Json的解析使用的是FasterXML/jackson-databind库,GitHub地址,wiki

把数据映射成Map[String, Any]

 

Json的支持丰富了Spark SQL数据接入场景。


JDBC支持

Jdbc support branchis under going


SQL92

Spark SQL目前的SQL语法支持情况见SqlParser类。目标是支持SQL92??

 

1. 基本应用上,sql server 和oracle都遵循sql 92语法标准。

2. 实际应用中大家都会超出以上标准,使用各家数据库厂商都提供的丰富的自定义标准函数库和语法。

3. 微软sql server的sql 扩展叫T-SQL(Transcate SQL).

4. Oracle 的sql 扩展叫PL-SQL.


存在问题


大家可以跟进社区邮件列表,后续待整理

http://apache-spark-developers-list.1001551.n3.nabble.com/sparkSQL-thread-safe-td7263.html 

http://apache-spark-user-list.1001560.n3.nabble.com/Supported-SQL-syntax-in-Spark-SQL-td9538.html


总结


以上整理了对Spark SQL各个模块的实现情况,代码结构,执行流程以及自己对Spark SQL的理解。

理解有偏差的地方欢迎交流讨论 :)


全文完 :)