0


Kafka 消费端消费重试和死信队列

系列文章目录


文章目录


前言

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。
在这里插入图片描述


Spring-Kafka 提供消费重试的机制。当消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 重新消费消息 。

默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。

Spring-Kafka 封装了消费重试和死信队列, 将正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

我们在应用中可以对死信队列中的消息进行监控重发,来使得消费者实例再次进行消费,消费端需要做幂等性的处理。
在这里插入图片描述
引入POM依赖

  1. <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.6</version></dependency>

YML配置文件

这里Kafka也安装到了本地,Kafka安装Windows版:http://www.javacui.com/tool/667.html

  1. # Web配置
  2. server:
  3. servlet:
  4. context-path: /
  5. port: 1088
  6. # kafka 配置
  7. spring:
  8. kafka:
  9. bootstrap-servers: 127.0.0.1:9092
  10. consumer:
  11. enable-auto-commit: true # 是否自动提交offset
  12. auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
  13. # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  14. # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  15. # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  16. auto-offset-reset: latest
  17. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  18. value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  19. # 表示接受反序列化任意的类,也可限定包路径
  20. properties:
  21. spring:
  22. json:
  23. trusted:
  24. packages: '*'
  25. producer:
  26. retries: 0 # 重试次数
  27. # 0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
  28. # 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;(数据要求快,重要性不高时用)
  29. # -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,数据一般不会丢失,延迟时间长但是可靠性高。(数据重要时用)
  30. acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1,0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。)
  31. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  32. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  33. listener:
  34. missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错

配置ErrorHandler,用于定制重试次数和间隔时间

  1. packagecom.example.springboot.config.kafka;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Primary;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.listener.ConsumerRecordRecoverer;importorg.springframework.kafka.listener.DeadLetterPublishingRecoverer;importorg.springframework.kafka.listener.ErrorHandler;importorg.springframework.kafka.listener.SeekToCurrentErrorHandler;importorg.springframework.util.backoff.BackOff;importorg.springframework.util.backoff.FixedBackOff;/**
  2. * Spring-Kafka 通过实现自定义的 SeekToCurrentErrorHandler ,当 Consumer 消费消息异常的时候,进行拦截处理:
  3. * 重试小于最大次数时,重新投递该消息给 Consumer
  4. * 重试到达最大次数时,如果Consumer 还是消费失败时,该消息就会发送到死信队列。 死信队列的 命名规则为: 原有 Topic + .DLT 后缀 = 其死信队列的 Topic
  5. */@ConfigurationpublicclassKafkaConfiguration{
  6. privateLogger logger =LoggerFactory.getLogger(getClass());@Bean@PrimarypublicErrorHandlerkafkaErrorHandler(KafkaTemplate<?,?> template){
  7. logger.warn("kafkaErrorHandler begin to Handle");// <1> 创建 DeadLetterPublishingRecoverer 对象ConsumerRecordRecoverer recoverer =newDeadLetterPublishingRecoverer(template);// <2> 创建 FixedBackOff 对象 设置重试间隔 10秒 次数为 1 次// 创建 DeadLetterPublishingRecoverer 对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。// 注意,正常发送 1 次,重试 1 次,等于一共 2 次BackOff backOff =newFixedBackOff(10*1000L,1L);// <3> 创建 SeekToCurrentErrorHandler 对象returnnewSeekToCurrentErrorHandler(recoverer, backOff);}}

编写生产者代码

  1. packagecom.example.springboot.controller;importcn.hutool.core.date.DateUtil;importcom.example.springboot.model.Blog;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjava.util.Date;@RestController@RequestMapping("/test/kafka")publicclassMessSendController{
  2. @AutowiredprivateKafkaTemplate kafkaTemplate;privatestaticfinalString messTopic ="test";@RequestMapping("/send")publicStringsendMess(){
  3. Blog blog =Blog.builder().id(1).name("测试").isDel(false).birthday(newDate()).build();
  4. kafkaTemplate.send(messTopic, blog);System.out.println("客户端 消息发送完成");returnDateUtil.now();}}

这里请求路径:http://localhost:1088/test/kafka/send 时,会发送一条消息到队列。

编写消费端代码

  1. packagecom.example.springboot.listener;importcom.alibaba.fastjson.JSON;importcom.example.springboot.model.Blog;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassKafkaMessListener{
  2. privatestaticfinalString messTopic ="test";@KafkaListener(id="KafkaMessLis
标签: kafka 分布式

本文转载自: https://blog.csdn.net/pleaseprintf/article/details/137611745
版权归原作者 Java毕设王 所有, 如有侵权,请联系我们删除。

“Kafka 消费端消费重试和死信队列”的评论:

还没有评论