RabbitMQ可靠性投递以及延迟队列
如何确保MQ消息的可靠性?
一、对于生产者可靠性
1.1 生产者重试机制
当生产者发送消息时,连接不上RabbitMQ或网络出现问题,可能会导致消息丢失,为了解决问题AMPQ提供了消息的重试机制。
配置yaml:
spring:rabbitmq:virtual-host: ITMQ
username: admin
password: root
host: localhost
port:5672connection-timeout: 1s # 连接时间1stemplate:retry:enabled:true# 开启重试机制initial-interval: 1000ms # 失败后的第一次等待时间multiplier:1# initial-interval * multiplier (等待时长倍数)max-attempts:2# 最大重试次数
spring.rabitmq.connection.timeout:1s #表示连接MQ的时常为1s
spring.rabbtmq.template.retry.enabled:true #表示开启mq的重试机制
spring.rabbtmq.template.retry.initial-interval:1000ms # 连接失败后的第一次等待时间1s
spring.rabbtmq.template.retry.multiplier:1 连接失败后下次等待的时长倍数
spring.rabbtmq.template.retry.max-attempts: 3 # 最大重试次数
此重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
1.2 生产者确认机制
当投递消息时候,消息丢失的情况:
- 生产者发送消息时连接不上RabbitMQ
- 生产者发送消息连接上RabbitMQ,但未找到
Exchange
- 生产者发送消息连接上RabbitMQ,找到
Exchange
,但未找到Queue
- 生产者发送消息连接上RabbitMQ,投递信息时,
MQ宕机
或进程异常
针对上述情况,MQ提供了生产者消息确认机制,
PublisherConfirm
和
Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回调。
在生产者中配置开启生产者确认:
spring:rabbitmq:virtual-host: ITMQ
username: admin
password: root
host: localhost
port:5672connection-timeout: 1s # 连接时间1stemplate:retry:enabled:true# 开启重试机制initial-interval: 1000ms # 失败后的第一次等待时间multiplier:1# 失败后下次等待的时常倍数max-attempts:3# 最大重试次数publisher-returns:true# 开启publisher return机制publisher-confirms:true# 开启publisher confirms机制
创建配置类RabbitMQConfig:
/**
* @Author: lfw
* @CreateTime: 2024-06-19 17:40
* @Description: TODO
*/@Configuration@Slf4jpublicclassRabbitConfig{@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 设置消息转换器
rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());// 无论是否发送 都强制调用 回调方法
rabbitTemplate.setMandatory(true);initTemplate(rabbitTemplate);return rabbitTemplate;}privatevoidinitTemplate(RabbitTemplate rabbitTemplate){
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){System.out.println("ConfirmCallback=========>"+"相关数据:"+ correlationData);System.out.println("ConfirmCallback=========>"+"确认情况:"+ ack);System.out.println("ConfirmCallback=========>"+"原因:"+ cause);}});
rabbitTemplate.setReturnCallback(newRabbitTemplate.ReturnCallback(){@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){System.out.println("ReturnCallback:=========>"+"消息:"+ message);System.out.println("ReturnCallback:=========>"+"回应码:"+ replyCode);System.out.println("ReturnCallback:=========>"+"回应信息:"+ replyText);System.out.println("ReturnCallback:=========>"+"交换机:"+ exchange);System.out.println("ReturnCallback:=========>"+"路由键:"+ routingKey);}});}}
经过测试,可以总结四种场景:
1、找不到
exchange
,触发
ConfirmCallback
2、找到
exchange
,没有找到
queue
,触发
ConfirmCallback
和
RetrunCallback
两个回调函数
3、
exchange
和
queue
都没找到,触发
ConfirmCallback
4、消息成功推送,则触发
ConfirmCallback
二、对于消费者可靠性
消费者处理消息结束后,应该向RabbitMQ发送一个
回执
,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack
:成功处理消息,RabbitMQ从队列中删除该消息nack
:消息处理失败,RabbitMQ需要再次投递消息reject
:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
当消费者中业务逻辑出错抛出异常等场景,需要我们自己处理消息,比如:重新投递消息或丢弃消息
消息的确认模式有三种:
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。(RabbitMQ默认消息模式)manual
:手动模式。需要自己在业务代码中调用ack
或reject
auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果: - 如果是业务异常,会自动返回nack
;- 如果是消息处理或校验异常,自动返回reject
;
下面进行演示这三种确认模式情况:
1、演示none确认模式:
在配置文件中添加如下配置:
spring.rabbitmq.listener.simple.acknowledge-mode=none
用注解的方式快速声明一个交换机和队列,向这个交换机发送一条消息,
一旦消费者接收到了消息,则会被立即删除。(自行断电调试)
2、演示auto确认模式:
在配置文件中添加如下配置:
spring.rabbitmq.listener.simple.acknowledge-mode=auto
上述我们修改一下代码,在业务代码中抛出一个异常和一个非业务异常
当业务代码中,抛出了业务异常,会自动重新投递消息。
如果是非业务异常(消息处理或校验异常),会删除该消息,不会再次投递。
3、演示manual确认模式:
在配置文件中添加如下配置:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
在创建一个交换机(这里用的是直连)和一个队列:
@ConfigurationpublicclassManualConfig{@BeanpublicQueuemQueue(){returnnewQueue("m.queue",true);}@BeanpublicDirectExchangemExchange(){returnnewDirectExchange("m.exchange");}@BeanpublicBindingmBinding(){returnBindingBuilder.bind(mQueue()).to(mExchange()).with("m");}}
在生产中发生一个消息给m.exchange:
@GetMapping("/test10")publicResult<Void>test10(){String uuid =UUID.randomUUID().toString().substring(0,5);// 给消息加上一个唯一idCorrelationData correlationData =newCorrelationData(uuid);
rabbitTemplate.convertAndSend("m.exchange","m","hello",correlationData);returnnewResult<Void>(200,"发送成功");}
在消费中,我们监听此队列,并调用相关API:
@RabbitListener(queues ={"m.queue"})publicvoidreceiveMsg(String msg,Message message,Channel channel)throwsIOException{try{System.out.println("收到m.queue队列的消息====> "+ msg);// 其他业务代码//int a = 1/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){System.err.println("消息消费失败");// 重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过
try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.
相关API详细参数说明:
1、void basicAck(
long deliveryTag, boolean multiple
) throws IOException;
long deliveryTag
:代表了条消息的唯一标识ID。
boolean multiple
: 是否批处理,一般为 false,当该参数为 true 时,则可以一次性确认deliveryTag小于等于传入值的所有消息。
2、void basicNack(
long deliveryTag, boolean multiple, boolean requeue
) throws IOException;
long deliveryTag
:代表了条消息的唯一标识ID。
boolean multiple
: 是否批处理,一般为 false,当该参数为 true 时,则可以一次性确认deliveryTag小于等于传入值的所有消息。
boolean requeue
: true重新投递,false删除消息
3、void basicReject(long deliveryTag, boolean requeue) throws IOException;
long deliveryTag
:代表了条消息的唯一标识ID。
boolean requeue
: true重新投递,false删除消息
reject和nack的区别是:reject只能拒绝单条消息,nack批量拒绝多条消息。
三、TTL过期时间
对于死信队列,先来陈述一下TTL过期时间。
TTL就是给队列或者消息加上过期时间,在过期时间内如果没有消费者消费则会被路由到死信队列中。
1、给队列创建TTL过期时间:
@BeanpublicQueuettlQueue(){HashMap<String,Object> map =newHashMap<>();
map.put("x-message-ttl",5000);//5000ms = 5 s // 设置过期时间,如果没有消费者消费,则会被移除returnnewQueue("ttl.queue",true,false,false, map);}@BeanpublicDirectExchangettlExchange(){returnnewDirectExchange("ttl.exchange");}@BeanpublicBindingttlBinding(){returnBindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}
当给ttl.exchange发送消息时,消息抵达到
ttl.queue
,如果此时在5s中没有消费者消费这一条消息,则此条消息就会被抛弃。
2、给消息加上TTL过期时间
@GetMapping("/test7")publicResult<Void>ttlMessage(){MessagePostProcessor messagePostProcessor =newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setExpiration("5000");// 消息过期为5s
message.getMessageProperties().setContentEncoding("UTF-8");return message;}};
rabbitTemplate.convertAndSend("test.exchange","test","hello,ttl");returnnewResult<Void>(200,"发送成功");}
通过MessagePostProcessor 对象,设置过期时间,如果该消息抵达到
queue
,如果无人消费这一条消息,则会被抛弃。
注意: 如果同时使用了这两种,则会根据最小的TTL来决定。比如说队列过期时间5s,消息过期时间10s,则在5s无人消费则消息被抛弃。
四、死信队列
DLX ,全称
Dead-Letter-Exchange
,可以被称为死信交换机。当消息在一个队列中变成死信后,它被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就被称之为
死信队列
。
要创建死信队列,需要声明参数
x-dead-letter-exchange
和
x-dead-letter-routing-key
(fanout类型的交换机不用声明)。其中,变成死信原因可能是因为:
- 1、消息过期
- 2、队列达到最大长度
- 3、消费者使用
basic.reject
或basic.nack
声明消费失败,x-dead-letter-exchange
: 指定死信交换机x-dead-letter-routing-key
:指定死信交换机和死信队列绑定的router-key
用Java代码实现(用消息过期演示):
@BeanpublicQueuettlQueue(){HashMap<String,Object> map =newHashMap<>();
map.put("x-message-ttl",5000);//5000ms = 5 s // 设置过期时间,如果没有消费者消费,则会被移除
map.put("x-dead-letter-exchange","dead.exchange");// 消息过期则路由到死信队列
map.put("x-dead-letter-routing-key","dead");// 消息过期则路由到死信队列,路由key deadreturnnewQueue("ttl.queue",true,false,false, map);}@BeanpublicDirectExchangettlExchange(){returnnewDirectExchange("ttl.exchange");}@BeanpublicBindingttlBinding(){returnBindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}// 死信队列@BeanpublicQueueDeadQueue(){returnnewQueue("dead.queue",true);}// 死信交换机@BeanpublicDirectExchangeDeadExchange(){returnnewDirectExchange("dead.exchange");}// 死信交换机和死信队列绑定@BeanpublicBindingDeadBinding(){returnBindingBuilder.bind(DeadQueue()).to(DeadExchange()).with("dead");}
向
ttl.exchange
发送消息,当消息过期时,就会传递到
dead.exchange
,最终路由到
dead.queue
。
五、下订单扣减库存实战
gitee地址:https://gitee.com/abandoned-and-pierced/springcloud-mq
版权归原作者 Peisi: 所有, 如有侵权,请联系我们删除。