0


[RabbitMQ] 延迟队列+事务+消息分发

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

1. 延迟队列

1.1 概念

延迟队列就是在消息发送以后,并不想让消费者立刻拿到消息,而是等待特定的时间之后,消费者才可以拿到消息进行消费.
RabbitMQ本身并没有直接支持延迟队列的功能,但是可以通过TTL+死信队列的方式结合模拟出延迟队列的功能.
假设一个应用中需要每条消息都为10s延迟,生产者通过

normal_exchange

这个交换器将发送的消息存储在

normal_queue

这个队列,之后为这个队列或者队列中的消息的ttl设置为10s.但是消费者订阅的队列并不是

normal_queue

这个队列,而是

dlx_queue

这个队列,当消息从

normal_queue

这个队列中的消息经历10s过期之后存入

dlx_queue

这个队列中,消费者就恰好消费到了延迟10s之后的消息.
在这里插入图片描述

1.2 TTL+死信队列实现

代码实现:

  1. 先看TTL+死信队列实现延迟队列 定义正常队列和死信队列,绑定正常队列和死信交换机.
@BeanpublicQueuenormalQueue(){returnQueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").build();}@BeanpublicDirectExchangenormalExchange(){returnExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).durable(true).build();}@BeanpublicBindingnormalBinding(@Qualifier("normalQueue")Queue queue,@Qualifier("normalExchange")DirectExchange exchange
){returnBindingBuilder.bind(queue).to(exchange).with("normal");}@BeanpublicQueuedlxQueue(){returnQueueBuilder.durable(Constant.DLX_QUEUE).build();}@BeanpublicDirectExchangedlxExchange(){returnExchangeBuilder.directExchange(Constant.DLX_EXCHANGE).durable(true).build();}@BeanpublicBindingdlxBinding(@Qualifier("dlxQueue")Queue queue,@Qualifier("dlxExchange")DirectExchange exchange
){returnBindingBuilder.bind(queue).to(exchange).with("dlx");}

编写生产者:
发送一条10s过期的消息,再发送一条20s过期的消息.

@RequestMapping("/delay")publicStringdelay(){Message message1 =newMessage("发送delay消息10s".getBytes(StandardCharsets.UTF_8));
    message1.getMessageProperties().setExpiration("10000");
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message1);Message message2 =newMessage("发送delay消息20s".getBytes(StandardCharsets.UTF_8));
    message2.getMessageProperties().setExpiration("20000");
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message2);return"发送成功";}

消费者:

@ComponentpublicclassDelayListener{@RabbitListener(queues =Constant.DLX_QUEUE)publicvoidlistener(Message message){long deliveryTag = message.getMessageProperties().getDeliveryTag();String msg =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println("接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+msg);}}

调用接口之后观察控制台接收消息的结果:等待10s和20s之后分别接收到消息
在这里插入图片描述
延迟队列希望达到的效果就是延迟一定的时间之后才收到消息,TTL刚好给消息设置延迟时间,成为死信,成为死信之后就会被投递到死信队列中,这样消费者就可以一直消费死信队列的消息就可以了.
但是这样的模式也会存在一定的问题
我们可以先发送20s的数据,再发送10s的数据:

@RequestMapping("/delay")publicStringdelay(){Message message2 =newMessage("发送delay消息20s".getBytes(StandardCharsets.UTF_8));
    message2.getMessageProperties().setExpiration("20000");
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message2);Message message1 =newMessage("发送delay消息10s".getBytes(StandardCharsets.UTF_8));
    message1.getMessageProperties().setExpiration("10000");
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message1);return"发送成功";}

通过控制台观察死信队列消费情况:
在这里插入图片描述
我们发现10s过期的消息和20s过期的消息同时被消费者收到.10s过期的消息和20s过期的消息同时进入了死信队列.
这是由于在消息过期之后,消息不会被马上丢弃,消息只在消息被消费者消费的时候,即出队列的时候检测消息是否过期(扫描队头的消息是否过期),由于20s的消息在10s消息的前面,队列会优先扫描20s过期的消息,10s过期的消息还暂时不会被扫描到,当队列扫描到20s的消息过期的时候,10s的消息才会被扫描到,队列这才会认为10s的这条消息已经过期了,所以他和20s的消息便同时进入了死信队列中.
所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是⼀致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每⼀种不同延迟时间的消息建立单独的消息队列.

1.3 延迟队列插件

RabbitMQ官方也提供了一个延迟的插件来实现延迟队列的功能.

1.3.1 安装延迟队列

  1. 下载并上传插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/d2984e15f9194af7850697735d5953f0.png) 下载ez文件.下载到Windows环境之后通过Xshell上传到服务器.(这里需要注意的是,我们下载的插件版本需要和我们操作系统上安装的RabbitMQ的版本一致) 我们在上传到服务器中的时候,我们需要把该文件上传到/usr/lib/rabbitmq/plugins目录中,RabbitMQ本身不会在此安装任何内容,如果没有这个路径,可以自己进行创建.在这里插入图片描述
  2. 启动插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange在这里插入图片描述
  3. 验证插件 在RabbitMQ管理平台查看,新建交换机的时候是否有延迟消息的选项,如果有就说明延迟消息插件已经正常运行了.在这里插入图片描述

