0


RabbitMQ实现轮询形式消息最大发送失败次数,及详细解析

RabbitMQ设置消息最大发送失败次数,达到三次后不确认消息(此处根据业务需求可考虑使不确认的消息进入死信交换机)

配置文件:

spring:
  rabbitmq:
    host: 192.168.1.248
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-confirm-type: correlated # 生产者的发布确认模式为相关模式
    publisher-returns: true # 开启发布者的returns模式
    listener:
      simple:
        acknowledge-mode: manual # 开启监听者(消费者、接受者)的手动确认模式
    cache:
      channel:
        checkout-timeout: 10000

# 自定义属性
my:
  exchangeName: exchange.reliability
  queueName: queue.reliability

交换机和队列配置:

@Configuration
@Slf4j
public class RabbitConfig {

    @Value("${my.exchangeName}")
    private String exchangeName;

    @Value("${my.queueName}")
    private String queueaName;

    @Bean
    public DirectExchange directExchange() {
        //建造者模式交换机默认就是持久化
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    @Bean
    public Queue queue() {
        //建造者模式队列默认就是持久化
        return QueueBuilder.durable(queueaName).build();
    }

    @Bean
    public Binding binding(DirectExchange directExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with("info");
    }

    /**
     * 配置一个消息转换器,json格式的
     *
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

生产者:

@Service
@Slf4j
public class MessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${my.exchangeName}")
    private String exchangeName;

    @PostConstruct
    public void init() {
        //设置确认回调接口(当消息成功到达交换机时,会调用此回调的confirm方法(confirm:Lambda表达式未简化时方法))
        rabbitTemplate.setConfirmCallback(
                // correlationData 相关数据(一般会存一个id属性)
                // ack 确认标志:true表示成功,false表示失败
                // cause 失败原因
                (correlationData, ack, cause) -> {
                    if (!ack) {
                        log.error("消息:{}没有到达交换机,原因为:{}", correlationData.getId(), cause);
                    }
                }
        );

        //设置模版的returnsCallback(当消息无法正确路由到队列时,会调用此回调的returnedMessage方法(returnedMessage:Lambda表达式未简化时方法))
        rabbitTemplate.setReturnsCallback(
                returned -> {
                    String errorMessage = String.format(
                        "消息从交换机%s使用路由键%s没有正确的路由到队列,错误代码:%d,错误原因为:%s",  
                        returned.getExchange(),  
                        returned.getRoutingKey(),  
                        returned.getReplyCode(),  
                        returned.getReplyText()
                    );  
                    log.error(errorMessage);  
                }
        );
    }

    // 发送消息
    public void sendMsg() throws JsonProcessingException {
        Orders orders = Orders.builder()
                .orderId(99)
                .orderName("橙子")
                .orderMoney(new BigDecimal(100))
                .orderTime(new Date())
                .build();

        // 存入消息id
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("order:" + orders.getOrderId());
        
        //队列中的消息默认是持久化的
        rabbitTemplate.convertAndSend(exchangeName, "info", orders,
                //消息头中设置重新发送次数(消息后处理器,用于在消息发送前对其进行修改)
                message -> {
                    message.getMessageProperties().setHeader("x-retry-count", 1); //消息头部设置计数属性,表示第一次发送消息
                    return message;
                }
                , correlationData);
        log.info("消息发送完毕");
    }
}

消费者:

@Component
@Slf4j
public class MessageListener {

    @Value("${my.exchangeName}")
    private String exchangeName;

    @Value("${my.queueName}")
    private String queueName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    // 声明一个消息监听器,当有消息到达指定的队列时,Spring会自动调用这个方法
    @RabbitListener(queues = {"${my.queueName}"})
    // orders 消息体
    // message RabbitMQ的消息对象,包含了消息的元数据(如消息ID、头信息、属性等)
    // channel RabbitMQ的通道对象,用于与RabbitMQ服务器进行通信,入确认消息、拒绝消息等
    public void receiveMsg(Orders orders, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag(); //获取唯一标识符
        int maxRetryCount = 3; // 最大重试次数,加上第一次发送,共三次
        Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count"); //获取本条消息是第几次发送
        retryCount = (retryCount == null) ? 1 : retryCount; //如果为空(属性不存在),默认设置为第一次发送
        try {
            log.info("插入数据库开始...{}", orders.toString());
            //模拟错误
            int a = 1 / 0;
            log.info("插入数据库完成");
            channel.basicAck(deliveryTag, false); //插入成功,手动确认消息,结束本条消息操作
        } catch (Exception e) {
            // 处理失败,检查重试次数
            if (retryCount < maxRetryCount) { //判断是否达到最大次数
                // 增加重试计数并重新入队
                message.getMessageProperties().getHeaders().put("x-retry-count", retryCount + 1);   //消息头部计数属性加1
                rabbitTemplate.convertAndSend(exchangeName, "info", message);   //重新发送消息
                channel.basicAck(deliveryTag, false); // 插入失败,手动确认消息(上一行代码已经重新发送了一条新的消息,所以,本条消息需要手动确认)
            } else {
                /**
                 * 此处业务逻辑为,仅需把消息发送给消费者,不关注后续操作,所以消息直接不确认
                 * 如果业务需求变更,可把消息发入死信队列等
                 */
                // 达到最大重试次数,执行其他处理逻辑
                log.error("插入数据库失败,原因为:{}", e.toString());
                channel.basicNack(deliveryTag, false, false); // 不确认消息并不重新入队
            }
        }
    }
}
标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/weixin_71992340/article/details/143368275
版权归原作者 乂氼324 所有, 如有侵权,请联系我们删除。

“RabbitMQ实现轮询形式消息最大发送失败次数,及详细解析”的评论:

还没有评论