首页 > 代码库 > 如何快速做一个山寨的实时“大数据”处理

如何快速做一个山寨的实时“大数据”处理

 
 

前言

为啥写这篇文章?因为我现在做的这套实时计算系统在公司里很难玩下去了。去年年初来到ctrip,主要就是做两个实时应用,一个是实时报警,功能是做出来了,但应用效果不好;一个是XXX(敏感应用,不敢写出来,以XXX代替),也是实现了功能需求,但想继续按自己的思路往下走是不可能了,我捉急的表达能力很难让上头去理解实时计算和传统的request-response方式的应用不同点在哪里,为啥要这么干,也很难明白上头喜欢的系统是什么样的,真是挠破头。我的方式看来是做不下去了,但总觉得还是有点价值,所以写下来,看看有没有也想山寨的同学,可以作为参考,下面一段是扯淡,想看实际内容的请跳到系统结构。

至于为什么起这个标题:

  • 如何:
    会介绍我目前的系统是怎么做到现在这个程度的;以及本来想尝试的一些开发方向。
  • 快速:
    • 这套系统基本上是我去年一个人搭起来的,最初完全不知道如何去做实时数据处理,所以架构翻了好几回(感谢ctrip给的机会),因此到最近的一个整个版本的话,核心代码也就小几千行(这个要感谢开源社区),找一个熟练的java工程师来做的话,最多在几个星期内就能搭起来,应该说建设的成本不会太大。
    • 因为起手就有两个项目,所以一开始就瞄着通用的实时计算框架/平台,过去一年还零散开发了两三个额外的应用,这几个应用基本上都是共享一套代码,差别只存在于配置。因此当底层设施建好后,可以很快的开发一个应用,我们曾经有个应用大概花了半天,如果能做的完善的话,新应用的开发和部署可以做到分钟级,因为现实场景中大部分应用的数据获取和计算本质上差别很小,非常相似。
  • 山寨:
    介绍的系统和那些大公司的系统肯定不在一个层次上,高手们可以绕道了。本文适合于像我一样的三流程序员,如果不想花费太大的代价,而关注点在于如何应用现有的开源产品(我们是storm+esper)来做可以cover一部分情况的实时数据处理,可以参考下本文。
  • 实时
    上面说了开发和部署的目标是分钟级。而整个系统的响应速度和新规则的上线速度是在秒级,但实在是没法做到毫秒级,毕竟是山寨的。用简单的方式做到实时计算,牺牲了一定的可靠性,如果真有需求的话,需要另外的手段去弥补。
    另外最近被人用实时操作系统的定义challenge了对实时系统的理解。我还是想表达一个观点,互联网里的实时系统应该还具备一个特征,即是关注latest的数据,实时系统不能完全等于响应快,还对应于数据新。如果是历史时刻的某一数据特征,那是offline的系统去考虑的,混到实时系统中只会对系统设计拖后腿

  • 大是相对的,跟巨头们的数据量没法比,只是ctrip这样的流量还是没有压力的。老实点,放在引号里。
  • 数据
    业内大数据讲的太多,太空,连“数据”是什么都没有明确的定义。我自己心中的数据要有如下的特征(个人观点,所以在引号里):
    • 可描述。各个公司应该都差不多,基本数据来源于各种各样的log,但最好还是能用统一且简单的方式去描述这种数据,而不是光秃秃的没有任何schema的text,不然在其上就比较难做文章(如果只是简单使用的话),后面几点特征也很难保证。当然这种格式会比较简单,以防止带来太多限制
    • 可计算。提供一些通用的操作可以对数据进行变换和处理,由数据产生数据,并能支持递归的迭代。这样才能在raw data上得到一些更有意义的数据,从而去做更有价值的事情。
    • 多条记录Over单条记录。在计算中,对于某一个实体(例如一个ip或是一个uid)而言,其单条记录往往用处不大,更应该关注其一组记录的特征值。所以需要大量的聚合、统计操作。目前我们做的这套系统重点就在这,本文大部分内容会与此相关。
    • 群体记录Over单个实体的记录。在计算中,单个实体也没有太大的价值,更重要的是看整个群体的特征,用群体的特征再去比对个体。找到人民大众的特征,再去找里面的遵纪守法者(正常值,可用在推荐)和离经叛道者(异常点,多用于报警)。我一直想做到这个,也有大概的思路,但没法继续尝试了,后文会简要介绍一些。
    • 可度量和监控。要对数据有统一的度量和监控,从而对数据有个掌控,也方便找出异常点
    • 数据Over规则。估计大多数公司的应用和ctrip的一样,业务方只会提出一系列规则和几个原始的数据源,期望一个万能的规则引擎去把所有数据采进来并按照规则的逻辑去实现。大部分业务方会是天然以规则为中心的,根据具体的case,人肉找出一定的规律,然后再看过程中用到了什么数据源,从而给出需求给pd。然而在大数据量的条件下,这是不成立的,一是在大量数据下做复杂的逻辑计算,从开发和运行的效率都很难保证高效;二是大数据量下,人肉很难去发现规则了,而且老的规则也很难维护,因为规则这东西难以度量和演化,也难做自动化生成。所以必须转向数据为中心,抽出不同的特征维度,然后才可能用高大上的机器学习等方法去自动化生成规则或模型,这个才是王道;现在很成熟的hadoop,storm,也都是以数据为中心,尽量简化计算模式。
    • 数据特征Over业务特征。数据处理系统只应该跟数据相关,考虑的是数据类型(double,string等少量类型)和数据特征(单记录大小、吞吐量、流速等),这样才能让系统的适用性最大化。虽然主流的说法是设计要按照业务来,但个人还是比较顽固,认为其本质是业务里的数据特征,业务本身太繁琐,对底层系统也没多大意义,应该把业务底下的数据特征提取抽象出来。目前我所做的系统只适合于较短时间内的数据的大量的、实时的计算,不考虑大时间跨度的数据,也暂时忍受短暂的失效。现在的几个应用基本拥有这个特征,所以可以号称开发应用只要配置就好了,虽然大家就是不相信。
  • 不是数据大
    现在很多同学认为跟H*的系统挂上钩,或是用个nosql,就吹是大数据了。其本质还是以前基于数据库的业务驱动的应用换了个dao而已,顶多称为数据大。实际上“数据”和“大”都值得商榷。虽然是山寨,也要跟他们坚决划清界限。
  • 处理
    火热的大数据包罗万象,这里只涉及到实时环境下的数据处理,不怎么涉及到存储、展现等其他方面