1.3.2 基于插件延迟队列实现

  1. 声明交换机和队列 在交换机的声明之后加上delay()选项,这里需要注意的是,虽然我们叫的是延迟队列,但是我们是在交换机上声明延迟的.
publicstaticfinalStringDELAY_EXCHANGE="delay_exchange";publicstaticfinalStringDELAY_QUEUE="delay_queue";
@BeanpublicDirectExchangedelayExchange(){returnExchangeBuilder.directExchange(Constant.DELAY_EXCHANGE).delayed().durable(true).build();}@BeanpublicQueuedelayQueue(){returnQueueBuilder.durable(Constant.DELAY_QUEUE).build();}@BeanpublicBindingdelayBinding(@Qualifier("delayExchange")DirectExchange exchange,@Qualifier("delayQueue")Queue queue){returnBindingBuilder.bind(queue).to(exchange).with("delay");}
  1. 生产者 发送两条消息,并设置延迟时间,这里我们先设置20s的,再设置10s的,看看上面的问题有没有得到解决.我们前面使用ttl+死信队列的方式实现消息延迟的时候,我们设置消息设置的是过期时间(setExpiration),我们在这里设置的时候设置的是延迟时间(setDelayLong).
@RequestMapping("/delay")publicStringdelay(){Message message2 =newMessage("发送delay消息20s".getBytes(StandardCharsets.UTF_8));
    message2.getMessageProperties().setDelayLong(20000L);
    rabbitTemplate.convertAndSend(Constant.DELAY_EXCHANGE,"delay",message2);Message message1 =newMessage("发送delay消息10s".getBytes(StandardCharsets.UTF_8));
    message1.getMessageProperties().setDelayLong(10000L);
    rabbitTemplate.convertAndSend(Constant.DELAY_EXCHANGE,"delay",message1);return"发送成功";}
  1. 消费者
