摘要
本文将从三个方面详细介绍在使用RabbitMQ时如何确保消息不丢失的方法:
- RabbitMQ接收到消息后,默认会先把消息暂存到内存中,并不会立即进行持久化操作,此时若MQ宕机了,消息就会丢失。所以需要通过持久化机制来保证消息的持久化。
- RabbitMQ的消息传递会经过两个过程,第一个是Producer到Exchange的过程,第二个是Exchange到Queue的过程,在这两个过程中可能会导致消息丢失。所以需要通过 confirm 机制来避免消息丢失的问题。
- RabbitMQ消费者处理消息成功后可以向MQ发送ack回执,MQ收到ack后会在队列中删除该消息,从而确保消息不会丢失。若消费者在处理消息中出现异常,则会发送nack回执,MQ收到nack后会再次投递消息。
正文
1. 通过持久化机制保证消息持久化
1.1 声明队列
- 在使用RabbitMQ之前,我们需要声明一个队列。如果队列已经存在,声明不会有任何影响。
@Bean
public Queue myQueue() {
return new Queue("queue-name", true, true, false);
}
- new Queue() 方法中的第一个参数是队列名称。
- 第二个参数 durable 表示队列是持久化的,默认为 false,设置为 true 后,即使RabbitMQ服务器停止或重启,队列也不会丢失元数据。
1.2 声明交换机
- 交换机负责将消息路由到一个或多个队列中。声明交换机时,也要注意其持久化特性。
@Bean
public DirectExchange myExchange() {
return new DirectExchange("exchange-name", true, false);
}
- 第二个参数 durable 表示交换机是持久化的,默认为 false,设置为 true 后,即使RabbitMQ服务器停止或重启,交换机也不会丢失元数据。
1.3 持久化消息
- 生产者发送消息时,可以将消息的 deliveryMode 属性设置为 MessageDeliveryMode.PERSISTENT,来创建持久化消息。持久化消息传递到队列后,将在服务器重启后保留,以确保消息不会丢失。
- deliveryMode属性值默认为 MessageDeliveryMode.NON_PERSISTENT,此时消息在传递过程中不会写入磁盘,所以消息可能会丢失。
- deliveryMode属性值设置为 MessageDeliveryMode.PERSISTENT 时,在传递过程中RabbitMQ会将消息写入磁盘,从而确保在MQ宕机后,消息也不会丢失。
Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
rabbitTemplate.convertAndSend("exchange-name", "routing-key", message);
- 需要注意,将消息设置为持久化会增加磁盘 I/O 开销。
2. 通过confirm机制避免消息丢失
我们需要手动开启 Confirm 机制,可以在配置文件中添加以下配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # MQ异步回调方式接收回执消息,效率高于simple
# publisher-confirm-type: none # 关闭confirm机制
# publisher-confirm-type: simple # 同步阻塞并等待MQ的回执消息,效率很低
publisher-returns: true
2.1 Publisher-Confirm机制
- **Publisher-Confirm **是RabbitMQ的一种机制,用于确保消息已经成功被Exchange接收并处理。当消息成功到达Exchange并被处理,RabbitMQ会向生产者发送 ack 回执。若出现Exchange不存在等异常情况,导致消息无法被处理,RabbitMQ会向生产者发送 nack 回执。
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
if (ack) {
log.info("message delivery to exchange, message id: {}", correlationData.getId());
// 处理消息确认
} else {
log.error("message not delivery to exchange, cause: {}", cause);
// 处理消息未确认
}
});
2.2 Publisher-Returns机制
- **Publisher-Returns **机制与 Publisher-Confirm 类似,用于处理消息无法路由到队列时的异常情况。当RabbitMQ无法将消息正确路由到队列时,则将该消息返回给消息生产者。与 Publisher-Confirm 不同的是,如果能正确路由到队列,则不会返回消息。
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.error("message routing failed: exchange({}), routing({}), replyCode({}), replyText({}), message({})",
returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),
returnedMessage.getReplyText(), returnedMessage.getMessage());
// 处理消息发送到Queue失败
});
2.3 处理方式
- 通常可以在失败时进行报警或者重试机制来确保消息可以发送成功。
- 生产者确认机制需要额外的网络和系统的资源开销,尽量不要使用。如果业务需要,对于发送失败的消息可以进行有限次数的重试操作,超过次数仍然失败,可以记录异常日志消息。
3. 通过消费者ack避免消息丢失
RabbitMQ消费者处理消息成功后可以向MQ发送 ack 回执,MQ收到 ack 后会在队列中删除该消息,从而确保消息不会丢失。若消费者在处理消息中出现异常,则会发送 nack 回执,MQ收到 nack 后会再次投递消息。
AMQP的 ack 回执处理方式有以下几种:
- none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用。
- manual:手动模式。需要自己在业务代码中调用api,发送 ack 或 nack/reject ,存在业务入侵,但对比自动模式更加灵活。
- auto:自动模式。SpringAMQP利用AOP对消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,会自动返回 nack或reject。
最好使用手动 ack 回执,这样消费者在处理完消息后,可以决定是否确认收到消息。如果在处理消息时发生错误,可以选择重新将消息放回队列,或者拒绝这条消息,这样可以防止丢失并且减少重复处理的情况。
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #手动ack
retry:
enabled: true #开启重试机制
结论
虽然我们进行消息持久化机制、生产者 Confirm 异步回调机制、消费者手动 ack 回执机制,等一系列操作,但是由于 RabbitMQ 的持久化过程是异步的,所以无法保证消息在传递过程中做到100%不丢失。
若业务需要做到消息100%不丢失,可以引入本地消息表,通过轮询(或其他方式)的方式来进行消息的重新投递。
版权归原作者 CodeWarrior丶 所有, 如有侵权,请联系我们删除。