首页 > 代码库 > 分布式机器学习的故事

分布式机器学习的故事

王益博士,称得上机器学习领域的资深从业者,本人之前有幸拜读过王益博士的一些paper和slides,对其从事的“分布式机器学习”方向尤感兴趣。

王益博士之前写过一篇《分布式机器学习的故事》,总结了自己从业多年的经验和感悟。可惜的是,这篇原始博文已经删除了,现在能找到的是原始的六篇讲稿素材:A New Era;Infrequent itemset mining;Application Driven;Implement Your MapReduce;Deep Learning;Peacock and Latent Topic Modeling。后来王益博士在52ml、知乎等平台重新发了几篇讲稿的译文,算是非常体贴了。

这个系列透漏出一个核心思想:互联网服务将超越人工服务;集体智能超越人工智能;大数据是行为数据;大数据必然长尾,长尾数据无噪声;追求“大”比求“快”重要;开发框架、而不是套用框架;工程技法和数学同样重要;远离 Java、远离 Python;有所谓好的系统,无所谓好的算法。虽然在验证一个新的并行算法的正确性的时候,我们可以利用现有框架,尽量快速实现,但是任何一个有价值的机器学习思路,都值得拥有自己独特的架构。所以重点在有一个分布式操作系统,方便大家开发自己需要的架构(框架),来支持相应的算法。

这个系列确实精彩,因此,自己将花些时间对其进行整理,并张贴在这里算是自己做个记录。当然,如果涉及到任何侵权不合理的纠纷,本人将及时撤下文章。

更多信息可参考http://cxwangyi.github.io/notes/2014-01-20-distributed-machine-learning.html

  • 一个新时代
      • 起源
      • 价值
  • 评价标准
  • pLSA和MPI
  • LDA和MapReduce
  • Rephil和MapReduce
  • Docker改变世界
      • Docker
      • boot2docker
      • CoreOS
      • Go语言
      • 交叉编译
      • 极简版搜索引擎
      • 创建集装箱
      • 分布式系统的部署
      • 集成测试
      • 自动部署
  • 论文发表了

一个新时代

起源

分布式机器学习是随着“大数据”概念兴起的。在有大数据之前,有很多研究工作为了让机器学习算法更快,而利多多个处理器。这类工作通常称为“并行计算”或者“并行机器学习”,其核心目标是把计算任务拆解成多个小的任务,分配到多个处理器上做计算。分布式计算或者分布式机器学习除了要把计算任务分布到多个处理器上,更重要的是把数据(包括训练数据以及中间结果)分布开来。因为在大数据时代,一台机器的硬盘往往装不下全部数据,或者即使装下了,也会受限于机器的I/O通道的带宽,以至于访问速度很慢。为了更大的存储容量、吞吐量以及容错能力,我们都希望把数据分布在多台计算机上。

那么什么样的数据大到一台机器甚至几百台机器的硬盘都装不下呢?要知道,现在很多服务器的硬盘空间都是数TB的了!其实这样的大数据有很多。比如搜索引擎要爬下很多很多的网页,对其内容做分析并建立索引。有多少网页呢?这个数字很难估计,因为这是随时间变化的。在Web 2.0出现之前,全球网页数量的增长相对稳定,因为网页都是专业人员编辑的。而由于各种Web 2.0工具帮助用户建立自己的网页,比如博客、甚至微博,所以网页数量呈指数速度递增。

另一种典型的大数据是电商网站上的用户行为数据。比如在亚马逊或者淘宝上,每天都很多用户看到了很多推荐的商品,并且点击了其中一些。这些用户点击推荐商品的行为会被亚马逊和淘宝的服务器记录下来,作为分布式机器学习系统的输入。输出是一个数学模型,可以预测一个用户喜欢看到哪些商品,从而在下一次展示推荐商品的时候,多展示那些用户喜欢的。

类似的,在互联网广告系统中,展示给用户的广告、以及用户点击的广告也都会被记录下来,作为机器学习系统的数据,训练点击率预估模型。在下一次展示推荐商品时,这些模型会被用来预估每个商品如果被展示之后,有多大的概率被用户点击。其中预估点击率高的商品,往往展示在预估点击率低的商品之前,从而赢得实际上比较高的点击率。

从上面的例子我们可以看出来,这些大数据之所以大,是因为它们记录的是数十亿互联网用户的行为。而人们每天都会产生行为,以至于百度、阿里、腾讯、奇虎、搜狗这样的公司的互联网服务每天收集到很多很多块硬盘才能装下的数据。而且这些数据随时间增加,永无止境。虽然对“大数据”的具体定义见人见智,但是互联网用户的行为数据,毫无疑问地被公认为大数据了。

价值

机器学习的应用由来已久。大家可能还记得十几年前IBM推出的语音识别和输入系统ViaVoice。这个系统使用的声学模型和语言模型是用人工收集整理和标注的数据训练的。当年因为IBM财大气粗,收集和整理了很多数据,所以ViaVoice的识别准确率在同类产品中遥遥领先。但是,ViaVoice很难保证能识别各种口音的人。所以IBM的工程师们设计了一个自动适应的功能——通过让用户标注没能正确识别的语音对应的文本,ViaVoice可以针对主任的口音做特别的优化。

今天,大家可以通过互联网使用Google的语音识别系统。我们会发现,不管使用者口音如何,Google的语音识别系统几乎都能准确识别,以至于几乎不再需要“适应主人的口音”。而且Google的系统支持的语言种类也更多。这其中的奥妙就在于“大数据”。

在Google发布语音识别引擎之前,先有语音搜索服务。在语音搜索服务之前,有一个打电话查询的服务。实际上,正式这个电话服务收集了很多用户的语音输入。这部分数据经过人工标注,称为了训练语言模型和声学模型的第一批数据。随后发布的语音搜索收集了世界各地更多互联网用户的声音,加上半自动标注系统的引入,训练数据大大丰富了。训练数据越多,能覆盖的口音和语种越多,机器学习得到的模型的识别准确率也就越高。以至于当Google发布语音识别引擎之初,识别率就远高于依赖人工标注训练数据的IBM ViaVoice。随着语音识别服务被很多手机应用和桌面应用使用,它能采集更多用户的语音输入,模型的准确性会不断得到提高。

