0


RabbitMQ生产者和消费者可靠性机制、延迟队列(含下订单实战)

RabbitMQ可靠性投递以及延迟队列

如何确保MQ消息的可靠性?

一、对于生产者可靠性

1.1 生产者重试机制

当生产者发送消息时,连接不上RabbitMQ或网络出现问题,可能会导致消息丢失,为了解决问题AMPQ提供了消息的重试机制。
配置yaml:

spring:rabbitmq:virtual-host: ITMQ
    username: admin
    password: root
    host: localhost
    port:5672connection-timeout: 1s # 连接时间1stemplate:retry:enabled:true# 开启重试机制initial-interval: 1000ms # 失败后的第一次等待时间multiplier:1# initial-interval * multiplier (等待时长倍数)max-attempts:2# 最大重试次数
spring.rabitmq.connection.timeout:1s  #表示连接MQ的时常为1s
spring.rabbtmq.template.retry.enabled:true #表示开启mq的重试机制
spring.rabbtmq.template.retry.initial-interval:1000ms # 连接失败后的第一次等待时间1s
spring.rabbtmq.template.retry.multiplier:1 连接失败后下次等待的时长倍数
spring.rabbtmq.template.retry.max-attempts: 3 # 最大重试次数

此重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

1.2 生产者确认机制

在这里插入图片描述
当投递消息时候,消息丢失的情况:

  • 生产者发送消息时连接不上RabbitMQ
  • 生产者发送消息连接上RabbitMQ,但未找到Exchange
  • 生产者发送消息连接上RabbitMQ,找到Exchange,但未找到Queue
  • 生产者发送消息连接上RabbitMQ,投递信息时,MQ宕机进程异常

针对上述情况,MQ提供了生产者消息确认机制,

PublisherConfirm

Publisher Return

两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回调

在生产者中配置开启生产者确认:

spring:rabbitmq:virtual-host: ITMQ
    username: admin
    password: root
    host: localhost
    port:5672connection-timeout: 1s # 连接时间1stemplate:retry:enabled:true# 开启重试机制initial-interval: 1000ms # 失败后的第一次等待时间multiplier:1# 失败后下次等待的时常倍数max-attempts:3# 最大重试次数publisher-returns:true# 开启publisher return机制publisher-confirms:true# 开启publisher confirms机制

创建配置类RabbitMQConfig:

/**
 * @Author: lfw
 * @CreateTime: 2024-06-19  17:40
 * @Description: TODO
 */@Configuration@Slf4jpublicclassRabbitConfig{@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 设置消息转换器
        rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());// 无论是否发送 都强制调用 回调方法
        rabbitTemplate.setMandatory(true);initTemplate(rabbitTemplate);return rabbitTemplate;}privatevoidinitTemplate(RabbitTemplate rabbitTemplate){
        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){System.out.println("ConfirmCallback=========>"+"相关数据:"+ correlationData);System.out.println("ConfirmCallback=========>"+"确认情况:"+ ack);System.out.println("ConfirmCallback=========>"+"原因:"+ cause);}});
        rabbitTemplate.setReturnCallback(newRabbitTemplate.ReturnCallback(){@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){System.out.println("ReturnCallback:=========>"+"消息:"+ message);System.out.println("ReturnCallback:=========>"+"回应码:"+ replyCode);System.out.println("ReturnCallback:=========>"+"回应信息:"+ replyText);System.out.println("ReturnCallback:=========>"+"交换机:"+ exchange);System.out.println("ReturnCallback:=========>"+"路由键:"+ routingKey);}});}}

经过测试,可以总结四种场景:
1、找不到

exchange

,触发

ConfirmCallback

2、找到

exchange

,没有找到

queue

,触发

ConfirmCallback

RetrunCallback

两个回调函数
3、

exchange

queue

都没找到,触发

ConfirmCallback

4、消息成功推送,则触发

ConfirmCallback

二、对于消费者可靠性

消费者处理消息结束后,应该向RabbitMQ发送一个

回执

,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

当消费者中业务逻辑出错抛出异常等场景,需要我们自己处理消息,比如:重新投递消息或丢弃消息
消息的确认模式有三种:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。(RabbitMQ默认消息模式)
  • manual:手动模式。需要自己在业务代码中调用ackreject
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果: - 如果是业务异常,会自动返回nack;- 如果是消息处理或校验异常,自动返回reject;

下面进行演示这三种确认模式情况:
1、演示none确认模式:
在配置文件中添加如下配置:

spring.rabbitmq.listener.simple.acknowledge-mode=none

用注解的方式快速声明一个交换机和队列,向这个交换机发送一条消息,
一旦消费者接收到了消息,则会被立即删除。(自行断电调试)
在这里插入图片描述

2、演示auto确认模式:
在配置文件中添加如下配置:

spring.rabbitmq.listener.simple.acknowledge-mode=auto

上述我们修改一下代码,在业务代码中抛出一个异常和一个非业务异常
在这里插入图片描述
在这里插入图片描述

当业务代码中,抛出了业务异常,会自动重新投递消息。
如果是非业务异常(消息处理或校验异常),会删除该消息,不会再次投递。

3、演示manual确认模式:
在配置文件中添加如下配置:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

在创建一个交换机(这里用的是直连)和一个队列:

@ConfigurationpublicclassManualConfig{@BeanpublicQueuemQueue(){returnnewQueue("m.queue",true);}@BeanpublicDirectExchangemExchange(){returnnewDirectExchange("m.exchange");}@BeanpublicBindingmBinding(){returnBindingBuilder.bind(mQueue()).to(mExchange()).with("m");}}

