众所周知,消息队列分为生产者、Broker、消费者
生产者
发送原理
消息发送涉及到两个线程,一个是主线程,另一个是sender线程。主线程创建一个双端队列RecordAccumulator,主线程将消息不断发送给双端队列,sender从双端队列拉取消息发给Broker。
具体流程:
想将外部数据传入Broker,先在main线程创建producer,一般没有拦截器,然后序列化。通过分区器,确定每个数据发送到哪个分区,实际上先发送到一个缓存队列中(内存中),队列大小默认32m,每批次是16k(黄色小框->内存池)。达到batchSize或者超等待时间,sender线程主动拉取数据发送,发送以节点的方式,通过selector发送。如果broker没有应答,最多同时缓存5个发送请求。
发送成功,就清理请求和双端队列的数据,失败就重试(默认int最大值,2^31-1)。
那么分区器是如何知道发往哪个分区的呢?有常见的3种策略DefaultPartitioner。1、生产者发送消息的时候,指定了分区,那就直接发往该分区。2、基于key的哈希,将消息的key通过hash算法,求的分区号,然后发送。3、如果既没有分区号,又没有key。那就采用轮询的方式选择一个分区发送。发满了就发下一个分区。
还可以通过继承Patitioner,重写patition方法。并让properties关联自定义分区器。
在应答机制中,
ack=0,速度最快,但丢失率高
ack=1,速度中等,leader虽然写入磁盘,但如果中途leader挂了,follower还没同步leader的数据,就会丢失数据。(用于日志)
ack=-1/all,速度最慢,要求leader和在部分follower写入磁盘后才应答。为了防止某个follwer突然挂了给不了leader响应,出现了一个动态的ISR(In-Sync Replicas,同步副本),它实现了如果follwer挂了,如果超时就踢出ISR。安全性最高。(用钱上)
最少一次要求ISR最小副本数量大于等于2。但是数据有可能重复(在ack=-1准备应答的一瞬间,leader挂了,那么生产者没收到ack,就重发数据,导致数据重复存入)
最多一次ack=0数据不重复,但是会丢失
那如何保证数据不丢失的情况下,数据不重复呢?幂等和事务。
幂等如何判断呢?<PID,Partition,SeqNumber>相不相同,PID是Kafka每次重启会分配新的,Partition分区号,SeqNumber是单调递增。可以保证单分区单会话内不重复。开启enable.idempotence(默认开启)
如果Kafka挂了再重启,还是会产生重复数据,那就需要事务。
事务如何判断呢?开启事务必须开启幂等。在broker中有事务协调器(处理事务)和存储事务信息的主题(用于将事务信息存储到磁盘)。
由图可知,broker0和broker1上都有事务,如何确定用哪个事务协调器呢?
默认50个存储事务的特殊主题的分区,事务来了后,需要生产者为事务提供 一个id(为了跨多个分区的事务),然后对50求模,计算该事务属于哪个分区(可以有partitio0、partition1...),假设就是TopicA-Partition0的分区。那么这个分区的Leader副本所在的broker里的事务协调器就是负责它的。
流程如下
确保了数据的正确和不重复,如何确保数据的顺序呢?
因为生产者在发送的Broker时,有可能会失败,失败就会重试,但是最多可以缓存5个请求,可能1,2两个请求成功了,3失败了,4成功了,然后3重试成功了。数据就变成1243,乱序了。
解决办法:
Broker
Zookeeper存储
first是主题
Broker的工作原理
首先是信息的注册。broker启动后,往zk的ids写入有哪些broker。然后每个Broker中都有Controller,哪个Controller先写入zk中的controller,哪个就是主导。由这个Broker来监听brokers的节点变化,例如leader、follower挂了。(如果Controller自己挂了,还是一样,重新抢,谁拿到谁就是主导)。然后Controller会觉得分区Leader的选举,ISR中存活,AR前面的优先。然后将leader和ISR的信息写到zk的topics/主题/partition/分区号。然后其他controller从那同步相关信息。
然后生产者往Leader的发送数据,follower主动和leader同步数据(通过log的方式,底层是一个个Segment,默认1G,查询根据index来检索)
如果leader挂了,Controller捕捉到后,从topics里拉取信息,获取最新ISR,然后根据一样的选举规则,选取新的leader。然后更新topics里的信息。
那么leader、follower故障了,里面同步的数据如何处理?
首先是followe挂了如何处理:
如果此时broker2挂了。就将它踢出ISR。broker0和broker1正常工作。
当然broker2有恢复的可能,当broker2重新上线后,先丢弃HW之后的数据,从HW开始同步。知道该follower2的LEO大于等于该分区的HW,才将他加入到ISR。
如果是leader挂了:
首先是从ISR中选一个新的leader。其他follower向leader的HW看起,将高于leader的HW的部分数据都删除,从新的leader同步数据。
那么这就涉及到原leader挂了,数据发送完了,如果是ack0的情况下,数据就丢失了。如果是ack1,原leader写入磁盘,其他follower同步了,数据也丢失了。如果是ack=-1的情况下,那么这个ack肯定是发不过去的,但有可能原leader已经写入磁盘,follower还没备份,生产者就会重新发送这个数据,当然重发了就可能导致乱序的问题,也涉及到采用事务或者幂等的问题。
Broker的数据是如何存储的?(难)
当文件超过过期时间(一般7天),就会采用删除或者压缩的方法。
删除(默认):log.cleanup.policy=delete。但是文件是按照追加的方式写入的,也就是说一个segment里,可能有些数据先进来,有些数据后进来。那么一个segment先进来的数据过期了,后进来的没过期,删不删呢?所以就统一按照最后进来的时间戳为这个segment的时间戳判断过不过期。还有按照大小log.retention.bytes来删除的,比如设置所有日志最大为8T,此时满了,就会把最早的给删除了。
压缩:log.cleanup.policy=compact。压缩方法:对于相同的key,不同的value,保留最新的value。
如何高效读写
1、分布式集群、分区技术、并行度高
2、读数据采用index稀疏矩阵,可以快速定位消费的数据
3、顺序写磁盘,生产的数据,追加写入log文件中,为顺序写。之所以快就是省去了大量磁头寻址的时间。
4、页缓存 + 零拷贝
消费者
Kafka消费者采用拉取消息的方式主动拉取,但是如果Broker里没有数据,那么消费者就会陷入循环,空拉。
初始化
既然消费者分组,每组有多个消费者,每个消费者怎么消费呢?
首先用这个消费者组的id对50(默认值和Broker的事务主题数一样)求模,值是多少就对应于那个分区。(和事务找对应的协调者一样)此时这个分区的coordinator就是消费者的负责人。
选出之后,消费者都发送请求,例如组名相同是cjd。求hash的时候都会发送到同一个coordinator中。coordinator再随机从消费者中选择一个作为Leader。然后将收集到的Topic信息发给Leader消费者。它会制定消费方案(谁消费几号分区/分区分配策略)。然后把计划发给coordinator,coordinator再将消费方案群发给其他消费者。如果消费者断开了,和coordinator没有心跳请求了,就会将这个消费者移除,并触发再平衡,或者消费者处理时间太长了,也认为挂了,依然触发再平衡。
工作原理
首先消费者要想工作,就要先创建一个消费者网络客户端,用于与Broker通讯。会设置最小抓取大小,超时时间和最大抓取大小。
然后就向Broker发送拉取请求。拉取的消息会放到一个消息队列。然后消费者从消息队列里拉取数据,默认一次拉取500条。拉取的数据经过反序列化、拦截器。
Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。
当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?
超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。
分区分配策略
有四种主流的:Range、RoundRobin、Sticky、CooperativeSticky
通过partition.assignment.strategy修改,默认策略是Range+CooperativeSticky。可以使用多个策略。
Range:针对一个主题。分区按照序号排序,消费者也按照顺序排序(底层编号)。分区数/消费者数,决定每个消费者消费几个分区。容易造成数据倾斜。
如果Consumer0突然挂了,45s内又有一批数据来到,那么其他消费者正常接收到对应分区的数据,012分区的数据会在45s后全部数据给到另一个消费者。如果45s后才有数据发送来,那么分区会再平衡,Consumer1会消费0123,Consumer2会消费456。
RoundRobin:针对所有Topic而言。将所有主题的所有分区和所有消费者进行排序。然后轮询分配分区给各个消费者。
如果Consumer0突然挂了,45s内又有一批数据来到,那么其他消费者正常接收到对应分区的数据,036分区的数据会在45s后全部数据按照轮询的方法给其他消费者。如果45s后才有数据发送来,那么分区会再平衡。
Sticky:粘性分区,执行一次新的分配之前,会考虑上一次的分配结果。
offset的维护位置
offset是存储在 _consumer_offsets主题里的。里面的结构是采用key和value的形式。key是group.id+topic+分区号,value就是offset的值。每个一段时间,kafka内部会对这个topic进行压缩,只保留每个key的最新value。
自动提交会导致重复提交,手动提交会导致漏提交。
重复消费就是,如果每隔5s提交,offset记录到2时,消费者挂了,重新连接上来的时候会从2继续开始消费,那就会导致重复消费3,4.
漏消费就是,如果手动提交,数据写入到消费者的时候,还没写入到磁盘,此时消费者挂了,但是offset将自己的位置提交了,下次消费的时候,消费者就从2开始,01的数据就丢失了。
那么如何解决呢?
事务:要做到完全数据一致。下游要支持事务,上游生产者也要使用事务。
数据积压
Kafka为什么快?
一般而言,离不开三点:网络、磁盘、复杂度
细分一点:网络就是网络模型、磁盘是顺序写、页缓存+零拷贝、复杂度比如压缩、批量、缓存
网络模型
底层还是NIO的,和Netty一样的Reactor线程模型
Reacotr 模型主要分为三个角色
Acceptor:负责监听来自客户端的请求连接。Acceptor线程通过Java NIO的Selector机制监听ServerSocketChannel上的请求连接事件,一旦有连接请求到来,就进行处理。
Processor:处理读写事件。当数据可读或可写时,Processor线程会进行相应的读写操作,并将请求放入RequestChannel的请求队列中。每个Processor线程都有自己的Selector和三个队列(newConnections、inflightResponse、responseQueue),用于管理连接和响应。
KafkaRequestHandler线程池:执行具体的业务逻辑处理。从RequestChannel的请求队列中获取请求,并调用KafkaApis进行业务逻辑处理。处理完成后,将响应结果放入对应Processor线程的responseQueue中。
磁盘
顺序写:减少了磁盘寻道和旋转的次数。具体实现就是每个Partition划分为多个segment,然后数据会追加到segment后。
零拷贝:避免在用户态和内核态来回拷贝数据。
页缓存:
复杂度
批量发送:发送消息端有batch.size、linger.ms、缓存队列。
压缩:经过分区器后可以压缩,Snappy、GZIP
分区并发:分区分配策略,也不是分区越多越好,分区越多log日志的index和数据文件越多。
版权归原作者 炒鸡蛋学Java 所有, 如有侵权,请联系我们删除。