0


RabbitMQ之延迟队列

    延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。

    例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?
  1. 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很低效,很多时候做的都是些无用功;

  2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在了;

  3. 还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的过程就是检查这个座位是否已经是“已付款”状态;

     你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚22点准时参加会有。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟)就会通知提醒参会人员做好参会准备,会议马上开始...
    
     同样的,这也可以通过轮询“会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息,而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。不过遗憾的是,在AMQP协议和RabbitMQ中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍的“死信队列”来变相的实现。
    

可以使用rabbitmq_delayed_message_exchange插件实现。

    这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息在延时交换机里(x-delayed-message exchange)。

  • 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
  • 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
  • 队列(queue)再把消息发送给监听它的消费者(customer)

下载插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

安装插件:

    将插件拷贝到rabbitmq-server的安装路径:*/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins *

启用插件

rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启rabbitmq-server

systemctl restart rabbitmq-server

SpringBoot代码案例

(1)xml配置文件与properties连接配置

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
spring.application.name=delayed_exchange
spring.rabbitmq.host=192.168.80.121
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 设置手动确认消息
#spring.rabbitmq.listener.simple.acknowledge-mode=manual

(2)SpringBootApplication主入口类

package com.lagou.rabbit.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Demo02RabbitmqDelayedApplication {

    public static void main(String[] args) {
        SpringApplication.run(Demo02RabbitmqDelayedApplication.class, args);
    }

}

(3)RabbitMQ的对象配置

package com.lagou.rabbit.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableRabbit
@ComponentScan("com.lagou.rabbit.demo")
public class RabbitConfig {

    @Bean
    public Queue queue() {
        return new Queue("queue.delayed", true, false, false, null);
    }

    @Bean
    public Exchange exchange() {
        Map<String, Object> arguments = new HashMap<>();
        // 使用x-delayed-type指定交换器的类型
        arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
        // 使用x-delayed-message表示使用delayed exchange插件处理消息
        return new CustomExchange("ex.delayed", "x-delayed-message", true, false, arguments);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
    }

    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }

    @Bean
    @Autowired
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

(4)使用推消息模式接收延迟队列的广播

package com.lagou.rabbit.demo.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

import java.io.IOException;

@Component
public class MyMeetingListener {
    @RabbitListener(queues = "queue.delayed")
    public void onMessage(Message message, Channel channel) throws IOException {
        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));

        // 消息确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

(5)开发RestController,用于向延迟队列发送消息,并指定延迟的时长

package com.lagou.rabbit.demo.controller;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RestController
public class DelayedController {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @RequestMapping("/meeting/{second}")
    public String bookMeeting(@PathVariable Integer second) {

        // RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
        // 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
        // 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
        // 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
                // 设置消息的过期时间
                .setHeader("x-delay", (second - 10) * 1000)
                .setContentEncoding("utf-8")
                .build();

        Message message = MessageBuilder.withBody("还有10s开始开会了".getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();

        // 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值
        // rabbitTemplate.convertAndSend("ex.delayed", "key.delayed", message, msg -> {
        // 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
         // 当消息转换完,设置消息头字段
        // msg.getMessageProperties().setHeader("x-delay", (seconds - 5) * 1000);
            // return msg;
        // });

        amqpTemplate.send("ex.delayed","key.delayed",message);

        return "会议订好了";
    }
}

(6)访问,然后查看输出


本文转载自: https://blog.csdn.net/weixin_52851967/article/details/128134264
版权归原作者 悠然予夏 所有, 如有侵权,请联系我们删除。

“RabbitMQ之延迟队列”的评论:

还没有评论