1. 上文传送门:
微服务: 00-rabbitmq出现的异常以及解决方案
微服务: 01-rabbitmq的应用场景及安装(docker)
微服务: 02-rabbitmq在springboot中如何使用(上篇)
微服务: 03-rabbitmq在springboot中如何使用(下篇)
微服务: 04-springboot中rabbitmq配置,消息回收,序列化方式
2. 前言简介:
上面文章是安装, 基础测试案例, 下面是进阶的第一篇文章
本篇主要介绍自动确认进入死信队列
手动确认在下一篇介绍
2.1 问: 消费端重复循环异常如何解决?
使用 RabbitMQ 的重试机制,当消费者处理消息失败时,可以将消息重新放回队列中,并设置一定的重试次数和重试时间间隔。如果超过了重试次数仍然处理失败,则可以将消息放入死信队列中,方便后续处理
2.2 为什么要使用死信队列
无法被消费的消息, 这样的消息如果没有后续的处理,就会一直在正常队列里重复重试, 所以加入私信队列后 进行人工补偿, 也可以分等级 哪些重要消息需要优先处理
2.3 案例思路
-> ps: 以下案例经过测试(思路一/二实现原理一样)
-> 2.3.1 思路一
两个交换机(一个正常exchange 一个死信exchange)
两个队列(正常queue,死信queue) 一个队列一个路由键
正常交换机绑定正常queue 绑定路由键 同理
-> 2.3.2 思路二
一个交换机 两个不同路由键 然后绑定**(本文使用)**
3. 案例代码
3.1 简单介绍案例
服务A发起请求到 rabbitmq, 服务B监听并处理消息, 报错了, 重试了五次还不行后 , 进入死信队列
3.2 声明交换机 队列 以及绑定路由键
@Bean
public DirectExchange pzyExchange() {
// return new DirectExchange("pzy_exchange", true, false);
return ExchangeBuilder.directExchange("pzy_exchange").build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order_dead_queue").build();
}
@Bean
public Binding deadBinding(Queue deadLetterQueue, DirectExchange pzyExchange) {
return BindingBuilder.bind(deadLetterQueue).to(pzyExchange).with("orderDeadRoutingKey");
}
@Bean
public Binding normalBinding(Queue normalLetterQueue, DirectExchange pzyExchange) {
return BindingBuilder.bind(normalLetterQueue).to(pzyExchange).with("orderNormalRoutingKey");
}
@Bean
public Queue normalLetterQueue() {
Map<String, Object> arguments = new HashMap<>(3);
arguments.put("x-dead-letter-exchange", "pzy_exchange");
arguments.put("x-dead-letter-routing-key", "orderDeadRoutingKey");
arguments.put("x-max-length", 200);
return QueueBuilder.durable("order_normal_queue").withArguments(arguments).build();
}
3.3 修改配置文件
微服务: 04-springboot中rabbitmq配置,消息回收,序列化方式
3.4 发送消息
rabbitTemplate.convertAndSend("pzy_exchange", "orderNormalRoutingKey", "我是消息");
3.5 接收消息(自动确认)
自动确认, 在重试结束后, 自动进入死信队列
(手动确认的下文在介绍)
@RabbitListener(queues = "#{normalLetterQueue.name}")
public void oldHoldInNewPurchaseData(Message message, String message1, Channel channel) throws IOException {
log.info("获取到mq消息,消息内容为{}", message);
log.info("mq接收到的对象===>{}", aixiPurchaseRequestDTO);
// System.out.println(map);
try {
// int i = new Random().nextInt(5);
// if (i > 1) throw new OrderServiceException(ResponseEnum.E40008, "后端测试报错回收机制!");//60% 概率报错
// if (true) throw new OrderServiceException(ResponseEnum.E40008, "后端测试报错回收机制!");//100% 报错
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
log.error("异常===> {}", e.getMessage());
throw new OrderServiceException(ResponseEnum.E40008, e.getMessage());
} finally {
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
3.6 开启测试代码
第一个判断(60%): 是查看是否在第一次报错后续消费成功后 是否进入死信队列
第二个判断(100%): 是测试重试机制(次数)过后 是否进入死信队列
抛出 throw new RuntimeException("测试报错!") 就可以
int i = new Random().nextInt(5);
if (i > 1) throw new OrderServiceException(ResponseEnum.E40008, "后端测试报错回收机制!");//五分之三概率报错
//if (true) throw new OrderServiceException(ResponseEnum.E40008, "后端测试报错回收机制!");//五分之三概率报错
3.7 测试结果
---> 3.7.1 查看交换机
有没有绑定两个路由键(没有查看绑定配置)
---> 3.7.2 查看队列
---> 3.7.3 主要查看方向
查看是不是最后一次报错后 才进入死信队列
报错异常是不是无限制循环报错
---> 3.7.4 另外遇到问题 别慌
先把交换机和队列都删了 然后再重试
[如果生产者和消费者配置是独立的 看看队列声明时是否相同, 不同删了队列重启项目]
版权归原作者 pingzhuyan 所有, 如有侵权,请联系我们删除。