0


【中间件】RabbitMQ 自定义重试次数(针对同一模块不同消费者)

最近遇到了关于 RabbitMQ 的问题,打比方说:某个微服务模块中,RabbitMQ 的大部分消费者需要重试两次,而小部分消费者由于特殊原因并不需要进行重试。这就涉及到自定义重试次数的话题了,但在网上找了一圈没发现相关的,但是功夫不负有心人,最后还是解决了这个问题,接下来给大家分享一下~

1 默认配置重试次数

一般来说,关于 RabbitMQ 的重试次数是直接在配置文件中进行定义(比如 application.yml),那么所有的消费者都将遵循这个配置条件,比如 👇

  1. spring.application.name=spirng-boot-rabbitmq
  2. spring.rabbitmq.host=127.0.0.1
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=admin
  5. spring.rabbitmq.password=admin
  6. spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制
  7. spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重试次数
  8. spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔

该配置中的 max-attempts 决定了消费者的重试次数,不过有一点需求注意:max-attempts 指的是尝试次数,就是说最开始消费的那一次也是计算在内的,那么 max-attempts: 3 便是重试两次,另外一次是正常消费~

同时消费者遵循的是本模块的 **RabbitMQ **配置,并不会读取生产者的配置。打比方说,生产者模块配置重试 3 次,而消费者模块配置重试 1 次,那么生产者给消费者发送消息,消费者进行消费,如果触发了重试,消费者也只会重试一次,它只遵循消费者模块的配置!!

如上,默认配置重试次数就算完成了,但是并没有实现针对不同消费者的自定义重试功能,请继续看第二章内容。

2 自定义重试次数

以应用广泛的订阅模式为例,由于消费者和生产者配置不一,注意消费者和生产者不在同一模块!因此分开阐述:

2.1 消费者

主要配置是在消费者这!!

① 配置文件

对于消费者来说,该配置不仅起到了连接作用,同时也启动了重试机制,默认重试 2 次。

  1. spring.rabbitmq.host=127.0.0.1
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=admin
  5. spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制
  6. spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重试次数
  7. spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔

② 配置队列,绑定交换机

  1. package com.yinyu.consumer.rabbitmq;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
  9. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  10. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  11. import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
  12. import org.springframework.retry.interceptor.RetryOperationsInterceptor;
  13. @Configuration
  14. public class FanoutRabbitConfig {
  15. @Autowired
  16. private ConnectionFactory connectionFactory;
  17. //自定义工厂
  18. @Bean
  19. public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
  20. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  21. factory.setConnectionFactory(connectionFactory);
  22. factory.setMessageConverter(jsonMessageConverter());
  23. factory.setConcurrentConsumers(3);
  24. factory.setMaxConcurrentConsumers(10);
  25. factory.setAdviceChain(retries());
  26. return factory;
  27. }
  28. @Bean
  29. public RetryOperationsInterceptor retries() {
  30. return RetryInterceptorBuilder.stateless()
  31. .maxAttempts(1) //设置最大尝试次数为1(不重试)
  32. .backOffOptions(1000, 3.0, 10000)
  33. .recoverer(new RejectAndDontRequeueRecoverer()).build();
  34. }
  35. @Bean
  36. public MessageConverter jsonMessageConverter() {
  37. return new Jackson2JsonMessageConverter();
  38. }
  39. @Bean
  40. public Queue retryTest1() {
  41. return new Queue("yinyu.retryTest1");
  42. }
  43. @Bean
  44. public Queue retryTest2() {
  45. return new Queue("yinyu.retryTest2");
  46. }
  47. @Bean
  48. public Exchange topicExchange() {
  49. return new TopicExchange("yinyu");//交换机命名
  50. }
  51. //队列绑定交换机
  52. @Bean
  53. public List<Binding> allActivateBinding() {
  54. return Arrays.asList(BindingBuilder.bind(
  55. BindingBuilder.bind(retryTest1()).to(topicExchange()).with("yinyu.retryTest1").noargs(),
  56. BindingBuilder.bind(retryTest2()).to(topicExchange()).with("yinyu.retryTest2").noargs());
  57. }
  58. }

③ 消费者文件

用于接收消息,设置了一个对照组,一个自定义配置,一个默认配置

  1. package com.yinyu.consumer.rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class ReceiverA {
  7. @RabbitListener(queues = "yinyu.retryTest1", containerFactory = "listenerContainerFactory")
  8. public void retryReceiver1(Map<String,String> map) {
  9. log.info("retryTest 自定义配置开始, key: {}", map.get("key"));
  10. if (!Objects.equals(map.get("key"), "yinyu")){
  11. throw new RuntimeException("value 值匹配不准确,请重新进行请求!!");
  12. }
  13. log.info("retryTest 结束");
  14. }
  15. @RabbitListener(queues = "yinyu.retryTest2")
  16. public void retryReceiver2(Map<String,String> map) {
  17. log.info("retryTest 默认配置开始, key: {}", map.get("key"));
  18. if (!Objects.equals(map.get("key"), "yinyu")){
  19. throw new RuntimeException("value 值匹配不准确,请重新进行请求!!");
  20. }
  21. log.info("retryTest 结束");
  22. }
  23. }

2.2 生产者

生产者不需要过多的配置,它的作用是发送消息

① 配置文件

写在 application.properties 中,对于生产者来说奇起的是连接 **rabbitmq **作用,如果它是调用其他模块的消费者,那么这个重试配置是不起作用的。

  1. spring.rabbitmq.host=127.0.0.1
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=admin
  5. spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制
  6. spring.rabbitmq.listener.simple.retry.max-attempts=5 # 最大重试次数
  7. spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔

② 生产者文件

用于发送消息,

  1. package com.yinyu.producer.rabbitmq;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import java.text.SimpleDateFormat;
  6. import java.util.Date;
  7. @Component
  8. public class Sender {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. public void retryProducer1(){
  12. Map<String,String> map = new HashMap<>();
  13. map.put("key","yinyu自定义");
  14. rabbitTemplate.convertAndSend("yinyu", "yinyu.retryTest1", map);
  15. }
  16. public void retryProducer2(){
  17. Map<String,String> map = new HashMap<>();
  18. map.put("key","yinyu默认");
  19. rabbitTemplate.convertAndSend("yinyu", "yinyu.retryTest2", map);
  20. }
  21. }

③ 测试文件

  1. package com.yinyu.producer.rabbitmq;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. @RunWith(SpringRunner.class)
  8. @SpringBootTest
  9. public class SenderTest {
  10. @Autowired
  11. private Sender sender;
  12. //测试自定义配置
  13. @Test
  14. public void testCustomConfig() {
  15. sender.retryProducer1();
  16. }
  17. //测试默认配置
  18. @Test
  19. public void testDefaultConfig() {
  20. sender.retryProducer2();
  21. }
  22. }

2.3 启动测试文件

有条件的各位可以启动一下生产者的测试文件中这两个方法,最终结果:

  • retryProducer1 发送消息后,retryReceiver1 消费消息,虽然报错,但没有重试(遵循自定义配置)
  • retryProducer2 发送消息后,retryReceiver2 消费消息,报错且重试 4 次(遵循默认配置)

完美实现自定义重试次数的需求!!


本文转载自: https://blog.csdn.net/weixin_51407397/article/details/131358848
版权归原作者 尹煜 所有, 如有侵权,请联系我们删除。

“【中间件】RabbitMQ 自定义重试次数(针对同一模块不同消费者)”的评论:

还没有评论