ok了家人们,小手一指,rap开始!
愿世上无bug
一.惰性队列(Lazy Queue)
1.什么是惰性队列
(1)RabbitMQ从3.6.0版本开始引入惰性队列,它是一种以惰性模式运行的经典队列。当设置“惰性”队列模式时,经典队列中的消息将尽可能早地移动到磁盘。这些消息只有在消费者请求时才被加载到RAM中。
(2)就是说生产者发送消息到RabbitMq时,这些消息会直接写入到磁盘上,而不是先加载到内存中;当消费者需要消费消息时,RabbitMq会从磁盘上加载这些消息到内存中,然后交给消费者,所以惰性队列只有在消费者实际消费时,才会将消息从磁盘加载到内存中。
2.什么情况下,我们要使用惰性队列
(1)惰性队列能够支持更多的消息存储,当消费者由于各种原因(如消费者下线,宕机等)而不能进行消费,造成消息的堆积时,这个时候惰性队列是非常必要的。
(2)RabbitMq内存资源有限的时候,我们可以使用惰性队列避免因消息过多而导致的内存溢出。
(3)对于普通队列,写入磁盘同时在内存中会存一份备份,对于想提高持久化效率,我们可以用惰性队列,它会将消息直接存入文件系统,这样可以减少内存的消耗。
注意:当我们用惰性队列时,我们要能接受性能开销,存入与取出是都要经过磁盘的,这就避免不了I/O,而且时效性很差。
3.如何实现惰性队列
(1)用Policy策略方式(推荐)
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
rabbitmqctl set_policy Lazy "^lazy-queue$" "{""queue-mode"":""lazy""}" --apply-to queues
(Windows)
(2)queue.declare设置参数方式
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 替换为你的RabbitMQ服务器地址
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个惰性队列
Map<String, Object> args1 = new HashMap<>();
args1.put("x-queue-mode", "lazy"); // 设置队列为惰性模式
channel.queueDeclare("lazy_queue", false, false, false, args1);
在SpringBoot项目我们可以用@Bean或者注解的方式配置
@Configuration
public class MqConfig {
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy_queue").lazy().build();
}
//...我们也可以配置交换机、队列绑定交换机等...
}
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
log.info("接收到 lazy.queue 的消息:{}", msg);
}
我们也可以直接用RabbitMq的webUI界面
注意:当策略参数和队列参数都指定队列模式时,则队列参数比策略值具有优先级,如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的,否则会报错。
4.内存利用率
官方提供了简单的测试数据显示了正常队列和惰性队列之间RAM(主存)利用率的差异。
一百万条数据,每条数据大小为 1kb,普通队列进程内存为257MB,而惰性队列是159KB;普通消息占用的内存为368MB,而惰性队列为0,因为消息都在磁盘上。
5.队列的切换
(1)从普通队列转换为惰性队列时,这个操作的性能影响与队列需要将消息分页到磁盘时的性能影响相同。
(2)从普通队列转换到惰性队列期间,队列首先将RAM(主存)中的所有消息分页到磁盘。当该操作正在进行时,它将不再接受来自发布通道的任何消息(会阻塞)。pageout(分页)完成后,队列将开始接受发布、ack和其他命令。
(3)从惰性队列切换到普通队列时,它将执行与服务器重启后恢复队列时相同的过程。
二.优先级队列(Priority Queue)
1.什么是优先级队列
(1)优先级队列是一种特殊类型的队列,它根据消息的优先级进行排序和发送,在这种队列中,高优先级的消息将先被消费。
(2)优先级可以设置为1
255,但建议设置为15,如果最大值5满足不了需求建议1~10,(3)更高的优先级需要更多的CPU和内存资源,因为RabbitMq需要在内部为每个优先级维护一个子队列,从1到你配置最大值。(4)优先级队列与普通队列有相同特性,如支持持久化,分页和镜像等功能,但需要注意的是消息的过期机制,过期的消息是从队列的头部开始过期的,即使你设置了队列级别的TTL,低优先级的过期消息仍然会被高优先级的未过期消息阻塞,导致无法传递,但它会出现在队列统计信息中;
(5)另外,如果你队列设置了最大长度限制,RabbitMq会按正常流程从队列中删除消息,无论是高优先级还是低优先级。
2.优先级队列的应用
(1)订单催付,客户下单后,商家希望在客户设定的时间内完成付款,如果为付款,我们需要推送催付短信。对于大客户的订单,我们就可以使用优先队列去处理。
(2)任务调度,后台调度任务中,有些任务可以稍后执行,有些需要立即执行,所以我们可以用优先队列发送一些紧急任务。
3.如何实现优先级队列
优先级队列是不能用策略配置的,因为策略是动态的,可以在已经声明队列之后进行更改。但是优先级队列声明之后,队列永远不会改变他们的优先级数量。
下边是java代码实现及RabbitMq管理页面配置优先级
(1)java代码
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 替换为你的RabbitMQ服务器地址
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个优先级队列
Map<String, Object> args1 = new HashMap<>();
args1.put("x-max-priority", 10); // 设置队列为优先级队列
channel.queueDeclare("priority_queue", true, false, false, args1);
基于@Bean
@Bean
public Queue priorityQueue(){
return QueueBuilder.durable("priority_queue")
.maxPriority(10).build();
}
RabbitMq的webUI页面
队列的优先级配置完成,下面是生产者发送消息实现消息的优先级
//测试偶数为优先级
@GetMapping("/priority")
public String priorityQueue() {
//消息确认与返回
rabbitTemplate.setConfirmCallback(mqProductCallBack);
rabbitTemplate.setReturnsCallback(mqProductCallBack);
String msg = "我爱上早八,成天笑哈哈";
for (int i = 0; i < 10; i++) {
//创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
//消息发送
if (i % 2 == 0) {
rabbitTemplate.convertAndSend("priority_queue", msg + i,
//Lambda表达式,实现MessagePostProcessor接口
message -> {
//持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//设置优先级
message.getMessageProperties().setPriority(5);
return message;
}, correlationData);
} else {
rabbitTemplate.convertAndSend("priority_queue", msg + i, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, correlationData);
}
}
return "消息发送成功了!!!";
}
MqProductCallBack: 消息确认代码
@Component
public class MqProductCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
/**
* @param correlationData 对象内部只有一个id属性,用来表示当前消息的唯一性
* @param ack 消息投递到broker的状态,true成功,false失败
* @param cause 投递失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack)
System.out.println("消息投递收到确认,correlationData=" + correlationData.getId());
if (!ack)
System.out.println("消息ID="+correlationData.getId()+"投递失败,失败原因:"+cause);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息返回结果:"+ JSONUtil.toJsonStr(returnedMessage));
}
消费者控制台
注意: 在消费者消费的速度大于生产者发送的速度,且broker中没有消息堆积的话,那么消费者接收这些消息之前,消息可能不会在优先级队列中等待;也就是还没等队列进行优先级排列,消息就被消费下来,优先级就没有了意义。(我们测试可以先发送,在启动消费者)
一键三连~我嘻嘻
欢迎大佬补充或者指正~我不嘻嘻
版权归原作者 一拳打穿地球o 所有, 如有侵权,请联系我们删除。