0


kafka-报错-The coordinator is not aware of this member

章节目录

问题描述

我在项目里把原来用着的 独立消费者 consumer-group-id 同时当做消费者组来消费分区信息,导致协调器找不到这个 consumer-group-id

2022-12-14 16:33:31.908 ERROR 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Offset commit failed on partition REBALANCE-ONE-TOPIC-1 at offset 13: The coordinator is not aware of this member.
2022-12-14 16:33:31.908  INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2022-12-14 16:33:31.908  INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 16:33:31.908  INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 16:33:31.909 ERROR 16020 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:157) ~[spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) [spring-kafka-2.8.9.jar:2.8.9]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_251]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_251]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_251]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1204) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1046) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1492) ~[kafka-clients-3.1.2.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3062) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3057) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:3043) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2835) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1329) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) [spring-kafka-2.8.9.jar:2.8.9]
    ... 4 common frames omitted

报错复现方式

注册两个测试 topic

@BeanpublicNewTopictestone(){returnTopicBuilder.name("test-topic-group").partitions(2).replicas(1).build();}@BeanpublicNewTopictesttwo(){returnTopicBuilder.name("test-topic-standalone").partitions(1).replicas(1).build();}

写一个消费者组

importlombok.extern.slf4j.Slf4j;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.annotation.PartitionOffset;importorg.springframework.kafka.annotation.TopicPartition;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassTestErrorConsumerListener{/**
     * 消费者组
     */@KafkaListener(groupId ="test-group", topicPartitions ={@TopicPartition(topic ="test-topic-group", partitions ="0", partitionOffsets =@PartitionOffset(partition ="*", initialOffset ="0"))})publicvoidone(String value){
        log.info("one:接收kafka消息:[{}]", value);}/**
     * 消费者组
     */@KafkaListener(groupId ="test-group", topicPartitions ={@TopicPartition(topic ="test-topic-group", partitions ="1", partitionOffsets =@PartitionOffset(partition ="*", initialOffset ="0"))})publicvoidtwo(String value){
        log.info("two:接收kafka消息:[{}]", value);}/**
     * 独立消费者
     */@KafkaListener(topics ="test-topic-standalone", groupId ="test-group")publicvoidthree(String value){
        log.info("three:接收kafka消息:[{}]", value);}}

启动项目,发送消息

this.template.send("test-topic-group", message);this.template.send("test-topic-standalone", message);

报错日志

2022-12-14 19:04:18.835  INFO 13416 --- [tainer#13-0-C-1] c.l.p.e.k.c.TestErrorConsumerListener    : one:接收kafka消息:[111111111]
2022-12-14 19:04:18.841  INFO 13416 --- [tainer#14-0-C-1] c.l.p.e.k.c.TestErrorConsumerListener    : three:接收kafka消息:[111111111]
2022-12-14 19:04:19.336  INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] Discovered group coordinator 192.168.136.136:9092 (id: 2147482646 rack: null)
2022-12-14 19:04:19.338 ERROR 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] Offset commit failed on partition test-topic-group-0 at offset 1: The coordinator is not aware of this member.
2022-12-14 19:04:19.338  INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2022-12-14 19:04:19.338  INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 19:04:19.338  INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 19:04:19.342 ERROR 13416 --- [tainer#13-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:157) ~[spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) [spring-kafka-2.8.9.jar:2.8.9]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_251]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_251]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_251]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1204) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1046) ~[kafka-clients-3.1.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1492) ~[kafka-clients-3.1.2.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3062) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3057) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:3043) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2835) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1329) [spring-kafka-2.8.9.jar:2.8.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) [spring-kafka-2.8.9.jar:2.8.9]
    ... 4 common frames omitted

查看kafka服务器的消费者状态

$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test-group --describe

GROUP           TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
test-group      test-topic-standalone 0          1               1               0               consumer-test-group-16-d1a0c068-a68d-456c-a958-97b44219ef1b /192.168.136.1  consumer-test-group-16
标签: kafka java 大数据

本文转载自: https://blog.csdn.net/lijiewen2017/article/details/128322053
版权归原作者 背风衣人 所有, 如有侵权,请联系我们删除。

“kafka-报错-The coordinator is not aware of this member”的评论:

还没有评论