0


【RabbitMQ】Springboot实现延迟队列+死信队列

死信的概念

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解

一般来说,生产者将消息投递到交换机或者直接到队列里了,消费者从队列取出消息 进行消费,但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景

1.为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中

2.用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的来源

1.消息 TTL 过期

2.队列达到最大长度(队列满了,无法再添加数据到 mq 中)

3.消息被拒绝

延迟队列

延迟队列概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列使用场景

1.订单在十分钟之内未支付则自动取消

2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

3.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议等

springboot实现

代码架构图

配置类

/**
 * 配置文件类代码
 */
@Configuration
public class TTLQueueConfig {
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    // 声明 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    // 声明 xExchange
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    // 声明队列 A 绑定 X 交换机
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, //@Qualifier 通过容器名进行捆绑
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //声明队列 B 绑定 X 交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
}

生产者(在web界面输入消息)

/**
 * 发送延迟消息
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class sendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendmsg/{msg}")
    public void sendMsg(@PathVariable("msg") String msg) {
        log.info("当前时间:{},发送信息给两个TTL队列:{}", new Date(), msg);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s:" + msg);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s:" + msg);
    }
}

消费者

/**
 * 队列ttl的消费者
 */
@Component
@Slf4j
public class DeadLetterQueueConsume {

    @RabbitListener(queues = "QD")
    public void receiveD(Message msg) {
        log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg.getBody());
        System.out.println(new String(msg.getBody()));
    }
}

结果

发起请求

http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。

问题和新需求

如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

延时队列优化

代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

配置文件类代码

/**
 * 配置文件类代码
 */
@Configuration
public class TTLQueueConfig {
    public static final String X_EXCHANGE = "X";
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String DEAD_LETTER_QUEUE = "QD";
    //新的普通队列QC
    public static final String QUEUE_C = "QC";

    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
    // 声明 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    // 声明 xExchange
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    //声明死信队列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE);
    }
    //声明死信队列 QD 绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

消息生产者代码

@Slf4j
@RestController
@RequestMapping("/ttl")
public class sendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //发送消息和ttl
    @GetMapping("/sendttlandmsg/{msg}/{ttlTime}")
    public void sendMsg(@PathVariable("msg") String msg, @PathVariable("ttlTime") String ttlTime) {
        log.info("当前时间:{},发送时长为{}毫秒的信息给队列QC:{}", new Date(), ttlTime, msg);
        rabbitTemplate.convertAndSend("X", "XC", msg, message -> {
            //设置发送消息的延迟时间
            message.getMessageProperties().setExpiration(ttlTime);
            return message;
        });
    }
}

消费者

/**
 * 队列ttl的消费者
 */
@Component
@Slf4j
public class DeadLetterQueueConsume {

    @RabbitListener(queues = "QD")
    public void receiveD(Message msg) {
        log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg.getBody());
        System.out.println(new String(msg.getBody()));
    }
}

发起请求

http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

存在的问题

如果不安装插件会导致延迟效果不能满足我们的需求,比如先发送了一个延迟20s的消息,后发送了一个延迟为2s的消息,如果第一个消息未到达则后一个消息会被阻塞

解决:安装延时队列插件

1.在官网上下载http:// https://www.rabbitmq.com/community-plugins.html,下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。

2.进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange

代码架构图

配置文件类代码

/**
 * 基于插件的延迟队列配置类
 */
@Configuration
public class DelayQueueConfig {
    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    //声明交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        /**
         * 1.交换机名称
         * 2.交换机类型
         * 3.是否需要持久化
         * 4.是否需要自动删除
         * 5.其他参数
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
                true, false, args);
    }

    //声明队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //绑定
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange) {

        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

消息生产者代码

/**
 * 发送延迟消息
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class sendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 基于插件:生产者,发送消息和延迟时间
     */
    @GetMapping("/sendDelaymsg/{msg}/{delayTime}")
    public void sendMsg(@PathVariable("msg") String msg, @PathVariable("delayTime") Integer delayTime) {
        log.info("当前时间:{},发送时长为{}毫秒的信息给延迟队列队列delayed.queue:{}", new Date(), delayTime, msg);
        rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME
                , DelayQueueConfig.DELAYED_ROUTING_KEY, msg, message -> {
                    //设置发送消息的延迟时间
                    message.getMessageProperties().setDelay(delayTime);
                    return message;
                });
    }
}

消费者

/**
 * 消费者:基于插件的延迟消息
 */
@Component
@Slf4j
public class DelayQueueConsume {
    @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message msg) {
        String message = new String(msg.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), message);
    }
}

结果

发起请求:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000 http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

第二个消息被先消费掉了,符合预期

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。


本文转载自: https://blog.csdn.net/m0_64337991/article/details/122772729
版权归原作者 小李不在ovo 所有, 如有侵权,请联系我们删除。

“【RabbitMQ】Springboot实现延迟队列+死信队列”的评论:

还没有评论