后续系统设计都是以这些为出发点的

背景知识

storm

storm是目前比较火热的实时处理系统,虽然不能和H系的比,但资料也还是不少,我这就默认大家已经知道storm的概况了,具体的资料就不举了!

国内而言,阿里系对storm的应用比较多,网上有很多的文章;在ctrip,也有另外一个team在用storm做前端的用户行为分析,感觉是蛮成功的,应该算公司里拿得出手的项目之一了。还有很多公司也是用storm在做一些实时的业务开发。

storm本身只是提供了一个实时处理的框架,帮我们解决了大部分的可靠性,数据流向组织,并发性等基本问题,但是实际的数据处理还是要根据业务需求去开发代码适配。因此它只是解决了实时计算的组织和管理,而对计算本身是没有支持的,直接用是达不到我想要的“不写代码只配置”的效果,所以我把重心放在esper上,storm只作为外部的容器,帮我做数据的简单处理和sharding,节点的自动分配和重启,数据源的组织和数据结果的分配等等外围的功能,计算就交给esper了。

esper

esper 绝壁是个好东西,功能强大,但门槛太高。资料的话官网上看下例子,也有一篇很详尽的文档,这是html版,有兴趣的同学可以上官网下pdf版。
简单的介绍的话,esper是一个用于针对数据流进行流式处理的lib库(java和.net都有),他跟传统的数据库不同之处在于:数据库是数据先写进来,再定义一个sql,然后去拉取数据并计算一次得到结果;esper是定义一个计算规则,并作为一个计算节点,然后再把数据不停的推给他,由计算节点不停的做增量计算,并出来一系列的结果。
Alt text
上图是传统的数据库(也有用nosql的)的方式,目前很多项目应该都是采取这种设计,比如我要做一个5分钟的数据统计,就需要不停的跑sql去拉数据做统计和计算(当然可以提前做一些预聚合,数据库也应该有触发器之类的功能,不过只能作为优化,也很难去掌控)。这种pull的方式容易理解,也有已经成熟到烂街的数据库技术支持,pull的时候靠分库分表来sharding,只是要自己写点聚合的代码,对单个查询来说很快,因为只要一次数据库查询,一跳就完了,数据的查询和存储都由数据库来保证高效。但对于数据量大,然后又要实时更新的场景来说,这种低效的方式是走不通的,图上就可以看到因为数据和计算节点的分离,势必造成冗余的数据被多次拉取(或者要写复杂的代码去优化),而且实时度靠轮询来达到。这种设计要么只能适合于数据量和计算量比较小的场景,要么只能适合于人傻钱多的场景。但它好理解。

也有说拿redis做counter的,但这种方式还是解决不了数据和计算分离的问题;在者,对于功能稍复杂的计算就力不从心了,而且redis的逻辑抽象程度远不如sql,开发工作比sql都要大很多。具体的不展开了
Alt text
此图中下方每个方框代表一个用esper实现的实时计算引擎,配置好运算规则后,我们主动去把数据喂过去,引擎不停的做增量计算,每次有新结果就通过回调通知我们。这个图是公司内部写ppt时临时画的,不大恰当。因为esper是个lib,单实例的可靠性和性能没法scale,所以我们是架在storm上,由storm去自动部署和分配多个esper进程,并在前端做sharding来达到高扩展性。

select avg(price) from StockTickEvent.win:time(30 sec) 

