0


Kafka消息失败后无限重复消费现象的排查

文章目录

背景

项目中用到了kafka消息队列,在开发测试过程中发现了消息端设置的最大重试次数失效的情况,具体信息如下:

  • consumer: 3
  • partition:1
  • maxRetryTimes:15
  • spring-kafka: 2.2.15.RELEASE
  • kafka-client: 2.0.1

相关代码

消费者config文件

@Configuration@EnableKafka@Slf4jpublicclassKafkaConsumerConfig{@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>demoContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String>
                factory =newConcurrentKafkaListenerContainerFactory<>();// 设置消费者工厂
        factory.setConsumerFactory(demoContainerFactory());// 消费者组中线程数量
        factory.setConcurrency(3);//  当使用批量监听器时需要设置为true
        factory.setBatchListener(false);// 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);// 最大重试次数3次SeekToCurrentErrorHandler seekToCurrentErrorHandler =newSeekToCurrentErrorHandler((consumerRecord, e)->{
            log.error("消费消息异常.抛弃这个消息,{}", consumerRecord.toString(), e);},3);
        factory.setErrorHandler(seekToCurrentErrorHandler);return factory;}

消费者业务代码

@Component@Slf4jpublicclassDemoSingleConsumer{@AutowiredprivateDemoHandler demoHandler;/**
     * 监听 topic 进行单条消费
     */@KafkaListener(topics ={KafkaConstants.TOPIC}, groupId =KafkaConstants.GROUPID,
            containerFactory ="demoContainerFactory", errorHandler ="listenErrorHandler")publicvoidkafkaListener(ConsumerRecord<String,String> message){
        log.info("消费消息开始 msg={}",JSONUtil.toJSONString(message.value()));SendMessage message =JSONUtil.parseObject(message.value(),ASendMessage.class);try{
            demoHandler.process(message);}catch(Throwable e){
            log.error("消息消费异常,messageBody={}",JSONObject.toJSONString(message.value()), e);}}

现象

上述代码的生产者启动后,手动给对应topic生产一个消息“clear”,由于定义的消息体是一个json,显然这次生产的消息不符合彼此的协议,因此会报如下的错。

message:clear
exception:Listener method 'public void com.demo.DemoConsumer.kafkaListener(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.apache.kafka.clients.consumer.Consumer<java.lang.String, java.lang.String>,java.lang.String,int)' threw exception; nested exception is com.alibaba.fastjson.JSONException: syntax error, expect{, actual error, pos 0, fastjson-version 1.2.72

上述报错我们是可以理解的,但是kafka消费者一直重复上述的消息,即毒丸问题。因为消费者反序列化是在

spring poll()

之前的,所以spring是没法处理的,即这个问题会一直存在。

在用户应用程序中无法处理毒药的影响很大。让我们来看看发生了什么:

  • 消费者应用程序正在使用Kafka主题。
  • 在某个时间点,应用程序无法对记录进行反序列化(遇到毒丸)。
  • 消费者不能处理毒丸。
  • 因为使用者偏移量没有向前移动,所以阻止了主题分区的使用。
  • 消费者将一次又一次地(非常迅速地)尝试反序列化记录,但是永远不会成功。因此,您的消费者应用程序将陷入一个无穷循环,尝试对失败的记录进行反序列化。
  • 对于每次失败,都会在您的日志文件中写入一行…糟糕!

现在我们已经知道重复消费的原因了,即消费者反序列化失败,接下来就来解决问题。

解决方法

解决问题最好的方法就是先查看官方文档,其次才是在各种论坛上搜索。这里先给出官网的方法,这里给出官网链接

When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.

官网给出的方法是给消费者设置

ErrorHandlingDeserializer

来处理反序列化时的异常,接下来是官网给出的配置。

...// other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE,"com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES,"com.example")returnnewDefaultKafkaConsumerFactory<>(props);

Reference

  1. kafkatemplate无法注入_kafka消费无限重试问题排查
  2. kafka专题:kafka的消息丢失、重复消费、消息积压等线上问题汇总及优化
  3. Kafka常见的导致重复消费原因和解决方案
  4. Kafka auto.offset.reset值详解
  5. Springboot整合Kafka-自动,手动提交偏移量
  6. Kafka在消费者反序列化时出现问题
  7. Apache Kafak如何处理消息反序列化失败等毒丸现象?
  8. Spring 整合Apache Kafka 处理事件流
标签: kafka java zookeeper

本文转载自: https://blog.csdn.net/EJoft/article/details/123114256
版权归原作者 EJoft 所有, 如有侵权,请联系我们删除。

“Kafka消息失败后无限重复消费现象的排查”的评论:

还没有评论