首页 > 代码库 > Kafka系列之1—Kafka的总体认识

Kafka系列之1—Kafka的总体认识

Kafka的总体认识

1.非中心的架构模型

2.基于TCP的一套Kafka通信协议

3.消息中间件&存储系统

4.存储逻辑层的高并发保证

5.isr机制降低了保证分布式一致性的代价

1. 非中心的架构模型

我们知道,在分布式系统的架构类型里,既有主从式的架构,也有非中心式的架构,像hadoophbase都采用了主从式的架构模型,主从式的架构优点有很多,但是主从式下为了避免单点故障而采取的各种策略使得主从式架构的优点并不那么理想,kafka作为一个分布式的消息系统,它采用了非中心式的架构模型,每个节点都作为独立的ServerClient提供服务,在集群环境下,多个节点依赖zookeeper维护client在读写访问中的分布式一致性。

在早期0.8.2之前的kafka版本中,kafkazookeeper的依赖非常大,producerserverconsumer都非常依赖zookeeper,虽然zookeeper作为一个轻量级的文件系统(已经成为分布式服务的基础服务,用以提供分布式环境下的一致性),但是大量的与其交互仍然会导致一些性能问题和不稳定的方面,在0.8.2之后的改进中,通过将一些状态保持在kafka自身中,减少与zookeeper的大量交互,为读写提供了更稳定的实现。

2. 基于TCP的一套Kafka通信协议

2.1 概述

kafka的通信协议相当的简单,只有六类核心的客户端请求APIs

  • Metadata:描述当前可用的brokershostport信息,并给出每个broker上分配了哪些partitions

  • Send:发送messagesbroker

  • Fetch:从broker中获取messages,包括获取data、获取集群的元数据信息以及获取某个topicoffset信息;

  • Offsets:获取某个给定topic partition的可用offsets信息;

  • Offset Commit:提交consumer groupoffsets信息;

  • Offset Fetch:获取某个consumer groupoffsets信息集合。

 

这些都会在下面详细描述。另外,0.9版本的kafkaconsumerskafka connect支持一般的group management。这部分的Client Api包括五种requests

  • GroupCoordinator:定位当前consumer groupcoordinator

  • JoinGroup:加入一个consumer group,如果没有就创建一个;

  • SyncGroup:同步同一个group下的所有consumer状态(partition分配到consumer的分布情况);

  • Heartbeat:用来检测group中的成员的存活状态;

  • LeaveGroup:直接离开一个group

 

还有一些用于监控/管理 kafka集群的administrativeAPIs

  • DescribeGroups:用来检测当前的groups

  • ListGroups:列出broker中管理的groups

  • response格式

Request格式如下:

RequestMessage => ApiKey ApiVersionCorrelationId ClientId RequestMessage

 ApiKey => int16

 ApiVersion => int16

 CorrelationId => int32

 ClientId => string

 RequestMessage => MetadataRequest | ProduceRequest | FetchRequest |OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

Response格式如下:

Response => CorrelationIdResponseMessage

CorrelationId => int32

ResponseMessage => MetadataResponse |ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse |OffsetFetchResponse

 

ApiKey

这个int数值是用来表明是哪一种请求,KafkaApis根据这个值来调用相应的处理逻辑

ApiVersion

由于不同的Kafka版本支持的ApiVersion不同,因此要根据KafkaServer支持的ApiVersion来发送对应格式的Request

CorrelationId

客户端提供的一个整型值,在response中会原封不动的返回,它的作用主要是用来匹配clientserver之间的requestresponse

2.3 ApiKey

下面的列表是ApiKey的整型值对应的Request类:

API name

ApiKey Value

ProduceRequest

0

FetchRequest

1

OffsetRequest

2

MetadataRequest

3

Non-user facing control APIs

4-7

OffsetCommitRequest

8

OffsetFetchRequest

9

GroupCoordinatorRequest

10

JoinGroupRequest

11

HeartbeatRequest

12

LeaveGroupRequest

13

SyncGroupRequest

14

DescribeGroupsRequest

15

ListGroupsRequest

16

2.4Response中的Error Codes

Error

Code

重试

描述

NoError

0


没有错误,执行成功!

Unknown

-1


未知的server error

OffsetOutRange

1


请求的offset值超出了server端维护的对应topic/partitionoffset值(可以大于也可以小于)

