0


RabbitMQ之“延时队列”

延时队列

RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列

下面先来解释一下

延时队列(也可以称为延迟队列,其实都是一个意思):

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

延时消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息

延时任务:设置在一定时间之后才执行的任务

死信:

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

1、消费者使用basic.reject或basic.nack声明消息消费失败,并且消息的requeue参数设置为false

2、消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费

3、要投递的队列消息堆积满了,最早的消息可能成为死信

延时队列可以用于以下场景:

  1. 订单处理:在电商网站中,订单处理是一个常见的业务流程。如果订单需要立即处理,可以使用RabbitMQ的延时队列来实现延迟处理。例如,可以将订单发送到一个延时队列中,并设置一个延迟时间(例如30分钟),然后在延迟时间到达后,将订单从队列中取出并进行处理。
  2. 消息推送:在移动应用或Web应用程序中,可以使用RabbitMQ的延时队列来实现消息推送。例如,可以将用户订阅的消息发送到一个延时队列中,并设置一个延迟时间(例如1小时),然后在延迟时间到达后,将消息从队列中取出并推送给用户。
  3. 定时任务:在分布式系统中,可以使用RabbitMQ的延时队列来实现定时任务。例如,可以将需要定期执行的任务发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将任务从队列中取出并执行。
  4. 数据备份:在数据库中,可以使用RabbitMQ的延时队列来实现数据备份。例如,可以将需要备份的数据发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将数据从队列中取出并进行备份。
  5. 优惠券发放:您可以设置一个延时队列,将优惠券发放任务添加到队列中,设置一定的延时时间,以保证优惠券在特定时间后才能被消费。
  6. 动态路由:您可以使用延时队列来实现动态路由的功能,将消息发送到延时队列中,并设置一定的路由规则,以实现消息在特定时间后被路由到不同的目标队列中。

业务场景:

我们通常会在电商网站中(或者app比如:京东,淘宝)进行下单,购买商品,但是我们由于没哟及时支付,会出现订单超时未支付自动取消的情况

下面用一张简单的图片来设计一下业务场景:

那我们该如何去实现延时队列呢,下面用一张图片给大家解释一下

话不多说,上代码!!!

作者在这里只创建了一个交换机,这个交换机可以同时绑定两个队列(有两个队列,一个队列设置了它的ttl(消息过期时间),同时设置了消息过期后的路由交换机和路由的routeKey,如果不设置过期策略那么消息过期之后就会进入死信队列,另外一个队列是普通队列,监听的时候只用去监听普通队列,达到延迟队列的效果。跟上图效果一样,消息通过这个交换机到达设置了过期时间的的队列,这个延迟队列没有消费者进行消费,当消息过期之后,会通过这个交换机路由到正常的队列,然后进行消费)

导入依赖

       <dependencies>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
       </dependencies>

配置类

package com.atguigu.gulimall.auth.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author YanShuLing
 * @Package:com.atguigu.gulimall.auth.config
 * @Project: brook
 * @Description TODO
 * @name:RabbitMQConfig
 * @Date 2024/3/8:9:56
 */
@Configuration
public class RabbitMqConfig {

    //创建了一个简单的队列
    @Bean
    public Queue createOrderReleaseQueue(){
        return new Queue("gmall.order.release.queue");
    }

    //这个是一个延时队列
    @Bean
    public Queue createOrderDeadQueue(){

        Map<String,Object> map = new HashMap<>();
        //队列消息的过期时间为十秒
        map.put("x-message-ttl",10000);
        //交换机
        map.put("x-dead-letter-exchange","gmall-order-exchange");
        //路由key
        map.put("x-dead-letter-routing-key","gmall.order.release.queue");

        return new Queue("gmall.order.dead.queue",true,false,false,map);
    }
    //交换机
    @Bean
    public Exchange createOrderExchange(){
        return new DirectExchange("gmall-order-exchange");
    }
    //交换机和正常队列绑定
    @Bean
    public Binding createOrderReleaseBind(){
       return new Binding("gmall.order.release.queue",Binding.DestinationType.QUEUE,
                "gmall-order-exchange","gmall.order.release.queue",null
                );
    }
    //交换机和延迟队列绑定
    @Bean
    public Binding createOrderDeadBind(){
        return new Binding("gmall.order.dead.queue",Binding.DestinationType.QUEUE,
                "gmall-order-exchange","gmall.order.dead.queue",null
        );
    }
}