从上面例子我们可以看出,因为互联网服务收集的数据是万万千千用户的行为的体现,而人类行为是人类智能的结果。所以如果我们能设计分布式机器学习系统,能从大数据中归纳规律,我们实际上就在归纳整个人类的知识库。这个听起来很神奇,实际上在上面的例子里,Google已经做到了。在这一系列的最后一节里,我们会介绍我们开发的一个语义学习系统,它从上千亿条文本数据中,归纳汉语中上百万的“语义”。随后,只要用户输入任何一段文本,这个系统可以利用训练好的模型在一毫秒之内,理解文本中表达的“语义”。这个理解过程确保消除文本中的歧义,从而让搜索引擎、广告系统、推荐系统等应用更好地理解用户需求。

简言之,互联网使得人类第一次有机会收集全人类的行为数据。从而为机器学习这一持续了数十年的研究方向提供了全新的机会——分布式机器学习——从互联网数据中归纳这个人类的知识,从而让机器比任何一个个人都要“聪明”。

评价标准

在后文中会详细介绍的各个大规模机器学习系统,基本都有三个特点:

  1. 可扩展。可扩展的意思是“投入更多的机器,能处理更大的数据”。而传统的并行计算要的是:“投入更多机器,数据大小不变,计算速度更快”。这是我认识中“大数据”和传统并行计算研究目标不同的地方。如果只是求速度快,那么multicore和GPU会比分布式机器学习的ROI更高。
    • 有一个框架(比如MPI或者MapReduce或者自己设计的),支持fault recovery。Fault recovery是可扩展的基础。现代机群系统都是很多用户公用的,其中任何一个进程都有可能被更高优先级的进程preempted。一个job涉及数千个进程(task processes),十分钟里一个进程都不挂的概率很小。而如果一个进程挂了,其他进程都得重启,那么整个计算任务可能永远都不能完成。
  2. 数学模型要根据架构和数据做修改。这里有两个原因:
    • 因为大数据基本都是长尾分布的,而papers里的模型基本都假设数据是指数分布的(想想用SVD做component analysis其实假设了Gaussian distributed,latent Dirichlet allocation假设了multimonial distribution。)。真正能处理大数据的数学模型,都需要能更好的描述长尾数据。否则,模型训练就是忽视长尾,而只关注从“大头”数据部分挖掘“主流”patterns了。
    • 很多机器学习算法(比如MCMC)都不适合并行化。所以往往需要根据模型的特点做一些算法的调整。有时候会是approximation。比如AD-LDA算法是一种并行Gibbs sampling算法,但是只针对LDA模型有效,对其他大部分模型都不收敛,甚至对LDA的很多改进模型也不收敛。
  3. 引入更多机器的首要目的不是提升性能,而是能处理更大的数据。用更多的机器,处理同样大小的数据,期待speedup提高——这是传统并行计算要解决的问题——是multicore、SMP、MPP、GPU还是Beowolf cluster上得分布式计算不重要。在大数据情况下,困难点在问题规模大,数据量大。此时,引入更多机器,是期待能处理更大数据,总时间消耗可以不变甚至慢一点。分布式计算把数据和计算都分不到多台机器上,在存储、I/O、通信和计算上都要消除瓶颈。

上述三个特点,会在实践中要求“一个有价值的算法值得也应该有自己独特的框架”。

在开始说故事之前,再先正名几个概念:Message Passing和MapReduce是两个有名的并行程序编程范式(paradigm),也就是说,并行程序应该怎么写都有规范了——只需要在预先提供的框架(framework)程序里插入一些代码,就能得到自己的并行程序。Message Passing范式的一个框架叫做MPI。MapReduce范式的框架也叫MapReduce。而MPICH2和Apache Hadoop分别是这MPI和MapReduce两个框架的实现(implementations)。另一个本文会涉及的MapReduce实现是我用C++写的MapReduce Lite。后面还会提到BSP范式,它的一个著名的实现是Google Pregel。

MPI这个框架很灵活,对程序结构几乎没有太多约束,以至于大家有时把MPI称为一组接口(interface)——MPI的I就是interface的意思。

这里,MPICH2和Hadoop都是很大的系统——除了实现框架(允许程序员方便的编程),还实现了资源管理和分配,以及资源调度的功能。这些功能在Google的系统里是分布式操作系统负责的,而Google MapReduce和Pregel都是在分布式操作系统基础上开发的,框架本身的代码量少很多,并且逻辑清晰易于维护。当然Hadoop已经意识到这个问题,现在有了YARN操作系统。(YARN是一个仿照UC Berkeley AMPLab的Mesos做的系统。关于这个“模仿”,又有另一个故事。)

pLSA和MPI

大数据的首要目标是“大”而不是“快”

我2007年毕业后加入Google做研究。我们有一个同事叫张栋,他的工作涉及pLSA模型的并行化。这个课题很有价值,因为generalized matrix decomposition实际上是collaborative filtering的generalization,是用户行为分析和文本语义理解的共同基础。几年后的今天,我们都知道这是搜索、推荐和广告这三大互联网平台产品的基础。

当时的思路是用MPI来做并行化。张栋和宿华合作,开发一套基于MPI的并行pLSA系统。MPI是1980年代流行的并行框架,进入到很多大学的课程里,熟悉它的人很多。MPI这个框架提供了很多基本操作:除了点对点的Send, Recv,还有广播Bdcast,甚至还有计算加通信操作,比如AllReduce。

MPI很灵活,描述能力很强。因为MPI对代码结构几乎没有什么限制——任何进程之间可以在任何时候通信——所以很多人不称之为框架,而是称之为“接口”。

但是Google的并行计算环境上没有MPI。当时一位叫白宏杰的工程师将MPICH2移植到了Google的分布式操作系统上。具体的说,是重新实现MPI里的Send, Recv等函数,调用分布式操作系统里基于HTTP RPC的通信API。

MPI的AllReduce操作在很多机器学习系统的开发里都很有用。因为很多并行机器学习系统都是各个进程分别训练模型,然后再合适的时候(比如一个迭代结束的时候)大家对一下各自的结论,达成共识,然后继续迭代。这个“对一下结论,达成共识”的过程,往往可以通过AllReduce来完成。