在生产中发生一个消息给m.exchange:

@GetMapping("/test10")publicResult<Void>test10(){String uuid =UUID.randomUUID().toString().substring(0,5);// 给消息加上一个唯一idCorrelationData correlationData  =newCorrelationData(uuid);
        rabbitTemplate.convertAndSend("m.exchange","m","hello",correlationData);returnnewResult<Void>(200,"发送成功");}

在消费中,我们监听此队列,并调用相关API:

@RabbitListener(queues ={"m.queue"})publicvoidreceiveMsg(String msg,Message message,Channel channel)throwsIOException{try{System.out.println("收到m.queue队列的消息====> "+ msg);// 其他业务代码//int a = 1/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){System.err.println("消息消费失败");// 重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过

try catch

机制捕获,消息处理成功时返回ack,处理失败时返回nack.

相关API详细参数说明:
1、void basicAck(

long deliveryTag, boolean multiple

) throws IOException;

long deliveryTag

:代表了条消息的唯一标识ID。

boolean multiple

: 是否批处理,一般为 false,当该参数为 true 时,则可以一次性确认deliveryTag小于等于传入值的所有消息。
2、void basicNack(

long deliveryTag, boolean multiple, boolean requeue

) throws IOException;

long deliveryTag

:代表了条消息的唯一标识ID。

boolean multiple

: 是否批处理,一般为 false,当该参数为 true 时,则可以一次性确认deliveryTag小于等于传入值的所有消息。

boolean requeue

: true重新投递,false删除消息

3、void basicReject(long deliveryTag, boolean requeue) throws IOException;

long deliveryTag

:代表了条消息的唯一标识ID。

boolean requeue

: true重新投递,false删除消息

reject和nack的区别是:reject只能拒绝单条消息,nack批量拒绝多条消息。

三、TTL过期时间

对于死信队列,先来陈述一下TTL过期时间。
TTL就是给队列或者消息加上过期时间,在过期时间内如果没有消费者消费则会被路由到死信队列中。
1、给队列创建TTL过期时间:

@BeanpublicQueuettlQueue(){HashMap<String,Object> map =newHashMap<>();
        map.put("x-message-ttl",5000);//5000ms = 5 s // 设置过期时间,如果没有消费者消费,则会被移除returnnewQueue("ttl.queue",true,false,false, map);}@BeanpublicDirectExchangettlExchange(){returnnewDirectExchange("ttl.exchange");}@BeanpublicBindingttlBinding(){returnBindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}

当给ttl.exchange发送消息时,消息抵达到

ttl.queue

,如果此时在5s中没有消费者消费这一条消息,则此条消息就会被抛弃。

2、给消息加上TTL过期时间

@GetMapping("/test7")publicResult<Void>ttlMessage(){MessagePostProcessor messagePostProcessor =newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
                message.getMessageProperties().setExpiration("5000");// 消息过期为5s
                message.getMessageProperties().setContentEncoding("UTF-8");return message;}};
        rabbitTemplate.convertAndSend("test.exchange","test","hello,ttl");returnnewResult<Void>(200,"发送成功");}

通过MessagePostProcessor 对象,设置过期时间,如果该消息抵达到

queue

,如果无人消费这一条消息,则会被抛弃。
注意: 如果同时使用了这两种,则会根据最小的TTL来决定。比如说队列过期时间5s,消息过期时间10s,则在5s无人消费则消息被抛弃。

四、死信队列

DLX ,全称

Dead-Letter-Exchange

,可以被称为死信交换机。当消息在一个队列中变成死信后,它被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就被称之为

死信队列


在这里插入图片描述
要创建死信队列,需要声明参数

x-dead-letter-exchange

x-dead-letter-routing-key

(fanout类型的交换机不用声明)。其中,变成死信原因可能是因为:

  • 1、消息过期
  • 2、队列达到最大长度
  • 3、消费者使用basic.rejectbasic.nack声明消费失败,x-dead-letter-exchange: 指定死信交换机x-dead-letter-routing-key:指定死信交换机和死信队列绑定的router-key

用Java代码实现(用消息过期演示):

@BeanpublicQueuettlQueue(){HashMap<String,Object> map =newHashMap<>();
        map.put("x-message-ttl",5000);//5000ms = 5 s // 设置过期时间,如果没有消费者消费,则会被移除
        map.put("x-dead-letter-exchange","dead.exchange");// 消息过期则路由到死信队列
        map.put("x-dead-letter-routing-key","dead");// 消息过期则路由到死信队列,路由key deadreturnnewQueue("ttl.queue",true,false,false, map);}@BeanpublicDirectExchangettlExchange(){returnnewDirectExchange("ttl.exchange");}@BeanpublicBindingttlBinding(){returnBindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}// 死信队列@BeanpublicQueueDeadQueue(){returnnewQueue("dead.queue",true);}// 死信交换机@BeanpublicDirectExchangeDeadExchange(){returnnewDirectExchange("dead.exchange");}// 死信交换机和死信队列绑定@BeanpublicBindingDeadBinding(){returnBindingBuilder.bind(DeadQueue()).to(DeadExchange()).with("dead");}

ttl.exchange

发送消息,当消息过期时,就会传递到

dead.exchange

,最终路由到

dead.queue

五、下订单扣减库存实战

gitee地址:https://gitee.com/abandoned-and-pierced/springcloud-mq

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/qq_52810280/article/details/139860912
版权归原作者 Peisi: 所有, 如有侵权,请联系我们删除。

“RabbitMQ生产者和消费者可靠性机制、延迟队列(含下订单实战)”的评论:

还没有评论