0


RabbitMQ消息确认机制详解


最近在学cloud,有不少组件都与消息队列有关,当时学的太浅了,一会忘光了,现在人麻了。

之前学的是尚硅谷的RabbitMQ,除了听点有趣的口音现在感觉没啥东西。

消息应答是消费者端的,发布确认是生产者端的,本质都是消息的确认。


1 消息应答

RabbitMQ中,消费者在获取到消息后,会向RabbitMQ服务端发送ACK确认,这时消息会立刻删除。

  • 自动应答:消息发送后立即被认为已经传送成功。- 优点:较高的消息吞吐量- 缺点:消息可能丢失,消费者在接受到前或在处理中,出现连接关闭、down机。消息堆积,内存耗尽:消费者处理消息可能很慢,而消息不停地发送。
  • 手动应答:消息发送后需要等到手动应答后才会向服务端确认并删除消息- 优点:安全性较好,避免消息丢失- 缺点:消息吞吐量不好

1.1 手动应答 & 没有集成springboot的版本:

使用basicConsume在消费时可以设置是自动应答和手动应答

说明一下这个方法,会启动一个消费者,并返回服务端生成的消费者标识consumerTag。(一个消费者一个,区分deliveryTag,是由信道标记的,一条消息一个,是一个从0开始单调递增的整数)

Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag. Provide access only to basic.deliver and shutdown signal callbacks (which is sufficient for most cases). See methods with a Consumer argument to have access to all the application callbacks.

方式1:针对单个消费者的单独设置:消费者消费消息,关闭自动确认,设置消息接收回调函数消息退回回调函数

boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

deliverCallback 是消费者获取到消息就会自动调用的方法,函数式接口,可以匿名实现。

官方释义:Called when a basic.deliver is received for this consumer.

可以在这个方法中捕捉异常,有异常发生则拒绝消息,没有则确认接收

boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, (consumerTag, delivery) -> {
    long deliveryTag = delivery.getEnvelope().getDeliveryTag();
    String msg = new String(delivery.getBody());
    try {
        System.out.println(1 / 0); // 模拟出现异常
        log.info("接收到消息:{}", msg);
        // 手动确认
        channel.basicAck(deliveryTag, false);// multiple表示多个一起确认
    } catch (Exception e) {
        /* 手动拒绝消息
        * channel.basicNack
        * @param deliveryTag:消息的tag
        * @param multiple:多条处理
        * @param requeue:重新入队重发
        * 如果出现异常的情况下,根据实际情况重发
        *      requeue:
        *          false:拒绝后,消息会从队列中移除,如果有死信队列则会打入死信队列
        *          true:拒绝后,消息会重新入队
        **/
        channel.basicNack(deliveryTag, false, false);
    }
}, consumerTag -> {});// 消费者取消时的回调

basicNack也可以替换为basicReject,少一个multiple参数,即不能批量拒签消息。


cancelCallback 是消费者取消订阅消息时调用的方法,函数式接口,可以匿名实现。

官方释义:Called when the consumer is cancelled for reasons other than by a call to Channel.basicCancel. For example, the queue has been deleted. See Consumer.handleCancelOk for notification of consumer cancellation due to Channel.basicCancel.

当消费者因除调用Channel.basicCancel之外的原因被取消订阅时调用。例如,队列已被删除。参考Consumer.handleCancelOk,用于通知由于Channel.basicCancel而取消的消费者。

这个用的较少,另外Channel.basicCancel会要求代理重新发送未确认的消息,与调用basicRecover(true)一样,消息将被排队,并可能被传递给不同的消费者。有些方法和参数不清楚可以参考RabbitMq基础篇-09-channel接口常用几种参数详解_channel.basicnack-CSDN博客


方式2:定义一个统一的 consumer,consumer中可以定义相应的方法和监听器

使用接口com.rabbitmq.client.Consumer的实现类com.rabbitmq.client.DefaultConsumer实现自定义消息监听器,接口中有多个不同的方法可以根据系统的需要实现;

boolean autoAck = false;
DefaultConsumer consuemr = new DefaultConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

可以自己定义一个consumer类继承DefaultConsumer并重写相关方法,或者用匿名内部类对象都行。方法有:

其中handleDelivery就是收到消息后调用的方法,类似于deliveryCallback

DefaultConsumer consumer = new DefaultConsumer(channel) {
    //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
    @Override
    public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        // 消息应答逻辑
    }
};

1.2 手动应答 & 集成springboot的版本:

首先在配置文件中先进行配置

server:
  port: 9999
spring:
  application:
    name: rabbitmq-test
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    port: 5672
    username: admin
    password: 123456
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
        prefetch: 1 # 预取值
  • acknowledge-mode 有三个值- AcknowledgeMode = NONE (default): 不确认,默认消息成功接收,并不断地向消费者推消息,如果出现异常则消息丢失- AcknowledgeMode = AUTO: 自动确认,类似上面的自动确认。根据情况确认,如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认。- AcknowledgeMode = MANUAL: 手动确认,消费者收到消息后,手动调用basicAck/basicNack/basicReject后,RabbitMQ收到这些消息后,才认为本次投递成功。手动确认模式可以使用prefetch,限制通道上未完成的(正在进行中的)发送的消息数量。
  • 这里说明一下 direct 和 simple,是两种监听器线程模型,影响不大,不放心两个都配置一下,相区分一下可以参考RabbitMq监听器simple和direct_direct.consumers-per-queue-CSDN博客

