首页 > 代码库 > Kafka

Kafka

Kafka架构

技术分享

Kafka的整体架构非常简单,是显式分布式架构,producer、broker(kafka)和consumer都可以有多个。 Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。 broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基 于简单,高性能,且与编程语言无关的TCP协议。

kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。


几个基本概念

Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。

Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。为了实现扩展性,一个非常大的topic可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息 都会被分配一个有序的id(offset)。

技术分享

Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。

Producers:消息和数据生产者,向Kafkas的一个topic发布消息的过程叫做producers。

Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。

Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。


消息发送流程

技术分享

  1. Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面

  2. kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。

  3. Consumer从kafka集群pull数据,并控制获取消息的offset

  4. Consumer Group (CG):消息系统有两类,一是广播,二是订阅发布。广播是把消息发送给所有的消费者;发布订阅是把消息只发送给订阅者。Kafka通过Consumer Group组合实现了这两种机制: 实现一个topic消息广播(发给所有的consumer)和单播(发给任意一个consumer)。一个 topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个 consumer(这是实现一个Topic多Consumer的关键点:为一个Topic定义一个CG,CG下定义多个Consumer)。如果需要实现 广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还 可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。典型的应用场景是,多个Consumer来读取一个Topic(理想情况 下是一个Consumer读取Topic的一个Partition),那么可以让这些Consumer属于同一个Consumer Group即可实现消息的多Consumer并行处理,原理是Kafka将一个消息发布出去后,ConsumerGroup中的Consumers可以通 过Round Robin的方式进行消费(Consumers之间的负载均衡使用Zookeeper来实现)


Kafka的设计

吞吐量

高吞吐是kafka需要实现的核心目标之一,为此kafka做了以下一些设计:

  1. 数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能

  2. zero-copy:减少IO操作步骤

  3. 数据批量发送

  4. 数据压缩

  5. Topic划分为多个partition,提高parallelism

负载均衡

  1. producer根据用户指定的算法,将消息发送到指定的partition

  2. 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上

  3. 多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over

  4. 通过zookeeper管理broker与consumer的动态加入与离开

拉取系统

由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:

  1. 简化kafka设计

  2. consumer根据消费能力自主控制消息拉取速度

  3. consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等

可扩展性

当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。


Kafka的设计要点

1、直接使用linux 文件系统的cache,来高效缓存数据。

2、采用linux Zero-Copy提高发送性能。传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少 为2次。根据测试结果,可以提高60%的数据发送性能。Zero-Copy详细的技术细节可以参考:https://www.ibm.com /developerworks/linux/library/j-zerocopy/

3、数据在磁盘上存取代价为O(1)。kafka以topic来进行消息管理,每个topic包含多个part(ition),每个part对应一 个逻辑log,有多个segment组成。每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储 位置,避免id到位置的额外映射。每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。发布者发到某个topic的消息 会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消 息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息 订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

4、显式分布式,即所有的producer、broker和consumer都会有多个,均为分布式的。Producer和broker之间没有负载均衡机制。broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和 consumer都会得到通知。


Topic、Partition和Replica的关系

技术分享

如上图,一个Topic有四个Partition,每个Partition两个replication。


Kafka的应用场景

1.消息队列

比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。 消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统, 如ActiveMR或RabbitMQ。

2.行为跟踪

Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。

3.元信息监控

作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。

4.日志收集

日志收集方面,其实开源产品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉 文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的 系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

5.流处理

这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的 数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文 章的内容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的 结果返还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。Strom和Samza是非常著名的实现这种类型数据转换的框架。

6.事件源

事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。

7.持久性日志(commit log)

Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。


kafka集群演示

kafka 基础配置

broker.id=1    #broker标识,id为正数,且全局不得重复
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/kafka-logs
num.partitions=1                       #每个partition的备份个数
num.recovery.threads.per.data.dir=1
log.retention.hours=168            #segment文件保留的最长时间,超过将被删除7*24
log.segment.bytes=1073741824    #日志文件中每个segment文件的尺寸,默认为1G
log.flush.interval.messages=10000    #partiton中buffer中,消息的条数,达到阀值,将触发flush到磁盘中
log.flush.interval.ms=1000    #消息buffer时间,达到阀值,将触发flush到磁盘
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000