select a.id, count(*) from pattern [
        every a=Status -> (timer:interval(10 sec) and not Status(id=a.id)
] group by id

esper声称自己是SEP(stream event processing)和cep(complex event processing)。上面从官网抄了两个比较有代表性的例子来分别说明。

  • 第一个很直观,是从StockTickEvent中计算最近30秒的平均price,当你配置好规则和StockTickEvent事件的schema,并把数据组装成具体的StockTickEvent事件源源不断push给引擎后,引擎会根据新数据的到来和时间的流逝(意味着老数据的expire)不断做计算,每当值发生变化了就会通过回调来通知调用方。从这个例子可以看出,esper采用了类似sql的写法,其称之为epl,基本包含了sql的大部分用法,还算比较亲切,只是用的时候思维要转换一下,这是一个持续的计算过程,而不是sql那种一锤子买卖
  • 第二个例子稍微复杂些,展示了esper描述事件之间关系的能力,这个例子是说当某个Status事件发生10秒之内,没有相同id的另一个Status事件发生,即通知调用方。

这两个例子还只是揭露了esper能力和复杂度冰山一角。对于流式的数据处理和事件间pattern的描述,它提供了很多的底层支持和选项。基本我们的需求都能得到满足,一种功能还能用好多种写法来实现,越用就越是觉得它强。
然而,强大的东西不容易掌控。我是在前公司paypal时听说这个开源软件的,当时老板的另外一个team花了很大的力气和资源在上面,希望做实时大数据,我离开不久听说这个team就散了,大概因为没出好的成果,员工图谱上,我和cto之间那条线上的老板们都陆续黯然离开了;而我自己,是在来到ctrip才接触,因为大概知道前同事的不顺利,所以是带着敬畏心在做,小心翼翼,尽量让它更易用,结果虽然功能是出来了,奖也拿了,还是被骂,结局也离屎差不多了。所以想用它的要当心啊,不祥之物啊:)
不过失败不能白失败,经验教训要总结下的,希望能成为它人的成功之母:

  • 要在esper上封装一层给使用。esper过于复杂,包罗万象,是个大杂烩,学习曲线开始时比较陡峭;而且从上面两张图比较,它的思维是跟传统的数据库正好相反,所以要想说服用户能理解并直接去用就很困难。前同事推销给paypal的analyst,结果人家不感冒,我在ctrip时也向用户推销过,希望用户能直接配epl规则来完成功能,结局是一年多下来还是只有我一个人能写一点。所以我花了很大力气去尝试在这上层去封装,希望能封装出更简单,更好用的一层。现在我大概有四个应用,已经近千条epl,每个都比上面的例子要长好多,如果直接在epl上操作是不可想象的事情。
  • 既然要做封装,不是做所有功能的封装。刚开始用esper,会觉得很好玩,比如我一个报警的cooldown功能就有好多种写法。后来逐渐简单化,尝试只用sep那套sql like的语法(不包含pattern那种扩展),一是已经满足目前遇到所需功能;二是容易驾驭,在上面做封装;三是正如我之前所说,想做数据为中心而不是规则为中心,所以只用了它简单而有效的聚合功能,摈弃了复杂和强大的CEP功能。
  • 一定要做好监控。前公司用的商业版,带一个称为“dashboard”的功能,听说没什么用;我自己用的开源版,没有这方面的支持,所以很受困扰。因为当那么大的数据量部署上去然后源源不断的跑时,你只能看到你的输入(源数据)和输出(esper最后通过回调传出来的数据),里面一大坨计算过程是黑盒的,完全搞不清状况,调试什么的基本不可能。还是需要想办法能让其输出一些中间信息,做到一定的监控,方便使用。最近用户提了点监控和报警的需求,已经想到了简单的solution,争取能在挂掉前实现上。
  • 界面是很重要的。之前都是一个人瞎玩,重点放在后端,一方面是前端基本没经验,另一方面是后端还在摸石头,所以忽视了前段展示。结局是我以为实现了功能就完了,所以只做到了restful api一层。但没有界面就向人展示理念,不讨喜,虽然自己觉得还是有独到之处的,结果还是留下了很烂的印象。这是一个很好的教训。
  • 说服别人采用异步批处理的思维。无论是storm还是esper,都不是request-response的同步调用方式。storm的调用方一般通过queue去push数据,计算节点是自己不断运行的,由源源不断到来的数据trigger,多个物理上可能分离的节点处理后,产生最终结果,最后一个节点可能执行些落地的操作,但不会往回传,虽然有drpc,但也只是模拟了一个rpc的表壳,内部还是通过不同的queue去连接各个计算节点的,而且是分布式系统:queue+分布式节点+分布式数据来源的归并同步,单个数据的处理很难做到毫秒级,但它本质是一种批处理的方式,是跑量的,可以做到海量数据的秒级处理;esper也是,它的操纵对象是数据流,内部是事件驱动,所以也是一种异步批处理的方式,如果用同步调的话很难达到如此高效。其实从硬件到软件(网络),再到现有的大数据处理系统,基本都是异步批处理的思路。所以当最外层使用的时候,最好是去适应这种新环境下的新特征,而不是说老的数据库能做到100ms,你这咋不行,太差了。这个思维方式不一样,不转换很难理解。

dashboard

我同事在opentsdb(后端是hbase)的思路上开发了一个叫dashboard的系统,可以对海量metrics进行实时的存储和查询。,同事对整个前后端都进行了重写和改进,细节上有蛮多独到之处,我所有程序的监控,包括storm的监控和想要做的计算平台的监控,都是基于此。这应该是ctrip里非常好的一个系统了。

系统结构

下面分别从逻辑和物理上描述整个系统的结构。

逻辑结构