关于springboot配置rabbitmq里的交换机、队列、绑定啥的就不赘述了,弄个配置类就行。


方式1:简便版:@RabbitListener注解+配置文件

@RabbitListener(queues = TASK_QUEUE_NAME)
    public void consumer(Message message,Channel channel) throws IOException{
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String msg = new String(message.getBody());
        try {
            System.out.println(1 / 0); // 模拟出现异常
            log.info("接收到消息:{}", msg);
            // 手动确认
            channel.basicAck(deliveryTag, false);// multiple表示多个一起确认
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
            log.info("消息消费失败,Message:{}", msg);
        }
    }

如果我把重新入队requeue设为true


这条消息不停地被拒签,回到队列,broker又重新投递,又拒签,不断循环,而且如果设置了持久化,这个消息在重启服务后还是会死循环。实际应该设定重试次数,超过指定次数的再做其他处理。

说明一下**@RabbitListener和@RabbitHandler**

@RabbitListener用在类上和方法上都可以,如果是用在类上要配合@RabbitHandler标在方法上注明是哪个方法来接收消息。


方式2:定义ChannelAwareMessageListener + SimpleMessageListenerContainer监听器

两种其实是比较相似的,它的重试策略是在Container自己配置的,可能能玩的花样更多,但是感觉挺麻烦的不如写配置文件,不去深究了。可以参考这篇博文,写的很详细。RabbitMQ高级特性(一):消息的可靠投递(发送方确认+消费方确认)HaleyTiger的技术博客51CTO博客


1.3 介绍一下重试机制

当采用自动确认AUTO时,配置文件中配置,针对异常可以进行重试:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初次的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数

重试次数用完会抛出异常org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted,默认情况下消息会被丢弃,即消息只在消费者本地重试,重试成功消费者就成功消费,失败就不管了,队列里已经除名。

如果是手动确认,emm,我看到很多网上说retry重试次数就失效了会一直重试,其实不是因为机制失效了,是因为你在手动确认的时候都会用try-catch来分别作ack和nach的操作,而spring的重试机制时抛出异常时重试,你都catch了肯定不能用了,它只是不断地拒签回队投递拒签回队,就没有触发重试。

但是没有try-catch似乎手动确认也失去意义了,而且一抛出异常就重试的话,都根本执行不到basicAck或BasicNack。

暂时没想到什么简便的办法,我只能想到自己写其他代码来计数,有大佬告知一下嘛?


上面说了重试完消息会默认丢弃,有什么挽救办法吗?有的。

事实上,消息在重试机会耗尽后,会调用MessageRecoverer接口的recover方法。MessageRecoverer接口有如下三个实现类:

RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列(默认) RepublishMessageRecoverer:重新发布消息 ImmediateRequeueMessageRecoverer:立即把消息重新放入队列

前面消息丢失时因为默认执行RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列。我们可以使用RepublishMessageRecoverer,望文生义,这个就是恢复消息,将它发布到其他队列。被拒绝的消息会成为死信,我们可以把多次重试异常的数据也转发到私信队列上或者定义一个异常队列也行。不再细说了,可以参考RabbitMq手动确认+消息重试_rabbittemplate.convertsendandreceive-CSDN博客

RabbitMQ重试机制_rabbitmq 重试次数-CSDN博客。


2 消息发布确认

与消息应发不同,消息发布确认主要是要确保消息成功发送到了RabbitMQ的server,即broker。

主要有异步和同步两种方式,异步确认是更推荐的,生产者可以先不管上条消息是否被确认就先发布下条消息,等到确认的消息或者是失败消息返回时再通过回调函数处理,这样处理效率更高。


2.1 未集成springboot版本

发布确认默认不开启,需要在生产者端设置

//开启发布确认
channel.confirmSelect();

2.1.1 同步确认

在一个线程内发送消息,同时又进行确认。

主要是利用channel的方法waitForConfirms()或者waitForConfirmOrDie(),可以带参数timeout表示等待(多长时间)确认,没确认或者消息没有死亡(成为死信)都会返回false。

channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
    System.out.println("消息发送成功");
}

可以设置批量确认

for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //服务端返回 false 或超时时间内未返回,生产者可以消息重发
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("消息发送成功");
        }
    }

这种方式的很简单,但是缺点也很明显,效率很低。


2.1.2 异步确认

主要是利用确认监听器监听broker返回的确认消息

channel.addConfirmListener(ackCallback, nackCallback);
for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = "消息" + i;
    channel.basicPublish("", queueName, null, message.getBytes());
}

其中有两个ConfirmCallback类型的参数ackCallback和nackCallback,是函数式接口的实现对象,分别代表消息被确认时的回调,和消息没有被确认(发送失败)的回调。

