0


RabbitMQ-高级

RabbitMQ-高级

消息队列的高级使用:主要是考虑处理实际使用过程各种异常情况,包括但不限于:

  • 消息可靠性
  • 重复消费 - 保证消息可靠,必然会对同一条消息进行多次发送、备份,此时需要保证消息的幂等性

消息可靠性:

  • 消息的传递过程中,共有三个参与方 - 发送者- MQ- 消费者
  • MQ与任意参与方直接的消息通过网络进行传递 - 因为网络中断、不稳定、延迟等,消息可能出现丢失
  • MQ的消息默认保存在内存中(3.1.2版本前),突发断电等情况可能出现消息丢失
  • 实际开发过程中,部分重要消息是万万不能丢失的🙅‍

需要注意的是:

  • 发现了解MQ保证消息可靠性的理念、实现很有必要 - 实际开发过程中往往需要调用第三方服务 - 同样也会遇到调用失败的问题- 有时,调用第三方服务的过程还是异步执行的,- 如何确保调用成功,并成功处理响应,MQ的实现可以作为一个参考 或者也可以直接引入MQ,解耦调用过程

可靠性

保证消息不丢失,需要从三方面考虑:

  • 发送者
  • MQ
  • 消费者

发送者可靠性

实际上,开启本功能耗费性能比较高,同时MQ和网络也远远没有想象的那么脆弱

  • 需要根据业务对数据可靠性的要求,按需开启 绝对正确的废话~

发送者重连

  • 保证发送者与MQ连接正常

发送者确认

  • 保证发送者将消息发送给MQ
发送者重连

有时因为网络波动原因,可能会出现连接MQ失败的情况,可以通过配置开启连接失败后的重连机制

配置文件:

spring:rabbitmq:connection-timeout: 1s # MQ的连接超时时间template:retry:enabled:true# 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier:1# 失败后下次的等待时长倍数max-attempts:3# 最大重试次数
  • 下次等待时长 0 -> 2 -> 5- initial-interval * multiplier
  • 重连过程是阻塞的,需要合理进行配置,否则会影响其他业务性能
  • 必须保证可靠性时:可以异步执行发送消息的业务
发送者确认

SpringAMQP提供了

Publisher Confirm【消息发送到MQ】、Publisher Return【消息】

两种确认机制。当发送者发送消息给MQ后,MQ会返回确认结构给发送者,有以下情况:

  • 返回ACK: - 消息投递到了MQ,但是路由失败(比如交换机未配置路由,这是程序员的失误)。此时通过Publisher Return返回路由异常的原因,然后返回ACK,告知投递成功- 临时消息【不需要持久化】投递到了MQ,并且入队成功,返回ACK- 持久消息投递到了MQ,并且入队完成持久化,返回ACK
  • 其他情况都会返回NACK,告知投递失败

spring:rabbitmq:publisher-confirm-type: correlated
    publisher-returns:true
  • 确认机制有三种类型: - none:关闭确认机制- simple:同步阻塞等待MQ回执消息- correlated:MQ异步调用方式返回回执消息
组件配置

返回确认机制是一个比较耗费性能的机制,一般不建议开启

@Slf4j@ConfigurationpublicclassMqConfig{@ResourcepublicRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinit(){

        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
                log.error("监听到消息return callback");
                log.debug("交换机: {}", returnedMessage.getExchange());
                log.debug("路由key: {}", returnedMessage.getRoutingKey());
                log.debug("文本信息: {}", returnedMessage.getMessage());
                log.debug("状态码: {}", returnedMessage.getReplyCode());
                log.debug("状态文本: {}", returnedMessage.getReplyText());}});}}

示例:

2024-07-0316:50:57.922 ERROR 6928---[nectionFactory1]com.learndemo.config.MqConfig: 监听到消息return callback
2024-07-0316:50:57.922 DEBUG 6928---[.55.94.206:5672]com.learndemo.LearnDemoApplicationTests: 发送消息成功,收到 ack!2024-07-0316:50:57.922 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 交换机: learn.direct
2024-07-0316:50:57.923 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 路由key: q
2024-07-0316:50:57.923 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 文本信息:(Body:'hello'MessageProperties[headers={spring_returned_message_correlation=061a4ee2-ba31-4e5b-8ebb-dc938671a0b2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])2024-07-0316:50:57.923 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 状态码:3122024-07-0316:50:57.923 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 状态文本: NO_ROUTE

每个消息都要单独指定回调对象:

@ResourcepublicRabbitTemplate rabbitTemplate;@TestvoidtestPublisherConfirm()throwsInterruptedException{// 1.创建CorrelationDataCorrelationData cd =newCorrelationData();// 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwable ex){// 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);}@OverridepublicvoidonSuccess(CorrelationData.Confirm result){// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){// result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");}else{// result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息
    rabbitTemplate.convertAndSend("learn.direct","q","hello", cd);// 给与时间接收回调Thread.sleep(20000);}

异常示例:

  • 2024-07-03 16:49:18.620 ERROR 12588 --- [nectionFactory1] c.g.learndemo.LearnDemoApplicationTests : 发送消息失败,收到 nack, reason : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'learn.direct' in vhost '/learn', class-id=60, method-id=40)

数据持久化

MQ为了保证性能,数据一般保存在内存之中

  • 一旦宕机,数据【数据不仅仅指消息:还包括交换机、队列等信息】会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息堆积

可以通过三个方面进行持久化设计:

  • 交换机持久化 -
  • 队列持久化 -
  • 消息持久化 - 消息如果在发送时设置不进行初始化 - 如果内存已满,会阻塞【PageOut】将消息进行持久化- 设置进行持久化,持久化过程不会阻塞-
@TestpublicvoidtestSendMessage(){// 1. 自定义构建消息Message message =MessageBuilder.withBody("Hello SpringAMQP".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)// 不进行持久化.build();// 2. 发送消息for(int i =0; i <10000; i++){
        rabbitTemplate.convertAndSend("simple.queue", message);}}
