文章目录
kafka客户端
分配分区策略
kafka中提供了消费者客户端参数
partition.assignment.strategy
来设置消费者与订阅主题之间的分区分配策略。默认情况此参数的值为RangeAssignor,此外还有两种分配策略:RoundRobinAssignor和StickyAssignor。消费者客户端参数可以配置多个分配策略,彼此之间以逗号分隔。
RangeAssignor分配策略
按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀的分配给所有消费者。对于每个主题,RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称的字典序进行排序,然后为每个消费者划分固定的分区范围,如果不够分配,那么字典序考前的消费者会被多分配一个分区。
RoundRobinAssignor分配策略
将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询的方式以此分配给每个消费者。如果同一个消费组内所有的订阅信息都是相同的,那么RoundRobinAssignor分配策略会是均匀的。如果同一个消费者内的消费者订阅的消息是不同的,那么在执行分区的时候就并不是完全的轮询分配,有可能导致分区分配得不均匀。
StickyAssignor分配策略
这种分配的目的是:1.分区的分配要尽可能均匀。2.分区的分配尽可能与上次分配的保持相同。1的优先级大于2.
消费者协调器和组协调器
如果消费者客户端中配置了多个分区分配策略,那么以哪个为准,多个消费者之间的分区分配是需要协同的,这一切都是交给消费者消费者协调器和组协调器来完成的。
消费者协调器和组协调器的概念是针对消费者客户端而言。
zookeeper集群中两个比较严重的问题:
- 羊群效应:zookeeper中一个被监听的节点变化,大量的water通知被发送到客户端,导致在通知期间的其它操作延迟,也有可能发生类似死锁的问题。
- 脑裂问题:消费者进行再均衡操作时每个消费者都与Zookeeper进行通信以判断消费者或broker变化的情况,由于zookeeper本身特性,可能导致再同一时刻各个消费者获取的状态不一致,这样会导致异常问题发生。
再均衡的原理
新的消费者客户端进行了重新设计,将全部消费组分成多个子集,每个消费者的子集在服务端对应一个GroupCoordinator对其进行管理。GroupCoordinator是kafka服务端用于管理消费组的组件。而消费者客户端的ConsumerCoordinator组件负责与GroupCoordinator进行交付。
ConsumerCoordinator与GroupCoordinator之间最重要的职责就是负责执行消费者在均衡的操作,包括前面提及的分区分配工作也是在均衡期间完成的。
会触发在均衡的操作:
- 有新的消费者加入消费组。
- 有消费者宕机下线,消费者并不一定需要真正下线,长时间GC、网络延迟导致消费者长期未向GroupCoordinator发送心跳。
- 有消费者主动退出消费组(LeaveGroup请求)
- 消费组对应的GroupCoordinator节点发生了变更。
- 消费组内所订阅的任意主题或者主题的分区发生了变化。
当有消费者加入消费组时,消费者、消费组及组协调器之间经历的截断。
- FIND_COORDINATOR:消费者需要确定它所属消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。如果消费者已经保存了与消费者对应的GroupCooridinator信息,并且正确建立了连接,则可以进入第二阶段。否则,向集群中某个节点(负载最小的节点)发送FindCoordinatorRequest请求来查找对应的GroupCoordinator。
- JOIN_GROUP:在成功找到消费组对应的GroupCoordinator之后就进入加入消费组的截断,消费者会发送JoinGroupRequest请求,并处理相应。 如果原有的消费者重新加入消费组,那么在真正发送JoinGroupRequest请求之前还要执行一些准备工作: 1. 如果消费者参数
enable.auto.commit
设置为true,自动提交位移功能,那么在请求加入消费组之前需向GroupCoordinator提交消费位移。阻塞执行。2. 如果消费者添加了自定义的再均衡监听器,那么此时会调用onPartitionsRevoked()方法再重新加入消费组之前实施自定义的规则逻辑,比如清除一些状态、提交位移。3. 重新加入消费组,之前和GroupCoordinator节点之间的心跳检测就不需要了,所以成功加入消费组之前需要禁止心跳检测的运作。
选举消费组中的leader
GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader。分两种情况,如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader。如果存在leader,取GroupCoordinator中保存消费者信息hashmap的第一个key对应的消费者元信息。
选举分区分配策略
每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者选举的分配策略中选举出一个彼此都信服的策略来进行整体的分区分配。这个分区分配的选举是根据消费组内各个消费者投票决定的。最终选举的分配策略基本上是被各个消费者支持最多的策略,具体选举过程:
- 收集各个消费者支持的所有分配策略,组成候选集。
- 每个消费者从候选集中找出第一个自身支持的策略,为这一策略投一票。
- 计算候选集中各个策略的选票数,选票最多的策略即为消费者的分配策略。
如果有的消费者不支持选出来的分配策略,会抛出异常,就是这个消费者没有配置这个分配策略就会不支持。
- SYNC_GROUP leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,通过GroupCoordinator进行转发同步分配方案。在第三阶段,也就是同步阶段,各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案。 服务端在收到消费者发送的SyncGroupRequest请求之后会交由GroupCoordinator来负责具体的逻辑处理。GroupCoordinator同样会先对SyncGroupRequest请求做合法性校验,在此之后会将从leader 消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入Kafka的__consumer_offsets主题中,最后发送响应给各个消费者以提供给各个消费者各自所属的分配方案
当消费者收到所属的分配方案之后会调用PartitionAssignor中的onAssignment()方法。随后再调用ConsumerRebalanceListener中的OnPartitionAssigned()方法。之后开启心跳任务,消费者定期向服务端的GroupCoordinator发送HeartbeatRequest来确定彼此在线。
- HeartBeat 进入这个截断,消费组中的所有消费者都进入正常工作状态。再正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了GroupCoordinator,并且GroupCoordinator将其保存到了Kafka内部的__consumer_offsets主题中,此时消费者可以通过OffsetFetchRequest请求获取上次提交的消费位移并从此处继续消费。
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,则整个会话就被判定为过期,GroupCoordinator 也会认为这个消费者已经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数heartbeat.interval.ms指定,默认值为3000,即3秒,这个参数必须比session.timeout.ms参数设定的值要小,一般情况下heartbeat.interval.ms的配置值不能超过session.timeout.ms配置值的1/3。这个参数可以调整得更低,以控制正常重新平衡的预期时间。
如果一个消费者发生崩溃,并停止读取消息,那么 GroupCoordinator 会等待一小段时间,确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。这个一小段时间由session.timeout.ms参数控制,该参数的配置值必须在broker端参数group.min.session.timeout.ms(默认值为 6000,即 6 秒)和 group.max.session.timeout.ms(默认值为300000,即5分钟)允许的范围内。还有一个参数 max.poll.interval.ms,它用来指定使用消费者组管理时 poll()方法调用之间的最大延迟,也就是消费者在获取更多消息之前可以空闲的时间量的上限
_consumer_offsets
位移提交的内容最终会保存到kafka的内部主题_consumer_offsets中。一般情况下,当集群中第一次由消费者消费消息时会自动创建主题_consumer_offsets,它的副本因子受
offsets.topic.replication.factor
约束,默认为3.分区数可以通过
offsets.topic.num.partitions
参数设置,默认为50.客户端提交消费位移使用OffsetCommitRequest请求实现。请求体中包含retention_time表示当前提交的消费位移所能保留的时长,不过对于消费者而言这个值保持为-1.表示按照broker端的配置
offsets.retention.minutes
来确定保留时长。默认为7天,超过这个时长后消费位移的消息就会被删除(使用墓碑消息或日志压缩策略)。
有些定时消费的任务在执行完某次消费任务之后保存了消费位移,之后隔了一段时间再次执行消费任务,如果这个间隔时间超过offsets.retention.minutes的配置值,那么原先的位移信息就会丢失,最后只能根据客户端参数 auto.offset.reset 来决定开始消费的位置,遇到这种情况时就需要根据实际情况来调配offsets.retention.minutes参数的值。
OffsetCommitRequest中的其余字段大抵也是按照分区的粒度来划分消费位移的。
如果有若干消费者消费了某个主题中的消息,并且也提交了相应的消费位移,那么再删除这个主题之后会一并将这些消费位移消息删除。
相关链接
- [1]kafka基础知识
- [2]kafka基础知识
- [3]kafka基础知识
- [4]kafka基础知识
- [5]kafka基础知识
- [6]kafka基础知识
版权归原作者 YEidlog 所有, 如有侵权,请联系我们删除。