0


生产者确认机制

配置文件

  1. server.port=8081
  2. logging.level.com.chensir=debug
  3. #host
  4. spring.rabbitmq.host=121.40.100.66
  5. #默认5672
  6. spring.rabbitmq.port=5672
  7. #用户名
  8. spring.rabbitmq.username=guest
  9. #密码
  10. spring.rabbitmq.password=guest
  11. #连接到代理时用的虚拟主机
  12. spring.rabbitmq.virtual-host=/
  13. #每个消费者每次可最大处理的nack消息数量 默认是250个 可在服务界面看到;意思是每批投递的数量
  14. spring.rabbitmq.listener.simple.prefetch=1
  15. #表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
  16. spring.rabbitmq.listener.simple.acknowledge-mode=auto
  17. #spring.rabbitmq.listener.simple.acknowledge-mode=manual
  18. #是否开启自动重试 默认为false 不开启
  19. spring.rabbitmq.listener.simple.retry.enabled=true
  20. #最大重试次数
  21. spring.rabbitmq.listener.simple.retry.max-attempts=5
  22. #最大重试时间间隔
  23. spring.rabbitmq.listener.simple.retry.max-interval=20000ms
  24. #重试时间间隔
  25. spring.rabbitmq.listener.simple.retry.initial-interval=2000ms
  26. # 最大重试间隔*乘数
  27. #应用于上一重试间隔的乘数 第一次(重试时间间隔)2s 4s 8s 16s 32s 此处32s>20s 以后都以20s为间隔 总的次数为最大重试次数
  28. spring.rabbitmq.listener.simple.retry.multiplier=2
  29. #尝试次数超过上面的设置之后是否丢弃;默认是true(false不丢弃时需要写相应代码将消息加入死信队列中;与参数acknowledge-mode有关系)
  30. spring.rabbitmq.listener.simple.default-requeue-rejected=false
  31. # 下方配置为生产者确认机制的配置
  32. #mandatory为true开启强制消息投递,若消息未被路由至任何一个queue则回退消息到RabbitTemplate.ReturnCallback中的returnedMessage方法;
  33. spring.rabbitmq.template.mandatory = true
  34. #开启回调机制,交换机发布确认模式;交换机收到或者未收到消息,都会调用回调实现类的回调确认方法;
  35. spring.rabbitmq.publisher-confirm-type=correlated
  36. # 开启回退消息,当交换机无法将消息路由出去,便会将消息回退给生产者
  37. spring.rabbitmq.publisher-returns=true

生产者代码

  1. package com.chensir.provider;
  2. import cn.hutool.json.JSONUtil;
  3. import com.chensir.config.RabbitConfig;
  4. import com.chensir.model.User;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.core.ReturnedMessage;
  8. import org.springframework.amqp.rabbit.connection.CorrelationData;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Component;
  12. import javax.annotation.PostConstruct;
  13. import java.util.UUID;
  14. /**
  15. * 生产者
  16. */
  17. @Component
  18. @Slf4j
  19. public class DirectProviderOk implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  20. @Autowired
  21. private RabbitTemplate rabbitTemplate;
  22. //@PostConstruct 注解是专门数据初始化的注解, 只有其他组件注入后,初始化方法才会执行,
  23. @PostConstruct
  24. public void init() {
  25. rabbitTemplate.setConfirmCallback(this);
  26. rabbitTemplate.setReturnsCallback(this);
  27. }
  28. public void sendOk() {
  29. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  30. System.out.println();
  31. System.out.println("callbackSender UUID: " + correlationData.getId());
  32. User user = User.builder()
  33. .id(1)
  34. .sex("男")
  35. .name("chen")
  36. .build();
  37. String s = JSONUtil.toJsonStr(user);
  38. rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_KEY, RabbitConfig.ROUTING_KEY_03 + 1, s,
  39. m -> {
  40. return m;
  41. }, correlationData);
  42. }
  43. /**
  44. * 交换机确认回调方法
  45. * <p>
  46. * correlationData 保存回调消息的ID及相关信息
  47. * ack 表示交换机是否收到消息
  48. * cause 失败的原因, 成功为null
  49. */
  50. @Override
  51. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  52. String id = correlationData != null ? correlationData.getId() : "";
  53. if (ack) {
  54. log.info("交换机已经收到了ID为:{}的消息", id);
  55. } else {
  56. log.info("交换机还未收到ID为:{}的消息,由于原因:{}", id, cause);
  57. }
  58. }
  59. /**
  60. * 消息没被交换机成功投递到队列时回调
  61. * <p>
  62. * 可以在当消息传递过程中不可达目的地时将消息返回给生产者,只有不可达目的地的时候 才进行回退,也就是回调
  63. *
  64. * @param returnedMessage
  65. */
  66. @Override
  67. public void returnedMessage(ReturnedMessage returnedMessage) {
  68. log.info("消息{},被交换机:{}退回,原因是:{},路由key是:{}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
  69. }
  70. }

结果

未成功投递交换机回调:

  1. callbackSender UUID: 59ea9e71-41b6-4ee7-854d-b41820b47290
  2. 2023-08-29 18:52:06.655 ERROR 26064 --- [.40.100.52:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DirectExchange-011' in vhost '/', class-id=60, method-id=40)
  3. 2023-08-29 18:52:06.657 INFO 26064 --- [nectionFactory2] com.chensir.provider.DirectProviderOk : 交换机还未收到ID为:59ea9e71-41b6-4ee7-854d-b41820b47290的消息,由于原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DirectExchange-011' in vhost '/', class-id=60, method-id=40)

交换机未成功投递队列回调:

  1. callbackSender UUID: d989577a-3fc7-43f0-bc4b-337f8c0bf556
  2. 2023-08-29 18:53:14.217 INFO 9324 --- [nectionFactory1] com.chensir.provider.DirectProviderOk : 消息{"id":1,"name":"chen","sex":"男"},被交换机:DirectExchange-01退回,原因是:312,路由key是:NO_ROUTE
  3. 2023-08-29 18:53:14.219 INFO 9324 --- [nectionFactory2] com.chensir.provider.DirectProviderOk : 交换机已经收到了ID为:d989577a-3fc7-43f0-bc4b-337f8c0bf556的消息

本文转载自: https://blog.csdn.net/weixin_45326523/article/details/132568076
版权归原作者 骑着蜗牛打天下 所有, 如有侵权,请联系我们删除。

“生产者确认机制”的评论:

还没有评论