0


生产者确认机制

配置文件

server.port=8081

logging.level.com.chensir=debug
#host
spring.rabbitmq.host=121.40.100.66

#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#每个消费者每次可最大处理的nack消息数量 默认是250个 可在服务界面看到;意思是每批投递的数量
spring.rabbitmq.listener.simple.prefetch=1
#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#是否开启自动重试 默认为false 不开启
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#最大重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=20000ms
#重试时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms
# 最大重试间隔*乘数
#应用于上一重试间隔的乘数 第一次(重试时间间隔)2s 4s 8s 16s 32s 此处32s>20s 以后都以20s为间隔 总的次数为最大重试次数
spring.rabbitmq.listener.simple.retry.multiplier=2
#尝试次数超过上面的设置之后是否丢弃;默认是true(false不丢弃时需要写相应代码将消息加入死信队列中;与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

# 下方配置为生产者确认机制的配置

#mandatory为true开启强制消息投递,若消息未被路由至任何一个queue则回退消息到RabbitTemplate.ReturnCallback中的returnedMessage方法;
spring.rabbitmq.template.mandatory = true
#开启回调机制,交换机发布确认模式;交换机收到或者未收到消息,都会调用回调实现类的回调确认方法;
spring.rabbitmq.publisher-confirm-type=correlated
# 开启回退消息,当交换机无法将消息路由出去,便会将消息回退给生产者
spring.rabbitmq.publisher-returns=true

生产者代码

package com.chensir.provider;

import cn.hutool.json.JSONUtil;
import com.chensir.config.RabbitConfig;
import com.chensir.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;

/**
 * 生产者
 */
@Component
@Slf4j
public class DirectProviderOk implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //@PostConstruct  注解是专门数据初始化的注解, 只有其他组件注入后,初始化方法才会执行,
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    public void sendOk() {

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println();
        System.out.println("callbackSender UUID: " + correlationData.getId());

        User user = User.builder()
                .id(1)
                .sex("男")
                .name("chen")
                .build();

        String s = JSONUtil.toJsonStr(user);

        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_KEY, RabbitConfig.ROUTING_KEY_03 + 1, s,
                m -> {
                    return m;
                }, correlationData);

    }

    /**
     * 交换机确认回调方法
     * <p>
     * correlationData   保存回调消息的ID及相关信息
     * ack   表示交换机是否收到消息
     * cause   失败的原因, 成功为null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到了ID为:{}的消息", id);
        } else {
            log.info("交换机还未收到ID为:{}的消息,由于原因:{}", id, cause);
        }

    }

    /**
     * 消息没被交换机成功投递到队列时回调
     * <p>
     * 可以在当消息传递过程中不可达目的地时将消息返回给生产者,只有不可达目的地的时候  才进行回退,也就是回调
     *
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息{},被交换机:{}退回,原因是:{},路由key是:{}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());

    }
}

结果

未成功投递交换机回调:

callbackSender UUID: 59ea9e71-41b6-4ee7-854d-b41820b47290
2023-08-29 18:52:06.655 ERROR 26064 --- [.40.100.52:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DirectExchange-011' in vhost '/', class-id=60, method-id=40)
2023-08-29 18:52:06.657  INFO 26064 --- [nectionFactory2] com.chensir.provider.DirectProviderOk    : 交换机还未收到ID为:59ea9e71-41b6-4ee7-854d-b41820b47290的消息,由于原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DirectExchange-011' in vhost '/', class-id=60, method-id=40)

交换机未成功投递队列回调:

callbackSender UUID: d989577a-3fc7-43f0-bc4b-337f8c0bf556
2023-08-29 18:53:14.217  INFO 9324 --- [nectionFactory1] com.chensir.provider.DirectProviderOk    : 消息{"id":1,"name":"chen","sex":"男"},被交换机:DirectExchange-01退回,原因是:312,路由key是:NO_ROUTE
2023-08-29 18:53:14.219  INFO 9324 --- [nectionFactory2] com.chensir.provider.DirectProviderOk    : 交换机已经收到了ID为:d989577a-3fc7-43f0-bc4b-337f8c0bf556的消息

本文转载自: https://blog.csdn.net/weixin_45326523/article/details/132568076
版权归原作者 骑着蜗牛打天下 所有, 如有侵权,请联系我们删除。

“生产者确认机制”的评论:

还没有评论