生产者(作者写了一个发送验证码的代码):

    @PostMapping("/createOrder")
    public R createOrder(String mobile){
        //生成随机的四位数(验证码)
        String code = RandomUtil.randomNumbers(4);
        //redis给这个验证码设置过期时间为5分钟   
        redisTemplate.opsForValue().set("send_sms_"+mobile,code,5, TimeUnit.MINUTES);
        
        String content =   StrFormatter.format(Constants.SMS_TEMPLATE,code);
        //给这个消息生成一个唯一标识,为了解决消息重复消费问题
        String messageId = IdUtil.randomUUID();
        //生产者发送消息,第一个参数是路由交换机,第二个参数是路由键,作者设置了跟死信队列一样的 
        名称,无伤大雅
        rabbitRemplate.convertAndSend("gmall-order-exchange","gmall.order.dead.queue",
        JSON.toJSONstring(new SmsParamVo(mobile,content,messageId)));

        //发送验证码,日志打印
   log.info("发送延迟消息给ttl队列,当前时间:{},消息内容:{}",new Date().toString(),content);
//      smsService.sendSms(mobile,content);
        return R.ok("成功");
    }

消费者:用来监听消息

//消费者监听队列为gmall.order.release.queue队列的消息
@RabbitListener(queues = {"gmall.order.release.queue"})
@Component
@Slf4j
public class SmsListener {

    private final SmsService smsService;

    private final RedisTemplate redisTemplate;

    public SmsListener(SmsService smsService, RedisTemplate redisTemplate) {
        this.smsService = smsService;
        this.redisTemplate = redisTemplate;
    }

    @RabbitHandler
    public void sendSms(String string, Channel channel, Message message){

        //消息标签
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            SmsParamVo smsParamVo = JSON.parseObject(string, SmsParamVo.class);
            if(redisTemplate.hasKey(smsParamVo.getMsgId())){
                //拿到消息的唯一标签,如果是已经消费过的消息,直接拒绝签收
                channel.basicReject(deliveryTag,false);
                return;
            }
     //打印日志
    log.info("发送延迟消息给ttl队列,当前时间:{},消息内容:{}",new Date().toString(),smsPar
mVo);
            //调用发送短信
//            smsService.sendSms(smsParamVo.getMobile(),smsParamVo.getContext());
            redisTemplate.opsForValue().set(smsParamVo.getMsgId(),smsParamVo.getMsgId(),12, TimeUnit.HOURS);
            //确认签收,消息会从队列中删除
            channel.basicAck(deliveryTag,false);

        } catch (IOException e) {
            try {
                if(deliveryTag<=3){
                    //如果是由于某种特殊原因,消息没有发送成功,然后重回队列,
                    channel.basicNack(deliveryTag,false,true);
                }
                //当重试次数达到一定的数量,就放进死信队列
                channel.basicNack(deliveryTag,false,false);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
            throw new RuntimeException(e);
        }

    }
    
}

测试发送之前,我们先来到rabbitMq可视化界面观察一下

下面我们来测试一下,作者使用的是Postman

看看后台日志打印,我们可以看到我们已经实现了延迟消息的效果

还有一种方式也可以实现延迟消息

那就是延迟消息插件,RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列

1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件

RabbitMQ官网下载插件的网址:Community Plugins | RabbitMQ

2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明

这里作者的版本是3.9.13所以,作者就下载3.9版本的

选择3.9版本

            ![](https://img-blog.csdnimg.cn/direct/7d0ed958f5bb4628abfd7c5873ff7908.png)

3、把这个插件传输到服务器上

4、拷贝下载好的插件到容器中

docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez 

可以看到我已经copy到容器内部了

5、安装延迟队列插件

进入RabbitMQ安装目录的目录下

//进入容器内部
docker exec -it rabbitmq /bin/bash

进入安装目录

cd /opt/rabbitmq/plugins

使用如下命令启用延迟插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如下我们就安装好了,然后我们重启rabbitmq容器

使用 exit 命令退出容器

使用docker restart rabbitmq 重启容器

我们来rabbitmq的可视化界面查看

这样说明我们的延迟插件就安装好啦!

到此就结束啦!希望可以帮到你,可以帮作者点个关注和小心心嘛!你们的支持就是我最大的动力,以后也会努力更新的哦!

标签: rabbitmq

本文转载自: https://blog.csdn.net/2302_77971734/article/details/136886578
版权归原作者 颜书凌 所有, 如有侵权,请联系我们删除。

“RabbitMQ之“延时队列””的评论:

还没有评论