首页 > 代码库 > 【Spark深入学习 -15】Spark Streaming前奏-Kafka初体验
【Spark深入学习 -15】Spark Streaming前奏-Kafka初体验
----本节内容-------
1.Kafka基础概念
1.1 出世背景
1.2 基本原理
1.2.1.前置知识
1.2.2.架构和原理
1.2.3.基本概念
1.2.4.kafka特点
2.Kafka初体验
2.1 环境准备
2.2 Kafka小试牛刀
2.2.1单个broker初体验
2.2.2 多个broker初体验
2.3 Kafka分布式集群构建
2.3.1 Kafka分布式集群构建
2.3.2 Kafka主题创建
2.3.3 生产者生产数据
2.3.4消费者消费数据
2.3.5消息的压缩
2.4 Kafka在ZK目录节点
2.4.1 kafka镜像原理
2.4.2 Kafka副本模型
2.4.3 在ZK目录节点内容
2.5 实体间交互流程
2.5.1主题与zk
2.5.2 消费者与zk
2.5.3 broker与生产者
2.5.4 消费者与消费者组
3.参考资料
---------------------
1.Kafka基础概念
1.1 出世背景
Kafka是一个消息系统,是LinkedIn公司开发并开源出来的组件。Kafka原本用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家公司作为多种类型的数据管道和消息系统使用。歪果仁就喜欢整一些看不懂的词汇,比如活动流,比如运营数据,就不能好好说话么。
活动流是啥?简单理解,就是用户使用网站或者系统时产生的数据流,比如点击一个页面,查看一个图片,翻看一网页,搜索一个关键字,网站运营者需要对用户的这些行为进行统计,形成报表。
运营数据是啥?就是计算机产生的监控日志信息,如CPU数据,IO数据等, 这些数据都是动态生成的,Linkein这样的大公司,运营数据量非常大,通常的方式是生成这些数据,写入到log文件中,然后进行统计。活动流数据和运营数据对网站和软件产品非常重要,举几个栗子
1)动态汇总,将用户的信息动态汇总,或者自己监控,或者发给用户的朋友圈
2) 安全,实时监控用户访问信息,防止网络爬虫或者用户扩散垃圾信息,对API使用速率进行实时监控和控制,切断网站某些不正常活动。
3)机器硬件实时监控:对机器运行效率实时监控,对异常情况自动触发告警。
4)报表和批处理:将数据导入Hadoop平台,进行离线报表分析。
LinkedIn处理的时候就碰到几个问题:
1) 日志量大,每天要处理10亿多条数据。
2)高吞吐量。
3)实时性能差。
现有的消息队列系统(messaging and queuing system)却很适合于在实时或近实时(near-real-time)的情况下使用,但它们对很长的未被处理的消息队列的处理很不给力,往往并不将数 据持久化作为首要的事情考虑。这样就会造成一种情况,就是当把大量数据传送给Hadoop这样的离线系统后, 这些离线系统每个小时或每天仅能处理掉部分源数据。Kafka的目的就是要成为一个队列平台,仅仅使用它就能够既支持离线又支持在线使用这两种情况。
1.2 基本架构和原理
1.2.1.前置知识
消息队列
为什么要引入消息队列?举个例子,假如A发送消息给B,如果B在线,那么可以很顺利的通讯发消息,那如果B不在线,那就比较麻烦了,消息队列技术可以很好的解决这个问题。
消息队列技术是分布式应用间交换信息的一种技术,是两个系统通讯的桥梁和媒介,将两个系统解耦,不需要知道对方的位置和信息。 通过消息队列技术2个异构的系统可以进行通讯,尤其是大型系统。消息队列可以保存在磁盘或者内存中。
消息队列技术底层都是socket通讯,socket在很多地方有用到,比如数据库,进程间通讯,jdbc等等底层都是socket通讯。
JMS是消息服务的规范,很多消息中间件技术都遵循JMS规范。
消息队列通讯模式
1)点对点通讯:点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。用人话描述一遍:就是一个人放东西,一个人去东西,这就是点对点。
2)发布/订阅 (Publish/Subscribe) 模式:发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。发布/订阅 模式:就跟贴寻人启事,公告一样的道理,消息往公告上一贴,关心的人就去看看发生什么事,不关心的就当作一堆废纸,不用搭理。
Kafka很厉害,kafka将这两个概念整合到一起,他只有主题模式,但是能实现队列模式的效果,实现方式就是消费者组的引入:
消费者组:将一个或者多个消费者划到一起,取一个名标记,这就是消费者组,对于一个消费者组,
,处在同一个消费者组的消费者,只能有一个消费者消费,
发布订阅模式:每个消费者组,只有一个消费者,那就是发布订阅,每个消费者都有自己的组。
每个组都能消费,那就是发布订阅模式。
队列模式: 所有的消费者都在一个组里面。
消息队列特点
·数据缓冲作用
· 降低耦合
·异构系统高效交互
Kafka是一个消息队列组件,它遵循JMS规范,基本工作流程,生产者生产数据-> kafka集群中转数据->消费者消费数据
1.2.2.架构和原理
生产者生产消息,将消息发送给Kafka集群,Kafka内在是分布式的,一个Kafka集群通常包括多个代理。为了均衡负载,将话题分成多个分区,每个代理存储一或多个分区。消费者从kafka主题中获取消息。多个生产者和消费者能够同时生产和获取消息。
1.Producer根据指定partition方法(round-robin、hash等),将消息发布到指定topic的partition里面
2.kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
3.Consumer从kafka集群pull数据,并控制获取消息的offset
1.2.3.基本概念
Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。
Topic:每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。
Partition:Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition。
Producer:负责发布消息到 Kafka broker。
Consumer:消息消费者,向 Kafka broker 读取消息的客户端。
Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。
1.2.4.Kafka特点
1)分布式流平台,支持消息的分区(mr的分区类似),支持多个服务器之间消息分区
2)支持发布和订阅数据流,类似于消息系统
3)支持分布式和副本集群方式,来存储数据流
4)实时处理数据流
5) 支持多种源数据,数据库交互、app双向交互
6)水平可伸缩
7) 容错好
8)速度快
9)多种方式存储,持久化存储内存,磁盘秒级
10)海量数据,TB级高吞吐量:支持每秒百万消息,·廉价硬件
11)多客户端支持,很容集成不同平台,java,python,和多源进行协同,它是一个
12)中间件的基因,跨平台和跨语言,开源
2.Kafka初体验
2.1环境准备
1)kafka下载
kafka2.1.2官网下载:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz
2) zookeeper下载
zookeeper3.3.6官网地址:http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz
3) jdk下载
kafka和zookeeper前提是安装好了jdk,注意你电脑是32位还是64为
jdk官网下载地址:http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz
4)关闭防火墙
否则zk启动会报错no route tohost
· 查看防火墙状态,使用root账号执行
service iptables status
·关闭防火墙
service iptables stop
·查看防火墙开机启动状态
chkconfig iptables --list
·关闭防火墙开机启动
chkconfig iptables off
5)配置好host
2.2 Kafka小试牛刀
2.2.1 单个broker初体验
1.安装单节点的kafka
下载下来了之后直接解压就可以运行单节点的Kafka,因为Kafka需要用zookeeper做高可用,如果没有安装zk,也没有关系,使用它自带的配置启动就可以。
1)启动自带配置的zk
启动zk命令: bin/zookeeper-server-start.sh config/zookeeper.properties
zookeeper启动成功
2)启动kafka
kafka启动:bin/kafka-server.start.sh config/server.properites
kafka启动成功
3)创建一个topic
创建topic:bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test
查看topic:bin/kafka-topicts.sh --list --zookeeper kafka01:2181
4)启动生产者
命令:bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test
启动之后,输入2行
5)启动消费者
命令:bin/kafka-console-consumer.sh --zookeeper kafka01:9202 --topic test --from-beginning
消费者接收到2行消息
总结
1.启动zk
命令: bin/zookeeper-server-start.sh config/zookeeper.properties
2.启动kafka
命令:bin/kafka-server.start.sh config/server.properites
参数:指定kafka的配置文件
3.创建主题
命令:bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test
参数:1)create:指定创建动作,2)zookeeper:指定zookeeper客户端;3)replication-factor:指定主题副本个数;4)partitions:指定分区个数;5)topic:指定主题名称
查看topic:bin/kafka-topicts.sh --list --zookeeper kafka01:2181
4.启动生产者
命令:bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test
参数:1) broker-list:指定broker;2)topic:指定主题名,从参数可以看出来,生产者是不直接和zk交互的
5.启动消费者
命令:bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topic test --from-beginning
参数:1)zookeeper:指定zookeeper客户端;2)topic:指定主题名,3)from-beginning:从topic头开始读取消息
2.2.2 多个broker初体验
前面已经配置了一个单节点kafka服务,再次扩展演示kafka集群多节点可用性、容错性,也为kafka分布式集群做好铺垫。
1)配置kafka多节点服务
配置多节点服务,拷贝出2分配置server1.properties,server2.properties,修改2处参数
------------------------------server1.properties
broker.id=1
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka1-logs
------------------------------server2.properties
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka2-logs
-----------------------------
2)启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
3)启动2个kafka服务
bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties
4)创建topic主题
bin/kafka-topics.sh --create --zookeeper kafka01:2181 -replication-factor 1 --partitions 1 --test2
5)启动生产者
bin/kafka-console-producer.sh --broker-list kafka02:9093 --topic test2
输入内容
6)启动消费者
bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topitc test2 --from-beginning
2) 测试多节点可用性
在生产者输入内容,消费者端可以获取到消息
生产者端:
消费者端:
3)测试多节点容错性
杀掉一个kafka服务,然后发送消息测试,消费者是否能政策收到消息
2.3 Kafka分布式集群构建
2.3.1 Kafka分布式集群构建
1.配置zookeeper集群
1).解压后,配置zoo.cfg,如果没有从模板配置文件中拷贝出来
官网建议使用zookeeper3.4.x,3.4.9
http://kafka.apache.org/documentation.html#zk
这里有各种版本的下载地址
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
我们需要的版本3.4.9
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
配置之前创建目录
命令:mkdir -p /usr/local/hadoop/zookeeper/data /usr/local/hadoop/zookeeper/log
修改zoo.cfg配置
命令:vi /usr/local/hadoop/zookeeper/zookeeper-3.4.5-cdh5.4.5/conf/zoo.cfg
内容:
----------------------------
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/hadoop/zookeeper/data
dataLogDir=/usr/local/hadoop/zookeeper/log
clientPort=2181
server.1=kafka01:2287:3387
server.2=kafka02:2288:3387
server.3=kafka03:2289:3387
----------------------------
b.配置myid(在每个节点上都这样配置)
还有一个关键的设置,在每个zk server配置文件的dataDir所对应的目录下,必须创建一个名为myid的文件,其中的内容必须与zoo.cfg中server.x 中的x相同,即:
-----------------------
/usr/local/hadoop/zookeeper/data/myid 中的内容为1,对应server.1中的1
/usr/local/hadoop/zookeeper/data/myid 中的内容为2,对应server.2中的2
/usr/local/hadoop/zookeeper/data/myid 中的内容为3,对应server.3中的3
-----------------------
c.关闭防火墙,并且各节点要配置好jdk
-------------
service iptables stop
chkconfig iptables off
-------------
d.启动zookeeper
/usr/local/hadoop/zookeeper/zookeeper-3.4.5-cdh5.4.5/bin/zkServer.sh start
e.验证服务
命令1:bin/zkServer.sh status
命令2:bin/zkServer.sh start
f.配置环境变量到/etc/profile
ZOOKEEPER_HOME=/usr/local/hadoop/zookeeper/zookeeper-3.3.6
PATH=$ZOOKEEPER_HOME/bin:$PATH
export PATH
KAFKA_HOME=/usr/local/hadoop/kafka/kafka_2.12-0.10.2.1
PATH=$ZOOKEEPER_HOME/bin:$PATH
export PATH
2.配置kafka集群
1)准备目录
命令:mkdir -p /usr/local/hadoop/kafka/log
2)修改server.properties 3个参数,
每个节点的broker.id不一样,本次实验kafka01,kafka02,kafka03对应1,2,3
------------
broker.id=03
log.dirs=/usr/local/hadoop/kafka/log
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
------------
3)启动kafka集群
每一个节点执行,
命令:bin/kafka-server-start.sh config/server.properties
4)验证kafka集群是否正常启动
a.jps查看进程是否启动,正常启动,会有Kafka进程服务
b.查看/usr/local/hadoop/kafka/log下面是否有数据
c.查看zk中是否有kafka目录
2.3.2 Kafka主题创建
1)相关命令
创建命令
bin/kafka-topics.sh --create --zookeeper kafka01:2181 -replication-factor 3 --partitions 2 --topic test3
查看命令
bin/kafka-topics.sh --list --zookeeper kafka01:2181
查看分区个数、副本个数
bin/kafka-topics.sh --describe --zookeeper kafka01:2181 --topic test
2)相关说明
从这里可以看到很多关于主题的信息,总要包含:
· 主题的leader:读写都从这里进行随机选择
· 主题的副本数:副本数,节点列表
· isr:同步复制
·主题的分区数:2个分区
zk保留了副本之间的leader和随从信息,每个副本周期性同步到磁盘
1.每个分区有N个副本,可以承受N-1个节点故障,ZK承受N-1/2个故障,如果3个节点,挂了2个,那就不行了。每个副本都有自己的leader,其余的都是follower,zk存放分区的leader和all replica的信息
2.每个副本存储消息的部分数据在本地的log和offset中,周期性同步到磁盘中去确保消息写入全部
副本或者其中一个
3.leader故障时,消息或者写入本地log,或则在producer在收到ack消息前,从新发送消息给新的leader
这些信息都是保留在zookeeper中的。进一步去zookeeper观察
看到有2个topic
进入到0的目录下查看
再看看kafka的log目录,实际的数据是保存在kafka的log目录下,虽然还没有写数据,但是相关目录已经准备好了相关存放文件和目录了。
2.3.3 生产者生产数据
1)相关命令
bin/kafka-console-producer.sh --broker-list kafka02:9092 --topic test2
这里要注意端口要和配置文件的保持一致,笔者因为前面演示了单机版的多broker(9093端口),端口没有改正,导致消费者没法消费数据,白白浪费了很多时间排查问题。
2)相关说明
从命令可以看出,生产者是和broker直接交互,broker使用zk协同工具来管理多个broker
broker:broker不知道谁消费了消息,并不维护哪个消费者消费了消息
消费者组:每个组中只有一个消费者可以消费消息(所有的消费者都在一个组》队列模式,都有自己的组》订阅模式),通过消费者组同意了
消费者:维护了消费消息的状态,broker不知道谁消费了消息,并不维护哪个消费者消费了消息,消费者自己知道的。
2.2.4 消费者消费数据
1)相关命令
bin/kafka-console-consumer.sh --zookeeper kafka02:2181 --topic test2 --from-beginning
2)相关说明
1.消息缓存与filesystem的存储,数据是立即被即刻写入OS的内核页并且缓存以及清理磁盘(可以配置)
2.消息被消费后,kafka能长时间驻留消息在服务器,允许重复消费
3.对分组消息使用了消息set,防止网络过载
4.在服务器存放消费的信息,kafka是在消费者端存放,消费者保持消息的状态
5.消费者状态默认是在zk中,也允许存到到其他OLTP,比如数据库
6.Kafka中生产和消费是点心的pull-push
生产者pull(write,输入流),消费者push(read,输出流,拉)
7.没有主从模式,所有的broker的地位相同,broker数据均在zk中维护
并在producer之间共享
8.负载均衡策略,loadbalance,允许producer动态发现broker
9.producer生产者维护了一个broker连接池,并能通过zk的callback进行实时更新
10.producer可以选择同步或者异步的方式发送消息给broker
打电话:同步,阻塞的都是同步的,NIO的特点就是非阻塞,IO就是阻塞的
发短息:异步,你收不收,知不知道,我不管,我先去干其他的事情
2.2.5 消息的压缩
Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
kafka producer设置compression.type=snappy
2.4 Kafka在ZK目录节点
2.4.1 kafka镜像原理
将元集群的数据副本化给target kafka集群,目标kafka集群就当作一个消费者消费数据实现数据的备份。
2.4.2 Kafka副本模型
同步模型(同步复制):生产者从zk找leader,并发送message,消息立即写入本地log,follow开始拉取消息,每个follow将消息写入各自本地的log,向leader发送确认回执。leader在收到所有的follow的确认回执和本地副本写入工作均完成后,再向producer发送确认回执。生产者客户端是阻塞的,消费者的数据pull从leader中完成。
异步模型:leader的本地log写入完成马上向生产者发送回执,leader不等待follow的回执,follow行不行,成不成功,不管。
2.4.3 在ZK目录节点内容
/brokers/topics/topic:存储某个topic的partitions所有分配信息
/brokers/topics/[topic]/partitions/[0...N]:partitions状态信息
/brokers/ids/[0...N]:每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL).
/controller_epoch -> int (epoch) :此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;
/controller -> int (broker id of the controller) :存储center controller中央控制器所在kafka broker的信息
2.5 实体间交互流程
zookeeper在kafka扮演重要角色,Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。同时借助zookeeper,kafka能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。
2.5.1主题与zk
在kafka中,用户可以自定义多个topic,每个topic又可以划分为多个分区,一半情况下,每个分区存储在一个独立的broker上。所有这些topic与broker的对应关系都有zookeeper进行维护。
在zookeeper中,建立专门的节点来记录这些信息,其节点路径为/brokers/topics/{topic_name},如:
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics
[toptic_t, test, my-replicated-topic, mykafka, mykafka6, mykafka5, mykafka4, test6, mykafka3, test7, mykafka2]
[zk: localhost:2181(CONNECTED) 17] get /brokers/topics/mykafka4
{"version":1,"partitions":{"1":[102,103,104],"2":[103,104,102],"0":[104,102,103]}}
针对topic 的每一个分区与broker的对应关系,zookeeper通过节点 /brokers/topics/topic.name来记录,如:
当broker启动时,会到对应topic节点下注册自己的broker.id到对应分区的isr列表中,如:
[zk: localhost:2181(CONNECTED) 23] get /brokers/topics/mykafka4/partitions/1/state
{"controller_epoch":15,"leader":102,"version":1,"leader_epoch":2,"isr":[102,103,104]}
同样的,当broker退出数,也会触发zookeeper更新其对应topic分区的isr列表,并决定是否需要做消费者的负载均衡。
2.5.2 消费者与zk
l 注册新的消费者分组
当新的消费者组注册到zookeeper中时,zookeeper会创建专用的节点来保存相关信息,其节点路径为ls /consumers/{group_id},其节点下有三个子节点,分别为[ids, owners, offsets]。
Ø ids节点:记录该消费组中当前正在消费的消费者;
Ø owners节点:记录该消费组消费的topic信息;
Ø offsets节点:记录每个topic的每个分区的offset,如:
[zk: localhost:2181(CONNECTED) 54] get /consumers/test-consumer2-group/offsets/mykafka4/0
142
l 注册新的消费者
当新的消费者注册到kafka中时,会在/consumers/{group_id}/ids节点下创建临时子节点,并记录相关信息,如:
[zk: localhost:2181(CONNECTED) 57] ls /consumers/test-consumer2-group/ids/test-consumer2-group_dev103-1433562901087-7b517b97
[]
[zk: localhost:2181(CONNECTED) 58] get /consumers/test-consumer2-group/ids/test-consumer2-group_dev103-1433562901087-7b517b97
{"version":1,"subscription":{"mykafka5":1},"pattern":"white_list","timestamp":"1433562901290"}
l 监听消费者分组中消费者的变化
每个消费者都要关注其所属消费者组中消费者数目的变化,即监听/consumers/{group_id}/ids下子节点的变化。一单发现消费者新增或减少,就会触发消费者的负载均衡。
2.5.3 broker与与zk
为了记录broker的注册信息,在zookeeper上,专门创建了属于kafka的一个节点,其路径为/brokers,如:
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics]
Kafka的每个broker启动时,都会到zookeeper中进行注册,告诉zookeeper其broker.id, 在整个集群中,broker.id应该全局唯一,并在zookeeper上创建其属于自己的节点,其节点路径为/brokers/ids/{broker.id}. 如:
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[102, 103]
创建完节点后,kafka会将该broker的broker.name及端口号记录到改节点,如
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/102
{"jmx_port":-1,"timestamp":"1433209686575","host":"host102","version":1,"port":9092}
另外,改broker节点属性为临时节点,当broker会话失效时,zookeeper会删除该节点,这样,我们就可以很方便的监控到broker节点的变化,及时调整负载均衡等。
2.5.4 消费者与消费者组
a.每个consumer客户端被创建时,会向zookeeper注册自己的信息;
b.此作用主要是为了"负载均衡".
c.同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个消息只发送给其中一个Consumer。
d.Consumer Group中的每个Consumer读取Topic的一个或多个Partitions,并且是唯一的Consumer;
e.一个Consumer group的多个consumer的所有线程依次有序地消费一个topic的所有partitions,如果Consumer group中所有consumer总线程大于partitions数量,则会出现空闲情况;
举例说明:
kafka集群中创建一个topic为report-log 4 partitions 索引编号为0,1,2,3
假如有目前有三个消费者node:注意-->一个consumer中一个消费线程可以消费一个或多个partition.
如果每个consumer创建一个consumer thread线程,各个node消费情况如下,node1消费索引编号为0,1分区,node2费索引编号为2,node3费索引编号为3
如果每个consumer创建2个consumer thread线程,各个node消费情况如下(是从consumer node先后启动状态来确定的),node1消费索引编号为0,1分区;node2费索引编号为2,3;node3为空闲状态
总结:
从以上可知,Consumer Group中各个consumer是根据先后启动的顺序有序消费一个topic的所有partitions的。
如果Consumer Group中所有consumer的总线程数大于partitions数量,则可能consumer thread或consumer会出现空闲状态。
Consumer均衡算法
当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力.
1) 假如topic1,具有如下partitions: P0,P1,P2,P3
2) 加入group中,有如下consumer: C0,C1
3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3
4) 根据(consumer.id + ‘-‘+ thread序号)排序: C0,C1
5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
3.参考资料
1.http://blog.jobbole.com/75328/ 分布式消息系统:Kafka
2.http://blog.csdn.net/lizhitao/article/details/23744675
3.http://blog.csdn.net/opensure/article/details/46048589Kafka文件存储机制那些事
4.http://blog.csdn.net/liuao107329/article/details/70175691 Zookeeper在kafka中的应用
5.https://wenku.baidu.com/view/20781790b52acfc788ebc955.html
【Spark深入学习 -15】Spark Streaming前奏-Kafka初体验