0


RabbitMQ---延迟消息

RabbitMQ---延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后才收到消息。

延迟任务:设置在一定时间之后才执行的任务。

延迟消息有以下三种实现方案:

  1. 死信交换机
  2. 延迟消息插件

一、延迟队列

TTL

  • TTL 全称 Time To Live(存活时间/过期时间)。
  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

死信交换机

成为死信(dead letter)的条件:

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false(消费者拒接消费消息,并且不重回队列;)
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 队列消息堆积已满,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

1、声明延迟队列

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.security.auth.login.CredentialNotFoundException;

@Configuration
public class DlxExchangeConfiguration {

    /**
     * 声明 TTL 队列
     * 1. 指定消息的 TTL
     * 2. 指定死信交换机
     * 3. 指定死信交换机的 RoutingKey
     */
    @Bean
    public Queue ttlQueue() {
        return QueueBuilder
                .durable("ttl.queue") // 指定队列的名称
                //.ttl(10000) // 指定 TTL 为 10 秒,这里可设置过期时间,但我这里测试在发送消息时设置过期时间
                .deadLetterExchange("dlx.direct") // 指定死信交换机
                .deadLetterRoutingKey("dlx") // 指定死信交换机的 RoutingKey
                .build();
    }

    /**
     * 声明TTl交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("ttl.direct");
    }

    /**
     * 声明ttl交换机与队列的关联关系
     * @return
     */
    @Bean
    public Binding directBinding(){
        return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("ttl");
    }

    /**
     * 声明死信交换机
     * @return
     */
    @Bean
    public DirectExchange dlxDirect(){
        return new DirectExchange("dlx.direct");
    }

    /**
     * 声明死信队列
     * @return
     */
    @Bean
    public Queue dlxQueue(){
        return new Queue("dlx.queue");
    }

    /**
     * 声明死信交换机与队列关联关系
     * @return
     */
    @Bean
    public Binding tlxBinding(){
        return BindingBuilder.bind(dlxQueue()).to(dlxDirect()).with("dlx");
    }

}

2、发送消息

@Test
    void testSendTTLMessage(){
        rabbitTemplate.convertAndSend("ttl.direct", "ttl", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        });
        log.info("消息发送成功!");
    }

3、死信队列消费消息

@RabbitListener(queues = "dlx.queue")  //监听的队列:dlx.queue
public void listenDlxQueue(String msg){
    log.info("dlx.queue的消息:【"+msg+"】");
}

4、结果比对

4.1、发送时间,设置10s过期

4.2、死信队列消费消息时间

二、延迟消息插件

    RabbitMQ官方推出的插件,原生支持延迟消息的功能。其原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

本地RabbitMQ官网下载rabbitmq_delayer_message_exchange插件地址:

https://www.rabbitmq.com/community-plugins.html

代码实现:
声明延迟交换机方式一:

 @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct")
                .delayed()  //设置delay的属性为true
                .durable(true)
                .build();
    }

方式二:

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(value = "delay.direct",delayed = "true"), //delayed  这是延迟交换机开启
            key = "delay"
    ))
    public void listenDelayMessage(String msg){
        log.info("接收到delay.queue的延迟消息:{}",msg);
    }

测试发送消息

@Test
    void testSendDelayMessage(){
        rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(1000);
                return message;
            }
        });
        log.info("消息发送成功!");
    }

案例

电商项目设置30分钟后检测订单支付状态,并完成取消超时订单

设置30分钟检查订单状态存在两个问题:

  1. 如果并发较高,30分钟可能堆积消息过多,对MQ压力过大
  2. 大多数订单再下单后很短时间内就会支付,但是却需要在MQ内等待30分钟,浪费资源。

解决措施:设置多个延迟消息交换机,如设置不同的等待时间:10s、10s、10s、15s、15s....,这些时间相加得到30分钟,不同延迟时间过滤掉大部分的消息,给MQ减压

首先先查询支付状态,判断是否支付,如果状态显示未支付,则获取下次延迟时间,判断是否有延迟时间,有则重发延迟消息,没有延迟消息则取消订单。如果订单显示已支付,则标记未已支付。

定义延迟时间集合及相关方法:

package com.itheima.consumer.config;

import lombok.Data;

import java.util.Arrays;
import java.util.List;

@Data
public class MultiDelayMessage<T>{
    //消息体
    private T data;

    //记录延迟消息时间的集合
    private List<Long> delayMillis;

    public  MultiDelayMessage(T data, List<Long> delayMillis){
        this.data = data;
        this.delayMillis = delayMillis;
    }

    public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
        return new MultiDelayMessage<>(data, Arrays.asList(delayMillis));
    }

    //获取并移除下一个延迟时间
    //Returns: 队列中的第一个延迟时间
    public Long removeNextDelay(){
        return delayMillis.remove(0);
    }

    //是否还有下一个延迟时间
    public boolean hasNextDelay(){
        return !delayMillis.isEmpty();
    }
}

定义交换机、队列名称以及key:

public interface MqConstants {

    String DELAY_EXCHANGE = "trade.delay.topic";  //交换机名称
    String DELAY_ORDER_QUEUE = "trade.order.delay.queue"; //队列名称
    String DELAY_PRDER_ROUTING_KEY = "order.query";  //key
}

定义延迟消息体:

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {
    
    private final int delay;
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return null;
    }
}

实现发送延迟消息:

/*
     * 生成订单后,发送延迟检查订单消息
     */
    public void testOrderStatic(){
       try{
           MultiDelayMessage<String> msg = MultiDelayMessage.of("这里是订单ID", 1000L, 1000L, 1000L, 1500L, 1500L); //这里的消息体是订单ID,后面是延迟消息时间集合
           rabbitTemplate.convertAndSend(
                   MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_PRDER_ROUTING_KEY,msg,
                   new DelayMessageProcessor(msg.removeNextDelay().intValue())
           );
       }catch (AmqpException e){
           log.error("延迟消息发送失败!");
       }

    }

消费延迟消息的大概思路:

import com.itheima.constants.DelayMessageProcessor;
import com.itheima.constants.MqConstants;
import com.itheima.constants.MultiDelayMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

@RequiredArgsConstructor
public class OrderStatusCheckListener {

    private RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
            exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true" ,type = ExchangeTypes.TOPIC),
            key = MqConstants.DELAY_PRDER_ROUTING_KEY
    ))
    public void listenOderDelayMessage(MultiDelayMessage<Long> msg){
        //1、查询订单状态

        //2、判断是否已支付

            //2.1订单不存在或者已经处理(订单显示已支付)----交易服务显示已支付,则表示已支付,直接return

        //(交易服务显示未支付的情况下)3、去支付服务查询真正的支付状态   ---- 这里是在交易服务查询显示未支付,但不一定是未支付,需要去支付服务查询确定一下

        //3.1、判断支付服务的订单支付状态,已支付则标记订单状态为已支付,直接return

        //4、判断是否存在延迟时间
        if (msg.hasNextDelay()){
            //4.1、存在,则重发延迟消息
            Long nestDelay = msg.removeNextDelay();
            rabbitTemplate.convertAndSend(
                    MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_PRDER_ROUTING_KEY,
                    msg, new DelayMessageProcessor(nestDelay.intValue())
            );
            return;
        }
        //5、不存在,取消订单,修改订单状态为取消订单

        //6、恢复库存

    }
}
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/Mai_Jun_Hao/article/details/135651485
版权归原作者 Mai_Jun_Hao 所有, 如有侵权,请联系我们删除。

“RabbitMQ---延迟消息”的评论:

还没有评论