如果我们关注一下MPI的研究,可以发现曾经有很多论文都在讨论如何高效实现AllReduce操作。比如我2008年的博文里提到一种当时让我们都觉得很聪明的一种算法。这些长年累月的优化,让MPICH2这样的系统的执行效率(runtime efficiency)非常出色。

基于MPI框架开发的pLSA模型虽然效率高,并且可以处理相当大的数据,但是还是不能处理Google当年级别的数据。原因如上节『概念』中所述——MPICH2没有自动错误恢复功能,而且MPI这个框架定义中提供的编程灵活性,让我们很难改进框架,使其具备错误恢复的能力。

具体的说,MPI允许进程之间在任何时刻互相通信。如果一个进程挂了,我们确实可以请分布式操作系统重启之。但是如果要让这个“新生”获取它“前世”的状态,我们就需要让它从初始状态开始执行,接收到其前世曾经收到的所有消息。这就要求所有给“前世”发过消息的进程都被重启。而这些进程都需要接收到他们的“前世”接收到过的所有消息。这种数据依赖的结果就是:所有进程都得重启,那么这个job就得重头做。

一个job哪怕只需要10分钟时间,但是这期间一个进程都不挂的概率很小。只要一个进程挂了,就得重启所有进程,那么这个job就永远也结束不了了.

虽然我们很难让MPI框架做到fault recovery,我们可否让基于MPI的pLSA系统支持fault recovery呢?原则上是可以的——最简易的做法是checkpointing——时不常的把有所进程接收到过的所有消息写入一个分布式文件系统(比如GFS)。或者更直接一点:进程状态和job状态写入GFS。Checkpointing是下文要说到的Pregel框架实现fault recovery的基础。

但是如果一个系统自己实现fault recovery,那还需要MPI做什么呢?做通信?——现代后台系统都用基于HTTP的RPC机制通信了,比如和Google的Stubby、Facebook的Thrift、腾讯的Poppy还有Go语言自带的rpc package。做进程管理?——在开源界没有分布式操作系统的那些年里有价值;可是今天(2013年),Google的Borg、AMPLab的Mesos和Yahoo!的YARN都比MPICH2做得更好,考虑更全面,效能更高。

LDA和MapReduce

“可扩展”的基础——数据并行。

因为MPI在可扩展性上的限制, 我们可以大致理解为什么Google的并行计算架构上没有实现经典的MPI。同时,我们自然的考虑Google里当时最有名的并行计算框架MapReduce。

MapReduce的风格和MPI截然相反。MapReduce对程序的结构有严格的约束——计算过程必须能在两个函数中描述:map和reduce;输入和输出数据都必须是一个一个的records;任务之间不能通信,整个计算过程中唯一的通信机会是map phase和reduce phase之间的shuffuling phase,这是在框架控制下的,而不是应用代码控制的。

pLSA模型的作者Thomas Hoffmann提出的机器学习算法是EM。EM是各种机器学习inference算法中少数适合用MapReduce框架描述的——map phase用来推测(inference)隐含变量的分布(distributions of hidden variables),也就是实现E-step;reduce phase利用上述结果来更新模型参数,也即是M-step。

但是2008年的时候,pLSA已经被新兴的LDA掩盖了。LDA是pLSA的generalization:一方面LDA的hyperparameter设为特定值的时候,就specialize成pLSA了。从工程应用价值的角度看,这个数学方法的generalization,允许我们用一个训练好的模型解释任何一段文本中的语义。而pLSA只能理解训练文本中的语义。(虽然也有ad hoc的方法让pLSA理解新文本的语义,但是大都效率低,并且并不符合pLSA的数学定义。)这就让继续研究pLSA价值不明显了。

另一方面,LDA的inference很麻烦,没法做精确inference,只有近似算法,比如variational inference。如果EM算法中的E-step采用variational infernece,那么EM算法就被称为variational EM。EM本来就是一个比较容易陷入局部最优的算法,E-step用了variational inference,总体效果就更差了。另一种近似inference算法是Gibbs sampling。虽然我们可以在E-step里用Gibbs sampling替换variational inference,但是Thomas Griffiths发现一个有趣的特点——稍微修改LDA之后,就可以利用Dirichlet和multinomial分布的共轭性,把模型参数都积分积掉。没有参数了,也就所谓M-step里的“更新参数”了。这样,只需要做Gibbs sampling即可。这个路子发表在PNAS上,题目是Finding Scientific Topics。

Gibbs sampling也有一个问题:作为一种Markov Chain Monte Carlo(MCMC)算法,顾名思义,Gibbs sampling是一个顺序过程,按照定义不能被并行化。幸运的是,2007年的时候,UC Irvine的David Newman团队发现,对于LDA这个特定的模型,Gibbs sampling可以被并行化。具体的说,把训练数据拆分成多份,用每一份独立的训练模型。每隔几个Gibbs sampling迭代,这几个局部模型之间做一次同步,得到一个全局模型,并且用这个全局模型替换各个局部模型。这个研究发表在NIPS上,题目是:Distributed Inference for Latent Dirichlet Allocation。

上述做法,在2012年Jeff Dean关于distributed deep leearning的论文中,被称为data parallelism(数据并行)。如果一个算法可以做数据并行,很可能就是可扩展(scalable)的了。

David Newman团队的发现允许我们用多个map tasks并行的做Gibbs sampling,然后在reduce phase中作模型的同步。这样,一个训练过程可以表述成一串MapReduce jobs。我用了一周时间在Google MapReduce框架上实现实现和验证了这个方法。后来在同事Matthew Stanton的帮助下,优化代码,提升效率。但是,因为每次启动一个MapReduce job,系统都需要重新安排进程(re-schedule);并且每个job都需要访问GFS,效率不高。在当年的Google MapReduce系统中,1/3的时间花在这些杂碎问题上了。后来实习生司宪策在Hadoop上也实现了这个方法。我印象里Hadoop环境下,杂碎事务消耗的时间比例更大。

随后白红杰在我们的代码基础上修改了数据结构,使其更适合MPI的AllReduce操作。这样就得到了一个高效率的LDA实现。我们把用MapReduce和MPI实现的LDA的Gibbs sampling算法发表在这篇论文里了。

当我们踌躇于MPI的扩展性不理想而MapReduce的效率不理想时,Google MapReduce团队的几个人分出去,开发了一个新的并行框架Pregel。当时Pregel项目的tech lead访问中国。这个叫Grzegorz Malewicz的波兰人说服了我尝试在Pregel框架下验证LDA。但是在说这个故事之前,我们先看看Google Rephil——另一个基于MapReduce实现的并行隐含语义分析系统。

