首页 > 代码库 > 说说阿里增量计算框架Galaxy :增量计算模型 (二)

说说阿里增量计算框架Galaxy :增量计算模型 (二)

背景

在前一篇文章中,介绍到了Galaxy的增量计算性质,其state是框架内部管理的,以及与Storm的简单对比。这篇文章将讲述更多Galaxy增量模型的事情,并介绍这套增量模型之上实现的Galaxy SQL和Galaxy Operator,同时会从增量角度对比Spark Streaming。


Galaxy MRM增量与Spark Streaming

MRM模型全称为MapReduceMerge,比MapReduce做了一个Merge操作。merge阶段可与state交互,读写某个key的oldValue,并且这个merge接口还具备rollback语义。在流计算场景下,数据按时间或条数切成不同的批,批内可以做普遍意义下的MapReduce操作,批之间需要merge阶段做跨批聚合的计算。大家可以对比Spark Streaming的UpdateStateByKey操作,在一个DStream内,各个时间段内的RDD(即各批)可以通过这个接口更新一次任务内的state。而galaxy的merge本质上是一次add的过程,对应的rollback是一次delete的过程,从数据库的语义看,两个过程合起来相当于是update操作,而这俩过程都是根据一个primary key来做的,所以这件事情与spark streaming的updateStateByKey做的事情是一样的,但是细看的话,两者还是存在很大的差异。

技术分享


galaxy的state暴露给计算task是线程级别独享的,spark streaming的state是任务内全局共享的。线程级别独享的优点,就在于同一批数据,按key shuffle之后来到不同的merge计算节点,各自不会阻塞各自的计算过程,而spark streaming的updateStateByKey操作会阻塞其他rdd的计算,虽然spark streaming能做到DStream内各个RDD并发执行,但是只要有state操作,最终还是落到了时间序列上的阻塞。本时间点StateRDD的计算需要依赖前一时间点父StateRDD的计算结果,而批内各个key对state操作是互相阻塞和影响的,所以着眼在这层barrier上的话,galaxy的merge过程更加精细,add和delete过程是分开的,批内的key是落到不同线程上计算而state是线程内独享的。


Galaxy有三种Model,分别是MapOnlyModel,MapReduceModel,MapReduceMergeModel。即,你可以使用M Model和MR Model做普通的流计算或小批计算,当需要跨批操作的时候就使用MRM Model。Model之间是随意组合串联的,接口相比MapReduce其实是相当灵活甚至过于灵活的,灵活的弊端是计算模型上带来复杂性。


Galaxy SQL

Galaxy SQL是一种StreamSQL,而且是目前业界没有的。从语法上Galaxy SQL贴近HiveSQL,但又有些流计算语义上(无限数据流)不能支持的语法,比如limit, order by。

Intel那边搞了一个Spark Streaming + Spark SQL的结合,叫StreamSQL。利用Spark SQL里的SchemaRDD,为Spark Streaming流进来的RDD带上了Schema元信息。借助Spark Streaming支持的操作,这种StreamSQL可以做滑窗效果的sql计算。但是真正跨批的增量语义(不仅仅是固定的window跨批计算),是支持不了的。Galaxy SQL可以做真正的增量流式SQL。


举个最简单的例子,

insert into t2 
  select t1.a as k, count(t1.b) as cnt from t1 group by t1.a;

select count(cnt) from t2 group by t2.cnt;


第一句sql中,根据t1的a字段分组,求了个count值。第二句sql中,t2表分组的字段变为t1表里count出来的cnt值。大家可以想象,在流计算场景里,第一次a求count出来的值可能是100,下一个时间点,同一个a的key,count出来的值就是200了,这时候,100这个cnt已经丢到t2表里计算出结果了,现在100已经更新到200了,200这个新的值的计算是简单的,但问题是如何把t2里之前100的计算结果撤销呢?


可以仔细想想,StreamSQL是做不了这样的sql的,本质上是因为spark streaming不支持这样的操作。Galaxy计算框架的merge阶段可以做rollback操作,回滚之前"错误"的状态,使得Galaxy SQL可以做分布式流式SQL。


Galaxy Operator

Galaxy Operator是Galaxy MRM编程接口之上的一层DAG封装,兼具易用性和表达能力。

算子层最终将映射成多个Galaxy的MRM Model,使用户可以更加关注计算逻辑,屏蔽较复杂的MRM Model,特别是merge阶段。

技术分享

算子层相当于是物理执行计划,本身可以做节点合并、谓词下推等优化的工作,即物理执行计划的优化。从本质上,我认为类似Hive、Spark Catalyst里对执行计划的优化工作,在算子层这个DAG里都是可以做的。通过算子这一层,理论上任何DSL都是可以映射之后在Galaxy计算框架上运行的。


算子层提供五类正交的基础算子:map, reduce,merge,shuffle,union。五类基础算子可以互相组合,衍生成更高级的算子。

技术分享

需要注意的是,reduce类的算子 ,针对的是本批内数据的聚合。增量语义下的reduce与批量语义下MapReduce中的reduce并不一样,增量语义下的reduce针对的是本批,MapReduce中的reduce对应跨批的数据,更加类似增量语义下的merge。merge类的算子 ,针对的是跨批的聚合操作。merge()对应的是MRM模型里的Merge phase,可与OldValue交互,是增量场景中的特性操作。通常用于实现count、sum等UDAF操作,也可以实现top、distinct、类join的操作。


union类的算子 ,针对的是多流合并的场景。union()操作是将多条流合并成一条流输出,要求各流的columns对齐且一致。mix()操作也是多流合并成一条,但内部标明了数据来自左流还是右流,各流的column可以不一致,后续可以衔接集合性的批内或跨批操作。mix()是专门为集合性操作而设计的接口。


功能上,算子层可以类比Spark RDD。Spark RDD 核心价值 有二:其一,在api层面,规避MapReduce模型的抽象和不舒适的原生接口,提供多种transformations和actions,方便开发者理解和使用,即easy to use;其二,在计算层面,通过持久化RDD做到了批量计算过程中对中间数据的复用,使Spark诞生之初以适合迭代型计算的内存计算框架闻名,即reuse data。反观Galaxy算子层,一方面,算子层与Spark RDD一样,在api设计上具备FlumeJava的设计理念,兼具易用性和表达能力;另一方面,Galaxy之增量计算模型是 "有状态的计算" ,天然做到了实时数据各批之间"状态"的reuse(在merge phase)。


后续

之后有时间,希望可以介绍下Galaxy的任务模型、对于state的管理和容错等方面的内容。


说说阿里增量计算框架Galaxy :增量计算模型 (二)