0


spring设置kafka超时时间没有生效的解决方法(解决rebalancing问题)

一、前言

最近生产kafka遇到一个问题,总是隔几分钟就

rebalancing

,导致没有消费者、消息堆积;
平衡好后,正常消费消息几分钟后,就又开始

rebalancing

,消息再次堆积,一直循环。

登录kafka服务器,用命令查看kafka组:

//组名是commonGroup,java里设置的
./kafka-consumer-groups.sh --bootstrap-server 10.123.123.123:9092 --group commonGroup --describe

就会发现报错:

warning: Consumer group 'commonGroup' is rebalancing.

此时组里的所有topic都会没有消费者。

再查看消费者(java后台)的日志,会发现大量的rebalancing语句,与重新加入分组的语句:

//这个是心跳发送失败报错的日志,因为此时在rebalancing
2022-08-25 17:55:41.801 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=commonGroup] Attempt to heartbeat failed since group is rebalancing

//这个是重新加入分组的日志,重新加入了commonGroup组里的topic为examTake的第13个分区(生产topic分了14个区)
2022-08-30 16:29:27.434 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  o.s.kafka.listener.KafkaMessageListenerContainer - partitions assigned: [examTake-13]

这个现象会导致消息堆积2-3分钟,然后消息会统一被消费一波,然后继续堆积2-3分钟消息;
因为kafka不知道为什么总是rebalancing,每次平衡需要2-3分钟时间,此时没有消费者;
平衡好后,消息被消费者消费一波,就又开始rebalancing。

用户明显感觉到系统变慢,需要想办法解决这个问题。

二、可能的原因

百度发现,kafka rebalancing发生的情况,主要有这几种:

1.有消费者新增/减少

如果启动了新的java程序,增加了消费者、或者有消费者挂了,kafka就会重新平衡;

但是排查后发现,所有消费者日志打印正常,没有挂掉的,也没有新增消费者,所以不是这个问题。

2.有消费者在规定时间内未发送心跳包

spring里可以配置kafka的session超时时间(默认10秒):

spring.kafka.properties.session.timeout.ms = 10000

以及心跳包发送时间间隔(默认隔3秒发送一次):

spring.kafka.properties.heartbeat.interval.ms = 3000

如果有消费者在session规定时间内没有发送心跳包,kafka就会认为该消费者不可用,开始rebalancing。

但是排查后发现,项目里配置的超时时间是15秒,心跳包间隔时间没有配置(默认3秒),感觉不应该有消费者15秒内一次心跳包也发不出去(消费者日志打印正常,没有挂掉的),所以不确定是不是这个问题。

3.有消费者在规定时间内没有处理完消息

spring里可以配置消费者一次拉取的消息数(默认500,低版本kafka好像不支持修改):

spring.kafka.consumer.max-poll-records=500

以及消费消息的超时时间(默认5分钟):

spring.kafka.properties.max.poll.interval.ms=300000

如果有消费者在规定时间内没有处理完消息,那么也会引起kafka的rebalancing。

但是排查后发现,kafka里的待消费消息数很低时(几条-几十条),仍然会隔几分钟就rebalancing一次,然后消费者会很快把消息全部消费完,就算是这样kafka后续还是会rebalancing。这样看来也不是这个问题。

三、设置kafka超时时间没有生效的解决方法

1.问题描述

虽然感觉不像是这几个原因导致kafka反复重新平衡的,但是还是得尝试解决。

因此,按照网上的方法,在spring项目里的application.properties中进行了配置,增加了超时时间:

//心跳超时时间(session超时时间)增加成25秒(之前项目设置了15秒)
spring.kafka.properties.session.timeout.ms = 25000

//每次拉取的消息减少为20(之前是默认值500)
spring.kafka.consumer.max-poll-records=20

//消息消费超时时间增加为10分钟
spring.kafka.properties.max.poll.interval.ms=600000

但是配置了之后,启动项目,发现这些配置都没有生效,kafka打印的参数还是之前的:

max.poll.interval.ms = 300000
max.poll.records = 500
session.timeout.ms = 15000

尤其是

max.poll.records