Rephil和MapReduce

描述长尾数据的数学模型
Google Rephil是Google AdSense背后广告相关性计算的头号秘密武器。但是这个系统没有发表过论文。只是其作者(博士Uri Lerner和工程师Mike Yar)在2002年在湾区举办的几次小规模交流中简要介绍过。所以Kevin Murphy把这些内容写进了他的书《Machine Learning: a Probabilitic Perspecitve》里。在吴军博士的《数学之美》里也提到了Rephil。

Rephil的模型是一个全新的模型,更像一个神经元网络。这个网络的学习过程从Web scale的文本数据中归纳海量的语义——比如“apple”这个词有多个意思:一个公司的名字、一种水果、以及其他。当一个网页里包含”apple”, “stock”, “ipad”等词汇的时候,Rephil可以告诉我们这个网页是关于apple这个公司的,而不是水果。

这个功能按说pLSA和LDA也都能实现。为什么需要一个全新的模型呢?

从2007年至今,国内外很多团队都尝试过并行化pLSA和LDA。心灵手巧的工程师们,成功的开发出能学习数万甚至上十万语义(latent topics)的训练系统。但是不管大家用什么训练数据,都会发现,得到的大部分语义(相关的词的聚类)都是非常类似,或者说“重复”的。如果做一个“去重”处理,几万甚至十万的语义,就只剩下几百几千了。

这是怎么回事?

如果大家尝试着把训练语料中的低频词去掉,会发现训练得到的语义和用全量数据训练得到的差不多。换句话说,pLSA和LDA模型的训练算法没有在意低频数据。

为什么会这样呢?因为pLSA和LDA这类概率模型的主要构造单元都是指数族分布(exponential family)。比如pLSA假设一个文档中的语义的分布是multinomial的,每个语义中的词的分布也是multinomial的。因为multinomial是一种典型的指数族分布,这样整个模型描述的海量数据的分布,不管哪个维度上的marginalization,都是指数族分布。在LDA中也类似——因为LDA假设各个文档中的语义分布的multinomial distributions的参数是符合Dirichlet分布的,并且各个语义中的词的分布的multinomial distributions的参数也是符合Dirichlet分布的,这样整个模型是假设数据是指数族分布的。

可是Internet上的实际数据基本都不是指数族分布的——而是长尾分布的。至于为什么是这样?可以参见2006年纽约时报排名畅销书The Long Tail: Why the Future of Business is Selling Less of More。或者看看其作者Chris Anderson的博客The Long Tail。

长尾分布的形状大致如下图所示:

其中x轴表示数据的类型,y轴是各种类型的频率,少数类型的频率很高(称为大头,图中红色部分),大部分很低,但是大于0(称为长尾,图中黄色部分)。一个典型的例子是文章中词的分布,有个具体的名字Zipf’s law,就是典型的长尾分布。而指数族分布基本就只有大头部分——换句话说,如果我们假设长尾数据是指数族分布的,我们实际上就把尾巴给割掉了。

割掉数据的尾巴——这就是pLSA和LDA这样的模型做的——那条长尾巴覆盖的多种多样的数据类型,就是Internet上的人生百态。理解这样的百态是很重要的。比如百度和Google为什么能如此赚钱?因为互联网广告收益。传统广告行业,只有有钱的大企业才有财力联系广告代理公司,一帮西装革履的高富帅聚在一起讨论,竞争电视或者纸媒体上的广告机会。互联网广告里,任何人都可以登录到一个网站上去投放广告,即使每日广告预算只有几十块人民币。这样一来,刘备这样织席贩屡的小业主,也能推销自己做的席子和鞋子。而搜索引擎用户的兴趣也是百花齐放的——从人人爱戴的陈老师苍老师到各种小众需求包括“红酒木瓜汤”(一种丰胸秘方,应该出丰胸广告)或者“苹果大尺度”(在搜索范冰冰主演的《苹果》电影呢)。把各种需求和各种广告通过智能技术匹配起来,就酝酿了互联网广告的革命性力量。这其中,理解各种小众需求、长尾意图就非常重要了。

实际上,Rephil就是这样一个能理解百态的模型。因为它把Google AdSense的盈利能力大幅提升,最终达到Google收入的一半。两位作者荣获Google的多次大奖,包括Founders’ Award。

而切掉长尾是一个很糟糕的做法。大家还记得小说《1984》里有这样一个情节吗?老大哥要求发布“新话”——一种新的语言,删掉自然英语中大部分词汇,只留下那些主流的词汇。看看小说里的人们生活的世界,让人浑身发毛,咱们就能体会“割尾巴”的恶果了。没有看过《1984》的朋友可以想象一下水木首页上只有“全站十大”,连“分类十大”都删掉之后的样子。

既然如此,为什么这类模型还要假设数据是指数族分布的呢?——实在是不得已。指数族分布是一种数值计算上非常方便的数学元素。拿LDA来说,它利用了Dirichlet和multinomial两种分布的共轭性,使得其计算过程中,模型的参数都被积分给积掉了(integrated out)。这是AD-LDA这样的ad hoc并行算法——在其他模型上都不好使的做法——在LDA上好用的原因之一。换句话说,这是为了计算方便,掩耳盗铃地假设数据是指数族分布的。

实际上,这种掩耳盗铃在机器学习领域很普遍。比如有个兄弟听了上面的故事后说:“那我们就别用概率模型做语义分析了,咱们还用矩阵分解吧?SVD分解怎么样?” 很不好意思的,当我们把SVD分解用在语义分析(称为LSA,latent semantic analysis)上的时候,我们还是引入了指数族分布假设——Gaussian assumption或者叫normality assumption。这怎么可能呢?SVD不就是个矩阵分解方法吗?确实传统SVD没有对数据分布的假设,但是当我们用EM之类的算法解决存在missing data的问题——比如LSA,还有推荐系统里的协同过滤(collaborative filtering)——这时不仅引入了Gaussian assumption,而且引入了linearity assumption。当我们用其他很多矩阵分解方法做,都存在同样的 问题。