目前的系统设计,对一个应用,包含了四大部分
Alt text

  1. data source system. Data sources负责管理系统的输入数据,所有输入源通过配置的形式给出,系统尽可能自动化的提取数据,并转化为内部的数据流
  2. variable engine. variable部分负责对原始数据流进行实时计算,以封装的esper引擎对原始数据流进行拆分/合并/过滤/聚合操作,加工后得到新的,更有价值的数据流。这一部分focus在计算,具体详见后文。
  3. rule engine. 针对variable部分处理后的数据流,我们需要过滤出符合用户需求的部分,需要进行阈值比较,数据cooldown等工作,最后产生可直接供用户使用的数据(称为Alert)。
  4. dispatch engine
    对每一个rule的Alert,可能有不同的action(邮件、DB、调用特定url、mq),这一部分管理如何输出数据。

所以整套系统是一个典型的输入(part 1) --> 处理(part 2&3) --> 输出(part 4)的结构,每个应用只需给出四大部分的配置,就可以得到一个实时事件处理应用。
这里需要补充说明的是:

  1. data source、variable、rule本质上都是数据流,每个元素的config信息会描述它是什么(shema,有哪些字段,字段是什么类型,仅限于string,double,long,object等简单类型),它从何来(它的source是什么,datasource来自于外部,variable来至于datasource或本身,rule来自于variable),它如何得到(对source的计算方式)
  2. data source这块应该仅局限于外部数据的集成和简单处理。这点之前没想清楚,也把一些计算功能混进去了,结果挖了个坑。
  3. rule单列开来,是因为给外部使用时需要一些额外的配置信息。我们的系统中基本往外的数据都是以alert/alarm为形式的,为了方便辨识,增加了一些名称、标示符等属性,方便数据的外部集成。
  4. variable目前在公司内部称为counter,但个人还是倾向于叫variable,主要是因为:
    • 大多数analyst的的规则和模型都是建立在变量上。我设计的初衷是为了能摆脱肉眼对原始数据进行判断的规则分析模式,希望能从多个数据源中抽象出供analyst去做挖掘分析的可精确描述、独立性较强的变量。当然这种变量在实时系统中是一个随时间变化的数据流(永远是latest值,不停变化)。虽然失败了,但理由还在。
    • counter这个名字只反映了聚合这一种计算类型,当时妥协改了这个名字,后面后悔了,因为要描述这个系统更难了
    • 对于整个计算过程的配置来说,都是OPERATION(input1, input2)--> output1的最简单的范式,input和output都是系统里的数据流,用variable的叫法更贴切。

Data Source System

对于data source来说,框架部分开发出不同类型数据源的数据抽取驱动,可以对以下数据类型数据源进行数据拉取:

  1. db(mysql/sqlserver)
  2. hbase
  3. dashboard api拉取
  4. dashboard数据直连
  5. mq
  6. url拉取
  7. other

对于每一种数据源,大致只需要定义元数据信息,就可以完成外部的数据拉取并到系统内部数据(一般称为event,包括name、key、timestamp、value四大固有属性和其他属性)格式的转换:

  1. 连接信息(dashboard url,db connection & sql, mq connection & queue name)
  2. event转换信息。对每一种通用的数据源,可以配置一些参数来自动完成源数据-->event的转换。

目前大部分数据源都是根据应用写死,但长期希望抽象出特征来,可以通过配置自动完成,只剩下少数特别的通过开发完成。
这一份的结果是我们可以从每个datasource源源不断的得到数据,因此就是一条条数据流,经过alignment后(时间对齐),要求所有事件都以相同的时钟下汇聚成一个逻辑上的总的数据流,这是系统最大的limitation)

Variable Engine

对于进入系统的数据流(stream),我们可以对此进行一些操作(包括但不限于split/join/aggreation/filtering),实时形成新的数据流,得到一系列variable(可以对应为BI中的维度,风控模型中的variable),如下图所示

Alt text

进入这部分系统时有两个数据流DS1, DS2;经过处理后,得到6个Variable(每个Variable主要包括name,key,timestamp,value几个固有属性和其他用户定义的属性),每个variable其实也是随时间变化的数据流,通过加工后的数据会更有利于作进一步的决策;最后在某一个时间点,对所有variable进行切片,可以得到一系列的latest值,这样就可以做为决策规则(模型)的输入依据,这一部分由rule management部分完成。

实际上,从数学的角度看,这部分工作希望以variable(数据流/维度,counter只是其中一种)这种数据流作为基本单元,完成一个带少量操作符的代数系统,从而整个计算过程可以由这样一些基本的操作符去搭建一个DAG,而不是从头到尾全部由程序员编码实现功能。

操作部分基本由esper完成,通过封装,将esper实现相关的部分封装掉,只提供逻辑上的运算符给用户/admin,减轻使用负担。目前只提供少量基本的操作类型:
a. (已实现)单线聚合/过滤。提供基于单个数据流(variable或data source)的按时间聚合、按条件过滤。通过此操作可以实现split/aggregate/filter等逻辑操作
b. (已实现)双线merge。对两条数据流(variable)进行合并操作,实现简单的join(根据key和timestamp严格匹配)。
c. (未实现)多线merge。将多条数据流(variable)根据key去所有的latest值,进行合并计算。
以上是系统中现在和将要实现的操作符,目前看效果不是很好,比如操作符a带的功能太多,希望一个操作符就能解决多个问题,对用户并不贴切,应该拆分为aggregate,filter(split用多个filter来实现)。

