问题描述:
生成环境,程序监听kafka,消费数据后推送给外部接口。kafka中的数据,每个周期大概有170条消息,然后发现监听程序的日志会持续打印2个小时以上,有时时间更长。
问题分析:
因为数据量并不大,理论上应该很快就消费处理完毕,分析日志发现,有数据被重复消费,此时首先想到的原因就是 kafka中数据又被重新写入,但查看kafka后数据量还是170条 所以问题还是出在了消费端。
消费端重复消费的原因大概分为二个方面:
1、offset未提交,例如:消费者一次poll后,未消费完毕宕机或者重启,导致已消费消息的offset未及时提交;
2、消费者处理逻辑太慢,导致kafka触发re-blance,认为你消费失败了,重新将消息分配给其他消费者处理
因为我的程序并未重启,所有是判断是第二种原因,
仔细分析日志后发现每条消息的处理时间都比较长大概在50秒-2分钟,看了kafka的配置,max-poll-records是5000,max.poll.interval.ms(默认间隔时间为300s),也就是170条数据被一次拉去缓存,然后300s内肯定消费不完,也就不会去重新拉去消息。超出300s后kafka会认为此次消费失败了,会让消费者重新拉去消息,最终导致重复消费。
处理方案:
1、修改max-poll-records=10 (根据实际处理时间计算得出)
2、增加同消费者组 的消费者 (几个分区就创建几个消费者,增加消费者后可设置max-poll-records=10*消费者数)
3、修改处理逻辑,我的逻辑中主要耗时是查数据库,给相关表增加索引,
还有其他处理方案,
1、可以将处理逻辑改为异步,我没有这么做是因为考虑到后期数据量会比较大(上百万),改为异步后要把所有的数据缓存在线程池的任务缓存队列中,占用过多内存。
2、max.poll.interval.ms也可以修改长一些,但我没有修改,因为这些默认配置基本是经过开发团队测试评审出的最优配置,尽量不要改太多
还有一种处理方式是增加幂等性校验,就是判断数据是否已经消费过了,消费过则不处理。
版权归原作者 CodeChenc 所有, 如有侵权,请联系我们删除。