参数,这个都可以点进jar包里了,不应该不生效的:
在这里插入图片描述
在这里插入图片描述

2.解决方法

(1)百度发现,低版本kafka好像不支持修改

max.poll.records

;不过目前项目中不是低版本kafka,应该是可以设置的;而且其它参数总是可以设置的,问题是不知道为什么没有生效。

(2)找了半天,发现项目中有一个KafkaConfig.java,其中部分配置为:

    @Value("${kafka.session.timeout.ms:15000}")
    private String sessionTimeout;

    @Value("${kafka.consumer.max.poll.records:500}")
    private String maxPollRecords;

    @Value("${kafka.max.poll.interval.ms:300000}")
    private String maxPollIntervalMs;

    @Value("${kafka.group.id:commonGroup}")
    private String groupId;
    
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //这个是组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        //这个是心跳(session)超时时间
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        //这个是每次拉取的消息数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //这个是消费消息的超时时间
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaIntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

(3)这下application.properties中配置了kafka参数没有生效的原因找到了,看来是java与application.properties中同时配置了kafka参数的话,会以java中配置的为准。

3.结果

修改java中的kafka配置后,启动日志显示配置生效了:

max.poll.interval.ms = 600000
max.poll.records = 20
session.timeout.ms = 25000

然而,项目用这个配置启动后,kafka反复rebalancing的状况还是没有好,并且rebalancing需要的时间更长了,从2-3分钟延长到了5-10分钟,消息积压时间延长、用户体验更差了。

四、kafka反复rebalancing最终解决方法

1.排查过程

反复排查了整个项目,情况如下:

(1)生产环境最近只发版了一个很小的功能,这个功能不会造成kafka反复rebalancing。

(2)生产环境发版后,有2天时间日志是正常的,kafka没有反复rebalancing,说明之前的kafka配置基本没有问题。

(3)第3天下午开始kafka出现了反复rebalancing问题,但是期间并没有发版,也不是用户访问量突然增多导致的。

(4)尝试调大kafka超时时间,但是没有作用。

(5)重启了kafka,也重启了所有消费者,但是反复rebalancing问题并没有好转。

2.最终解决方法

1.kafka重新平衡是按group的,具体来说就是commonGroup不知道哪里除了问题:

warning: Consumer group 'commonGroup' is rebalancing.

2.因此,决定把这个组里比较重要的几个topic移动出去,换到其它组(java里只需要改一行):

//这里没有显式配置组,用的是上方KafkaConfig.java里的commonGroup组
//@KafkaListener(topics = "${kafka.topic.commit}")

//改为了显式配置组,把这个topic移动到新组 commitGroup
@KafkaListener(topics = "${kafka.topic.commit}", groupId = "commitGroup")

3.把重要的topic移动出去、分到新组后,发现,新组里的topic工作正常,没有反复重新平衡;
旧组commonGroup依然有问题,隔一段时间就会rebalancing。

4.由于旧组里的topic不太重要,因此消费堆积2-3分钟的问题勉强可以接受;
由于旧组里的topic还有很多,因此暂时还没有排查出是哪个topic及其消费者有问题。

5.最后,这个问题就勉强算解决了,后续有时间后再继续研究为什么kafka会反复rebalancing。

五、备注

1.spring设置kafka参数session超时时间时,要小于请求超时时间与处理超时时间,例如:

request.timeout.ms = 30000  session.timeout.ms = 15000    max.poll.interval.ms = 300000

session.timeout.ms < request.timeout.ms

session.timeout.ms < max.poll.interval.ms

2.kafka的topic的分区,最好是有几个消费者、就创建几个分区,这样可以一一对应,一个消费者对应一个分区。

3.kafka的rebalancing是按group的,遇到rebalancing问题,可以把重要的topic移动到其它group里,试试能不能行;最好是一个topic一个group,这样可以快速定位是哪个topic出了问题。

标签: kafka spring java

本文转载自: https://blog.csdn.net/BHSZZY/article/details/126757295
版权归原作者 追逐梦想永不停 所有, 如有侵权,请联系我们删除。

“spring设置kafka超时时间没有生效的解决方法(解决rebalancing问题)”的评论:

还没有评论