InvalidMessage/CorruptMessage

2

YES

消息内容不能通过CRC校验

UnknownTopicOrPartition

3

YES

请求的topicpartition不再发往的broker

InvalidMessageSize

4


消息的大小为负值

LeaderNotAvailable

5

YES

请求发生在leader选举过程中时抛出这个异常,此时请求的partition没有leader无法读写

NotLeaderForPartition

6

YES

在客户端尝试向不是leaderreplica写入信息时抛出,意味着客户端的元数据信息过期了

RequestTimedOut

7

YES

request超过了用户指定的时间,一般是值socket超时

BrokerNotAvailable

8


这个错误不是client遇到的,往往发生在工具类的请求中

ReplicaNotAvailable

9


broker上没有期望的replica(可以被安全的忽视)

MessageSizeTooLarge

10


server有一个最大消息的配置,当clientserver端写入超过配置大小的message时抛出

StaleControllerEpochCode

11


brokerbroker通信时发生的内部错误

OffsetMetadataTooLargeCode

12


如果指定了一个大于配置的offset metadata大小的string

GroupLoadInProgressCode

14

YES

topic partitionleader发生变化后,新的leaderload offsets时,offset fetch request请求时抛出,或者在group  membership(例如heartbeat)的response中返回当coordinatorload group  metadata

GroupCoordinatorNotAvailableCode

15

YES

offsets topic还没创建或者group coordinator没有active

NotCoordinatorForGroupCode

16

YES

offset fetchcommit request的请求发往一个不是coordinator的节点

InvalidTopicCode

17


访问一个不可用的topic或者尝试对内部topic__consumer_offset)进行写入操作时

RecordListTooLargeCode

18


如果producemessage batch超过了配置的segment size

NotEnoughReplicasCode

19

YES

处于in-syncreplicas数量小于配置的produce要求的最小replicasrequiredAcks=-1

NotEnoughReplicasAfterAppendCode

20

YES

message被写入到log后,但是in-syncreplicas数小于需要的

InvalidRequiredAcksCode

21


请求的requiredAcks是不可获得的

IllegalGenerationCode

22


server端的generation idrequest中的generation id不一致

InconsistentGroupProtocolCode

23


当前group能够接受的protocol type中不包含join group时给出的protocol type

InvalidGroupIdCode

24


join groupgroupId为空或者null

UnknownMemberIdCode

25


当前generationgroup中不包含请求的memberId

InvalidSessionTimeoutCode

26


join group时超出了配置的request  session timeout

RebalanceInProgressCode

27


当请求发起时coodinator正在对group进行rebalance,此时client要重新join group

InvalidCommitOffsetSizeCode

28


offset commit超过metadata的最大限制被拒绝时

TopicAuthorizationFailedCode

29


client没有访问请求的topic的权限时

GroupAuthorizationFailedCode

30



ClusterAuthorizationFailedCode

31



 

kafka实现了基于tcp的一种通信协议,只要符合通信协议的规范,即可与kafka server进行通信,因而kafka client是跨语言的

3. 消息中间件&存储系统

kafka既可以被认为是消息中间件,也可以作为存储系统使用

由于kafka可以将producer发送的消息保存起来供consumer消费,因此既可以作为消息中间件使用,也可以作为存储系统来保存数据。

4. 存储逻辑层的高并发保证

kafka在存储逻辑层的设计为高吞吐量提供了可能

kafka存储数据的逻辑单元是partitionproducerconsumer的处理单元也是基于partition的,针对某个topic,可以有多个partition,而多个partition又可以分布在不同的节点上,从而在存储层保证了I/O的并发,为高吞吐量提供了可能。

5. isr机制降低了保证分布式一致性的代价

kafkaisr同步机制使得保证分布式一致性的代价大大降低

kafkaisr机制,允许isr中的replica和主副本之前有一定的差距,这样做保证了响应的及时性,另一方面,由于在isr层面没有考虑严格的分布式一致性,没有使用paxosleader选举策略,使得kafkaleader选举更加容易,没有严格的节点数要求的限制,只要有一个节点是isr中的,就不会丢数据。


本文出自 “数字科技” 博客,请务必保留此出处http://7639538.blog.51cto.com/7629538/1883631

Kafka系列之1—Kafka的总体认识