一、发送者的可靠性
1.生产者的重连
因网络问题连接MQ失败,解决在配置文件中配置失败后的重连机制(阻塞式的,影响业务)
spring:
rabbitmq:
host: 192.168.88.129
port: 5672
virtual-host: /hmall
username: hmall
password: #
connection-timeout: 1s #设置MQ的连接超时时间
template:
retry:
enabled:true #开启超时重试机制
initial-interval:1000ms #失败后的初始等待时间
multiplier:1 #失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier
max-attempts:3 #最大重试次数
2.生产者确认
有两种 Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
其它情况都会返回NACK,告知投递失败
3.解决
在发送者的配置文件中配置
rabbitmg :
publisher-confirm-type:correlated #开publisher confirm机制,并设置confirm类型
publisher-returns:true #开启publisher return机制
#配置说明:
#这里publisher-confirm-type有三种模式可选:
# none:关闭confirm机制
# simple:同步阻塞等待MQ的回执消息
# correlated:MO异步回调方式返回回执消息
写一个配置类
package com.itheima.publisher.Config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
@Override
public void returnedMessage(ReturnedMessage returned) {
log.debug("收到消息的return callback, exchange:{},key:{},msg:{}, code:{}, text:{}",
returned.getExchange(), returned.getRoutingKey(), returned.getMessage(),
returned.getReplyCode(), returned.getReplyText());
}
});
}
}
在每次发消息时
@Test
void testConfirmCallback() throws InterruptedException {
//1.创建cd
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
//2.添加Confirm// Callback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息回调失败", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
log.debug("收到confirm callback回执");
if (result.isAck()) {
//消息发送成功
log.debug("消息发送成功,收到ack");
} else {
//消息发送失败
log.error("消息发送失败,收到nack,原因:{}", result.getReason());
}
}
});
rabbitTemplate.convertAndSend("hmall.direct", "red", "hello", cd);
Thread.sleep(2000);
}
二、消息队列的可靠性
1 分析
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题
一旦MQ宕机,内存中的消息会丢失
内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
2.解决
2.1数据持久化(交换机持久化,队列持久化和消息持久化)
2.2 Lazy Queue(推荐)
基于注解
@RabbitListener(queuesToDeclare = @Queue(
name="lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenDelayQueues(String msg){
log.info("接收到了delay。queue的消息{}",msg);
}
三、接收者的可靠性
1.消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
1.1 在接收者的配置文件中配置
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto #确认机制 :none关闭ack,manual手动ack,auto自动ack
2.失败重试机制,当接收者程序抛异常后,会不断地重新发送再抛异常
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次reaueue,无限循环,导致mq的消息处理飙升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列
2.1配置
2.2失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(建议)
新建一个队列,投递进去进行人工处理
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
log.debug("加载RepublishMessageRecoverer");
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
3.业务的幂等性
1.创建唯一消息id
消费者和发送者的启动类配一个bean
2.结合业务
四、延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。延迟任务:设置在一定时间之后才执行的任务
1.死信交换机实现
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead LetterExchange,简称DLX)。
2.延迟消息插件
1.执行命令安装插件
docker exec -it mq rabbitmg-plugins enable rabbitmq_delayed_message_exchange
2.使用(使用在延迟时间较短的场景)
2.1定义延迟交换机
版权归原作者 小小薛定谔 所有, 如有侵权,请联系我们删除。