对于每个应用来说,直接拿底层框架的操作符进行配置可以基本满足需求。在资源充足的情况下,每个应用可以在框架的variable体系开发一套更高一级抽象层的操作符,来方便应用的使用。参见实时报警的实现

同时,variable部分会提供dump操作,定期的将variable值dump到hbase中,供后续查询。这个功能主要是为了事后分析和离线查询,目前还没有在生产启用。最近打算先简单写一路进dashboard,以利用其实时查询和展现的能力。至于是否以后要扩展,要看最后项目的发展了

Rule Engine

Rule这块会直接对上一部的variable进行操作,通过用户提供的阈值来得出有价值的信息(暂且称为alert),并且根据后续用户配置的action操作分发到外部处理的地方。
Alt text

目前,规则管理准备实现以下三种:

  1. (已实现)单variable固定阈值。阈值可以写死在规则里,这种方式简单,但不够灵活,适合那种比较稳定的规则
  2. (已实现)单variable浮动阈值。阈值通过api由用户管理,对每个variable[key],以分钟为精度,进行阈值报警。这种方式使用比较麻烦,单可以提供一定的灵活性。
  3. (未实现)多variable组合报警。多个variable通过给定的公式来报警。这个操作可以作为后续更复杂的规则引擎的基础。目前是实现了两个variable的组合报警,方便用户使用。对于BI提供的规则和模型来说,使用多variable的组合是天然的手段,目前还没达到这个阶段,所以也没加上。

这里需要注明的事,rule和variable有部分重合,rule的一些功能后续也能扩展到variable里实现,两者的区别在于:

  • variable是一种较为稳定的逻辑对象,对它有很好的管理:它可以作为计算单元在整个variable引擎中作为计算输入;通过variable dump功能可以有事后分析和查询的好处。因此,当需要对结果数据进行更细致的分析后后续处理时,可以建立一个variable
  • 当只需要对结果数据进行分发到后续的handler时,可以用rule,逻辑上更易懂一些。

Dispatch Engine

Dispatch引擎会将rule engine产生的alerts进行分发,供用户进行进一步访问。这一快将与data source一块形成相对应的关系,目标是通过配置将alerts推送到制定的地方:

  • db
  • mq
  • url gateway
  • hbase

App Management

App 管理作为后续计划(nice to have),应该是没可能实现了,本打算实现以下功能。

  1. 根据data source system, variable engine, rule engine, dispatch engine,再加上一些额外的配置,自动生成storm topology
  2. 提供app(其实就是topology)的界面化部署、启动、停止
  3. 整合各个模块的监控,并开发console模块
    写的简单,还是有蛮多细节要考虑的,有精力的可以去尝试

这一部分主要设计了逻辑上数据流的定义,以及其整个生命周期,下一段讲一下我们简单的物理结构是如何去分别实现各个功能的。

物理结构

之前已经提到过,我们是storm+esper的形式,esper负责内部绝大部分的计算,storm负责外围的组织。整个系统实现起来非常简单,如图
Alt text

外部数据过来有两种方式:

  1. 外部数据主动push。然而由于storm更适合用主动向外拉的方式,所以我们中间用了一个mq 中转,目前是一个activemq,打算往分布式queue迁移。
  2. 外部数据被pull。最初专门写了一个puller的程序,去拉取各种数据,再导入到storm中。后来发现多此一举了,storm本身就提供了高并发和故障恢复,于是逐渐把数据采集转移到storm上,组织好的话,代码仍然可以保持简洁性,并轻松获得高并发的特性。现在基本上只要不向外提供接口服务的程序都已经往storm集群上迁了

storm的多个spout/bolt节点获取外部数据后,成为统一的格式event(包含name,key,timestamp和其它特定属性),并sharding(需要key)到不同esper 节点,作进一步计算。

Alt text

每一个esper节点,都已经是运行在单个机器的单个进程上了,所以在这里将不同来源的数据流对齐(需要timestamp),并作一些优化处理(比如去除冗余的对象)。最后这些event就分配到esper引擎了。
variable计算引擎一方面将这些外部数据转化为简单变量(一种我自己约定格式的esper数据流),这些简单变量在一些操作符下可以生成其它变量,有最简单的操作符

  1. filtering. insert into outputVar select * from inputVar where $condition
  2. aggregation.insert into outputVar select count/sum/avg(*) as value,... from inputVar:time:win(5 min)
  3. split. 跟filter类似,用两个或多个filter来实现即可insert into outputVar1 select * from inputVar where $condition;insert into outputVar2 select * from inputVar where not $condition
  4. join(根据实际需要有多种join方式,这里列出一种)。insert into outputVar select inputVar1.*, inputVar2.* from inputVar1.latest(key), inputVar2.latest(key) where inputVar1.key = inputVar2.key,这个伪epl表示将两个原始数据流,以其key作为 单位,将最新值拼接起来,这里还可作一些运算。举个例子,有一个变量源数据来源于pc访问,另一个变量数据来源于移动app(可能用户再家里同时用手机和pc访问),按ip/uid去做join,就可以得到这个ip的完整视图