Lazy Queue

3.12版本后,所有的队列都是LazyQueue,无法更改

一种特殊的队列,特点:

  • 接收到的消息不再保存到内存,直接存入磁盘
  • 消费者需要消费消息时才会从磁盘中读取并加载到内存(可以预加载消息,最多 2048 条)

配置

@BeanpublicQueuelazyQueue(){returnQueueBuilder.durable("lazy.queue")// 持久化队列.lazy()// 惰性队列.build();}

消费者可靠性

消费者确认

用于确认消费者是否成功处理了消息。当消费者处理消息结束后,应该向MQ发送一个回执,告知处理结果:

  • ACK: - 消息处理成功,MQ从队列删除该消息
  • NACK: - 消息处理失败,MQ需要再次投递改消息
  • REJECT: - 消息处理失败,但不再进行处理,MQ从队列删除消息

SpringAMQP实现了消息确认功能,可以通过配置文件选择消息处理方式,有三种方式:

  • none: 不推荐- 不进行处理,消费者收到消息里面返回ACK
  • manual: - 手动进行:在代码中调用API发送ACK或者REJECT
  • auto: 推荐 - 对消息处理逻辑使用AOP环绕增强 - 业务执行正常:ACK- 业务异常【抛出异常】:NACK- 消息处理或校验异常【消息相关的特定异常】:REJECT - 如:消息转换异常
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto   # none 关闭、 manual 手动
失败重试机制

消息处理失败,还可以在本地重试几次,如果都失败,执行后续异常逻辑

spring:
  rabbitmq:
    # 消费者确认机制
    listener:
      simple:
        acknowledge-mode: auto
        prefetch:1
        # 开启本地失败重试
        retry:
          enabled:true
          max-attempts:3  # 重试次数,默认3
          initial-interval:1000 # 时间间隔,默认1s
          multiplier:1     # 间隔倍增

开启重试模式后,重试次数耗尽,通过MessageRecoverer接口来处理,包含三种策略:

  • RejectAndDontRequeueRecoverer- 次数耗尽,返回reject,丢弃消息- 默认处理方式,很容易导致消息丢失,极不推荐
  • ImmediateRequeueMessageRecoverer- 次数耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer- 次数耗尽后,将消息投递到指定队列

修改默认策略:

@ConfigurationpublicclassErrorMessageConfig{@ResourceprivateRabbitTemplate rabbitTemplate;@BeanpublicDirectExchangeerrorExchange(){returnnewDirectExchange("error.direct");}@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue");}@BeanpublicBindingerrorQueueBinding(Queue errorQueue,DirectExchange errorExchange){returnBindingBuilder.bind(errorQueue).to(errorExchange).with("error");}/**
    * 配置
    */@BeanpublicMessageRecoverererrorMessageRecoverer(){returnnewRepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}}
@ConfigurationpublicclassErrorMessageConfig{@ResourceprivateRabbitTemplate rabbitTemplate;@BeanpublicMessageRecoverermessageRecoverer(){returnnewImmediateRequeueMessageRecoverer();}}
消费者重复消费

也被称为:幂等性问题,即对同一个请求,处理一次和多次,所造成的影响是一样的

通用方案-唯一的ID:

  • 消息设置唯一的ID
  • 将处理成功的消息ID保存到数据库之中
// 会在消息的properties字段设置消息ID@BeanpublicMessageConvertermessageConverter(){Jackson2JsonMessageConverter messageConverter =newJackson2JsonMessageConverter();
    
    messageConverter.setCreateMessageIds(true);// 自动生成消息IDreturn messageConverter;}// 消费者获取消息ID@RabbitListener(queues ="simple.queue")publicvoidlistener(Message message){// 获取消息IDString messageId = message.getMessageProperties().getMessageId();}
  • 缺点: - 读写数据库,性能不高

特定场景-业务状态:

  • 比如说支付: - 接收到消息后修改为支付成功,只有未支付的订单才能变为支付成功- 状态只能由:A -> B,消息只会被正常 ‘处理一次’- 同一条消息只会对应一种状态转变- 状态转变只会发生一次

本文转载自: https://blog.csdn.net/goal66/article/details/142640152
版权归原作者 goal66 所有, 如有侵权,请联系我们删除。

“RabbitMQ-高级”的评论:

还没有评论