0


RabbitMQ(四) | 惰性队列 - 解决消息堆积问题

RabbitMQ(四) | 惰性队列 - 解决消息堆积问题


接上一篇:RabbitMQ(三) | 死信交换机、死信队列、TTL、延迟队列(安装DelayExchange插件)


1.消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
在这里插入图片描述

解决消息堆积有两种思路:

  • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
  • 给消费者开启多线程,提高消费速度
  • 扩大队列容积,提高堆积上限

要提升队列容积,把消息保存在内存中显然是不行的。


2.惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

2.1.基于命令行设置lazy-queue

而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

rabbitmqctl set_policy Lazy "^lazy-queue$"'{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues :策略的作用对象,是所有的队列

2.2.基于@Bean声明lazy-queue

在这里插入图片描述

在consumer服务端的新增惰性队列配置类LazyConfig:

importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.QueueBuilder;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/** 惰性队列配置 */@ConfigurationpublicclassLazyConfig{/** 惰性队列 - 增加了.lazy()属性 */@BeanpublicQueuelazyQueue(){returnQueueBuilder.durable("lazy.queue").lazy()// 开启x-queue-mode为lazy.build();}/** 正常队列 */@BeanpublicQueuenormalQueue(){returnQueueBuilder.durable("normal.queue").build();}}

2.3.基于@RabbitListener声明LazyQueue

在这里插入图片描述
在consumer服务端的消息监听类中新增方法

@RabbitListener(queuesToDeclare =@Queue(
        name ="lazy.queue",
        durable ="true",
        arguments =@Argument(name ="x-queue-mode", value ="lazy")))publicvoidlistenLazyQueue(String msg){
    log.info("接收到 lazy.queue 的消息:{}", msg);}

2.4.发送消息

这里对正常队列和惰性队列进行十万条数据的测试,启动consumer服务,在publisher服务的SpringAmqpTest类中新增下列两个方法:

/** 测试惰性队列 */@TestpublicvoidtestLazyQueue()throwsInterruptedException{// 模拟发送十万条数据long b =System.nanoTime();for(int i =0; i <1000000; i++){// 1.准备消息Message message =MessageBuilder.withBody("hello, LazyQueue".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)// 改成非持久化,可以看一下LazyQueue的效果.build();// 2.发送消息
        rabbitTemplate.convertAndSend("lazy.queue", message);}long e =System.nanoTime();System.out.println(e - b);}/** 测试正常队列 */@TestpublicvoidtestNormalQueue()throwsInterruptedException{// 模拟发送十万条数据long b =System.nanoTime();for(int i =0; i <1000000; i++){// 1.准备消息Message message =MessageBuilder.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)// 改成非持久化,可以看一下正常队列的效果.build();// 2.发送消息
        rabbitTemplate.convertAndSend("normal.queue", message);}long e =System.nanoTime();System.out.println(e - b);}

RabbitMQ控制台队列的数据变化:

初始:
在这里插入图片描述

运行两个生产者方法后,看一下两个队列总览

在这里插入图片描述


惰性队列数据变化

全部一进来就会存储到磁盘,内存中只有需要消费的:
在这里插入图片描述


正常队列数据变化

而正常队列都在内存中(这里是没有开启持久化的情况):
在这里插入图片描述


3.总结

消息堆积问题的解决方案?

  • 队列上绑定多个消费者,提高消费速度
  • 给消费者开启多线程,提高消费速度
  • 使用惰性队列,可以在mq中保存更多消息

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

惰性队列的缺点有哪些?

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

下一篇:RabbitMQ(五) | MQ集群搭建、部署、仲裁队列、集群扩容



本文转载自: https://blog.csdn.net/qq_25112523/article/details/125431996
版权归原作者 蓝图H 所有, 如有侵权,请联系我们删除。

“RabbitMQ(四) | 惰性队列 - 解决消息堆积问题”的评论:

还没有评论