0


RabbitMQ如何保证消息不丢失

摘要

本文将从三个方面详细介绍在使用RabbitMQ时如何确保消息不丢失的方法:

  1. RabbitMQ接收到消息后,默认会先把消息暂存到内存中,并不会立即进行持久化操作,此时若MQ宕机了,消息就会丢失。所以需要通过持久化机制来保证消息的持久化。
  2. RabbitMQ的消息传递会经过两个过程,第一个是Producer到Exchange的过程,第二个是Exchange到Queue的过程,在这两个过程中可能会导致消息丢失。所以需要通过 confirm 机制来避免消息丢失的问题。
  3. RabbitMQ消费者处理消息成功后可以向MQ发送ack回执,MQ收到ack后会在队列中删除该消息,从而确保消息不会丢失。若消费者在处理消息中出现异常,则会发送nack回执,MQ收到nack后会再次投递消息。

正文

1. 通过持久化机制保证消息持久化

1.1 声明队列

  • 在使用RabbitMQ之前,我们需要声明一个队列。如果队列已经存在,声明不会有任何影响。
@Bean
public Queue myQueue() {
    return new Queue("queue-name", true, true, false);
}
  • new Queue() 方法中的第一个参数是队列名称。
  • 第二个参数 durable 表示队列是持久化的,默认为 false,设置为 true 后,即使RabbitMQ服务器停止或重启,队列也不会丢失元数据。

1.2 声明交换机

  • 交换机负责将消息路由到一个或多个队列中。声明交换机时,也要注意其持久化特性。
@Bean
public DirectExchange myExchange() {
    return new DirectExchange("exchange-name", true, false);
}
  • 第二个参数 durable 表示交换机是持久化的,默认为 false,设置为 true 后,即使RabbitMQ服务器停止或重启,交换机也不会丢失元数据。

1.3 持久化消息

  • 生产者发送消息时,可以将消息的 deliveryMode 属性设置为 MessageDeliveryMode.PERSISTENT,来创建持久化消息。持久化消息传递到队列后,将在服务器重启后保留,以确保消息不会丢失。
  • deliveryMode属性值默认为 MessageDeliveryMode.NON_PERSISTENT,此时消息在传递过程中不会写入磁盘,所以消息可能会丢失。
  • deliveryMode属性值设置为 MessageDeliveryMode.PERSISTENT 时,在传递过程中RabbitMQ会将消息写入磁盘,从而确保在MQ宕机后,消息也不会丢失。
Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
rabbitTemplate.convertAndSend("exchange-name", "routing-key", message);
  • 需要注意,将消息设置为持久化会增加磁盘 I/O 开销。

2. 通过confirm机制避免消息丢失

我们需要手动开启 Confirm 机制,可以在配置文件中添加以下配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # MQ异步回调方式接收回执消息,效率高于simple
#    publisher-confirm-type: none       # 关闭confirm机制
#    publisher-confirm-type: simple     # 同步阻塞并等待MQ的回执消息,效率很低
    publisher-returns: true

2.1 Publisher-Confirm机制

  • **Publisher-Confirm **是RabbitMQ的一种机制,用于确保消息已经成功被Exchange接收并处理。当消息成功到达Exchange并被处理,RabbitMQ会向生产者发送 ack 回执。若出现Exchange不存在等异常情况,导致消息无法被处理,RabbitMQ会向生产者发送 nack 回执。
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
    if (ack) {
        log.info("message delivery to exchange, message id: {}", correlationData.getId());
        // 处理消息确认
    } else {
        log.error("message not delivery to exchange, cause: {}", cause);
        // 处理消息未确认
    }
});

2.2 Publisher-Returns机制

  • **Publisher-Returns **机制与 Publisher-Confirm 类似,用于处理消息无法路由到队列时的异常情况。当RabbitMQ无法将消息正确路由到队列时,则将该消息返回给消息生产者。与 Publisher-Confirm 不同的是,如果能正确路由到队列,则不会返回消息。
rabbitTemplate.setReturnsCallback(returnedMessage -> {
    log.error("message routing failed: exchange({}), routing({}), replyCode({}), replyText({}), message({})",
            returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),
            returnedMessage.getReplyText(), returnedMessage.getMessage());
    // 处理消息发送到Queue失败
});

2.3 处理方式

  • 通常可以在失败时进行报警或者重试机制来确保消息可以发送成功。
  • 生产者确认机制需要额外的网络和系统的资源开销,尽量不要使用。如果业务需要,对于发送失败的消息可以进行有限次数的重试操作,超过次数仍然失败,可以记录异常日志消息。

3. 通过消费者ack避免消息丢失

RabbitMQ消费者处理消息成功后可以向MQ发送 ack 回执,MQ收到 ack 后会在队列中删除该消息,从而确保消息不会丢失。若消费者在处理消息中出现异常,则会发送 nack 回执,MQ收到 nack 后会再次投递消息。

AMQP的 ack 回执处理方式有以下几种:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用。
  • manual:手动模式。需要自己在业务代码中调用api,发送 ack 或 nack/reject ,存在业务入侵,但对比自动模式更加灵活。
  • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,会自动返回 nack或reject。

最好使用手动 ack 回执,这样消费者在处理完消息后,可以决定是否确认收到消息。如果在处理消息时发生错误,可以选择重新将消息放回队列,或者拒绝这条消息,这样可以防止丢失并且减少重复处理的情况。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #手动ack
        retry:
          enabled: true #开启重试机制

结论

虽然我们进行消息持久化机制、生产者 Confirm 异步回调机制、消费者手动 ack 回执机制,等一系列操作,但是由于 RabbitMQ 的持久化过程是异步的,所以无法保证消息在传递过程中做到100%不丢失。

若业务需要做到消息100%不丢失,可以引入本地消息表,通过轮询(或其他方式)的方式来进行消息的重新投递。

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/huhuhu_o/article/details/140093138
版权归原作者 CodeWarrior丶 所有, 如有侵权,请联系我们删除。

“RabbitMQ如何保证消息不丢失”的评论:

还没有评论