0


Kafka消息能正常发送,但是无法消费问题排查

这里是小奏,觉得文章不错可以关注公众号小奏技术

kafka version

  • kafka_2.13-3.5.0

背景

线上的

kafka

集群要进行扩容,原先的2

broker

,扩容之后变成了新增3个

broker

,然后下掉了原先老的

broker

新集群看着没问题,但是出现了一个问题。

消息发送都正常,但是消息消费异常

排查

消费端打印log

消费者启动后一直打印如下关键log

2024-06-1409:54:23DEBUGo.a.k.c.c.i.ConsumerCoordinator:887-[Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca]ReceivedFindCoordinator response ClientResponse(receivedTimeMs=1718330063896, latencyMs=285, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=consumer-gid_xiaozou-1, correlationId=61, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='gid_xiaozou', nodeId=-1, host='', port=-1, errorCode=15, errorMessage='')]))2024-06-1409:54:23DEBUGo.a.k.c.c.i.ConsumerCoordinator:914-[Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca]Group coordinator lookup failed:2024-06-1409:54:23DEBUGo.a.k.c.c.i.ConsumerCoordinator:276-[Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca]Coordinator discovery failed, refreshing metadata
org.apache.kafka.common.errors.CoordinatorNotAvailableException:The coordinator is not available.

可以看到关键的log是

CoordinatorNotAvailableException: The coordinator is not available.

查看消费者状态

进入到

kafka

进去

bin

目录执行如下脚本

./kafka-consumer-groups.sh --bootstrap-server <bootstrap.servers>--describe--group<your-consumer-group-id>

输出如下信息

Error:Executing consumer group command failed due toorg.apache.kafka.common.errors.TimeoutException:Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094attempt(s)java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.TimeoutException:Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094attempt(s)
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:543)
        at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
        at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
        at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.map(JavaCollectionWrappers.scala:313)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:542)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:558)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:367)
        at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:72)
        at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
        at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)Caused by:org.apache.kafka.common.errors.TimeoutException:Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094attempt(s)Caused by:org.apache.kafka.common.errors.DisconnectException:CancelleddescribeGroups(api=FIND_COORDINATOR) request withcorrelation id 42096 due tonode6 being disconnected

看着是查询不到

comsumer group

的信息,然后报了

TimeoutException

异常,没有什么有用的信息

什么情况会出现 The coordinator is not available

网上有一种说法是如果

broker

配置文件中的

offsets.topic.replication.factor

必须小于等于

broker

的数量,否则会出现

The coordinator is not available

的问题。

比如

broker

的数量是3,

offsets.topic.replication.factor

是4,就会出现这个问题。

实际我这边是不会出现这个问题,因为我的

broker

数量是3,

offsets.topic.replication.factor

配置的是2

删除topic重建是否可行

删除

topic

也没用,还是会出现

The coordinator is not available

的问题

是否是__consumer_offsets出了问题

kafka

的消费位点主要通过

__consumer_offsets

这个topic来管理

由于本次为了迁移简单,同时线上的业务数据可以丢弃,所以并没有进行

topic

分区迁移,而是直接删除了

topic

,然后重建

topic

然后我们查看

__consumer_offsets

这个

topic

的分区情况

点进

topic

详情发现 partition 还是仅在老的

broker

中,即已经下掉的topic。这就导致新的

broker

没有

__consumer_offsets

这个

topic

解决方式

删除掉

__consumer_offsets

这个

topic

,然后系统会自动重建,就可以解决了。

如果想更平滑的方式也可以考虑对

__consumer_offsets

进行分区迁移,注意重建后使用原来的

comsumer group

好像消费还是会有问题,如果使用原来的

comsumer group

还是消费异常,换个

comsumer group

就好了

总结

如果kafka能正常发送消息,但是消费异常,一般是消费位点出现了问题,即管理消费位点

__consumer_offsets

的这个

toipc

目前来看新增了3个

broker

kafka并没有自动对

__consumer_offsets

进行分区迁移,需要手动进行迁移

所以后续出现消费相关的问题可以优先检查

__consumer_offsets

这个

topic

的情况,毕竟

kafka

得消费位点都依赖于

__consumer_offsets

这个topic

标签: kafka 分布式

本文转载自: https://blog.csdn.net/qq_42651904/article/details/139697463
版权归原作者 weihubeats 所有, 如有侵权,请联系我们删除。

“Kafka消息能正常发送,但是无法消费问题排查”的评论:

还没有评论