0


RabbitMQ保证消息的可靠性

一、背景

消息丢失:下图是消息从生产者发送到消费者接收的关系图。通过图片可以看出,消息在生产者、MQ、消费者这三个环节都有可能丢失。
在这里插入图片描述

1.1 生产者丢失

  • 生产者发送消息时连接MQ失败
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • 消息到达MQ后,处理消息的进程发生异常

1.2 MQ丢失

  • 消息到达MQ,保存到队列后,尚未消费就突然宕机

1.3 消费者丢失

  • 消息接收后尚未处理突然宕机
  • 消息接收后处理过程中抛出异常

1.4 总结(三方面入手)

  • 确保生产者成功把消息发送到MQ
  • 确保MQ不会丢失消息
  • 确保消费者成功处理消息

二、解决方案

配置

packagecom.qiangesoft.rabbitmq.producer;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * 消息发送配置
 *
 * @author qiangesoft
 * @date 2024-05-08
 */@ConfigurationpublicclassMessageConfig{publicstaticfinalStringEXCHANGE="simple.exchange";publicstaticfinalStringQUEUE="simple.queue";publicstaticfinalStringROUTING_KEY="simple";@BeanpublicDirectExchangesimpleExchange(){returnExchangeBuilder.directExchange(EXCHANGE)// 持久化交换机.durable(true).build();}@BeanpublicQueuesimpleQueue(){returnQueueBuilder// 持久化队列.durable(QUEUE)// 避免消息堆积、懒加载.lazy().build();}@BeanpublicBindingsimpleBinding(Queue simpleQueue,DirectExchange simpleExchange){returnBindingBuilder.bind(simpleQueue).to(simpleExchange).with(ROUTING_KEY);}}

2.1 生产者

2.1.1 生产者重试机制

背景:生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
解决方案:配置连接超时时间、重试机制。

spring:rabbitmq:# 设置MQ的连接超时时间connection-timeout: 1s
    template:# 连接重试机制retry:enabled:true# 失败后的初始等待时间initial-interval: 1000ms
        # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermultiplier:1# 最大重试次数max-attempts:3

2.1.2 生产者确认机制

背景:

  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • 消息到达MQ后,处理消息的进程发生异常

解决方案:配置Publisher Confirm机制、Publisher Return机制。

spring:rabbitmq:# 开启publisher confirm机制,并设置confirm类型,确保消息到达交换机publisher-confirm-type: correlated
    # 开启publisher return机制,确保消息到达队列publisher-returns:true
定义ConfirmCallback
packagecom.qiangesoft.rabbitmq.producer;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageBuilder;importorg.springframework.amqp.core.MessageDeliveryMode;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.util.concurrent.ListenableFutureCallback;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjava.nio.charset.StandardCharsets;importjava.util.UUID;/**
 * 生产者
 *
 * @author qiangesoft
 * @date 2024-05-08
 */@Slf4j@RequestMapping("/producer")@RestControllerpublicclassProducerController{@AutowiredpublicRabbitTemplate rabbitTemplate;@GetMapping("/send")publicvoidsend(String content){CorrelationData correlation =getCorrelationData();Message message =MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8)).setMessageId(UUID.randomUUID().toString())// 消息持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 正常发送
        rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE,MessageConfig.ROUTING_KEY, message, correlation);}privatestaticCorrelationDatagetCorrelationData(){// 异步回调返回回执,开启publisher confirm机制【确保消息到达交换机】CorrelationData correlation =newCorrelationData();
        correlation.getFuture().addCallback(newListenableFutureCallback<>(){@OverridepublicvoidonFailure(Throwable ex){
                log.error("消息发送异常,ID:{},原因:{}", correlation.getId(), ex.getMessage());}@OverridepublicvoidonSuccess(CorrelationData.Confirm result){
                log.info("触发【publisher confirm】机制");if(result.isAck()){
                    log.info("消息发送成功到达交换机,ID:{}", correlation.getId());}else{// 消息发送失败
                    log.error("消息发送失败未到达交换机,ID:{},原因:{}", correlation.getId(), result.getReason());}}});return correlation;}}
定义ReturnCallback
packagecom.qiangesoft.rabbitmq.producer;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.BeansException;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.context.annotation.Configuration;/**
 * 消息路由失败回退配置
 *
 * @author qiangesoft
 * @date 2024-05-08
 */@Slf4j@ConfigurationpublicclassReturnsCallbackConfigimplementsApplicationContextAware{@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 消息路由失败退回,设置ReturnsCallback【消息到达交换机没有达到队列】
        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){
                log.info("触发【publisher return】机制");
                log.error("消息投递失败未到达队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}", returned.getReplyCode(), returned.getReplyText(),
                        returned.getExchange(), returned.getRoutingKey(), returned.getMessage());}});}}

2.2 MQ

  • Exchange持久化
  • Queue持久化
  • Message持久化

2.2.1 Exchange

@BeanpublicDirectExchangesimpleExchange(){returnExchangeBuilder.directExchange(EXCHANGE)// 持久化交换机.durable(true).build();}

2.2.2 Queue

@BeanpublicQueuesimpleQueue(){returnQueueBuilder// 持久化队列.durable(QUEUE)// 避免消息堆积、懒加载.lazy().build();}

2.2.3 Message

Message message =MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8)).setMessageId(UUID.randomUUID().toString())// 消息持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 发送
rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE,MessageConfig.ROUTING_KEY, message, correlation);

2.3 消费者

2.3.1 消费者确认机制

spring:rabbitmq:listener:simple:# 自动ackacknowledge-mode: auto

2.3.2 消费者重试机制

spring:rabbitmq:listener:simple:# 消费者失败重试机制retry:enabled:true# 初始的失败等待时长为1秒initial-interval: 1000ms
          # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmultiplier:1# 最大重试次数max-attempts:3# true无状态;false有状态。如果业务中包含事务,这里改为falsestateless:true

2.3.3 失败处理策略

packagecom.qiangesoft.rabbitmq.consumer;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.MessageRecoverer;importorg.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * 消息消费失败配置
 * ps:配置处理失败消息的交换机和队列
 *
 * @author qiangesoft
 * @date 2024-05-08
 */@ConfigurationpublicclassErrorMessageConfig{publicstaticfinalStringEXCHANGE="error.exchange";publicstaticfinalStringQUEUE="error.queue";publicstaticfinalStringROUTING_KEY="error";@BeanpublicDirectExchangeerrorMessageExchange(){returnnewDirectExchange(EXCHANGE);}@BeanpublicQueueerrorQueue(){returnnewQueue(QUEUE,true);}@BeanpublicBindingerrorBinding(Queue errorQueue,DirectExchange errorMessageExchange){returnBindingBuilder.bind(errorQueue).to(errorMessageExchange).with(ROUTING_KEY);}@BeanpublicMessageRecovererrepublishMessageRecoverer(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,EXCHANGE,ROUTING_KEY);}}

本文转载自: https://blog.csdn.net/weixin_39311781/article/details/138580195
版权归原作者 PG_强哥 所有, 如有侵权,请联系我们删除。

“RabbitMQ保证消息的可靠性”的评论:

还没有评论