0


RabbitMQ如何保证消息传输可靠性

1、保证消息发送到 MQ

RabbitMQ 的消息最终时存储在 Queue 上的,而在 Queue 之前还要经过 Exchange,那么这个过程中就有两个地方可能导致消息丢失。第一个是 Producer 到 Exchange 的过程,第二个是 Exchange 到 Queue 的过程。

上面两个可能丢失的过程,都可以利用 confirm 机制,注册回调来监听是否成功。
Publisher Confirm 是一种机制,用于确保消息已经被 Exchange 成功接收和处理。一旦消息成功到达 Exchange 并被处理,RabbitMQ 会向消息生产者发送确认信号(ACK)。如果由于某种原因(例如,Exchange 不存在或路由键不匹配)消息无法被处理,RabbitMQ 会向消息生产者发送否认信号(NACK)。

// 启用Publisher Confirms
channel.confirmSelect();// 设置Publisher Confirms回调
channel.addConfirmListener(newConfirmListener(){@OverridepublicvoidhandleAck(long deliveryTag,boolean multiple)throwsIOException{System.out.println("Message confirmed with deliveryTag: "+ deliveryTag);// 在这里处理消息确认}@OverridepublicvoidhandleNack(long deliveryTag,boolean multiple)throwsIOException{System.out.println("Message not confirmed with deliveryTag: "+ deliveryTag);// 在这里处理消息未确认}});

Publisher Returns 机制与 Publisher Confirms 类似,但用于处理在消息无法路由到任何队列时的情况。当 RabbitMQ 在无法路由消息时将消息返回给消息生产者,但是如果能正确路由,则不会返回消息。

// 启用Publisher Returns
channel.addReturnListener(newReturnListener(){@OverridepublicvoidhandleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Message returned with replyCode: "+ replyCode);// 在这里处理消息发送到Queue失败的返回}});

通过以上方式,我们注册了两个回调监听,用于在消息发送到 Exchange 或者 Queue 失败时进行异常处理。通常我们可以在失败时进行报警或者重试来保障一定能发送成功。
完整的代码如下:

importcom.Rabbitmq.Client.*;PublicclassPublisherCallbacksExample{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){// 启用Publisher Confirms
            channel.confirmSelect();// 设置Publisher Confirms回调
            channel.addConfirmListener(newConfirmListener(){@OverridepublicvoidhandleAck(long deliveryTag,boolean multiple)throwsIOException{System.out.println("Message confirmed with deliveryTag: "+ deliveryTag);// 在这里处理消息确认}@OverridepublicvoidhandleNack(long deliveryTag,boolean multiple)throwsIOException{System.out.println("Message not confirmed with deliveryTag: "+ deliveryTag);// 在这里处理消息未确认}});// 启用Publisher Returns
            channel.addReturnListener(newReturnListener(){@OverridepublicvoidhandleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("Message returned with replyCode: "+ replyCode);// 在这里处理消息发送到Queue失败的返回}});String exchangeName ="my_exchange";String routingKey ="my_routing_key";String message ="Hello, RabbitMQ!";// 发布消息到Exchange
            channel.basicPublish(exchangeName, routingKey,true,null, message.getBytes());// 等待Publisher Confirmsif(!channel.waitForConfirms()){System.out.println("Message was not confirmed!");}// 关闭通道和连接
            channel.close();}}}

2、保证 MQ 发送给消费者不丢

RabbitMQ 在接收到消息后,默认并不会立即进行持久化,而是先把消息暂存在内存中,这时候如果 MQ 挂了,那么消息就会丢失。所以需要通过持久化机制来保证消息可以被持久化下来。

2.1 队列和交换机的持久化

在声明队列时,可以通过设置 durable 参数为 true 来创建一个持久化队列。持久化队列会在 RabbitMQ 服务器重启后保留,确保队列的元数据不会丢失。
在声明交换机时,也可以通过设置 durable 参数为 true 来创建一个持久化交换机。持久化交换机会在 RabbitMQ 服务器重启后保留,以确保交换机的元数据不会丢失。
绑定关系通常与队列和交换机相关联。当创建绑定关系时,还是可以设置 durable 参数为 true,以创建一个持久化绑定。持久化绑定关系会在服务器重启后保留,以确保绑定关系不会丢失。

@BeanpublicQueueTestQueue(){// 第二个参数durable:是否持久化,默认是falsereturnnewQueue("queue-name",true,true,false);}@BeanpublicDirectExchangemainExchange(){//第二个参数durable:是否持久化,默认是falsereturnnewDirectExchange("main-exchange",true,false);}
2.2 持久化消息

生产者发送的消息可以通过设置消息的 deliveryMode 为 2 来创建持久化消息。持久化消息在发送到持久化队列后,将在服务器重启后保留,以确保消息不会丢失。

DeliveryMode 是一项用于设置消息传递模式的属性,用于指定消息的持久性级别。DeliveryMode 可以具有两个值:
1(非持久化):这是默认的传递模式。如果消息被设置为非持久化,RabbitMQ 将尽力将消息传递给消费者,但不会将其写入磁盘,这意味着如果 RabbitMQ 服务器在消息传递之前崩溃或重启,消息可能会丢失。
2(持久化):如果消息被设置为持久化,RabbitMQ 会将消息写入磁盘,以确保即使在 RabbitMQ 服务器重启时,消息也不会丢失。持久化消息对于重要的消息非常有用,以确保它们不会在传递过程中丢失。

Message message =MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))//kp 消息体,字符集.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();

rabbitTemplate.convertAndSend("simple.queue", message);

通过设置 deliveryMode 类实现消息的持久化。但是需要注意,将消息设置为持久化会增加磁盘 I/O 开销。

2.3 消费者确认机制

有了持久化机制后,那么怎么保证消息在持久化下来之后一定能被消费者消费呢?这里就涉及到消息的消费确认机制。
在 RabbitMQ 中,消费者处理消息成功后可以向 MQ 发送 ack 回执,MQ 收到 ack 回执后才会删除该消息,这样才能确保消息不会丢失。如果消费者在处理消息中出现了异常,那么就会返回 nack 回执,MQ 收到回执之后就会重新投递一次消息,如果消费者一直都没有返回 ACK/NACK 的话,那么他也会在尝试重新投递。

2.4 无法做到 100%不丢

虽然我们通过发送者端进行异步回调、MQ 进行持久化、消费者做确认机制,但是也没办法保证 100%不丢,因为 MQ 的持久化过程其实是异步的。即使我们开了持久化,也有可能在内存暂存成功后,异步持久化之前宕机了,那么这个消息就会丢失。
如果想要做到 100%不丢失,就需要引入本地消息表,来通过轮询的方式来进行消息重投。

标签: rabbitmq

本文转载自: https://blog.csdn.net/u013149491/article/details/135727782
版权归原作者 留梦人 所有, 如有侵权,请联系我们删除。

“RabbitMQ如何保证消息传输可靠性”的评论:

还没有评论