掩耳盗铃的做法怎么能存在得如此自然呢?这是因为指数族分布假设(尤其是Gaussian assumption)有过很多成功的应用,包括通信、数据压缩、制导系统等。这些应用里,我们关注的就是数据中的低频部分;而高频部分(或者说距离mean比较远的数据)即使丢掉了,电话里的声音也能听懂,压缩还原的图像也看得明白,导弹也还是能沿着“最可能”靠谱的路线飞行。我们当然会假设数据是指数族分布的,这样不仅省计算开销,而且自然的忽略高频数据,我们还鄙夷地称之为outlier或者noise。

可是在互联网的世界里,正是这些五花八门的outliers和noise,蕴含了世间百态,让数据不可压缩,从而产生了“大数据”这么个概念。处理好大数据的公司,赚得盆满钵满,塑造了一个个传奇。这里有一个听起来比较极端的说法大数据里无噪声——很多一开始频率很低,相当长尾,会被词过滤系统认为是拼写错误的queries,都能后来居上成为主流。比如“神马”,“酱紫”。

Rephil系统实现的模型是一个神经元网络模型(neural network)。它的设计的主要考虑,就是要能尽量好的描述长尾分布的文本数据和其中蕴含的语义。Rephil模型的具体技术细节因为没有在论文中发表过,所以不便在这里透露。但是Rephil模型描述长尾数据的能力,是下文将要介绍的Peacock系统的原动力,虽然两者在模型上完全不同。

Rephil系统是基于Google MapReduce构建的。如上节所述,MapReduce在用来实现迭代算法的时候,效率是比较低的。这也是Peacock要设计全新框架的原动力——使其比MapReduce高效,但同时像MapReduce一样支持fault recovery。

Docker改变世界

Docker最近很火。Docker实现了“集装箱”——一种介于“软件包”和“虚拟机”之间的概念——并被寄予厚望,以期革新Internet服务以及其他大数据处理系统的开发、测试、和部署流程。

为了使用Docker,需要了解不少工具及其设计思路;而这些工具的文档分布在不同的网站。为了方便大家学习,本文以开发一个极简的搜索引擎为例,展示Docker带来的革新。

说是革新,其实是Google已经用了十年的方式,只是最近才因为Docker开源项目而广为人知。Eric Brewer(Google VP of Infrastructure)在Dockercon14活动上的演讲回顾了这段历程。目前,Google每周会执行20亿个集装箱。可以说,最近十年是各互联网公司和高校都在奋力模仿Google的计算技术的十年。了解这一模仿的过程,可以帮助我们深入理解分布式系统(包括现在常说的“大数据系统”)中若干重要问题。为此,本文以技术教程为主线,穿插了一些关于Hadoop和Mesos等“模仿”项目的介绍,简要追溯它们勇敢而艰难的“邯郸学步”的历程。最后,本文会介绍Google最近公布的“正确答案”——Kubernetes——Google核心技术Borg的开源版本。

Docker

Docker是一个软件系统,实现了一种称为“集装箱”的概念。集装箱类似Google机群管理系统Borg中的包(package)。

通常我们说的“包”是软件包——比如Ubuntu/Debian Linux里常见的.deb文件——安装的时候,安装程序会把被依赖的包也装上。可是执行的时候呢?得根据具体情况配置,然后依次启动互相依赖的多个程序。比如,启动一个Web服务之前,要启动Apache和MySQL;而且他们仨都得有合理的配置,确保它们能一起工作,来实现这个Web服务。

但是Docker集装箱以及Borg中的包更像虚拟机。虚拟机里包括程序和配置,所以可以被执行——也就是执行其中的程序。因为程序是配置好的,所以虚拟机可以被扔到各种环境上去执行——包括开发机、做演示用的笔记本电脑、用VirtualBox虚拟的机群、测试机群、预发布环境和产品环境。近几年随着“云计算”概念的普及,虚拟机被广泛使用,作为分布式计算的基础调度单元。

Docker作为一个软件系统,可以用来创建“集装箱镜像”(container image)和执行这些镜像。就像VirtualBox是一个软件系统,可以用来创建和执行虚拟机。但是集装箱比虚拟机“轻”——一个虚拟机包括一组虚拟硬件、操作系统,用来执行用户程序;而集装箱里没有虚拟的硬件,也没有操作系统,它用主机(host)的硬件和操作系统来执行程序。

那么在集装箱里跑程序和直接在主机上跑有什么区别呢?一个区别是,集装箱有一套网络端口空间(port space)。一个集装箱里的进程可以各自开端口,也可以连接对方的端口进行通信。但是这些端口是集装箱之外的进程看不到的。我们也可以让集装箱把某些内部端口号展示给外部,比如把集装箱内的端口5000映射到外部的8080。这样,当我们用主机上的程序(比如浏览器)访问本机(主机)的8080端口时,实际上访问的是集装箱里的5000端口。这项对外公开集装箱内部端口的技术,称为端口转发(port forwarding),和虚拟机的端口转发概念一样。另一个区别在于,集装箱里有虚拟的文件系统。这样我们可以把要执行的程序拷贝进集装箱。也可以把主机上的某些目录映射成集装箱虚拟文件系统的某些目录。

集装箱这个想法已经在深刻地改变传统分布式系统的开发、测试和部署的流程了。传统的做法是,开发者写一个Makefile(或者其他描述,比如CMakeList、POM等)来说明如何把源码编译成二进制文件。随后,开发人员会在开发机上配置并且执行二进制文件,来作测试。测试人员会在测试机群上配置和执行,来作验证。而运维人员会在数据中心里的预发布环境和产品环境上配置和执行,这就是部署。因为开发机、测试机群、和产品环境里机器的数量和质量都不同,所以配置往往很不同。加上每个新版本的软件系统,配置方式难免有所差异,所以经常造成意外错误。以至于绝大部分团队都选择趁夜深人静、用户不活跃的时候,上线新版本,苦不堪言。

而利用集装箱概念的开发流程里,开发者除了写Makefile,还要写一个Dockerfile,来描述如何把二进制文件安装进一个集装箱镜像(container image),并且做好配置。而一个镜像就像一台配置好的虚拟机,可以在机群上启动多个实例(instance),而每个实例通常称为一个集装箱(container)。在自测的时候,开发者在开发机上执行一个或者多个集装箱;在验证时,测试人员在测试机群上执行集装箱;在部署时,运维人员在产品环境执行集装箱。因为执行的都是同样地集装箱,所以不容易出错。

