0


RabbitMQ(六)消息的持久化

目录

在这里插入图片描述

一、简介

1.1 定义

  • 持久化: 是为了提高 RabbitMQ 消息的可靠性,防止在异常情况(重启宕机)下数据的丢失。
  • RabbitMQ 持久化分为三部分:交换机的持久化队列的持久化消息的持久化

1.2 消息丢失的场景

出现消息丢失的场景有:

  • 生产者发送消息丢失:发送消息到 RabbitMQ Server 异常。 可能因为网络问题造成 RabbitMQ 服务端无法收到消息。——解决方案:ConfirmCallback
  • 生产者发送消息丢失:消息无法路由到指定队列。 可能由于代码层面或配置层面错误导致消息路由到指定队列失败。——解决方案:ReturnCallback
  • RabbitMQ Server 存储消息丢失:消息未完全持久化到磁盘。 可能因为 RabbitMQ 宕机导致消息未完全持久化,或队列丢失从而导致消息丢失等持久化问题。——解决方案:集群部署,实现高可用
  • 消费者消费消息丢失:消费者消费消息异常。 可能在消费者接收消息后,还没来得及消费消息,消费者就发生宕机、故障等问题。——解决方案:消费端手动确认消息

二、交换机的持久化

方式一:直接 new

直接实例化对应的

Exchange

实现类即可,默认:持久化的、非自动删除的

@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange(directExchange);}
Exchange

源码:

packageorg.springframework.amqp.core;publicabstractclassAbstractExchangeextendsAbstractDeclarableimplementsExchange{...publicAbstractExchange(String name){// 默认是持久化的、非自动删除的。this(name,true,false);}...}

方式二:channel.exchangeDeclare()

在 RabbitMQ 的原生 API 中,例如:Java 客户端 API,声明持久化交换机时,需要将

durable

参数设置为

true

// 声明交换机:(交换机名称,交换机类型,持久化)
channel.exchangeDeclare("myExchange","direct",true);

方式三:ExchangeBuilder【推荐】

在 Spring AMQP 中,我们可以使用

ExchangeBuilder

来创建持久化的交换机:

importorg.springframework.amqp.core.Exchange;importorg.springframework.amqp.core.ExchangeBuilder;Exchange exchange =ExchangeBuilder.directExchange("myExchange").durable(true)// 交换机持久化.build();

三、队列的持久化

方式一:直接 new

直接实例化

Queue

类即可,默认:持久化的、非独占的、非自动删除的

@BeanpublicQueuemyQueue(){returnnewQueue("myQueue");}
Queue

源码:

packageorg.springframework.amqp.core;publicQueue(String name){// 默认是持久化的、非独占的、非自动删除的。this(name,true,false,false);}

方式二:channel.queueDeclare()

在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化队列时,需要将

durable

参数设置为

true

// 声明队列:(队列名称,持久化,非独占,非自动删除,可选队列参数)
channel.queueDeclare("myQueue",true,false,false,null);

方式三:QueueBuilder【推荐】

在 Spring AMQP 中,可以使用

QueueBuilder

来创建持久化的队列:

importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.QueueBuilder;Queue queue =QueueBuilder.durable("myQueue").build();

四、消息的持久化

即使队列时持久化的,发送到队列中的消息 默认是持久化的。我们也可以手动配置使消息持久化,需要在发送消息时设置消息属性为持久化。

RabbtTemplate.java 源码:

packageorg.springframework.amqp.rabbit.core;publicclassRabbitTemplateextendsRabbitAccessorimplementsBeanFactoryAware,...{...// 没有手动指定MessagePostProcessor时@OverridepublicvoidconvertAndSend(String exchange,String routingKey,finalObject object,@NullableCorrelationData correlationData)throwsAmqpException{send(exchange, routingKey,convertMessageIfNecessary(object), correlationData);}// 初始化MessagePropertiesprotectedMessageconvertMessageIfNecessary(finalObject object){if(object instanceofMessage){return(Message) object;}returngetRequiredMessageConverter().toMessage(object,newMessageProperties());}...}

MessageProperties.java 源码:

packageorg.springframework.amqp.core;publicclassMessagePropertiesimplementsSerializable{...// 消息默认为持久化publicstaticfinalMessageDeliveryModeDEFAULT_DELIVERY_MODE=MessageDeliveryMode.PERSISTENT;privateMessageDeliveryMode deliveryMode =DEFAULT_DELIVERY_MODE;...}

方式一:channel.basicPublish()

在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化消息时,需要将

deliveryMode

参数设置为

2

importcom.rabbitmq.client.AMQP;AMQP.BasicProperties props =newAMQP.BasicProperties.Builder().deliveryMode(2)// 这里2对应MessageDeliveryMode.PERSISTENT.build();// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange","myRoutingKey", props. messageBytes);

或者,我们可以直接使用

MessageProperties.PERSISTENT_TEXT_PLAIN

来进行指定消息的持久化:

importcom.rabbitmq.client.MessageProperties;// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange","myRoutingKey",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

方式二:rabbitTemplate.convertAndSend()【推荐】

在 Spring AMQP 中,可以使用

rabbitTemplate.convertAndSend()

来创建持久化的消息:

rabbitTemplate.convertAndSend("myQueue", body, message ->{
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});

通过上述步骤,消息会被持久化存储,只要队列也被正确配置为持久化,即使 RabbitMQ 服务器重启,消息也将被保留下来。

补充:

然而,即使队列和消息都是持久化的,也不能完全保证消息的 100% 不丢失。例如:在消息尚未被刷写到磁盘时,RabbitMQ 服务器突然崩溃,这种情况下的消息仍然可能会丢失。此外,RabbitMQ 不保证消息的立即持久化,而是尽可能快地将消息保存到磁盘


五、持久化问题

思考:将交换机、队列、消息都设置持久化之后就能保证数据不会丢失吗?

答:当然不能,需要从多方面考虑:

  • 场景1: 如果消费者订阅队列的时候将 autoAck(自动确认)设置为 true,虽然消费者接收到了消息,但是没有来得及处理就宕机了,那消息也会丢失。解决方案:将消费者的消息确认机制改为手动确认,带处理完消息之后,手动删除消息。
  • 场景2: 在 RabbitMQ 服务器,如果消息正确被发送,但是 RabbitMQ 服务器中的消息还没来得及持久化,即没有将数据写入磁盘时,如果服务器发生异常,消息也会丢失。解决方案: 可以 通过 RabbitMQ 集群的方式实现消息中间件的高可用。

因此,还需要做好生产者和消费者的

消息确认机制

以及通过

集群

的方式来实现 RabbitMQ 的高可用。

整理完毕,完结撒花~ 🌻

参考地址:

1.RabbitMQ保证消息可靠性,https://www.cnblogs.com/auguse/articles/17712620.html

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/qq_33204709/article/details/135502902
版权归原作者 不愿放下技术的小赵 所有, 如有侵权,请联系我们删除。

“RabbitMQ(六)消息的持久化”的评论:

还没有评论