RabbitMQ设置消息最大发送失败次数,达到三次后不确认消息(此处根据业务需求可考虑使不确认的消息进入死信交换机)
配置文件:
spring:
rabbitmq:
host: 192.168.1.248
port: 5672
username: admin
password: 123456
virtual-host: powernode
publisher-confirm-type: correlated # 生产者的发布确认模式为相关模式
publisher-returns: true # 开启发布者的returns模式
listener:
simple:
acknowledge-mode: manual # 开启监听者(消费者、接受者)的手动确认模式
cache:
channel:
checkout-timeout: 10000
# 自定义属性
my:
exchangeName: exchange.reliability
queueName: queue.reliability
交换机和队列配置:
@Configuration
@Slf4j
public class RabbitConfig {
@Value("${my.exchangeName}")
private String exchangeName;
@Value("${my.queueName}")
private String queueaName;
@Bean
public DirectExchange directExchange() {
//建造者模式交换机默认就是持久化
return ExchangeBuilder.directExchange(exchangeName).build();
}
@Bean
public Queue queue() {
//建造者模式队列默认就是持久化
return QueueBuilder.durable(queueaName).build();
}
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
/**
* 配置一个消息转换器,json格式的
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
生产者:
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Value("${my.exchangeName}")
private String exchangeName;
@PostConstruct
public void init() {
//设置确认回调接口(当消息成功到达交换机时,会调用此回调的confirm方法(confirm:Lambda表达式未简化时方法))
rabbitTemplate.setConfirmCallback(
// correlationData 相关数据(一般会存一个id属性)
// ack 确认标志:true表示成功,false表示失败
// cause 失败原因
(correlationData, ack, cause) -> {
if (!ack) {
log.error("消息:{}没有到达交换机,原因为:{}", correlationData.getId(), cause);
}
}
);
//设置模版的returnsCallback(当消息无法正确路由到队列时,会调用此回调的returnedMessage方法(returnedMessage:Lambda表达式未简化时方法))
rabbitTemplate.setReturnsCallback(
returned -> {
String errorMessage = String.format(
"消息从交换机%s使用路由键%s没有正确的路由到队列,错误代码:%d,错误原因为:%s",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText()
);
log.error(errorMessage);
}
);
}
// 发送消息
public void sendMsg() throws JsonProcessingException {
Orders orders = Orders.builder()
.orderId(99)
.orderName("橙子")
.orderMoney(new BigDecimal(100))
.orderTime(new Date())
.build();
// 存入消息id
CorrelationData correlationData = new CorrelationData();
correlationData.setId("order:" + orders.getOrderId());
//队列中的消息默认是持久化的
rabbitTemplate.convertAndSend(exchangeName, "info", orders,
//消息头中设置重新发送次数(消息后处理器,用于在消息发送前对其进行修改)
message -> {
message.getMessageProperties().setHeader("x-retry-count", 1); //消息头部设置计数属性,表示第一次发送消息
return message;
}
, correlationData);
log.info("消息发送完毕");
}
}
消费者:
@Component
@Slf4j
public class MessageListener {
@Value("${my.exchangeName}")
private String exchangeName;
@Value("${my.queueName}")
private String queueName;
@Resource
private RabbitTemplate rabbitTemplate;
// 声明一个消息监听器,当有消息到达指定的队列时,Spring会自动调用这个方法
@RabbitListener(queues = {"${my.queueName}"})
// orders 消息体
// message RabbitMQ的消息对象,包含了消息的元数据(如消息ID、头信息、属性等)
// channel RabbitMQ的通道对象,用于与RabbitMQ服务器进行通信,入确认消息、拒绝消息等
public void receiveMsg(Orders orders, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag(); //获取唯一标识符
int maxRetryCount = 3; // 最大重试次数,加上第一次发送,共三次
Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count"); //获取本条消息是第几次发送
retryCount = (retryCount == null) ? 1 : retryCount; //如果为空(属性不存在),默认设置为第一次发送
try {
log.info("插入数据库开始...{}", orders.toString());
//模拟错误
int a = 1 / 0;
log.info("插入数据库完成");
channel.basicAck(deliveryTag, false); //插入成功,手动确认消息,结束本条消息操作
} catch (Exception e) {
// 处理失败,检查重试次数
if (retryCount < maxRetryCount) { //判断是否达到最大次数
// 增加重试计数并重新入队
message.getMessageProperties().getHeaders().put("x-retry-count", retryCount + 1); //消息头部计数属性加1
rabbitTemplate.convertAndSend(exchangeName, "info", message); //重新发送消息
channel.basicAck(deliveryTag, false); // 插入失败,手动确认消息(上一行代码已经重新发送了一条新的消息,所以,本条消息需要手动确认)
} else {
/**
* 此处业务逻辑为,仅需把消息发送给消费者,不关注后续操作,所以消息直接不确认
* 如果业务需求变更,可把消息发入死信队列等
*/
// 达到最大重试次数,执行其他处理逻辑
log.error("插入数据库失败,原因为:{}", e.toString());
channel.basicNack(deliveryTag, false, false); // 不确认消息并不重新入队
}
}
}
}
本文转载自: https://blog.csdn.net/weixin_71992340/article/details/143368275
版权归原作者 乂氼324 所有, 如有侵权,请联系我们删除。
版权归原作者 乂氼324 所有, 如有侵权,请联系我们删除。