这种流程更合理的划分了开发者和其他角色的工作边界,也大大简化了测试和部署工作。

boot2docker

上节提到,Docker虚拟了网络地址空间和文件系统。实际上,它还虚拟了进程ID空间(pid space)等系统数据结构。这些功能是一个叫dockerd的daemon程序借助Linux内核中的control groups(又叫cgroups)功能实现的。

dockerd负责执行集装箱;就像VirtualBox负责执行虚拟机一样。而cgroup是Google的两个工程师Paul Menage和Rohit Seth贡献给Linux社区的。cgroups的使用始于2006年。但是从他们的工作记录看,主要工作持续到2008和2009年。据说,Google开发它就是为了方便在自己的机群上部署各种Internet应用和离线处理系统。具体一点儿的故事,请看这篇Information Week上的帖子。

因为cgroups功能只有Linux内核有,所以Docker目前只能运行在Linux上。可是,现在很多开发者都在用Mac。为了能让这些开发者方便的测试自己创作的集装箱镜像,Docker的开发者写了boot2docker——利用VirtualBox虚拟一个Linux主机,并且在上面安装dockerd。而命令行控制程序docker执行在Mac主机上,被配置成和虚拟Linux主机上的dockerd协作。

boot2docker的安装方式很简单:照着这个流程,下载并执行一个安装包即可。因为boot2docker利用了VirtualBox,所以安装它之前需要先装VirtualBox。Homebrew也提供了安装boot2docker的选项,但是可能因为bug导致dockerd和docker版本不同,没法协同工作。

在利用boot2docker在Mac上开始工作之前,还有几个注意事项。当我们在Linux主机上启动一个集装箱的时候,我们可以让Docker把主机的某些目录映射成集装箱内的目录。这样集装箱里的程序和主机上的程序共享数据,是一种方便的调试方式。但是在用boot2docker的时候,“主机”不是Mac,而是虚拟Linux主机。此时如果想把Mac上的目录映射到集装箱,先得将其通过VirtualBox映射到Linux主机。

另一个注意事项和端口转发有关。当我们把集装箱内的某个端口映射为主机的某个端口时,只是映射到了虚拟Linux主机;如果想让Mac上的程序能访问,还得把虚拟机端口通过VirtualBox映射成Mac上的端口。这些注意事项,在下文中会有详细解释。

CoreOS

实际开发中的测试机群和产品环境通常都是用的Linux服务器。要在上面执行集装箱,也需要安装Docker。因为Docker的开发者提供各种Linux软件包,所以通常输入一个命令,即可安装Docker。比如在Ubuntu/Debian Linux里,这个命令是:

sudo apt-get install docker.io

但是目前最常用的用来执行Docker集装箱的Linux发行版本既不是Ubuntu、Debian也不是RedHat、Fedora,而是CoreOS。这个发行版本根本没有软件包管理程序,所以也不能通过输入某个命令来安装软件。但是CoreOS预装了Docker,所以可以制作集装箱镜像,或者下载别人发布的集装箱镜像来执行。目前,Amazon AWS和Google Compute Engine这两大云计算平台都提供预装了CoreOS的虚拟机。

实际上,Google数据中心里运行的Linux系统和CoreOS有很多相似之处。我记得2010年我刚离开Google加入腾讯的时候,一位腾讯的同事好奇地问:“Google的机群里用的Linux用什么软件包管理程序?是apt-get吗?还是yum?”我回答:“其实服务器上运行的Linux是不需要包管理的,只有桌面Linux系统才需要”。这位同事很难相信。其实,要不是因为“见了一回猪跑”,我也想不到会是这样。

CoreOS和其他Linux发行版本相比,执行效率高、内存耗费省;此外,利用双磁盘分区技术,即便是更新Linux内核也不需要重启。CoreOS还有很多独特之处,使得它在问世后很短的时间里就被Amazon和Google采用。如果想进一步了解这些特性,请看这个对Docker作者的访谈。

Go语言

接下来,我们看看如何在Mac上用Go语言写一个极简化的搜索引擎,并且封装成集装箱镜像。

我们选择Go语言为例,而不是更常见的Java、Python、Perl、Ruby、Scala等,有很现实的原因——后面这些语言写的程序,在执行时都需要某些运行环境的支持。比如,Java程序依赖Java虚拟机,Python程序需要Python解释器,这些加上预装的程序库需要占用几百MB的集装箱空间。而用Go写的程序默认是全静态编译的,执行时不需要任何环境支持,不需要预装库,甚至连Linux系统动态库都不依赖。鉴于一家公司的系统往往由成千上万的集装箱构成;每个集装箱少几百MB,能为公司省出很大一笔开销。那些每月要向Amazon或者Goolgle付账的公司,对此必然印象深刻。这是Go语言在很多创业公司拓展迅猛的一个原因。

如果我们用C或者C++开发,也可以生成全静态链接的二进制程序文件。但是在Web时代,C/C++的开发效率不如Go。Google里倒是普遍使用C++,但是Google里有一套精心设计、积攒多年的C++库,这是外界没有的。外界普遍得使用第三方库,并往往因此挠头。比如,不同的第三方库(Thrift和boost)各有各的线程池机制,很难统一管理多线程。C++11倒是有了标准线程管理,但是把很多库统一到C++11是一项开销极大的工作。Go语言是专门为分布式系统开发设计的,根本就没有线程的概念,在语法上用goroutine代替了,线程池实现在Go runtime里,被编译进每个二进制程序。

交叉编译

因为集装箱用主机的操作系统和硬件来执行程序,而Docker只支持Linux,所以Go程序必须被编译成Linux二进制文件,才能通过Docker运行。而我们在Mac上开发,需要利用交叉编译技术来生成Linux二进制文件。

为了得到一个支持交叉编译的Go语言编译器,我们需要从源码安装Go,并且需要做一些额外的安装工作。具体过程如下:

  1. 安装Xcode,从而获得C编译器。
  2. 下载Go编译器的源码包。比如Go 1.3在这里。
  3. 解压和编译
  4. tar xzvf go1.3.src.tar.gz
  5. cd go/src
  6. ./all.bash
  7. 编译各种平台下的Go标准库
  8. git clone git://github.com/davecheney/golang-crosscompile.git
  9. source golang-crosscompile/crosscompile.bash
  10. go-crosscompile-build-all