这些基本的操作符都非常简单,不会太多,开发起来也比较容易。只是要定义一下配置项,以及运算到底层esper语句的映射。我们目前新加一个基本variable操作的话,后端只用新增一个文件即可,但前端比较难做,还没有想到很好的解决方法。
当然,对于一些特殊的应用,简单的操作符可能抽象度较低,难以直接使用,可以在简单操作之上进一步封装,详细的参考case study里面实时报警的就可以了

变量出来之后,就可以通过rule来过滤出我们感兴趣的内容了:

  1. 单变量规则:insert into rule1 select * from inputVar where value > 5
  2. 多变量规则:insert into rule1 select ... from inputVar1.latest(key), inputVar2.latest(key) where inputVar1.key = inputVar2.key and (inputVar1.value 5 or inputVar2.value <10)

最后产生的结果,通过各种方式送到系统下游的各个handler。由于storm这种灵活的代码运行框架,这一点很容易做到,不详细叙述了。

case study

实时报警

ctrip内部用dashboard系统对各个应用的内部状态进行监控,包括物理的/逻辑的,利用hbase的特性,获得了实时聚合和展示的能力。我们的实时报警项目,就希望能自动化的去拉取数据,找出异常点。
公司内部也有其他报警系统,会对订单这些作一些同环比,阈值报警,系统跑的很好很有效。我们这边的报警系统更多的在于系统监控方面,会有一些更大的挑战,一是数量更大,我可能要对每个hostip,每个url,甚至是他们的组合作为单位去检查并报警;二是简单的阈值规则对上层业务来说有价值,对底层来说没太大指导作用,超出定义的阈值是比较常见的情况,结局就是大量的报警邮件发出来了,但没人关心,自己反倒成了公司内部最大的垃圾邮件制造商,所以需要各种更加复杂类型的报警:

  1. 连续上升超过50%的报警。原始数据-->简单变量-->(单变量聚合,取相邻两个值的比例合成一个新变量)-->比例变量-->(阈值为大于1.5即报警)-->done
  2. 同环比报警。我们的数据源拉取配了时间参数,可以拉取当前时间的数据,也能拉取一段时间前这个时段的数据,于是两条原始数据流-->两个简单变量-->(双变量join后,新数据/老数据)-->同环比变量-->阈值报警-->done
  3. 多个metrics值组合报警,比如一个metrics描述了>10s延时的统计,另一个metrics描述了所有延时的统计,我想知道占比的情况。两个metrics对应的原始数据流-->简单变量-->(双变量join,分子/分母)-->比例变量-->阈值报警-->done
  4. 多阈值报警。单阈值的话,规则过于刚性,容易误报。所以添加了阈值管理功能,通过开放api接口,用户可以以时间、key为单位定义不同的阈值,这样可以达到分时段报警(多个time的阈值),差异报警(不同key对应于不同阈值),基线报警(每天的每一分钟预先算好阈值并写入)。这得益于esper可以轻易的调用java的方法,只做简单的开发就可以扩充esper的功能。
  5. 。。。

这里可以看出,尽管有各种复杂的规则类型,但基本的操作是相同的,所以只要在基本操作上封装一层即可。下面是我的一条规则配置,由于没有太多资源专门管理这些五花八门的规则,我将所有配置项揉到了一起,看着复杂些,但管理成本稍微低些(因为各个部分的配置相互间可以排列组合,分开来成本太高)。

{
    "namespace": "ns",
    "group": "group,
    "name": "rulename", // 这三项只是名字标识,方便rule管理
    "config": {
      "dashboardUrl": "http://xxxxx",  // dashboard url,返回json数据
      "timeAdjustment": 300,   // 表示取多久之前的数据,同环比就在这个配置项有不同
      "dataType": "Single", // 是否拉取到的值直接报警,还是要做同环比,或者是多值计算
      "period": 0,  // 以下四项配置同环比的参数
      "ops": "-",
      "oldCondition": "",
      "newCondition": "",
      "secondUrl": "",  // 以下四项配置配置双metrics计算 
      "secondTimeAdjustment": 0,
      "dualOps": "-",
      "firstCondition": "",
      "secondCondition": "",
      "valueType": "", // 以下三项配置是对之前的原始值或计算值直接检测,还是要看变化率(差或比值)
      "formerPointCondition": "",
      "latterPointCondition": "",
      "triggerType": "fixed",  // 以下四项配置阈值类型,要么直接给阈值,要么给个标识符,用户自己去写复杂的阈值
      "lower": 0,
      "upper": 30000,
      "thresholdName": "",
      "conditionWindow": 0, // 以下两项控制规则多次命中才报警,减少误报
      "conditionCount": 0,
      "cooldown": 600, // 报警cooldown,防止短期内重复报警
      "to": "wenlu@ctrip.com", // 报警邮件配置
      "cc": "",
      "bcc": "",
      "mailInterval": 60,
      "cats": "",
      "catsEnv": "",
      "catsLevel": "info",
      "catsMessage": "",
      "catsDevid": "",
      "catsName": "",
      "catsPriority": "info",
      "type": "MetricsAlertingRule", // 表示规则类型,不同值会触发不同操作
      "desc": ""
    },
    "type": "MetricsAlertingRule",
    "desc": "",
    "status": "on",
    "app_subid": "auto@hotel_product_common_utility_logging_responsetime_30s" // 自动生成底层数据时用给的标识符
  }
View Code

 