kafka start脚本

#!/bin/sh
export JAVA_HOME=/usr/java/jdk7
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

KAFKA_CFG="/opt/kafka/config/server.cfg"

#Identify the broker id
echo "broker.id=${BROKER_ID}" >> ${KAFKA_CFG}

#Identify the service port
echo "listeners=PLAINTEXT://:${KAFKA_PORT}" >> ${KAFKA_CFG}

# Add additional ZooKeeper servers into the server.properties file
echo "${ZK_CFG}" >> ${KAFKA_CFG}
# Start Kafkaexec 
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.cfg

kafka Dockerfile 

FROM jdk7
RUN locale-gen en_US.UTF-8ENV LANG en_US.UTF-8
RUN /bin/cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
ADD ./kafka /opt/kafka
ADD ./server.cfg /opt/kafka/config/
ADD ./kafka_start.sh /opt/kafka/bin/
RUN chmod +x /opt/kafka/bin/kafka_start.sh
RUN mkdir /opt/kafka/kafka-logs

CMD /opt/kafka/bin/kafka_start.sh

kafka Build 

docker build --no-cache=true -t "ttxsgoto.com/kafka:v2016" .

kafka Run

#kafka1
docker run --net=host -d --name kafka1 -e BROKER_ID=1 -e KAFKA_PORT=9091 -e ZK_CFG=zookeeper.connect=192.168.229.133:2181,192.168.229.133:2182,192.168.229.133:2183 ttxsgoto.com/kafka:v2016

#kafka2
docker run --net=host -d --name kafka2 -e BROKER_ID=2 -e KAFKA_PORT=9092 -e ZK_CFG=zookeeper.connect=192.168.229.133:2181,192.168.229.133:2182,192.168.229.133:2183 ttxsgoto.com/kafka:v2016

#kafka3
docker run --net=host -d --name kafka3 -e BROKER_ID=3 -e KAFKA_PORT=9093 -e ZK_CFG=zookeeper.connect=192.168.229.133:2181,192.168.229.133:2182,192.168.229.133:2183 ttxsgoto.com/kafka:v2016

建立topic

export JAVA_HOME=/usr/java/jdk7
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

./kafka-topics.sh --create --zookeeper 192.168.229.133:2181 --replication-factor 1 --partitions 1 --topic ttxsgoto
./kafka-topics.sh --create --zookeeper 192.168.229.133:2182 --replication-factor 1 --partitions 1 --topic ttxsgoto
./kafka-topics.sh --create --zookeeper 192.168.229.133:2183 --replication-factor 1 --partitions 1 --topic ttxsgoto

查看topic

export JAVA_HOME=/usr/java/jdk7
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

./kafka-topics.sh --list --zookeeper 192.168.229.133:2181
./kafka-topics.sh --list --zookeeper 192.168.229.133:2182
./kafka-topics.sh --list --zookeeper 192.168.229.133:2183

生产数据

export JAVA_HOME=/usr/java/jdk7
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

./kafka-console-producer.sh --broker-list 192.168.229.133:9091 --topic ttxsgoto
./kafka-console-producer.sh --broker-list 192.168.229.133:9092 --topic ttxsgoto
./kafka-console-producer.sh --broker-list 192.168.229.133:9093 --topic ttxsgoto

消费数据

export JAVA_HOME=/usr/java/jdk7
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

./kafka-console-consumer.sh --zookeeper 192.168.229.133:2181 --topic ttxsgoto
./kafka-console-consumer.sh --zookeeper 192.168.229.133:2182 --topic ttxsgoto
./kafka-console-consumer.sh --zookeeper 192.168.229.133:2183  --topic ttxsgoto


本文出自 “天天向上goto” 博客,请务必保留此出处http://ttxsgoto.blog.51cto.com/4943095/1855580

Kafka