官方释义:Implement this interface in order to be notified of Confirm events. Acks represent messages handled successfully; Nacks represent messages lost by the broker. Note, the lost messages could still have been delivered to consumers, but the broker cannot guarantee this. For a lambda-oriented syntax, use ConfirmCallback.

实现此接口,以便收到确认事件的通知。ack表示成功处理的消息;Nacks表示代理丢失的消息。注意,丢失的消息仍然可以被传递给消费者,但是broker不能保证这一点。对于面向lambda的语法,请使用ConfirmCallback。

channel.addConfirmListener((deliveryTag1, multiple) ->{},(deliveryTag1, multiple) ->{});

主要有两个参数,deliveryTag和multiple。

很清楚了,用法和应答的也很类似,不再赘述。


2.2 集成springboot版本

首先,要在配置文件当中

spring.rabbitmq.publisher-confirm-type=correlated  # 开启消息确认 设置确认模式
publisher-returns: true  # 开启消息退回
  • NONE 值是禁用发布确认模式,是默认值
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法 用这种即可√
  • SIMPLE 值有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

在springboot,利用RabbitTemplate可以很轻松实现,主要要设置两个回调函数,确认回调函数ConfirmCallback退回回调函数ReturnsCallback(有些低版本是ReturnCallback),两个函数是对全局生效的,所以要在系统驱动的时候加载。

//在配置类内设置就行
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMandatory(true);
    // 消息发布确认回调函数
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {});
    // 消息回退回调函数
    rabbitTemplate.setReturnsCallback(returnedMessage -> {});
    return rabbitTemplate;
}

2.2.1 确认回调函数 ConfirmCallback

确认回调函数ConfirmCallback消息转发到交换机就会回调的函数,转发是否成功都会被调用。

主要有3个参数,correlationData记录了消息的信息,ack是消息发布确认的信息,true表示成功转发到交换机,cause是转发失败的原因。

// 消息发布确认回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack){
        log.info("消息发布成功");
    }else {
        log.info("消息发布失败:" + cause);
    }
});

为了方便消息确认,所以在生产者发送消息时就不能简单地传递string的message,还要传入correlationData,这个对象用起来很容易,只需要传入一个全局唯一的ID,或者不传它默认构造器由UUID随机生成。但不好理解:

官方释义:Base class for correlating publisher confirms to sent messages. Use the org.springframework.amqp.rabbit.core.RabbitTemplate methods that include one of these as a parameter; when the publisher confirm is received, the CorrelationData is returned with the ack/nack. When returns are also enabled, the returned property will be populated when a message can't be delivered - the return always arrives before the confirmation. In this case the #id property must be set to a unique value. If no id is provided it will automatically set to a unique value.

将被发送的消息与发布者确认关联起来的基类。

@GetMapping("/sendMessage/{message}")
public void sendMessage (@PathVariable String message){
    CorrelationData correlationData = new CorrelationData();
    rabbitTemplate.convertAndSend(TASK_EXCHANGE_NAME,BINDING_KEY+"1",message,correlationData);
    log.info("生产者发出消息" + message);
}
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    assert correlationData != null;
    if (ack){
        log.info("消息发布成功,消息的ID:{}",correlationData.getId());
    }else {
        log.info("消息发布失败,消息的ID:{},原因为:" + cause,correlationData.getId());
    }
});

2.2.2 退回回调函数 ReturnsCallback

退回回调函数ReturnsCallback是消息转发到队列失败回调的函数,成功则不会执行。失败的原因有很多,比如路由错误,服务崩溃,队列失效。开启回退除了配置文件外,还需要设置mandatory参数,配置文件,配置类中都可以设置。

rabbitmq:
    template:
          mandatory: true
rabbitTemplate.setMandatory(true);
// 消息回退回调函数
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage (ReturnedMessage returned){
                log.info("转发自{}交换机的消息丢失,应答码:{},原因:{},消息:{}",
                        returned.getExchange(),
                        returned.getReplyCode(),
                        returned.getReplyText(),
                        returned.getMessage());
            }
        });

虽然说是回退,但是消息已经没有地方退,本就还没到队列,也不能还给消费者,所以消息就丢失了,当然,可以在两个回调函数中进行处理,让他们转发到备用的交换机和队列(backup exchange),这边就不细说了。


3 总结

  • 如何确保RabbitMQ消息的可靠性?
  1. 开启生产者确认机制,确保生产者的消息能到达队列;
  2. 设置备份交换机,将无法路由到队列的消息送到备份队列;
  3. 开启持久化功能,确保消息未消费前在队列中不会丢失;
  4. 开启消费者应答机制,设置自动应答或手动应答回调;
  5. 开启消费者失败重试机制,设置重试次数和间隔时间;
  6. 开启消息恢复机制,重试机会耗尽的消息投递到异常交换机,交由人工处理

本文转载自: https://blog.csdn.net/m0_72397750/article/details/135408543
版权归原作者 加糖苏打水 所有, 如有侵权,请联系我们删除。

“RabbitMQ消息确认机制详解”的评论:

还没有评论