这算是一条顶层规则,系统会自动生成一系列底层的data source/variable/rule/action,对顶层的CRUD操作也会自动映射为底层的操作。
这样开发一条新规则只用考虑如何去运用底层的计算平台,可以在更高的抽象层次上去开发,而不是从头到尾重新开发一遍。
总体而言,实时报警的功能是实现了,但运用的不好,而且是给底层人民用的,可视度不高。

XXX应用

XXX应用是敏感项目,会讲的含糊些。
这个项目数据量比较大,基本上对ctrip的每次访问都要促发一系列的运算和规则检验。过滤后,每秒k级数据,目前有几十个变量和规则,意味着每秒上w次的聚合操作和检测。生产上用了三台机器跑storm集群,实时上借助于storm+esper的高效,单台测试机(16g内存+6核cpu),已经能基本扛得住如此量的运行(不过最近随着变量的增加和流量的上升已经比较勉强,需要考虑可能的优化了)。
公司另外一个项目,用的是我之前提的数据库的方式。他们的数据跨度比较大,需要很长时间段的数据,而不局限于当前,总数据量是xxx项目当前时间段数据的几倍,但计算触发频次较低,每秒几十次,如果不算db的话,用了10台服务器,据说cpu使用率20%不到。虽然我很不喜欢拿这两个系统去比较,因为解决的问题和适用的场景不一样,就好像拿乔丹和贝利去比较,但由此得出XXX项目效率比较低这种结论是很难让人心服口服的。esper这种基于本地内存的效率远不是基于db或者是redis这种异地内存的系统可以比拟的,他的滑动窗口的计算效率也不是简单算法就能超过的。如果说可靠性和成熟度倒是可以让人服帖的。

这个项目一方面展示了当前架构能扛得住中等流量的冲击,另一方面本意是尝试让用户能自主的动态的创建变量:
Alt text
上图的拓扑是根据用户的需求演化而来,其实具有一定的普适性,通过一定的过滤找到具体的感兴趣的数据,进行聚合得到统计特征,在此之上才能建出灵活的规则来。
图中最下方有个join变量,目前还没用到,是为了多数据源(多设备或者多数据中心的数据)的数据整合。有种说法是直接从数据源上进行合并,但这样会增加数据源的复杂度,破坏整体的结构,还不能完全覆盖;另一方面,join过滤后的数据远比join原始数据高效的多得多
这一套之前一半用代码,一半靠实时计算系统的规则,最近才抽象出来,本来打算完全迁移到实时计算系统中,把整个图的掌控交给用户,由用户去控制整个DAG的结构构造和节点配置,这样灵活性比较高,通过统一的监控功能可以让用户掌控每一环节的具体信息。这让我想起去年有家国内公司来推销大数据的,他们学术背景是可视计算。现在想来如果这套能实现的话,是不是也有点可视计算的味道了。
不过现实是这条路已经断掉了,一是我们team,尤其是我的前端能力还比较差,另一方面老板觉得这个太复杂,易用性比较差,不能让人家去管理树(内部介绍还停留在树一层),于是改成了与业务适配,拓扑定死的结构,只让用户配底层的聚合变量,内部称为counter。现在只希望能满足用户需求,能多多的建出变量来,这样才有往下一步走的可能。

一些实现细节和演进方向

