架构
一个典型的kafka集群包含若干Producer(可以是应用节点产生的消息,也可以是通过Flume收集日志产生的事件),若干个Broker(kafka支持水平扩展)、若干个Consumer Group,以及一个zookeeper集群。kafka通过zookeeper管理集群配置及服务协同。
Producer使用push模式将消息发布到broker,consumer通过监听使用pull模式从broker订阅并消费消息。
多个broker协同工作,producer和consumer部署在各个业务逻辑中。三者通过zookeeper管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。
图上有一个细节是和其他mq中间件不同的点,producer 发送消息到broker的过程是push,而consumer从broker消费消息的过程是pull,主动去拉数据。而不是broker把数据主动发送给consumer
producer配置信息
1.acks配置表示producer发送消息到broker上以后的确认值。有三个可选项
Ø 0:表示producer不需要等待broker的消息确认。这个选项时延最小但同时风险最大(因为当server宕机时,数据将会丢失)。
Ø 1:表示producer只需要获得kafka集群中的leader节点确认即可,这个选择时延较小同时确保了leader节点确认接收成功。
Ø all(-1):需要ISR中所有的Replica给予接收确认,速度最慢,安全性最高,但是由于ISR可能会缩小到仅包含一个Replica,所以设置参数为all并不能一定避免数据丢失
2.batch.size
生产者发送多个消息到broker上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是16384byte,也就是16kb,意味着当一批消息大小达到指定的batch.size的时候会统一发送
3.linger.ms
Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms就是为每次发送到broker的请求增加一些delay,以此来聚合更多的Message请求。 这个有点想TCP里面的Nagle算法,在TCP协议的传输中,为了减少大量小数据包的发送,采用了Nagle算法,也就是基于小包的等-停协议。
batch.size和linger.ms这两个参数是kafka性能优化的关键参数,很多同学会发现batch.size和linger.ms这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配置的时候,只要满足其中一个要求,就会发送请求到broker上
4.max.request.size
设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。
consumer配置信息
1.group.id
consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费.
如下图所示,分别有三个消费者,属于两个不同的group,那么对于firstTopic这个topic来说,这两个组的消费者都能同时消费这个topic中的消息,对于此事的架构来说,这个firstTopic就类似于ActiveMQ中的topic概念。如右图所示,如果3个消费者都属于同一个group,那么此事firstTopic就是一个Queue的概念
2.enable.auto.commit
消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到,还可以配合auto.commit.interval.ms控制自动提交的频率。
当然,我们也可以通过consumer.commitSync()的方式实现手动提交
3.auto.offset.reset
这个参数是针对新的groupid中的消费者而言的,当有新groupid的消费者 来消费指定的topic时,对于该参数的配置,会有不同的语义
auto.offset.reset = latest 情况下,新的消费者将会从其他消费者最后消费的offset处开始消费Topic下的消息
auto.offset.reset = earliest 情况下,新的消费者会从该topic最早的消息开始消费
auto.offset.reset = none 情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
4.max.poll.records
此设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔要处理的最大值。通过调整此值,可以减少poll间隔
topic和partition
Topic
在kafka中,topic是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到kafka集群的消息都有一个类别。物理上来说,不同的topic的消息是分开存储的,
每个topic可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。
Partition
每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的。
Topic&Partition的存储
Partition是以文件的形式存储在文件系统中,比如创建一个名为firstTopic的topic,其中有3个partition,那么在kafka的数据目录(/tmp/kafka-log)中就有3个目录,firstTopic-0~3
kafka消息分发策略
kafka消息分发策略
消息是kafka中最基本的数据单元,在kafka中,一条消息由key、value两部分构成,在发送一条消息时,我们可以指定这个key,那么producer会根据key和partition机制来判断当前这条消息应该发送并存储到哪个partition中。我们可以根据需要进行扩展producer的partition机制。
消息默认的分发机制
默认情况下,kafka采用的是hash取模的分区算法。如果Key为null,则会随机分配一个分区。这个随机是在这个参数”metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内,如果key为null,则只会发送到唯一的分区。这个值默认情况下是10分钟更新一次。
关于Metadata,这个之前没讲过,简单理解就是Topic/Partition和broker的映射关系,每一个topic的每一个partition,需要知道对应的broker列表是什么,leader是谁、follower是谁。这些信息都是存储在Metadata这个类里面。
消息的消费原理
在实际生产过程中,每个topic都会有多个partitions,多个partitions的好处在于,一方面能够对broker上的数据进行分片有效减少了消息的容量从而提升io性能。另外一方面,为了提高消费端的消费能力,一般会通过多个consumer去消费同一个topic,也就是消费端的负载均衡机制。
一个consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的consumer来消费。
同一个group中的消费者对于一个topic中的多个partition,存在一定的分区分配策略。在kafka中,存在两种分区分配策略,一种是Range(默认)、另一种另一种还是RoundRobin(轮询)。通过partition.assignment.strategy这个参数来设置。
Rangestrategy(范围分区)
Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假设我们有10个分区,3个消费者,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程,10 / 3 = 3,而且除不尽,那么消费者线程C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
C1-0 将消费0, 1, 2, 3 分区
C2-0 将消费4, 5, 6 分区
C3-0 将消费7, 8, 9 分区
可以看出,C1-0 消费者线程比其他消费者线程多消费了分区(主题越多越明显),这就是Range strategy的一个很明显的弊端。
RoundRobinstrategy(轮询分区)
轮询分区策略是把所有partition和所有consumer线程都列出来,然后按照hashcode进行排序。最后通过轮询算法分配partition给消费线程。如果所有consumer实例的订阅是相同的,那么partition会均匀分布。
在我们的例子里面,假如按照hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费T1-5, T1-2, T1-6 分区;
C1-1 将消费T1-3, T1-1, T1-9 分区;
C2-0 将消费T1-0, T1-4 分区;
C2-1 将消费T1-8, T1-7 分区;
使用轮询分区策略必须满足两个条件
1. 每个主题的消费者实例具有相同数量的流
2. 每个消费者订阅的主题必须是相同的
当出现以下几种情况时,kafka会进行一次分区分配操作,也就是kafkaconsumer的rebalance
1. 同一个consumergroup内新增了消费者
2. 消费者离开当前所属的consumergroup,比如主动停机或者宕机
3. topic新增了分区(也就是分区数量发生了变化)
kafkaconsuemr的rebalance机制规定了一个consumergroup下的所有consumer如何达成一致来分配订阅topic的每个分区。而具体如何执行分区策略,就是前面提到过的两种内置的分区策略。而kafka对于分配策略这块,提供了可插拔的实现方式,也就是说,除了这两种之外,我们还可以创建自己的分配机制。
谁来执行Rebalance以及管理consumer的group呢?
Kafka提供了一个角色:coordinator来执行对于consumer group的管理,Kafka提供了一个角色:coordinator来执行对于consumer group的管理,当consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信
如何确定coordinator
consumer group如何确定自己的coordinator是谁呢,消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator
JoinGroup的过程
在rebalance之前,需要保证coordinator是已经确定好了的,整个rebalance的过程分为两个步骤,Join和Sync
join: 表示加入到consumergroup中,在这一步中,所有的成员都会向coordinator发送joinGroup的请求。一旦所有成员都发送了joinGroup请求,那么coordinator会选择一个consumer担任leader角色,并把组成员信息和订阅信息发送消费者
完成分区分配之后,就进入了SynchronizingGroupState阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应的partition分配方案同步给consumergroup 中的所有consumer每个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配方案,其他消费者只是打打酱油而已。当leader把方案发给coordinator以后,coordinator会把结果设置到SyncGroupResponse中。这样所有成员都知道自己应该消费哪个分区。
保存消费端的消费位置
什么是offset
每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的;对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。
offset在哪里维护?
在kafka中,提供了一个__consumer_offsets_*的一个topic,把offset信息写入到这个topic中。__consumer_offsets——按保存了每个consumergroup某一时刻提交的offset信息。__consumer_offsets 默认有50个分区。
计算公式
  ✍ Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ;