这里,我们用到了Dave Cheney写的一个Bash脚本程序。这个程序支持生成以下平台上的Go语言标准库:

  1. darwin/386
  2. darwin/amd64
  3. freebsd/386
  4. freebsd/amd64
  5. freebsd/arm
  6. linux/386
  7. linux/amd64
  8. linux/arm
  9. windows/386
  10. windows/amd64
  11. nacl/amd64
  12. nacl/386

并行计算最常用的目标平台是linux/amd64——64bit的Linux系统,也是CoreOS的平台格式。下文中我们会演示如何在Mac下用这个编译器生成Linux平台的二进制代码文件。

极简版搜索引擎

在这篇帖子里,作者Adriaan de Jonge用一个最简单的http server作为例子,说明如何在Mac下用Docker运行一个程序。

这篇帖子对我很有帮助。只是这个例子程序太过简单了——通常一个互联网产品包含不只一个程序——现代互联网产品几乎都采用micro service架构,一个http server和多个RPC server协同工作。之外,还会有一些daemon程序,不时向RPC server提供不断更新的数据。比如在搜索引擎里,一个indexer程序会不断将cralwer程序爬下来的网页内容加以整理,并且发送给搜索引擎服务。

本节里我们介绍的极简版的搜索引擎就包括两个程序——search engine server和向它提供索引内容的indexer daemon。search engine server首先是一个http server,可以通过浏览器访问——对每个输入的query,返回相应的结果。同时,它还是一个RPC server,接受从indexer daemon发来的更新后的索引内容。这两个程序的源码在这里。

为了下载和构建这个例子程序,请输入如下命令:

mkdir -p /tmp/learn-docker
cd /tmp/learn-docker
export GOPATH=pwd
go get github.com/wangkuiyi/helloworld/indexer
go get github.com/wangkuiyi/helloworld/searchengine

此时,在 /tmp/learn-docker/bin 目录里应该有两个二进制程序文件 indexer和searchengine。这两个文件都是Darwin/AMD64格式的。我们可以在Mac主机上运行它俩:

./bin/searchengine -addr=”:10000” &
./bin/indexer -searchengine=”localhost:10000”

这样首先启动了searchengine,并且让它的http和rpc服务都监听本机(Mac主机)的10000端口;随后启动了indexer,它每秒钟通过RPC调用告诉searchengine更新索引内容。

启动成功之后,我们可以在浏览器里访问如下网址:http://localhost:10000/?q=news,从而看到searchengine返回的搜索结果(如下图):

当然,我们也可以用命令行程序,比如wget和curl,来访问searchengine服务。这样我们可以很方便的写一个集成测试(regression test)程序。比如这个。

创建集装箱

接下来,我们看看如何把这两个程序打包进Docker集装箱镜像,然后在Mac主机(实际上是boot2docker创建的Linux虚拟机)上运行集装箱。接下来我们会看到:这些集装箱不用修改,也就能在Amazon AWS和Google Compute Engine上运行,从而完成发布。

首先,我们需要从源码生成Linux/AMD64二进制程序文件。用上文介绍的方法,得到一个支持交叉编译的Go编译器之后,编译示范程序很简单:

GOOS=linux GOARCH=amd64 go install \
github.com/wangkuiyi/helloworld/indexer \
github.com/wangkuiyi/helloworld/searchengine

可以看到,我们只是通过环境变量设置了一下目标操作系统和架构。

随后,我们要创建一个Docker集装箱镜像,把编译好的两个程序放进去。因为如上文介绍的,Go程序执行时不需要特殊的运行环境,所以这个集装箱镜像里,除了一些metadata和我们的程序之外,什么都不需要。以至于我们可以从Docker Hub网站上下载一个空的镜像,在里面安装我们的程序即可。为此,我们需要写一个Dockerfile:

FROM scratch
ADD bin/linux_amd64/searchengine /searchengine
ADD bin/linux_amd64/indexer /indexer

这里的第一行是让Docker自动从Docker Hub上下载名为scratch的镜像;第二行说把本地文件bin/linux_amd64/searchengine装进这个镜像的根目录,成为/searchengine;第三行拷贝indexer。

有了Dockerfile我们就能用docker命令创建一个镜像了。下面命令创建一个镜像,并命名为wangkuiyi/helloworld:

cp GOPATH/src/github.com/wangkuiyi/helloworld/Dockerfile<script type="math/tex" id="MathJax-Element-1">GOPATH/src/github.com/wangkuiyi/helloworld/Dockerfile </script>GOPATH/
docker build -t wangkuiyi/helloworld $GOPATH

此时,我们可以用docker images命令看到我们创建的镜像:

yiwang@yiwang-mn1-> docker images
REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
wangkuiyi/helloworld latest 255460c3d095 3
hours ago 13.86 MB

分布式系统的部署

最简单的使用Docker的部署方案是:启动一个集装箱,在其中运行一个searchengine进程和一个indexer进程。这和上文中介绍的在Mac主机上运行的方式是一样的,但这不符合分布式系统的一般部署原则。

通常,为了提高处理速度、提升吞吐量和系统容错能力,每个程序都会启动为多个进程,运行在不同的机器上。比如,indexer程序的每个进程处理一部分数据(比如一个cralwer进程的输出)。这样的并行处理提升建立索引的效率。这种情况下,每个进程及其处理的数据被称为一个shard。(shard应该怎么翻译?我不知道)。

类似地,searchengine进程也会启动为多个进程,每个进程的内存空间里都装着同样地索引结构,所以都能提供同样地服务,从而提升吞吐量。如果这些进程运行在不同的机器上,那么哪怕某些机器挂了,还有活着的进程能不间断地提供搜索服务。这样的每个进程被称为一个replication。

其实每个indexer shard也可以是一组多个进程,其中每个进程是隶属本shard的一个replication。从而同时提升indexer的处理速度和容错能力。

这么多进程应该启动在哪些机器上呢?要靠人来决定,可就忙不过来咯;得靠机群管理系统。Google Borg就是这样一套系统。

