死信的概念
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解
一般来说,生产者将消息投递到交换机或者直接到队列里了,消费者从队列取出消息 进行消费,但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景
1.为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中
2.用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源
1.消息 TTL 过期
2.队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3.消息被拒绝
延迟队列
延迟队列概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列使用场景
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议等
springboot实现
代码架构图
配置类
/**
* 配置文件类代码
*/
@Configuration
public class TTLQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 声明 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明 xExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, //@Qualifier 通过容器名进行捆绑
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
}
生产者(在web界面输入消息)
/**
* 发送延迟消息
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class sendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendmsg/{msg}")
public void sendMsg(@PathVariable("msg") String msg) {
log.info("当前时间:{},发送信息给两个TTL队列:{}", new Date(), msg);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s:" + msg);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s:" + msg);
}
}
消费者
/**
* 队列ttl的消费者
*/
@Component
@Slf4j
public class DeadLetterQueueConsume {
@RabbitListener(queues = "QD")
public void receiveD(Message msg) {
log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg.getBody());
System.out.println(new String(msg.getBody()));
}
}
结果
发起请求
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
问题和新需求
如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
延时队列优化
代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间
配置文件类代码
/**
* 配置文件类代码
*/
@Configuration
public class TTLQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
//新的普通队列QC
public static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueC() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
// 声明 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明 xExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
消息生产者代码
@Slf4j
@RestController
@RequestMapping("/ttl")
public class sendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息和ttl
@GetMapping("/sendttlandmsg/{msg}/{ttlTime}")
public void sendMsg(@PathVariable("msg") String msg, @PathVariable("ttlTime") String ttlTime) {
log.info("当前时间:{},发送时长为{}毫秒的信息给队列QC:{}", new Date(), ttlTime, msg);
rabbitTemplate.convertAndSend("X", "XC", msg, message -> {
//设置发送消息的延迟时间
message.getMessageProperties().setExpiration(ttlTime);
return message;
});
}
}
消费者
/**
* 队列ttl的消费者
*/
@Component
@Slf4j
public class DeadLetterQueueConsume {
@RabbitListener(queues = "QD")
public void receiveD(Message msg) {
log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg.getBody());
System.out.println(new String(msg.getBody()));
}
}
发起请求
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
存在的问题
如果不安装插件会导致延迟效果不能满足我们的需求,比如先发送了一个延迟20s的消息,后发送了一个延迟为2s的消息,如果第一个消息未到达则后一个消息会被阻塞
解决:安装延时队列插件
1.在官网上下载http:// https://www.rabbitmq.com/community-plugins.html,下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
2.进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange
代码架构图
配置文件类代码
/**
* 基于插件的延迟队列配置类
*/
@Configuration
public class DelayQueueConfig {
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//声明交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
/**
* 1.交换机名称
* 2.交换机类型
* 3.是否需要持久化
* 4.是否需要自动删除
* 5.其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
true, false, args);
}
//声明队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//绑定
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
消息生产者代码
/**
* 发送延迟消息
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class sendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 基于插件:生产者,发送消息和延迟时间
*/
@GetMapping("/sendDelaymsg/{msg}/{delayTime}")
public void sendMsg(@PathVariable("msg") String msg, @PathVariable("delayTime") Integer delayTime) {
log.info("当前时间:{},发送时长为{}毫秒的信息给延迟队列队列delayed.queue:{}", new Date(), delayTime, msg);
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME
, DelayQueueConfig.DELAYED_ROUTING_KEY, msg, message -> {
//设置发送消息的延迟时间
message.getMessageProperties().setDelay(delayTime);
return message;
});
}
}
消费者
/**
* 消费者:基于插件的延迟消息
*/
@Component
@Slf4j
public class DelayQueueConsume {
@RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message msg) {
String message = new String(msg.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), message);
}
}
结果
发起请求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000 http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
第二个消息被先消费掉了,符合预期
总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。
版权归原作者 小李不在ovo 所有, 如有侵权,请联系我们删除。