0


RabbitMQ常见问题之消息可靠性

文章目录

一、介绍

MQ

的消息可靠性,将从以下四个方面展开并实践:

  1. 生产者消息确认
  2. 消息持久化
  3. 消费者消息确认
  4. 消费失败重试机制

二、生产者消息确认

对于

publisher

,如果

message

到达

exchange

与否,

rabbitmq

提供

publiser-comfirm

机制,如果

message

达到

exchange

但是是否到达

queue

rabbitmq

提供

publisher-return

机制。这两种机制在代码中都可以通过配置来自定义实现。

以下操作都在

publisher

服务方完成。

1. 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
spring:rabbitmq:publisher-confirm-type: correlated
    publisher-returns:truetemplate:mandatory:true

配置说明:

publish-confirm-type

:开启

publisher-confirm

,这里支持两种类型:

  • simple:同步等待confirm结果,直到超时
  • correlated:异步回调,定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback
publish-returns

:开启

publish-return

功能,同样是基于

callback

机制,不过是定义

ReturnCallback
template.mandatory

:定义消息路由失败时的策略。true,则调用

ReturnCallback

; false,则直接丢弃消息

2. 配置ReturnCallBack

每个

RabbitTemplate

只能配置一个

ReturnCallBack

,所以直接给

IoC

里面的

RabbitTemplate

配上,所有人都统一用。
新建配置类,实现

ApplicationContextAware

接口,在接口中

setReturnCallback

@Slf4j@ConfigurationpublicclassCommonConfigimplementsApplicationContextAware{@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{//check if is delay messageif(message.getMessageProperties().getReceivedDelay()!=null&& message.getMessageProperties().getReceivedDelay()>0){return;}
            log.error("消息发送到queue失败,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",
                    replyCode, replyText, exchange, routingKey, message.toString());});}}

3. 配置ConfirmCallBack

ConfirmCallBack在message发送时配置,每个message都可以有自己的ConfirmCallBack。

@TestpublicvoidtestSendMessage2SimpleQueue()throwsInterruptedException{String message ="hello, spring amqp!";// confirm callbackCorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(
                result ->{if(result.isAck()){
                        log.debug("消息到exchange成功, id={}", correlationData.getId());}else{
                        log.error("消息到exchange失败, id={}", correlationData.getId());}},
                throwable ->{
                    log.error("消息发送失败", throwable);});

        rabbitTemplate.convertAndSend("amq.topic","simple.test", message, correlationData);}

4. 测试

将消息发送到一个不存在的

exchange

,模拟消息达到

exchange

失败,触发

ConfirmCallBack

,日志如下。

18:22:03:913 ERROR 23232 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aamq.topic' in vhost '/', class-id=60, method-id=40)
18:22:03:915 ERROR 23232 --- [nectionFactory1] cn.itcast.mq.spring.SpringAmqpTest       : 消息到exchange失败, id=0c0910a3-7937-43ea-9606-e5bbcdda0b5c

将消息发送到一个存在的

exchange

,但

routekey

异常,模拟消息到达

exchange

但没有到达

queue

,触发

ConfirmCallBack

ReturnCallBack

,日志如下。

18:27:22:757  INFO 20184 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#7428de63:0/SimpleConnection@6d60899e [delegate=amqp://[email protected]:5672/, localPort= 53662]
18:27:22:797 DEBUG 20184 --- [ 127.0.0.1:5672] cn.itcast.mq.spring.SpringAmqpTest       : 消息到exchange成功, id=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11
18:27:22:796 ERROR 20184 --- [nectionFactory1] cn.itcast.mq.config.CommonConfig         : 消息发送到queue失败,replyCode=312, reason=NO_ROUTE, exchange=amq.topic, routeKey=simplee.test, message=(Body:'hello, spring amqp!' MessageProperties [headers={spring_returned_message_correlation=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

三、消息持久化

新版本的

SpringAMQP

默认开启持久化。

RabbitMQ

本身并不默认开启持久化。

队列持久化,通过

QueueBuilder

构建持久化队列,比如

@BeanpublicQueuesimpleQueue(){returnQueueBuilder.durable("simple.queue").build();}

消息持久化,在发送时可以设置,比如

@TestpublicvoidtestDurableMessage(){Message message =MessageBuilder.withBody("hello springcloud".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
    rabbitTemplate.convertAndSend("simple.queue", message);}

四、消费者消息确认

消费者消息确认是指,

consumer

收到消息后会给

rabbitmq

发送回执来确认消息接收状况。

SpringAMQP

允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack, MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode: auto # manual auto none

但是

auto

有个很大的缺陷,因为

rabbitmq

会自动不断给有问题的

listen

反复投递消息,导致不断报错,所以建议使用下一章的操作。

五、消费失败重试机制

当消费者出现异常后,消息会不断

requeue

(重新入队)到队列,再重新发送给消费者,然后再次异常,再次

requeue

,无限循环,导致

mq

的消息处理飙升,带来不必要的压力。

我们可以利用

Spring

retry

机制,在消费者出现异常时利用本地重试,而不是无限制的

requeue

mq

队列。

1. 引入依赖

spring:rabbitmq:listener:simple:prefetch:1retry:enabled:true# 开启消费者失败重试initial-interval:1000#初识的失败等待时长为1秒multiplier:2# 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts:3# 最大重试次数stateless:true# true无状态;false有状态。如果业务中包含事务,这里改为false

2. 配置重试次数耗尽策略

在这里插入图片描述
我们采用

RepublishMessageRecoverer


定义用于接收失败消息的

exchange

queue

以及它们之间的

bindings

然后定义

MessageRecoverer

,比如

@ComponentpublicclassErrorMessageConfig{@BeanpublicMessageRecovererrepublishMessageRecover(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");}}

3. 测试

定义处理异常消息的

exchange

queue

,比如

@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="error.queue"),
            exchange =@Exchange(name ="error.exchange"),
            key ="error"))publicvoidlistenErrorQueue(String msg){
        log.info("消费者接收到error.queue的消息:【"+ msg +"】");}

定义如下一个

listener

,来模拟

consumer

处理消息失败触发消息重试。

@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="simple.queue"),
            exchange =@Exchange(name ="simple.exchange"),
            key ="simple"))publicvoidlistenSimpleQueue(String msg){
        log.info("消费者接收到simple.queue的消息:【"+ msg +"】");System.out.println(1/0);
        log.info("consumer handle message success");}

写一个简单的测试,往simple.exchange发送消息,比如

@TestpublicvoidtestSendMessageSimpleQueue()throwsInterruptedException{String message ="hello, spring amqp!";
        rabbitTemplate.convertAndSend("simple.exchange","simple", message);}

运行测试,consumer得到以下日志

18:51:10:164  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:11:167  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:168  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:176  WARN 24072 --- [ntContainer#0-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.exchange' with routing key error
18:51:13:181  INFO 24072 --- [ntContainer#1-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到error.queue的消息:【hello, spring amqp!】

可以看到

spring

尝试2次重发,一共3次,第一次间隔1秒,第二次间隔2秒,重试次数耗尽,消息被

consumer

传入

error

.

exchange

,注意,是

consumer

传的,不是

simple

.

queue

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/m0_51390969/article/details/135657273
版权归原作者 木子dn 所有, 如有侵权,请联系我们删除。

“RabbitMQ常见问题之消息可靠性”的评论:

还没有评论