0


RabbitMQ高级

文章目录


MQ的一些常见问题

1.消息可靠性问题:如何确保发送的消息至少被消费一次

2.延迟消息问题:如何实现消息的延迟投递

3.高可用问题:如何避免单点的MQ故障而导致的不可用问题

4.消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题

一.消息可靠性

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • -发送时丢失:- 生产者发送的消息未送达exchange- 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

1.生产者消息确认

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认

消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack

  • publisher-return,发送者回执 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

在这里插入图片描述

简单来说:在publisher-confirm下的nack是消息投递到交换机失败返回的信息;在publisher-confirm下的ack是消息成功到达了消费者;在publisher-return下的ack是消息到达了交换机但是路由失败的返回信息


SpringAMQP实现生产者确认

一.想要实现生产者消息确认机制,需要在配置文件编写开启代码,即在微服务的application.yml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated 
    publisher-returns: true 
    template:
      mandatory: true

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:- simple:同步等待confirm结果,直到超时- correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback(推荐使用)
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

二.配置ReturnCallback

ReturnCallback是消息到达交换机但是没有成功进行路由的回调函数(作用于全局)

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", 
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
    }
}

对于代码中的ApplicationContext是负责管理和组织Spring应用中的各个组件,如bean、配置文件等.

通过实现ApplicationContextAware这个接口,bean可以获取对ApplicationContext的引用,并因此获得访问应用上下文中的其他bean、资源和容器特性的能力,所以说实现了接口等同于获取到了bean容器,就可以获取到 rabbitTemplate并进行设置唯一的ReturnCallback

在回调函数中,消息路由失败会返回很多信息,其中使用路由键,消息的交换机的名称可以实现重发消息

三.在生产者类中发送消息并同时实现ConfirmCallback

ConfirmCallback同样是回调函数,与ReturnCallback不同的是ConfirmCallback可以创建多次

ConfirmCallback是对消息还没有进入到交换机就丢失的一种消息返回策略,当丢失后,执行回调函数并可以记录消息的失败原因和UUID,成功也是同理

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 消息体
    String message = "hello, spring amqp!";
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 添加callback
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){ 
                    // ack,消息成功
                    log.debug("消息发送成功, ID:{}", correlationData.getId());
                }else{
                    // nack,消息失败
                    log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 发送消息
    rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}

需要注意的是:在手动添加交换机的过程中,想要使用通配符"#"的话,应该设置交换机为topic类型!

总结:

SpringAMQP中处理消息确认的几种情况:

  • publisher-comfirm:- 消息成功发送到exchange,返回ack- 消息发送失败,没有到达交换机,返回nack- 消息发送过程中出现异常,没有收到回执
  • 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback

2.消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

1.交换机持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 
    return new DirectExchange("simple.direct", true, false);
}

2.队列持久化:

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

3.消息持久化,SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定

Message msg = MessageBuilder
        .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 
        .build();

但在springamqp中,其实已经在声明交换机和队列的时候将其持久化了,发送消息的方法convertAndSend()内部也将消息做了持久化,了解消息持久化的设置方法可以将以后不是很重要的交换机,队列,消息设置为非持久化


3.消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto(推荐):自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,抛出异常后,会不断重新发送消息即失败重试机制
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

配置方式是修改application.yml文件,添加下面配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

4.消费者失败重试

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

在这里插入图片描述

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)

第三种处理策略是将重试失败的消息重新投递到指定的交换机,然后在投递到指定的队列中,形成了一个交换机-队列的错误消息存放容器,在这个容器中存放不仅有错误消息,还有错误消息头的异常栈信息

在这里插入图片描述

实现方式:

1.定义接收失败消息的交换机、队列及其绑定关系:

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true); 
}
@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}

2.定义RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); 
}

总结:如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

二.死信交换机

初识死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

在这里插入图片描述

如何给队列绑定死信交换机?

  • 给队列设置dead-letter-exchange属性,指定一个交换机
  • 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

1.TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间

在这里插入图片描述

如何实现?

1.声明一组死信交换机和队列,基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "dl.queue", durable = "true"),
        exchange = @Exchange(name = "dl.direct"),
        key = "dl" 
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.queue的延迟消息:{}", msg);
}

2.队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化 
            .ttl(10000) // 设置队列的超时时间,10秒
            .deadLetterExchange("dl.direct") // 指定死信交换机
            .deadLetterRoutingKey("dl") // 指定死信RoutingKey
            .build();
}
@Bean
public Binding simpleBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

3.发送消息时,给消息本身设置超时时间(主要是测试当ttl队列和消息都设置了超时时间最终以哪个为主)

@Test
public void testTTLMsg() {
    // 创建消息
    Message message = MessageBuilder
            .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
            .setExpiration("5000") 
            .build();
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
}

可以发现都设置超时时间的情况下,以时间短的ttl为准

总结:

消息超时的两种方式是?

  • 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
  • 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
  • 两者共存时,以时间短的ttl为准

2.延迟队列

==利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。==这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

实现延迟队列的效果需要依赖RabbitMQ额外的插件:DelayExchange,具体的使用不进行描述,这里说明使用springamqp实现延迟队列

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可

例如(基于注解方式):

在这里插入图片描述

这里是使用注解方式实现了延迟交换机的实现,下面采用声明bean的方式来实现延迟交换机:

在这里插入图片描述

声明好延迟交换机后,需要在发送消息添加消息头(header:x-delay,值为延迟的时间,单位为毫秒),来指定延迟时间

在这里插入图片描述

总结:

延迟队列插件的使用步骤:

  • 声明一个交换机,添加delayed属性为true
  • 发送消息时,添加x-delay头,值为超时时间

三.惰性队列

消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限

惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

在这里插入图片描述

这里主要采用springamqp声明惰性队列:

  • @Bean的方式:

在这里插入图片描述

  • 注解方式:

在这里插入图片描述

总结:

惰性队列的优点:

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

惰性队列的缺点:

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

四.MQ集群

集群分类
RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。


1.普通集群

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失

在这里插入图片描述

最重要的是可以多个节点间可以共享数据(元信息)!


2.镜像集群

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主

在这里插入图片描述
镜像集群不是针对某个节点来区分主从节点的,是针对单个队列来确定是否属于这个队列的主从节点!


3.仲裁队列(推荐)

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致

搭建十分简单,只需要在普通集群的基础上,在rabbitMQ的可视化界面操作声明队列即可(声明队列选择类型为Quorum)
在这里插入图片描述

SpringAMQP创建仲裁队列:

在这里插入图片描述

SpringAMQP连接集群,只需要在yaml中配置即可:
在这里插入图片描述


本文转载自: https://blog.csdn.net/m0_71484388/article/details/135140459
版权归原作者 p1sto 所有, 如有侵权,请联系我们删除。

“RabbitMQ高级”的评论:

还没有评论