0


Rabbitmq 延迟队列---插件

    解决没法优先发送延时时间短的消息。
  • 插件安装
下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
将插件放入:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins下
进入目录:cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
安装:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启:systemctl restart rabbitmq-server
  • 配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.CustomAutowireConfigurer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayConfig {
    //死信队列
    public static final String DELAY_QUEUE = "DELAY_QUEUE";
    //死信交换机
    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
    //死信routingKey
    public static final String DELAY_ROUNTING_KEY = "DELAY_ROUNTING_KEY";

    //声明交换机
    @Bean
    public CustomExchange delayEchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        //1.交换机名称
        //2.交换机类型
        //3.是否需要持久化
        //4.是否需要自动删除
        //5.其他参数
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);
    }

    //声明队列
    @Bean
    public Queue delayQueue() {
        //创建队列
        return new Queue(DELAY_QUEUE);
    }

    //绑定队列
    @Bean
    public Binding delayBindingQueue(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayEchange") CustomExchange delayEchange) {
        return BindingBuilder.bind(delayQueue).to(delayEchange).with(DELAY_ROUNTING_KEY).noargs();
    }

}
  • 生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 *  延迟队列----插件
 *
 *
 **/
@Slf4j
@RestController
@RequestMapping("/delayed")
public class SendDelayMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //延迟队列
    @GetMapping("/sendExpireMsg/{message}/{ttlTime}")
    public void sendExpireMsg(@PathVariable String message, @PathVariable Integer ttlTime) {
        log.info("发送定时消息");
        rabbitTemplate.convertAndSend(DelayConfig.DELAY_EXCHANGE, DelayConfig.DELAY_ROUNTING_KEY, "消息来自定时消息:" + message, msg -> {
            //发消息的时候,延迟时长
            msg.getMessageProperties().setDelay(ttlTime);
            return msg;
        });
    }

}
  • 消费者
import cn.my.config.rabbitmq.delaybyplugin02.DelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DelayConsume {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    //接收消息
    @RabbitListener(queues = DelayConfig.DELAY_QUEUE)
    public void receiveDelay(Message msg) {

        String message = new String(msg.getBody());
        log.info("接收到了插件延迟队列消息:" + message);
    }
}
标签: rabbitmq

本文转载自: https://blog.csdn.net/weixin_53604412/article/details/127690083
版权归原作者 替罪的羊 所有, 如有侵权,请联系我们删除。

“Rabbitmq 延迟队列---插件”的评论:

还没有评论