首页 > 代码库 > kafka:一个分布式消息系统
kafka:一个分布式消息系统
1.背景
最近因为工作需要,调研了追求高吞吐的轻量级消息系统Kafka,打算替换掉线上运行的ActiveMQ,主要是因为明年的预算日流量有十亿,而ActiveMQ的分布式实现的很奇怪,所以希望找一个适合分布式的消息系统。
以下是内容是调研过程中总结的一些知识和经验,欢迎拍砖。
2.基础知识
2.1.什么是消息队列
首先,我们来看看什么是消息队列,维基百科里的解释翻译过来如下:
队列提供了一种异步通信协议,这意味着消息的发送者和接收者不需要同时与消息保持联系,发送者发送的消息会存储在队列中,直到接收者拿到它。
一般我们把消息的发送者称为生产者,消息的接收者称为消费者;注意定义中的那两个字“异步”,通常生产者的生产速度和消费者的消费速度是不相等的;如果两个程序始终保持同步沟通,那势必会有一方存在空等时间;如果两个程序一持续运行的话,消费者的平均速度一定要大于生产者,不然队列囤积会越来越多;当然,如果消费者没有时效性需求的话,也可以把消息囤积在队列中,集中消费。
说到这里,我们再来谈谈队列的分类,一般我们根据生产者和消费者的不同,可以把队列分为三类:
第一类是在一个应用程序内部(进程之间或者线程之间),相信大家学多线程时都写过“生产者消费者”程序,生产者负责生产,将生产的结果放到缓冲区(如共享数组),消费者从缓冲区取出消费,在这里,这个缓冲区就可以称为“消息队列”。
第二类其实也算在第一类的特例,就像我们喜欢把操作系统和应用程序区别对待来看,操作系统要处理无数繁杂的事物,各进程、线程之间的数据交换少不了消息队列的支持。
第三类是更为通用意义上的“消息队列”,这类队列主要作用于不同应用,特别是跨机器、平台,这令数据的交换更加广泛,一般一款独立的队列产品除了实现消息的传递外,还提供了相应的可靠性、事务、分布式等特性,将生产者、消费者从中解耦。常见的消费队列产品根据开源与否又可分为两类:
专有软件:IBM WebSphere MQ,MSMQ…
开源软件:ActiveMQ、RabbitMQ、Kafka…
2.2.JMS与AMQP
好了,对于上述第三类“消息队列”,要在不同的机器中提供消息队列的功能,那势必要有统一的规范,这时候SUN就跳出来了,作为跨平台的JAVA势必也要支持跨平台的消息传递,基于此,SUN提供了一套消息标准:Java Message Service,缩写JMS,但是这套规范定义的是API层面的标准,在JAVA体系中可以很方便的交换,但对于其他平台就需要,可能需要消息队列产品本身支持多协议(如OpenWire、STMOP)。
而AMQP定义的比JMS更加底层,从名字就能看出来(Advanced Message Queuing Protocol),它定义的是Wire-level的协议,天然具有跨平台、跨语言的特性,基于此实现的消息队列可以与任何支持该协议的平台交互。
一种是JAVA层面的API,一种是Wire-level协议,这是JMS和AMQP最本质的区别;同时两种标准还有两个比较明显的差异:
一是消息传递模型;JMS比较简单,支持两种最通用的Peer-2-Peer、publisher/subscriber;通俗点就是点对点和广播模式;而AMQP定义的更为复杂,其定义了一种exchange&binding机制,由此支持五种模型:direct exchange、fanout exchange、topic exchange、headers exchange、system exchange,本质上与P2P、PUB/SUB一样,但是更加细致些。
二是支持的消息类型,JMS支持多种消息模型:TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage、Message等;而AMQP只有byte数组。
2.3.ActiveMQ
ActiveMQ是基于JMS实现的Provider(可以理解为队列),它支持多种协议,如OpenWire,Stomp,AMQP等,基于此,支持多平台;支持事务,支持分发策略、还有上面的多种消息模型。这里我们不细谈ActiveMQ的各特性,我们着重来看ActiveMQ的分布式模型。
ActiveMQ支持分布式,它支持Master-Slave提供高可用,也支持Broker-Cluster提供负载均衡,但是它的负载基于一种Forwarding Bridge机制。
在这种机制下,任意时刻一条消只会被一个broker持有,producer发送的消息,可能会经过多个broker转发最终才会到达consumer,可以想象,当broker越来越多时,几乎每次消费都要经过转发,效率会明显下降;并且在这种复杂逻辑下,任一broker的加入和移除都显得十分复杂;这两点是我不建议使用ActiveMQ分布式集群的根本原因。
3.Kafka
好,我们最后来谈今天的主角Kafka,这个奇特的名字我始终没有找到典故,也许是开发者暗恋女孩(基友)的名字吧^_^,Kafka由linkin开发,最初的目的是为了应对linkin庞大的活动流数据(登录、浏览、点击、分享、喜欢等),这部分数据容量庞大,但是可靠性要求不高,故而通过牺牲一部分可靠性(这并不是说我们的数据会按百分比丢,我们后面再谈)来提升吞吐量;它砍掉了很多复杂的特性,如事务、分发策略、多种消息模型等;通过自身独特的设计将消息持久化到磁盘上,以此同时支持在线和离线消费;并且其天生为分布式而设计,压根就没有单机模式(或者说单机模式是分布式的特例),能够很好的扩展。实际应用中,Kafka可以用来做消息队列、流式处理(一般结合storm)、日志聚合等。
3.1.架构
我们先宏观的看看Kafka的架构,Producer集群通过zookeeper(实际中写的是broker list)获取所写topic对应的partition列表,然后顺序发送消息(支持自己实现分发策略),broker集群负责消息的存储和传递,支持Master Slaver模型,可分布式扩展;Consumer集群从zookeeper上获取topic所在的partition列表,然后消费,一个partition只能被一个consumer消费。Name Server集群(一般是zookeeper)提供名称服务等协调信息。至于什么是topic,什么是partition,我们接下来看。
3.2.Topic
Topic是生产者生产、消费者消费的队列标识。一个Topic由一个或多个partition组成,每个partition可以单独存在一个broker上,消费者可以往任一partition发送消息,以此实现生产的分布式,任一partition都可以被且只被一个消费者消息,以此实现消费的分布式;因此partition的设计提供了分布式的基础。
同时,从上图我们也能发现这种设计还有一个优点,因为每个partition内的消息是有序的,而一个partition只能被一个消费者消费,因此Kafka能提供partition层面的消息有序,而传统的队列在多个consumer的情况下是完全无法保证有序的。
3.3.消息传递模型
传统的消息队列最少提供两种消息模型,一种P2P,一种PUB/SUB,而Kafka并没有这么做,巧妙的,它提供了一个消费者组的概念,一个消息可以被多个消费者组消费,但是只能被一个消费者组里的一个消费者消费,这样当只有一个消费者组时就等同与P2P模型,当存在多个消费者组时就是PUB/SUB模型。
3.4.消息持久化
很多系统、组件为了提升效率一般恨不得把所有数据都扔到内存里,然后定期flush到磁盘上;可实际上,现代操作系统也是这样,所有的现代操作系统都乐于将空闲内存转作磁盘缓存(页面缓存),想不用都难;对于这样的系统,他的数据在内存中保存了一份,同时也在OS的页面缓存中保存了一份,这样不但多了一个步骤还让内存的使用率下降了一半;因此,Kafka决定直接使用页面缓存;但是随机写入的效率很慢,为了维护彼此的关系顺序还需要额外的操作和存储,而线性的写入可以避免这些,实际上,线性写入(linear write)的速度大约是300MB/秒,但随即写入却只有50k/秒,其中的差别接近10000倍。这样,Kafka以页面缓存为中间的设计在保证效率的同时还提供了消息的持久化,每个消费者自己维护当前读取数据的offser(也可委托给zookeeper),以此可同时支持在线和离线的消费。
3.5.Push vs. Pull
对于消息的消费,ActiveMQ使用PUSH模型,而Kafka使用PULL模型,两者各有利弊,对于PUSH,broker很难控制数据发送给不同消费者的速度,而PULL可以由消费者自己控制,但是PULL模型可能造成消费者在没有消息的情况下盲等,这种情况下可以通过long polling机制缓解,而对于几乎每时每刻都有消息传递的流式系统,这种影响可以忽略。
3.6.可靠性
刚刚说Kafka牺牲了一些可靠性来提升吞吐量,很多同学可能担心消息的丢失,那么我们现在来看看各种情况下的可靠性。
对于如上的模型,我们分开来看,
先来看消息投递可靠性,一个消息如何算投递成功,Kafka提供了三种模式,第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;第二种是对于Master Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型。
我们再来看消息在broker上的可靠性,因为消息会持久化到磁盘上,所以如果正常stop一个broker,其上的数据不会丢失;但是如果不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这可以通过配置flush页面缓存的周期、阈值缓解,但是同样会频繁的写磁盘会影响性能,又是一个选择题,根据实际情况配置。
接着,我们再看消息消费的可靠性,Kafka提供的是“At least once”模型,因为消息的读取进度由offset提供,offset可以由消费者自己维护也可以维护在zookeeper里,但是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的情况,这种情况同样可以通过调整commit offset周期、阈值缓解,甚至消费者自己把消费和commit offset做成一个事务解决,但是如果你的应用不在乎重复消费,那就干脆不要解决,以换取最大的性能。
最后,我们再来看zookeeper的可靠性,很明显,他要挂了,一切都完了,地球就毁灭了,人类就灭绝了,星级穿越也挽救不了了……所以增强可靠性的方式就是把zookeeper也部署成集群。
3.7.性能
好了,说了那么多,我们实际来测试下Kafka在各种情况下的性能,为了对比我也测了下单机模式下ActiveMQ的性能,不过由于懒,没有搭建ActiveMQ集群进行测试,但是基于其恶心的Forwarding Bridge模型,我也持悲观态度。
首先,测试环境如下:
Kafka:3 broker;8核/32G;默认配置
ActiveMQ:1 broker;8核/32G;默认配置
Producer: 一台机器通过多线程模拟多producer;8核/32G;默认配置,异步发送
Consumer: 一台机器通过多线程模拟多consumer;8核/32G;默认配置
除了特殊说明,生产和消费同时进行。
然后,我使用如下字符表示各种测试条件:
1T-1P3C-1P1C-1KW-1K:
1T:1个toipc
1P3C:1个partition 3个replication
1P1C:1个producer 1个consumer
1KW:1千万条消息
1K:每个消息1K
我先对ActiveMQ在单机多Producer、多consumer的情况下的测试,结果比我想象中的好,官方的给出的一个数据是1-2K的数据,每秒10-20K个,这样算下来大概30-40MB/S,而测试的结果在多线程的情况下会更好些。
ActiveMQ-thread | Produce | Consume |
1T-XXX-1P1C-1KW-1K | 28.925MB/S | 28.829MB/S |
1T-XXX-3P3C-1KW-1K | 43.711MB/S | 41.791MB/S |
1T-XXX-8P8C-1KW-1K | 52.426MB/S | 52.383MB/S |
然后我又对Kafka进行了相应的测试,用一个partition模拟单机模式,结果和预想的一样,在单机模型下,两者差异不大;而官方给的数据说生产者能达到50MB/S,消费者能达到100MB/S,生产者符合官方数据,而消费者我始终没有压到那么高的速度。
Kafka- thread | Produce | Consume |
1T-1P1C-1P1C-1KW-1K | 29.214MB/S | 29.117MB/S |
1T-1P1C-3P3C-1KW-1K | 46.168MB/S | 43.018MB/S |
1T-1P1C-8P8C-1KW-1K | 52.140MB/S | 51.975MB/S |
接下来的对于Kafka集群,我想同样数量的消息会不会因为topic数目的增多而影响,测试结果如下,表明topic越多,速度会有所下降,也符合预期。
Kafka-topic | Produce | Consume |
1T-3P3C-3P3C-1.2KW-1K | 49.255MB/S | 49.204MB/S |
3T-3P3C-3P3C-0.4KW*3-1K | 46.239MB/S | 45.774MB/S |
然后为了测试partition对性能的影响,进行了如下测试,可以看到partition数量越多,总的生产和消费速度越快;但是意外的是Only produce情况下生产效率没有明显提升反而略慢,这里怀疑和page cache有关,没有深入研究。
Kafka-partition | Produce | Consume | Only Produce | Only Consume |
1T-1P3C-1P1C-1KW-1K | 29.213MB/S | 29.117MB/S | 28.941MB/S | 34.360MB/S |
1T-3P3C-3P3C-1KW-1K | 47.103MB/S | 46.966MB/S | 46.540MB/S | 66.219MB/S |
1T-8P3C-8P8C-1KW-1K | 61.522MB/S | 61.412MB/S | 60.703MB/S | 72.701MB/S |
综上,我们可以看到Kafka的性能和吞吐是可以扩展的。
3.8.风险点
对于我们来说,Kafka主要有两个风险点,第一,要深入使用必须要熟读源码,而kafka源码是用scala写的,我们并没有相应的技术储备,需要学习;第二,kafka技术较新,目前的版本是0.8.1.1,看起来还不太成熟。
4.KG应用
这一块是在公司内部系统的应用,不适合对外,所以这里删去。
5.参考资料
Kafka-DOC:http://kafka.apache.org/documentation.html
ActiveMQ-DOC:http://activemq.apache.org
Understading the differences between AMQP & JMS:http://www.wmrichards.com/amqp.pdf
WIKI-MQ:http://en.wikipedia.org/wiki/Message_queue
WIKI-JMS:http://en.wikipedia.org/wiki/Java_Message_Service
WIKI-AMQP:http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol
kafka:一个分布式消息系统