Spring Boot整合RabbitMQ,重写confirmcallback,但是不生效,大神帮忙看下
配置文件:
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
队列交换机配置
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange1992";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue1992";
public static final String CONFIRM_ROUTING_KEY = "key1992";
//声明业务 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
.build();
}
// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
}
}
重写confirm
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
log.info("初始化confirm函数");
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData!=null?correlationData.getId():"";
if (ack){
log.info("消息及已经收到ID为:{}的消息",id);
} else {
log.info("消息未收到");
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("交换机:{},路由:{},的消息:{},被退回,原因是:{}",
returned.getExchange(),returned.getRoutingKey(),returned.getMessage(),
returned.getReplyCode()+returned.getReplyText());
}
}
消息生产者
@Slf4j
@RestController
@RequestMapping("/confirm1")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}")
public void sendMsg(@PathVariable String msg){
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,
msg.getBytes(StandardCharsets.UTF_8),new CorrelationData("1"));
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY+1,
msg.getBytes(StandardCharsets.UTF_8),new CorrelationData("2"));
}
}
消息消费者
@Slf4j
@Component
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfimMsg(Message message){
String msg1 = new String(message.getBody());
log.info("接收到的队列confirm.queue消息:{}",msg1);
}
}
测试结果
本次问题出在进行连接启动时进行了设置rabbittemplate对象创建时间,而且设置的是ConfigurableBeanFactory.SCOPE_PROTOTYPE,因此在启动成功后,设置的ConfirmCallBack失效。
总结:若出现Confirm不生效主要考虑一下几种情况
1.连接配置中是否设置了
connectionFactory.setPublisherConfirms(true);
2.是否配置了confirm开启参数
spring.rabbitmq.publisher-confirm-type=correlated
3.是否在启动时,设置rabbitTemplate创建时间,若已经设置,需要每次对象调用时,重新设置ConfirmCallBack
版权归原作者 FX_XU 所有, 如有侵权,请联系我们删除。