以下基本是未实现,只是思考过的部分

  1. 要加强监控。之前对variable的监控力度不够,整个系统完全变成黑盒。其实可以通过诸如简单的epl如insert into varInfo select count(*),.... from inputVar.win:time_length(1 min)就可以获得每个变量的运行时统计,直接导入ctrip的dashboard系统,就可以获得监控和展示能力,这样还能导入到实时报警应用里面去,获得实时报警能力(用户提的需求,所以说多让用户参与讨论是很有用的,不要把用户当傻瓜)。对XXX项目而言,这些统计信息甚至能作为业务指标使用,例如如果我需要公司某一业务线的访问量,只要配个变量+监控就ok了。没什么难度,但应该很有用
  2. 提高sharding的灵活性。目前只在前端做一次sharding,都是事先决定的(根据event里的key字段),可以考虑改进这块,根据后续变量的sharding来自动选择、复制和分发数据。
  3. 多迭代。目前esper节点只有一层,如果有更复杂的功能,可以考虑用mq作中转,或是建立多层esper节点,从而实现多道处理。
  4. 多集群配合。几个集群配合起来工作,比如在多数据中心环境下,每个数据中心部署一套算各自的集群,只要把处理过的数据再统一的聚合一遍,就可以获得统一的结果,简单又不失效率;另外第一点也提到了XXX项目的监控数据可以由实时报警去检测异常,但两者核心代码是一致的,实时报警本身的统计也能用来监控自己,这种带点递归的特性还是比较好玩的。
  5. 2-4都描述了复杂化整个计算流程,其实甚至可以通过对整个DAG图进行分析去自动分配到各个节点,自动去做同步的操作。不过这些都过于复杂了,现在还只能想想
  6. 这套实时系统有可靠性的弱点。因为基于内存的计算,如果某一节点不幸挂掉了,内存里的数据就丢了。这个目前没有太好的办法,要么做主从备份(同样的计算分配到不同的机器,只取其中一个的计算结果),要么用storm的可靠性方式,挂了后恢复到某一个时间段开始重放,不过这就要看业务是否能容忍至少一次的一致性。要做到通用的完全的可靠度非常具有挑战性,还是得根据业务对数据可靠性的不同需求才能做出合适的设计。
  7. 由于可靠性的因素,目前这套系统比较推荐用于短时数据的分析,这样即使挂了,storm重启后也不会有很大的影响,很多业务都能容忍。对于时间跨度比较长的数据分析和聚合,按我的想法,需要摒弃掉esper,但数据库那种方式还是太慢,一定要做好流式计算(或者说是online的计算)。我目前的思路是只采用storm,将每个步骤分摊到各个节点:
    • 对于过滤和join比较简单,直接来就可以了
    • 对于基于滑动窗口的聚合,稍微麻烦些,但像count和sum这类都是可以流式计算的,对于每个进了变量窗口的事件,我们只要添加一个进入事件和窗口事件后的退出事件即可,计算节点只要顺序处理这些事件即可。
    • count distinct的比较麻烦些。可以用一些hashmap的技术去优化,不过我更倾向于采用分段作基数估计的方法去取近似值。
      这样子的话项目规模会大不少,效率肯定比单机版的esper差,但可控性增强,方便可靠性和一致性方面的工作。还只是想法,没有详细思考过
  8. 对于历史性的数据,如果跨度非常大,可以仍旧采用老的db的方式,但可以有优化,可以隔一段时间用hadoop跑一下,做些预处理工作,尽量减少在线的计算量。事实上,我们的实时系统本身可以也改造成后台job的形式,计算写基于老数据的变量,如果和实时版的变量相结合,会很大扩展适用范围。
  9. 续上一点,最理想的状况就是把实时变量提供接口暴露出来(下一步有时间会尝试,这下要测下整个系统的延时度了),与其他系统产生的历史数据相集成,能有个通用的rule引擎去统一运用这些数据。这样的话,每个系统各司其职,有的系统跑得慢,但稳定,可以利用较多的数据;有的系统虽然可靠性很难保证,但跑得快,可以提供掌控当前状况的能力。

更大的视角

其实为啥要这么折腾,除了人懒想写点一本万利的代码外,主要是从前公司那里产生了完整的生态系统才是最重要的这一个想法。所以力图往这个方向靠。
Alt text
此图是我在paypal待了一年后最大的感受:别看技术烂,只要生态系统建成了,整个系统能有机的跑起来,就能源源不断的带来收益。
整个系统是一个循环往复的过程:

  1. ops在目前的技术条件下是最重要的,任何的规则系统,总得有人去给你做标注,做出大量的样本数据来,这是做任何分析系统的基础。paypal ops的director就自豪的认为google没做过paypal就是因为没“人”的参与;而在ctrip,目前感觉几个应用最缺的就是这块,没有样本数据很难去做任何自动化,规则只能是出事后每次人工去猜测,后续新规则的制定和老规则的维护都很难跟上。即使是公司技术能力突增,会用机器学习了,也很难发力,你只能用用无监督学习,但找到的特征估计十有八九不是想要的。这一点是致命伤,公司也很难向这个方向投入资源,一是还没发展到这步,二是得考虑划不划算
  2. ops产生的样本数据经由analyst分析,归纳出一些变量,就可以训练处一部分规则和模型,只要有足够的数据,就可以得到还行的成果。再拿paypal举例,基本上看家本领就靠teradata数据仓库,当时听一个pd的team leader说他们最复杂的方法就是logstic regression了,因为这样出来的东西好理解(据说现在也开始高大上,采用神经网络了,帮他们打个广告)。别看不是很复杂,但确实很有效,支撑公司了很久(当然他们的准确率要求不高,因为背后有千把个ops撑腰呢)。切换到ctrip,没有ops作支撑,所以我的用户们定规则都小心翼翼的,每次都只有些raw log去看,然后去定规则,这些规则都无法度量和跟踪维护。这就是我要做这套变量系统的初衷,其实本来我只要把人家的规则实现就好了,但这永远是打补丁的方式,不是可持续发展的道路。原本是希望能有套灵活建变量的系统,这样可以很轻松的让用户去见上百个变量,这样对每个ip/uid,我都能有成百上千个更有意义的数据(或者叫维度),理论上说,形成了一个risk profile。如果有样本的话,可以与BI对接,自动化都能产生规则出来;即使没有样本,通过监控统计,也能比raw log更好的揭示系统状况。
  3. ops产生的规则和模型最终由pd的在线系统去run,从而抓出感兴趣的数据来。这一块纯粹是执行了。不过对我自己实现的系统而言,还期望达到能灵活的自动化的建立变量,而不是每个变量都要去考编码来实现
  4. 整个系统周而复始的运行,不断调整,不断演进。

这这个一套系统才是完整的有机体,本文所述的内容都只是描述了其中最不重要的在线那一块,而建立整个体系和完善离线系统才是更重要的,这个是真正做一个完善有用的实时系统需要解决的。我自己还没做到,就不多说了

总结

想说的都说了,没想到这么长。最后只想说句做实时计算真不容易,越做越觉得能力和经验上的不足益发明显了,已经不是简单搭个开源软件就能搞定的。希望后面能看到别人的做法,有机会能跟风。

什么,你居然看到结束了,请你喝酸梅汤。