0


Spring Boot整合RabbitMq 重写confirm不生效

Spring Boot整合RabbitMQ,重写confirmcallback,但是不生效,大神帮忙看下

配置文件:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

队列交换机配置

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange1992";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue1992";
    public static final String CONFIRM_ROUTING_KEY = "key1992";
    //声明业务 Exchange
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .build();
    }
    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    // 声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }
} 

重写confirm

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        log.info("初始化confirm函数");
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData!=null?correlationData.getId():"";
        if (ack){
            log.info("消息及已经收到ID为:{}的消息",id);
        } else {
            log.info("消息未收到");
        }
    }
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("交换机:{},路由:{},的消息:{},被退回,原因是:{}",
                returned.getExchange(),returned.getRoutingKey(),returned.getMessage(),
                returned.getReplyCode()+returned.getReplyText());
    }
}

消息生产者

@Slf4j
@RestController
@RequestMapping("/confirm1")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMsg/{msg}")
    public void sendMsg(@PathVariable String msg){
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,
                msg.getBytes(StandardCharsets.UTF_8),new CorrelationData("1"));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+1,
                msg.getBytes(StandardCharsets.UTF_8),new CorrelationData("2"));
    }
}

消息消费者

@Slf4j
@Component
public class ConfirmConsumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfimMsg(Message message){
        String msg1 = new String(message.getBody());
        log.info("接收到的队列confirm.queue消息:{}",msg1);
    }
}

测试结果

本次问题出在进行连接启动时进行了设置rabbittemplate对象创建时间,而且设置的是ConfigurableBeanFactory.SCOPE_PROTOTYPE,因此在启动成功后,设置的ConfirmCallBack失效。

总结:若出现Confirm不生效主要考虑一下几种情况

1.连接配置中是否设置了

connectionFactory.setPublisherConfirms(true);

2.是否配置了confirm开启参数

spring.rabbitmq.publisher-confirm-type=correlated

3.是否在启动时,设置rabbitTemplate创建时间,若已经设置,需要每次对象调用时,重新设置ConfirmCallBack

标签: rabbitmq spring boot

本文转载自: https://blog.csdn.net/FX_XU/article/details/127677458
版权归原作者 FX_XU 所有, 如有侵权,请联系我们删除。

“Spring Boot整合RabbitMq 重写confirm不生效”的评论:

还没有评论