可是在很多年的时间里,外界都不知道Borg。有一些项目试图模仿Google的计算架构,比如Hadoop意图模仿MapReduce。Google MapReduce是一个构建在Borg之上的并行计算框架。但是Hadoop的开发者没有开发类似Borg的系统,而是让Hadoop(计算框架)兼任资源管理和调度的功能,导致系统复杂,代码乱作一团。

实际上,在Hadoop开始的若干年里,甚至没有像Google MapReduce那样让每个job有一个master进程来管理;而是让机群上所有job里的所有进程都向一个叫Job Tracker的进程汇报心跳(heartbeat),以至于一个Hadoop机群不能太大,否则Job Tracker会处理不过来。而且Job Tracker作为性能和稳定性的双重瓶颈,一旦累坏了,整个机群上所有job就都挂了。Hadoop的开发者直到2011年左右才意识到这一点,并发布了一篇文章,开始计划开发“下一代Hadoop”,现在被称为YARN的系统。

YARN的功能和Google Borg有类似之处,但是真正引发外界对Google Borg关注的,是加州大学伯克利分校和Twitter的合作项目Mesos。这是一个试图复制Borg的尝试。当Mesos在Twitter运行起来的时候,很多从Google加入Twitter的工程师都很兴奋——终于重新能“高效工作”了!这里的故事,可以参见这篇Wired文章。Mesos系统设计思路描述在这篇论文里。其第一作者Ben Hindman曾经在Google实习,后来在Twitter任职。

实际上,即便Mesos也没有能很相似地模仿Google Borg。至少在程序的发布和部署上。Mesos没有和Google Borg等效的打包和执行包的功能。而这个功能能为外界所访问,正是靠了本文着重介绍的Docker。Docker和Google Borg一样,使用Google工程师为Linux内核贡献的cgroups功能来实现集装箱机制。

借助Docker,Google终于于本月(2014年7月)开源了Borg——但是是用Go语言重写的Borg,称为Kubernetes——Google Borg是用C++开发的。感谢开源社区不懈的推动!

集成测试

基于上一节的介绍,我们能想象,如果每个集装箱只执行一个进程,那么机群管理系统在部署和调度应用时受到的限制最少。反过来想,如果我们在一个集装箱里同时运行一个indexer进程和一个searchengine进程,那么我们实际上引入了一个不必要的约束——indexer进程和searchengine进程一一对应。而且如果机群中有一台机器,可以承担运行一个进程的负载,但是不能承担同时运行两个进程,那么这台机器上就没法部署上述“大”集装箱了。

所以,在Google Borg和Google Kubernetes里,都建议每个集装箱里只执行一个进程。

基于“打包一次,兼顾测试和发布”的原则,我们可以想象,对于一个应用(或者叫做产品,比如上述的极简搜索引擎),最常见的打包方式是产生一个集装箱镜像,但是每个集装箱里只执行一个程序的一个进程。

上文中,我们已经用一个Dockerfile把两个程序:indexer和searchengine都装进一个镜像wangkuiyi/hellworld了。接下来,我们尝试在Mac主机上启动两个集装箱,分别执行一个indexer和一个searchengine进程:

docker run -d -p 8080:8080 –name searchengine wangkuiyi/helloworld /searchengine
VBoxManage modifyvm “boot2docker-vm” –natpf1 “tcp-port8080,tcp,,8080,,8080”
docker run -d –name indexer –link searchengine:se wangkuiyi/helloworld /indexer -searchengine=se:8080

这里,第一行启动了一个集装箱,并且起名叫searchengine,执行的镜像是wangkuiyi/helloworld。-d的意思是在后台执行,类似一个shell命令后面跟上一个&符号的效果。-p 8080:8080的意思是:“这个集装箱里有个程序会监听8080端口(如果看看searchengine的源码,会发现8080是其默认端口),把这个端口映射到主机(boot2docker创建的Linux虚拟机)的8080端口”。

第二个命令让VirtualBox把Linux虚拟机的8080端口映射为Mac主机的8080端口。这样就可以在Mac主机上启动一个浏览器,通过访问本机的8080端口,来访问集装箱里的searchengine服务。(如果你在Linux主机上开发,就不需要boot2docker虚拟一个Linux主机了,也就不需要这个命令了。)

上述第三个命令启动了一个名为indexer的集装箱,执行的也是wangkuiyi/helloworld镜像。在这个集装箱里启动了一个indexer进程;这个进程会去连接se:8080这个网络地址,并通过RPC调用,向这个目标地址发送更新的索引数据。se这个IP地址是怎么来的呢?这是–link seachengine:se参数的效果——这个参数使得Docker在启动indexer集装箱之前,修改了其中/etc/hosts文件,在其中增加了一行:

xxx.xxx.xxx.xxx se

这里 http://xxx.xxx.xxx.xxx 指代集装箱searchengine(–link searchengine:se中冒号左边的部分)的虚拟IP地址,se(–link searchengine:se中冒号右边的部分)也就是其域名了。Docker就是通过–link这个参数,让不同集装箱内的多个进程可以互相通信的。

此时,在本机打开一个浏览器窗口并访问http://localhost:8080/?q=news,可以看到和上图完全一样的结果。

自动部署

到目前为止,我们都是手动调用docker命令来操作docker的。而得到的效果——在Mac主机上启动极简搜索引擎——和不用Docker是一样的。大家不禁会问,为什么要引入Docker呢?

其实,实际使用Docker时,我们不会手动敲docker命令,而是会利用fleet或者Kubernetes来部署和启动集装箱。这样只需要写一个非常简明的部署配置文件,就可以在开发机、集成测试机群、预发布机群、和产品环境中完成部署了。这篇文章为了说明Docker的设计思路和使用方法已经很长了,所以关于fleet和Kubernetes的介绍,我准备放在《Docker:分布式系统的软件工程革命(下)》中。

谢谢大家看到这里!

论文发表了

Peacock系统在腾讯广点通广告系统中上线两年多之后,论文发表在ACM Transactions on Intelligent Systems and Technology的2015年8月刊上。感谢论文持笔者,苏州大学的曾嘉教授。感谢给我们最多指导和建议的香港科技大学计算机系系主任杨强教授。写一篇简单的侧记,表达对此项工作中各位合作者的感激,也展示一下工业机器学习系统研发过程的不易。和学术研究不同,工业级别大规模机器学习系统的研发更多的是一个社会学问题。

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    分布式机器学习的故事