1.发送者可靠性
1.1 发送者重连
RabbitMQ 的发送者重连机制是一种应对网络不稳定或连接中断情况的策略,它能够自动尝试重新建立与RabbitMQ服务器的连接,以确保消息能够成功发送。
发送者重连通常涉及到一些配置参数,如连接超时时间、重试间隔、最大重试次数等。例如,在Spring框架的配置文件中,可以设置
connection-timeout
、
retry
、
initial-interval
、
multiplier
和
max-attempts
等参数来定义重连的行为。重连机制是阻塞式的,即在等待重连的过程中,当前线程会被阻塞,这可能会影响业务性能。如果业务性能有要求,建议合理配置这些参数或考虑使用异步线程来执行发送消息的代码。
1.2 发送者确认
发送者确认机制允许生产者知道消息是否成功到达了RabbitMQ的交换器或队列。这可以通过实现
ConfirmCallback
接口来完成,当消息成功到达交换器时,会触发该回调。另外,通过实现
ReturnCallback
接口,可以在消息无法从交换器路由到队列时收到通知。在Spring AMQP中,可以通过设置
publisher-confirm-type
和
publisher-returns
属性来开启发送者确认和返回通知功能。
以下是一个发送者确认的代码示例:
首先,确保你的项目中已经添加了Spring Boot和Spring AMQP的依赖。
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Starter AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
然后,你可以创建一个配置类来配置RabbitTemplate,并开启发送者确认机制:
// RabbitMqConfig.java
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 开启发送者确认模式
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送到交换机成功: " + correlationData);
} else {
System.out.println("消息发送到交换机失败: " + cause);
}
}
});
// 开启消息返回机制
template.setReturnCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息路由失败: exchange='" + returnedMessage.getExchange() +
"', routingKey='" + returnedMessage.getRoutingKey() +
"', message: " + new String(returnedMessage.getMessage().getBody()));
}
});
template.setMandatory(true); // 如果路由失败,消息将返回给回调
return template;
}
}
接下来,你可以创建一个服务类来发送消息,并使用发送者确认机制:
// MessageSenderService.java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageSenderService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public MessageSenderService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String exchange, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData(message));
}
}
在上述代码中,
CorrelationData
是一个用于关联发送消息和确认回调的数据结构,你可以使用它来传递额外的业务标识,比如订单ID或其他业务相关信息。
最后,你可以在控制器或其他业务逻辑中调用
MessageSenderService
的
sendMessage
方法来发送消息
// SomeController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SomeController {
private final MessageSenderService messageSenderService;
@Autowired
public SomeController(MessageSenderService messageSenderService) {
this.messageSenderService = messageSenderService;
}
@PostMapping("/send")
public String sendMessage(@RequestBody String message) {
String exchange = "your_exchange";
String routingKey = "your_routing_key";
messageSenderService.sendMessage(exchange, routingKey, message);
return "消息发送中...";
}
}
这个示例展示了如何在Spring框架中配置和使用RabbitMQ的发送者确认机制。当消息成功发送到交换器时,
ConfirmCallback
会被调用;如果消息无法路由到队列,
ReturnCallback
会被触发。这样,生产者就可以得到关于消息发送结果的反馈。
2.MQ可靠性
2.1 mq数据持久化
在RabbitMQ中,实现数据持久化主要涉及三个关键部分:交换器(Exchange)的持久化、队列(Queue)的持久化以及消息(Message)的持久化。以下是具体的实现方式:
1.交换器的持久化:在声明交换器时,设置
durable
参数为
true
,这代表交换器是持久化的,服务重启之后也会存在。例如,在Java客户端中,可以这样声明一个持久化的交换器:
channel.exchangeDeclare("exchangeName", "direct", true);
``` [^15^]
2.队列的持久化:在声明队列时,同样需要设置
durable
参数为
true
,这样队列就会在服务重启后依然存在。例如:
channel.queueDeclare("queueName", true, false, false, null);
``` [^17^]
3.消息的持久化:消息的持久化是指消息本身需要持久化存储。在发送消息时,可以通过设置消息的
deliveryMode
属性为
2
(即
MessageDeliveryMode.PERSISTENT
)来实现消息的持久化。例如,使用Java客户端发送消息时:
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish("exchangeName", "routingKey", props, message.getBytes());
``` [^18^]
RabbitMQ的数据持久化涉及的存储机制,包括队列索引(
rabbit_queue_index
)和消息存储(
rabbit_msg_store
)。队列索引负责维护队列中落盘消息的信息,而消息存储则以键值对的形式存储消息,并被所有队列共享。消息存储分为
msg_store_persistent
和
msg_store_transient
,分别负责持久化消息和非持久化消息的存储。
2.2 Lazy queue
惰性队列(Lazy Queue)是RabbitMQ从3.6.0版本开始引入的一个特性,主要设计目标是支持更长的队列,即能够存储更多的消息。惰性队列在接收到消息时,会直接将消息存储到磁盘中,而不是首先存储到内存中,这样可以减少内存的消耗,尤其适用于消息量特别大时的场景。只有在消费者消费到相应的消息时,消息才会被加载到内存中。
惰性队列特别适合于处理大规模消息堆积的情况,例如消费者长时间离线或处理能力下降导致消息积压。在3.12版本之后,惰性队列已经成为RabbitMQ的默认队列类型,官方推荐使用此版本或更高版本以利用惰性队列的优势,包括提高系统稳定性和适应大规模消息堆积的场景。
要声明一个惰性队列,可以通过在声明队列时设置
x-queue-mode
参数为
lazy
来实现。例如,在Spring AMQP中,可以这样声明一个惰性队列:
@Bean
public Queue lazyQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
return new Queue("lazyQueue", true, false, false, args);
}
3.消费者可靠性
3.1 消费者确认机制
消费者处理消息结束后,向MQ返回一个回执
ack:成功处理消息
nack:消息处理失败 mq需要再次投递
reject: 消息处理失败并拒绝该消息,mq从队列中删除该消息
目前SPringAMQP已经帮我们做好了消费者确认机制的相关方法
在Spring AMQP中,消费者确认(Acknowledgements)机制是确保消息被正确处理的关键部分。Spring AMQP提供了灵活的配置选项来控制确认的行为。以下是三种主要的确认模式:
- none (不处理):- 在这种模式下,Spring AMQP不会发送任何确认给RabbitMQ。这通常用于使用非持久化消息的场景,其中消息在队列中不会被存储,因此不需要确认。- 使用
none
模式时,消息一旦被发送给消费者,就会立即从队列中移除,不论消费者是否成功处理了消息。 - manual (手动模式):- 这是最灵活的确认模式。消费者在处理完每条消息后,需要显式地发送一个确认回执给RabbitMQ。如果处理失败,消费者可以发送一个否定确认(Negative Acknowledgement,简称NAK)。- 手动模式允许消费者控制何时以及如何确认消息,提供了对消息处理流程的细粒度控制。- 在Spring框架中,可以通过实现
AcknowledgeCallback
接口或使用Channel
的basicAck
和basicNack
方法来手动发送确认。 - auto (自动模式):- 自动确认模式下,Spring AMQP会在消息被传递给消费者后自动发送确认给RabbitMQ。这意味着一旦消息被接收,就会立即从队列中移除。- 这种模式适用于消息处理非常快速且可靠,或者消息丢失不会造成严重影响的场景。- 自动模式简化了编程模型,因为消费者不需要显式地处理确认逻辑。
@Bean
public Queue queue() {
return new Queue("myQueue", true); // durable=true for a persistent queue
}
@Bean
public RabbitListenerContainerFactory<SimpleRabbitListenerContainer> myFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置为手动确认模式
// factory.setAcknowledgeMode(AcknowledgeMode.AUTO); // 设置为自动确认模式
// factory.setAcknowledgeMode(AcknowledgeMode.NONE); // 设置为不处理确认模式
return factory;
}
@RabbitListener(queues = "myQueue", containerFactory = "myFactory")
public void listenAndProcess(String message) {
System.out.println("Received: " + message);
// 处理消息...
// 如果处理成功,发送确认
// channel.basicAck(deliveryTag, false);
// 如果处理失败,可以选择发送否定确认或拒绝
// channel.basicNack(deliveryTag, false, true);
// channel.basicReject(deliveryTag, false);
}
在这个例子中,我们创建了一个队列并配置了RabbitListenerContainerFactory,设置了确认模式。然后在
@RabbitListener
注解的方法中,消费者可以接收消息并根据处理结果发送相应的确认或否定确认。
通过这样的配置,Spring AMQP提供了一种灵活的方式来处理消息确认,允许开发者根据业务需求选择最合适的确认策略。
3.2 消费者失败重试策略
在Spring AMQP中,消费者出现异常时,可以通过配置实现本地重试,而不是无限地将消息重新入队(requeue)到MQ中。这可以通过
MessageRecoverer
接口的不同实现来处理重试失败的消息:
- RejectAndDontRequeueRecoverer:当重试次数耗尽后,直接拒绝(reject)消息,这会导致消息被丢弃,不重新入队 。
- ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回负确认(nack),消息将重新入队,等待再次消费 。
- RepublishMessageRecoverer:当重试次数耗尽,将失败的消息重新发布到指定的交换机。这是一种较为优雅的处理方式,特别是当需要将失败消息发送到一个专门处理异常消息的队列时。
下面是一个实现MessageRecover的例子:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RecoveredMessage;
public class CustomMessageRecoverer implements MessageRecoverer {
@Override
public void recover(RecoveredMessage recoveredMessage) {
// 这里的逻辑可以根据你的业务需求来定制
// 例如,记录日志
log.error("Message recovery for: {}", recoveredMessage);
// 将消息发送到死信队列
rabbitTemplate.send(deadLetterExchange, deadLetterRoutingKey, recoveredMessage.getMessage());
}
// 可以添加其他自定义方法
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public CustomMessageRecoverer customMessageRecoverer() {
return new CustomMessageRecoverer();
}
@Bean
public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 设置自定义的MessageRecoverer
factory.setMessageRecoverer(customMessageRecoverer());
// 其他配置...
return factory;
}
}
3.3 业务幂等性
在Java业务开发中,保证幂等性是非常重要的,特别是在处理网络请求、数据库操作、消息队列等场景时。幂等性意味着对同一请求的多次处理将产生相同的结果,而不会重复影响系统状态。以下是一些常见的策略来保证Java业务的幂等性:
其实说白了保证业务幂等性就两种方法:1.唯一(不限于唯一标识或者同一时间唯一业务执行) 2.具体业务逻辑调整。
- 唯一标识符:为每个业务操作生成一个唯一的标识符(如UUID或订单ID),在执行操作前检查该标识符是否已经存在或被处理过。
- 数据库唯一约束:在数据库层面,使用唯一索引(UNIQUE KEY)来保证不会因为重复的请求而插入重复的数据。
- 缓存标记:在执行操作前,将请求的标识符存储在缓存(如Redis)中,操作完成后移除。在处理请求时,首先检查缓存中是否存在该标识符。
- 状态机:使用状态机管理业务状态,确保每个状态只能被处理一次。如果接收到重复的请求,状态机将根据当前状态返回相应的响应。
- 检查请求参数:在处理请求之前,检查请求参数是否满足幂等性条件,例如,检查参数值是否在允许的范围内或是否已经存在。
- Token机制:为每个请求生成一个Token,并在服务器端进行校验。一旦请求处理完成,Token即失效,重复使用相同Token的请求将被拒绝。
- 乐观锁:在更新数据库记录时,使用乐观锁(通过版本号或时间戳)来确保在读取记录到更新记录这段时间内记录没有被其他操作修改。
- 分布式锁:在分布式系统中,使用分布式锁(如Redisson、Zookeeper等)来保证同一时间只有一个操作实例在执行。
- 消息队列的确认机制:在使用消息队列(如RabbitMQ、Kafka)时,利用消息确认机制确保每条消息只被消费一次。
- 幂等性接口设计:设计接口时,考虑幂等性,例如,使用GET方法进行查询操作,使用PUT或DELETE方法进行更新或删除操作。
- 业务规则校验:在业务逻辑中加入幂等性校验,比如检查是否已经执行过相同的操作,或者操作是否满足幂等性条件。
- 日志记录:记录操作日志,当接收到重复请求时,可以通过日志记录来判断是否已经处理过该请求。
- 重试机制与回退策略:在可能发生重复请求的场景下,实现重试机制,并设置合理的回退策略。
- 服务降级:在系统负载较高时,通过服务降级策略避免处理重复请求,保证核心业务的稳定性。
- 依赖注入和中间件支持:利用Java的依赖注入框架和中间件来实现幂等性控制,例如,Spring框架提供的@Transactional注解可以保证方法的幂等性。
版权归原作者 翔山代码 所有, 如有侵权,请联系我们删除。