1.Kafka的基础架构
Kafka像其他Mq一样,也有自己的基础架构,主要存在生产者Producer、Kafka集群Broker、消费者Consumer、注册消息Zookeeper.
Producer:消息生产者,向Kafka中发布消息的角色。
Consumer:消息消费者,即从Kafka中拉取消息消费的客户端。
Consumer Group:消费者组,消费者组则是一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费
Broker:经纪人,一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic。
Topic:主题,可以理解为一个队列,通常为一种,生产者和消费者都是面向一个Topic
Partition:分区,为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)
Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。
2.Kafka工作流程Kafka中消息是以topic进行分类的,Producer生产消息,Consumer消费消息,都是面向topic的。Topic是逻辑上的概念,Partition是物理上的概念,每个Partition对应着一个log文件,该log文件中存储的就是producer生产的数据,topic=N*partition;partition=logProducer生产的数据会被不断的追加到该log文件的末端,且每条数据都有自己的offset,consumer组中的每个consumer,都会实时记录自己消费到了哪个offset,以便出错恢复的时候,可以从上次的位置继续消费。生产者不断的向log文件追加消息文件,为了防止log文件过大导致定位效率低下,Kafka的log文件以1G为一个分界点,当.log文件大小超过1G的时候,此时会创建一个新的.log文件,.log中存放的是真实的数据。同时为了快速定位大文件中消息位置,Kafka采取了分片和索引的机制来加速定位。在kafka的存储log的地方,会存在消费的偏移量以及具体的分区信息,分区信息的话主要包括.index和.log文件组成,Segment 是 Kafka 文件存储的最小单位。Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用0填充。如 00000000000000368769.index 和 00000000000000368769.log。以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:Kafka索引文件.indexKafka的日志文件通常非常庞大,每条消息不是固定长度的,读取和处理可能会耗费大量时间和资源,为了提升读取和处理速度,Kafka为每个日志文件创建了两个索引文件,分别是偏移量索引文件(文件后缀".index")和时间戳索引文件(文件后缀".timeindex")。这两种索引文件都是稀疏索引,并不保证每个消息在索引文件中都有对应的索引项,因此可以大幅减少索引文件大小,从而实现索引文件的缓存加载,提升查询速度。- 偏移量索引文件".index"偏移量索引文件是用来建立消息偏移量offset到消息在日志文件存储的物理地址之间的映射关系,当写入的消息长度超过一定量(由参数指定)时,偏移量索引文件就会增加一个偏移量索引项,该索引项包括该消息的offset以及其在物理文件中的位置。由于日志文件名前缀为存储消息的baseoffset,当消费者想要读取消息时,先获取partition中的日志文件名列表顺序排序,根据消息的Offset(假设为x)使用二分法找到对应的日志文件,找到对应的日志文件之后,可以在对应偏移索引文件中通过二分查找来快速定位不大于x的最大索引条目项(假设其offset为y),并得到y在日志数据文件中存放的位置p,从p开始顺序扫描日志文件直到找到offset为x的那条消息。- 时间戳索引文件".timeindex"文件存储了消息的时间戳与消息的offset偏移量之间的映射关系,它根据时间戳将消息分片,并记录每片中最后一条消息的时间戳和对应的offset偏移量,用于按照时间顺序进行消息的快速查找。当 Kafka 写入的消息长度超过一定量(由参数指定)或新消息的时间戳和上一个索引条目的时间戳超过一定时长(参数指定),时间戳索引文件就会增加一个时间戳索引项。当需要查询指定时间戳的日志消息时,使用二分法先找到时间戳索引文件中不大于目标时间戳的最大索引项x,得到该索引项对应的偏移量y,再根据y查询偏移量索引文件去读取消息所在的日志文件位置p。
3.生产者#### 3.1同步发送与异步发送****同步发送:生产者发送消息后阻塞,收到ACK后继续发送,如果超过一定时间没有收到重新发送异步发送:生产者发送消息后就可以执行后续任务,broker在收到消息后异步调用生产者提供的回调方法#### 3.2生产者分区策略- 指明partition,直接将指明的值作为partition的值- 没有指明partition的情况下,但是存在值key,此时将key的hash值与topic的partition总数进行取余得到partition值- 值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。#### 3.3生产者ISR需要注意的的概念:- 分区中的所有副本统称为AR(Assigned Replicas)。- 所有Leader副本加上和Leader副本保持同步的Follower副本组成ISR(In-Sync Replicas)。- 所有没有保持同步的Follower副本组成OSR(Out-of-Sync Replicas)。- AR = ISR + OSR。正常情况下,所有Follower副本都应该和Leader副本一致,即AR=ISR。- 当Leader故障时,在ISR集合中的Follower才有资格被选举为新的Leader。#### 3.4HW和LEO在 Kafka 中,HW(High Watermark)和 LEO(Log End Offset)是与数据复制和消费有关的两个重要概念。- HW(High Watermark):HW 是指在分区中,已经被所有追随者(Follower)副本复制的消息的位置。HW 是每个分区的属性,它表示已经提交的消息。只有在 HW 之前的消息才被认为是已经提交的,这些消息已经被写入分区的所有追随者副本,并且被认为是安全的,不会丢失。HW 是为了确保数据一致性和可靠性而引入的。消费者能见到的最大的offset,ISR队列中最小的LEO- LEO(Log End Offset):LEO 是指在分区中当前最新消息的位置。LEO 表示分区日志中的最后一条消息的偏移量。LEO 包括已经被写入但尚未被所有追随者副本复制的消息,以及正在等待被写入的消息。LEO 是一个动态的属性,它会随着新消息的写入而逐渐增加。HW 和 LEO 之间的关系非常重要,它们可以帮助确保数据的可靠性和一致性:HW 之前的消息是已经提交的消息,它们在数据复制中是安全的,不会丢失。LEO 之前的消息是已经写入但尚未被所有追随者副本复制的消息。这些消息可能会在 HW 之前被提交,也可能会在之后被提交。一旦 HW 追赶上 LEO,表示所有的消息都已经提交,分区的数据一致性得到了保障#### 3.5follower故障和leader故障follower故障:follower发生故障后会被临时踢出ISR,等待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等待该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。leader故障:leader发生故障之后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据的一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复#### 3.6生产者ack机制对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没有必要等到ISR中所有的follower全部接受成功。Kafka为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡选择不同的配置。ack参数配置- ack=0:producer不等待broker的ack,这一操作提供了最低的延迟,broker接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据- ack=1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)- ack=-1(all):producer等待broker的ack,partition的leader和ISR的follower全部落盘成功才返回ack,但是如果在follower同步完成后,broker发送ack之前,如果leader发生故障,会造成数据重复。(这里的数据重复不是因为没有收到,所以继续重发导致的数据重复) 可以设置follower同步个数#### 3.7发送缓冲区- kafka不会来一条消息就发送,kafka会默认创建一个消息缓冲区,用来存放要发送的消息,默认32M- kafka会在缓冲区每次拉取16k数据发送到broker- 如果数据不足16k每隔10ms发送一次数据缓冲区大小,每次拉取数据与时间间隔可配置#### 3.8消费者参数配置### 4.消费者#### 4.1单播与多播- 单播消息:同一个partion,一个消费者组只有一个消费者可以消费- 多播消息:同一个partion,可以供不同消费者组消费#### 4.2消费者长轮询默认情况消费者每次poll500条消息,可以设置长轮询时间如1s,在时间范围内直到poll到500条消息,或者到达1s时间,消费者开始消费可以设置两次poll的最大时间间隔,默认30s,两次poll超过30s,集群会认为消费者消费能力弱,踢出该消费者,触发rebalance,重新分配消费主题#### 4.3自动提交与手动提交- 自动提交:消费者poll到消息后会自动向broker的)consumer_offsets主题提交当前主题-分区消费的偏移量。如果poll下来,提交offset会出现消息丢失- 手动同步提交:消费完消息后调用同步提交方法,broker返回ack之前阻塞。- 手动异步提交:消费完消息后,直接执行后续业务,提供回调方法供broker调用#### 4.4消费者分区分配策略一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由那个consumer消费的问题。Kafka的两种分配策略- round-robin循环RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。三个消费者都订阅了ABC的情况:对所有主题进行排序(TopicA-P0,TopicA-P1,TopicA-P2,TopicB-P0,TopicB-P1,TopicB-P2,TopicB-P3,TopicC-P0,TopicC-P1,TopicC-P2),对消费者排序(Consumer0,Consumer1,Consumer2),分配流程轮训分配结果如下步骤主题分区消费者1TopicA-P0Consumer02TopicA-P1Consumer13TopicA-P2Consumer24TopicB-P0Consumer05TopicB-P1Consumer16TopicB-P2Consumer27TopicB-P3Consumer08TopicC-P0Consumer19TopicC-P1Consumer210TopicC-P2Consumer0消费者0订阅AB消费者1订阅BC的情况下,分配流程及结果如下, 步骤2,按照排序结果应该分配TopicA-P1,由于Consumer1没有订阅topicA,只能分配topicB给Consumer1
步骤7,由于只剩topic-C,只能分配给Consumer1
步骤主题分区消费者1TopicA-P0Consumer02TopicB-P0Consumer13TopicA-P1Consumer04TopicB-P1Consumer15TopicA-P2Consumer06TopicB-P2Consumer17TopicC-P0Consumer18TopicC-P1Consumer19TopicC-P2Consumer1- rangeRange对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序,之后尽量均衡的将分区分配给消费者。这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。** 示例1** TopicATopicA-P0TopicA-P1TopicA-P2Consumer0Consumer1Consumer2 TopicB TopicB-P0 TopicB-P1 TopicB-P2 TopicB-P3Consumer0Consumer1Consumer2Consumer0TopicC TopicC-P0 TopicC-P1 TopicC-P2 Consumer0Consumer1Consumer2** 示例2** TopicATopicA-P0TopicA-P1TopicA-P2Consumer0Consumer1Consumer0 TopicB TopicB-P0 TopicB-P1 TopicB-P2 Consumer0Consumer1Consumer0 TopicC TopicC-P0 TopicC-P1 TopicC-P2 Consumer0Consumer1Consumer0
4.5 消费者参数配置## 5.kafka常见问题#### 5.1kafka中的幂等性(不重复)将服务器的ACK级别设置为-1(all),可以保证producer到Server之间不会丢失数据。将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次。ack=all可以保证数据不丢失,但是不能保证数据不重复,而ack=0可以保证数据不重复,但是不能保证数据不丢失,对于重要的数据,则要求数据不重复也不丢失,即Exactly Once即精确的一次。在0.11版本的Kafka之前,只能保证数据不丢失,在下游对数据的重复进行去重操作,多余多个下游应用的情况,则分别进行全局去重,对性能有很大影响。0.11版本的kafka,引入了一项重大特性:幂等性,幂等性指代Producer不论向Server发送了多少次重复数据,Server端都只会持久化一条数据。启用幂等性,即在Producer的参数中设置enable.idempotence=true即可,Kafka的幂等性实现实际是将之前的去重操作放在了数据上游来做,开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个Partition的消息会附带Sequence Number,而Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息的时候,Broker只会持久化一条。但PID在重启之后会发生变化,同时不同的Partition也具有不同的主键,所以无法保证跨分区跨会话幂等性。其他方法保证幂等性- 写数据库业务判断唯一性- 写 Redis,天然幂等性。##### 5.2kafka可靠性(不丢失)- 消费端弄丢了数据唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。- kafka丢失数据这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。所以此时一般是要求起码设置如下 4 个参数:- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。- 在 Kafka 服务端设置min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。- 在 producer 端设置acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。- 在 producer 端设置retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。- 生产者丢失数据设置ack=all##### 5.3kafka保证消费顺序- 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。- 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性#### 5.4消息积压业务涉及考虑过载场景,如监控消息时间,很久之前的消息直接跳过消费者异步处理增加分区,扩容消费者#### 5.5使用消息队列的好处(目的)1. 异步2. 削峰3. 限流## 参考看完这篇Kafka,你也许就会了Kafka-CSDN博客
Kafka【入门】就这一篇! - 知乎 (zhihu.com)
kafka的消费者分区分配策略_kafka分区分配策略-CSDN博客GitHub - shishan100/Java-Interview-Advanced: 中华石杉--互联网Java进阶面试训练营
版权归原作者 weixin_47542905 所有, 如有侵权,请联系我们删除。