0


RabbitMq生产者发送消息确认

RabbitMq生产者发送消息失败现象

一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitMq服务器的问题之后,待RabbitMq服务器正常之后,需要对这些消息进行重新投递。正常来说RabbitMq做了集群之后是不会出现这种问题,整个集群挂断的概率也是非常小。

错误信息

当项目启动后,然后把交换机Exchange删除后,然后生产者发送消息时会提示交换机不存在。Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)

SpringBoot代码示例

SpringBoot的application.properties需要新增spring.rabbitmq.publisher-confirm-type配置要求值是correlated。默认值是none表示无需触发交换机收到消息的回调接口。correlated表示消息发布后会触发交换机收到消息的回调接口。

# springboot整合rabbitMq的配置
spring.rabbitmq.host=192.168.15.200
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

队列和交换机配置类

package springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ConfirmConfig {

    // 普通交换机名称
    public static final String EXCHANGE_NAME = "confirm_exchange";
    // 队列名称
    public static final String QUEUE_NAME = "confirm_queue";

    public static final String ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
    }
}

生产者消息发送确认配置类

package springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

// 1.先实现RabbitTemplate.ConfirmCallback接口,从写confirm回调函数
@Slf4j
@Component
public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback {

    // 2.注入
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *
     * @param correlationData 消息
     * @param b 发送成功是true,失败是false
     * @param s 发送失败时的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("交换机已经收到id为{}的消息", id);
        } else {
            log.error("交换机未收到id为{}的消息, 原因是:{}", id, s);
            // 消息缓存或入库,邮件提醒运维
        }
    }

    // 3.然后在springBoot对象初始化之后再执行rabbitTemplate.setConfirmCallback(this);设置回调函数,避免使用默认的ConfirmCallback
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }
}

生产者类

package springbootrabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springbootrabbitmq.config.ConfirmConfig;
import springbootrabbitmq.config.TtlQueueConfig;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ConfirmController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMsg/{message}")
    public String sendMsg(@PathVariable String message) {
        log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message);
        return "success";
    }
    @GetMapping("/sendMsg2/{message}")
    public String sendMsg2(@PathVariable String message) {
        log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
        CorrelationData data = new CorrelationData();
        data.setId("1111");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
        return "success";
    }
}

消费者类

package springbootrabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import springbootrabbitmq.config.ConfirmConfig;

import java.util.Date;

@Component
@Slf4j
public class ConfirmConsumer {
    //监听器接收消息
    @RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{}, 收到一条消息:{} ", new Date().toString(), msg);
    }
}

首先正常发送,然后再删除交换机然后再发送。测试结果如下

2023-01-29 21:07:12.367  INFO 79848 --- [nio-8080-exec-1] s.controller.ConfirmController           : 当前时间:Sun Jan 29 21:07:12 CST 2023, 发送一条消息:12 到队列
2023-01-29 21:07:12.399  INFO 79848 --- [nectionFactory1] s.config.RabbitMqCallBack                : 交换机已经收到id为1111的消息
2023-01-29 21:07:12.403  INFO 79848 --- [ntContainer#0-1] s.consumer.ConfirmConsumer               : 当前时间:Sun Jan 29 21:07:12 CST 2023, 收到一条消息:12 
2023-01-29 21:08:01.282  INFO 79848 --- [nio-8080-exec-2] s.controller.ConfirmController           : 当前时间:Sun Jan 29 21:08:01 CST 2023, 发送一条消息:123 到队列
2023-01-29 21:08:01.289 ERROR 79848 --- [168.15.200:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
2023-01-29 21:08:01.290 ERROR 79848 --- [nectionFactory2] s.config.RabbitMqCallBack                : 交换机未收到id为1111的消息, 原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)

消息回退

如果不开启消息回退,默认是消息即使无法发送到队列(如路由键错误等场景),也不会进行提醒,生产者不知道消息能否成功发送到队列。

解决方案

当消息无法到达队列的时候进行提醒

消息回退代码示例

配置,开启消息不可达目的地时的回调

spring.rabbitmq.publisher-returns=true

配置类,实现RabbitTemplate.ReturnCallback接口

package springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

// 1.先实现RabbitTemplate.ConfirmCallback接口,从写confirm回调函数
@Slf4j
@Component
public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    // 2.注入
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *
     * @param correlationData 消息
     * @param b 发送成功是true,失败是false
     * @param s 发送失败时的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("交换机已经收到id为{}的消息", id);
        } else {
            log.error("交换机未收到id为{}的消息, 原因是:{}", id, s);
            // 消息缓存或入库,邮件提醒运维
        }
    }

    // 3.然后在springBoot对象初始化之后再执行rabbitTemplate.setConfirmCallback(this);设置回调函数,避免使用默认的ConfirmCallback
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    // 当消息传递过程中不可达到目的地时将消息返回给生产者,只有不可达到目的地时才会调用这个方法
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息无法被写入队列:{}, 退回原因:{}, 路由Key: {}", message, replyText, routingKey);
        // 邮件发送,缓存或存到数据库
    }
}

生产者

@GetMapping("/sendMsg3/{message}")
    public String sendMsg3(@PathVariable String message) {
        log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
        CorrelationData data = new CorrelationData();
        data.setId("1111");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY+"222", message +"222", data);
        return "success";
    }

消费者与上一个消费者相同

测试结果如下:调用:http://127.0.0.1:8080/confirm/sendMsg3/123生产者的接口可以看到当路由键错误导致交换机无法把消息投递到队列时会回调returnedMessage方法。

2023-01-29 21:27:48.910  INFO 74512 --- [nio-8080-exec-1] s.controller.ConfirmController           : 当前时间:Sun Jan 29 21:27:48 CST 2023, 发送一条消息:123 到队列
2023-01-29 21:27:48.934  INFO 74512 --- [nectionFactory1] s.config.RabbitMqCallBack                : 交换机已经收到id为1111的消息
2023-01-29 21:27:48.941 ERROR 74512 --- [nectionFactory1] s.config.RabbitMqCallBack                : 消息无法被写入队列:(Body:'123222' MessageProperties [headers={spring_returned_message_correlation=1111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), 退回原因:NO_ROUTE, 路由Key: key1222
2023-01-29 21:27:48.943  INFO 74512 --- [nectionFactory2] s.config.RabbitMqCallBack                : 交换机已经收到id为1111的消息
2023-01-29 21:27:48.946  INFO 74512 --- [ntContainer#0-1] s.consumer.ConfirmConsumer               : 当前时间:Sun Jan 29 21:27:48 CST 2023, 收到一条消息:123 

本文转载自: https://blog.csdn.net/qq_28227405/article/details/128793568
版权归原作者 才_先生 所有, 如有侵权,请联系我们删除。

“RabbitMq生产者发送消息确认”的评论:

还没有评论