@ComponentpublicclassDelayListener{@RabbitListener(queues =Constant.DLX_QUEUE)publicvoidlistener(Message message){long deliveryTag = message.getMessageProperties().getDeliveryTag();String msg =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println("接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+msg);}}
  1. 运行程序,观察控制台日志和RabbitMQ管理界面在这里插入图片描述 我们看到delay_exchange的交换机类型是"x-delay-message". 调用接口,发送消息,观察控制台日志:在这里插入图片描述 我们发现我们上述的问题得到了解决,我们首先收到了延迟10s的消息,后收到了延迟20s的消息.

1.4 常见面试题

延迟队列作为RabbitMQ的高级特性,也是面试的一大重点

  1. 介绍一下RabbitMQ的延迟队列 延迟队列就是在消息发送以后,并不想让消费者立刻拿到消息,而是等待特定的时间之后,消费者才可以拿到消息进行消费. 但是RabbitMQ本身并没有直接实现延迟队列,有以下的两种方法来实现: - 通过ttl+死信队列的方式来实现- 但是通过这种方式实现存在一定的问题,如果延迟时间长的消息先到达队列,延迟时间短的后到达队列,延迟时间短的不会即时被消费者收到.- 可以通过官方提供的延迟插件实现延迟功能.
  2. 应用场景 - 订单在10min之内未支付自动取消.- 用户在注册成功之后,3天之后发起调查问卷- 用户发起退款,24小时之内商家未处理,则默认同意退款.
  3. 二者对比 - 基于死信实现的延迟队列 优点就是灵活,不需要额外的插件来支持,缺点就是存在消息顺序问题,需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性.- 基于插件实现的延迟队列 优点就是通过插件可以直接创建延迟队列,简化延迟消息的实现,避免了DLX存在消息顺序问题. 缺点就是需要依赖特定的插件,有运维的工作,其次RabbitMQ的版本必须和插件的版本对应.

2. 事务

RabbitMQ是基于AMQP协议实现的,该协议实现了事务的机制,因此RabbitMQ也支持事务的机制,Spring AMQP也提供了对事务的额相关操作.RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败.

2.1 配置事务

配置事务管理器的时候,分为两步,首先创建RabbitTemplate,使用

setChannelTransacted(true)

开启RabbitTemplate的信道事务.之后创建事务管理器

RabbitTransactionManager

.

@ConfigurationpublicclassTransactionConfig{@BeanpublicRabbitTransactionManagertransactionManager(ConnectionFactory connectionFactory){returnnewRabbitTransactionManager(connectionFactory);}@BeanpublicRabbitTemplatetransactionTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}}

2.2 配置队列

@BeanpublicQueuetansactionQueue(){returnQueueBuilder.durable("trans_queue").build();}

2.3 生产者

在生产者中,我们需要在方法之上加上

@Transactional

才可以生效.我们在异常发生之前发送一条消息,在异常发生之后发送一条消息,查看数据是否会被回滚.

@RequestMapping("/trans")@TransactionalpublicStringtrans(){
    transactionTemplate.convertAndSend("","trans_queue","trans1...");int i =5/0;
    transactionTemplate.convertAndSend("","trans_queue","trans2...");return"发送成功";}

测试:
在这里插入图片描述
在发送消息之后,报出了500的错误码.
在这里插入图片描述
我们看到trans_queue中没有接收到消息,说明第一条消息被回滚了.

3. 消息分发

3.1 概念

RabbitMQ队列拥有多个消费者的时候,队列会把收到的消息分派给不同的消费者,每条消息只会发送给订阅列表里的一个消费者.这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者处理消息即可.
默认的情况下,消费者是轮训进行分发的,而不管消费者是否已经消费并已经确认了消息.这种方式是不太合理的,试想一下,如果某些消费者的消费速度较慢,而某些消费者的消费速度很快,这就会导致某些消费者的消息发生积压,某些消费者则很空闲,进而导致应用的整体吞吐量下降.所以我们就可以使用消息分发来解决问题.

3.2 应用场景

消息分发的常见应用场景如下:

  1. 限流
  2. 非公平分发

3.2.1 限流

RabbitMQ提供了限流的机制,可以控制消费端一次只能拉取N个请求.
通过设置prefetchCount参数,同时也必须设置消息应答方式为手动应答.
prefetchCount: 控制消费者从队列中预取消息的数量,以此来实现流控制和负载均衡.
代码示例:

  1. 首先我们需要在配置文件中加入prefetch参数
listener:simple:acknowledge-mode: manual #需要设置为手动应答retry:initial-interval: 1000ms
          enabled:truemax-attempts:5prefetch:5# 表示一个队列最多可以有5条未确认的消息
  1. 配置队列和交换机
@BeanpublicDirectExchangeQOSExchange(){returnExchangeBuilder.directExchange(Constant.QOS_EXCHANGE).durable(true).build();}@BeanpublicQueueQOSQueue(){returnQueueBuilder.durable(Constant.QOS_QUEUE).build();}@BeanpublicBindingQOSBinding(@Qualifier("QOSQueue")Queue queue,@Qualifier("QOSExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("qos");}
  1. 消费者监听队列 首先我们先不对消息进行手动ack
@ComponentpublicclassQOSListener{@RabbitListener(queues =Constant.QOS_QUEUE)publicvoidlistener(Message message,Channel channel)throwsIOException{long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+newString(message.getBody(),"UTF-8"));//        channel.basicAck(deliveryTag,true);}}
  1. 生产者发送消息,一次性发送20条消息
@RequestMapping("/qos")publicStringqos(){for(int i =0;i <20;i++){
        rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","message"+i);}return"发送成功";}
  1. 测试 调用接口,发送消息:在这里插入图片描述在这里插入图片描述 我们看到,由于没有对消息进行手动应答,我们控制台只收到了5条消息.在这里插入图片描述 由于5条消息还没有ack掉,所以剩下的15条消息就在队列中发生了堆积. 之后我们对消息进行手动ack.观察控制台:在这里插入图片描述 我们看到消息全部被应答了.

3.2.2 负载均衡

我们也可以使用此配置来实现负载均衡.
如图所示的情况下,一个消费者处理任务非常快,另一个非常慢,就会造成一个消费者很忙,一个消费者很闲.这是因为RabbitMQ只是在消息进入队列的时候分派消息,它不考虑消费者未确认消息的数量.
在这里插入图片描述
我们可以设置

prefetch=1

的方式来实现负载均衡.告诉RabbitMQ一次只给一个消费者发送消息,也就是说,在一个消费者对前一条消息进行确认之前,不会对该消费者发送新的消息,相反,它会将它分配给一个不处在繁忙阶段的消息队列.
代码示例:

  1. 配置prefetch参数为1,将消息应答机制设置为手动应答
listener:simple:acknowledge-mode: manual #需要设置为手动应答retry:initial-interval: 1000ms
          enabled:truemax-attempts:5prefetch:1# 表示一个队列最多可以有1条未确认的消息
  1. 设置两个消费者,其中消费较慢的消费者使用Thread.sleep(100)来模拟消费慢.
@ComponentpublicclassQOSListener{@RabbitListener(queues =Constant.QOS_QUEUE)publicvoidlistener(Message message,Channel channel)throwsIOException,InterruptedException{Thread.sleep(100);long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+newString(message.getBody(),"UTF-8"));
        channel.basicAck(deliveryTag,true);}@RabbitListener(queues =Constant.QOS_QUEUE)publicvoidlistener2(Message message,Channel channel)throwsIOException{long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者2接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+newString(message.getBody(),"UTF-8"));
        channel.basicAck(deliveryTag,true);}}
  1. 测试 调用接口,向两个消费者发送消息在这里插入图片描述 我们可以很明显的看到,消费者2消费消息的速度比消费者1快很多.

deliveryTag有重复是因为两个消费者使用的是不同的Channel,每个Channel上的deliveryTag 是独立计数的.

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/2301_80050796/article/details/144120861
版权归原作者 LileSily 所有, 如有侵权,请联系我们删除。

“[RabbitMQ] 延迟队列+事务+消息分发”的评论:

还没有评论