消息的存储
消息的保存路径
首先我们需要了解的是,kafka是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个offset值来表示它在分区中的偏移量。Kafka中存储的一般都是海量的消息数据,为了避免日志文件过大,Log并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>比如创建一个名为firstTopic的topic,其中有3个partition,那么在kafka的数据目录(/tmp/kafka-log)中就有3个目录,firstTopic-0~3。
多个分区在集群中的分配
如果我们对于一个topic,在集群中创建多个partition,那么partition是如何分布的呢?
1.将所有N Broker和待分配的i个Partition排序
2.将第i个Partition分配到第(i mod n)个Broker上
消息写入的性能
为了规避随机读写带来的时间消耗,kafka采用顺序写的方式存储数据。即使是这样,但是频繁的I/O操作仍然会造成磁盘的性能瓶颈,所以kafka还有一个性能策略:零拷贝
消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过socket发送给消费者。虽然这个操作描述起来很简单,发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。
▪操作系统将数据从磁盘读入到内核空间的页缓存
▪应用程序将数据从内核空间读入到用户空间缓存中
▪应用程序将数据写回到内核空间到socket缓存中
▪操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到4次上下文切换以及4次数据复制,并且有两次复制操作是由CPU完成。但是这个过程中,数据完全没有进行变化,仅仅是从磁盘复制到网卡缓冲区。通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的同时也会减少上下文切换次数。
现代的unix操作系统提供一个优化的代码路径,用于将数据从页缓存传输到socket;在Linux中,是通过sendfile系统调用来完成的。Java提供了访问这个系统调用的方法:FileChannel.transferTo API。使用sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
消息的存储原理
kafka通过分段的方式将Log分为多个LogSegment。LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件。其中日志文件是用来记录消息的,索引文件是用来保存消息的索引。
LogSegment:
假设kafka以partition为最小存储单位,当kafka producer不断发送消息,必然会引起partition文件的无限扩张,这样对于消息文件的维护以及被消费消息的清理待来非常大的挑战。所以kafka以segment为单位又把partition进行细分。每个partition相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中(每个segment文件中的消息不一定相等),这种特性方便已经被消费的消息的清理,提高磁盘的利用率。
segment file由2大部分组成,分别为index file 和 data file,此两个文件一一对应,成对出现。后缀".index"、".log"分别表示为segment索引文件、数据文件。
segment文件命名规则:partiton全局的第一个segment从0开始,后续每个 segment文件名为上一个segment文件最后一条消息的offset值进行递增。数值最大为64位long大小,20位数字字符长度,没有数字用0填充。
在partition中如何通过offse查找message?
算法:
(1)根据offset的值,查找segment段中的index索引文件,由于索引文件命名是以上一个文件的最后一个offset进行命名的,所以,使用二分查找算法能够根据offset快速定位到指定的索引文件。
(2)找到索引文件后,根据offset进行定位,找到索引文件中的符合范围的索引(kafka采用稀疏索引的方式来提高查找性能)
(3)得到position以后,再到对应的log文件中,从position处开始查找对应的消息,将每条消息的offset与目标offset进行比较,直到找到消息
日志的清除策略以及压缩策略
日志清除策略:
日志的分段存储,一方面能够减少单个文件内容的大小,另一方面,方便kafka进行日志清理。日志的清理策略有两个:
(1)根据消息的保留时间,当消息在kafka中保存的时间超过了指定时间,就会触发清理过程
(2)根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka会启动一个后台线程吗,定期检查是否存在可以删除的消息
通过log.retention,bytes和log.retention.hours这两个参数来设置,当其中任意一个达到要求,都会执行删除,默认的保留时间是7天
日志压缩策略:
可以有效减少日志文件的大小,缓解磁盘紧张的情况。在很多实际场景中,消息的key和value的值之间的对应关系是不断变化的,就像数据库中的数据不会断被修改一样,消费者只关心key对应的最新的value,因此我们可以kafka的日志压缩功能,服务端会在后台启动clearner线程池,定期将相同的key进行合并,只保留最新的value值。
partition的高可用副本机制
kafka的每个topic都可以分为多个partition,并且多个partition会均匀分布在集群的各个节点下。虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说,都是单点的,当其中一个partition不可用的时候,那么这部分消息就没办法消费。kafka为了提高partition的可靠性而提供了副本的概念(replica),通过副本机制来实现冗余备份。
每个分区可以有多个副本,并在副本集合中会存在一个leader的副本,所有的读写请求都是由leader副本来进行处理。剩余的其他副本都作为follower副本,follower副本会从leader副本同步消息日志。这个有点类似zookeeper中的leader和follower的概念,但是具体的实现方式还是有较大差异。副本集存在一主多从的关系。
一般情况下,同一个分区的多个副本会被均匀分配到集群的不同broker上,当leader副本所在的broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提高kafka集群的可用性。
副本分配算法:
将所有的N Broker和待分配的i个Partition排序
将第i个Partition分配到第(i mod n)个broker上
将地i个Partition的第j个副本分配到第((i+j)mod n)个broker上
kafka提供了数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。kafka确保从同步副本列表选举一个副本为leader;leader负责维护和跟踪ISR(in-Sync replicas, 副本同步队列)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。
kafka副本机制中的几个概念:
kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同可分为3类:
leader副本:响应clients端读写请求的副本
follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求
ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本–如何判定是否与leader同步涉及每个卡发卡副本对象都有的两个重要属性:LEO和HW。
LEO:即日志末端位移(log and offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的。
HW:即提到的水位值。对于同一个副本对象而言,其HW不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的。同理, leader副本和follower副本的HW更新是有区别的。HW标记了一个特殊的offset,当消费者处理消息的时候,只能拉取到HW之前的消息,HW之后的消息对消费者来说不可见的。
副本协同机制:
消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及当leader副本所在的broker挂了以后,会从follower副本中选取新的leader。
写请求首先与leader副本处理,之后follower副本会从leader上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍。但是如果一个follower副本出现异常,那这个时候,leader就会把它踢出去。kafka通过ISR集合来维护一个分区副本信息。
ISR:
ISR表示目前“可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集”。ISR集合中的副本必须满足两个条件:
1.副本所在节点必须维持着与zookeeper的连接
2.副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值
一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样避免了部分数据被写进Leader,还没来得及被任何follower赋值宕机了,而造成数据丢失(Consumer无法消费这些消息)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过acks来设置。这种机制确保了只要ISR只要有一个或以上的follower,一条被commit的消息就不会丢失。
数据的同步过程:数据的处理过程:
Producer在发布消息到某个partiton时,先通过zookeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(即该Partition有多少个Replica),Producer只讲该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。
如何处理所有的Replicca不工作的情况:
在ISR中至少有一个follower时,kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了
1.等待ISR中的任一个Replica“活”过来,并且选它作为Leader
2.选择第一个活过来的Replica(不一定ISR中的)作为Leader
这就需要在可用性和一致性当中作出一个折中。
ISR的设计原理:
在分布式存储中,冗余备份是一种常见的设计方式,常用的模式有同步复制和异步复制。
如果采用同步复制,那么需要要求所有能工作的follower副本都复制完,这条消息才会被认为提交成功,一旦有一个follower副本出现故障,就会导致HW无法完成递增,消息就无法提交,消费者就获取不到消息。这种情况下,故障的Follower副本会拖慢整个系统的性能,甚至导致系统不可用。
如果采用异步复制,leader副本收到生产者推送的消息后,就认为这次消息提交成功。follower副本则异步从leader副本同步。这种设计虽然避免了同步复制的问题,但是假设所有follower副本的同步速度都比较慢,保存的信息量远远落后于leader副本。此时leader副本所在的broker突然宕机,则会重新选举新的leader副本,而新的leader副本中没有原来leader副本的消息,这就出现了消息的丢失。
kafka权衡了同步和异步两种策略,采用ISR集合,巧妙地解决了两种方案的缺陷:当follower副本延迟过高,leader副本则会把该follower副本踢出ISR集合,消息依然可以快速提交。当leader副本所在的broker突然宕机,会优先将ISR集合中的follower副本选举为leader,新leader副本包含了HW之前的全部消息,这样就避免了消息的丢失。
版权归原作者 Code Lisa 所有, 如有侵权,请联系我们删除。