解决没法优先发送延时时间短的消息。
- 插件安装
下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
将插件放入:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins下
进入目录:cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
安装:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启:systemctl restart rabbitmq-server
- 配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.CustomAutowireConfigurer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayConfig {
//死信队列
public static final String DELAY_QUEUE = "DELAY_QUEUE";
//死信交换机
public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
//死信routingKey
public static final String DELAY_ROUNTING_KEY = "DELAY_ROUNTING_KEY";
//声明交换机
@Bean
public CustomExchange delayEchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
//1.交换机名称
//2.交换机类型
//3.是否需要持久化
//4.是否需要自动删除
//5.其他参数
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);
}
//声明队列
@Bean
public Queue delayQueue() {
//创建队列
return new Queue(DELAY_QUEUE);
}
//绑定队列
@Bean
public Binding delayBindingQueue(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayEchange") CustomExchange delayEchange) {
return BindingBuilder.bind(delayQueue).to(delayEchange).with(DELAY_ROUNTING_KEY).noargs();
}
}
- 生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 延迟队列----插件
*
*
**/
@Slf4j
@RestController
@RequestMapping("/delayed")
public class SendDelayMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
//延迟队列
@GetMapping("/sendExpireMsg/{message}/{ttlTime}")
public void sendExpireMsg(@PathVariable String message, @PathVariable Integer ttlTime) {
log.info("发送定时消息");
rabbitTemplate.convertAndSend(DelayConfig.DELAY_EXCHANGE, DelayConfig.DELAY_ROUNTING_KEY, "消息来自定时消息:" + message, msg -> {
//发消息的时候,延迟时长
msg.getMessageProperties().setDelay(ttlTime);
return msg;
});
}
}
- 消费者
import cn.my.config.rabbitmq.delaybyplugin02.DelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DelayConsume {
@Autowired
private StringRedisTemplate stringRedisTemplate;
//接收消息
@RabbitListener(queues = DelayConfig.DELAY_QUEUE)
public void receiveDelay(Message msg) {
String message = new String(msg.getBody());
log.info("接收到了插件延迟队列消息:" + message);
}
}
标签:
rabbitmq
本文转载自: https://blog.csdn.net/weixin_53604412/article/details/127690083
版权归原作者 替罪的羊 所有, 如有侵权,请联系我们删除。
版权归原作者 替罪的羊 所有, 如有侵权,请联系我们删除。