0


RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

RabbitMQ–基础–8.1–消息确认机制–接受确认机制(ACK)


代码位置

https://gitee.com/DanShenGuiZu/learnDemo/tree/master/rabbitMq-learn/rabbitMq-03

1、场景和问题

1.1、需求

消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。

为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。

如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。

这里不存在Timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。

1.2、消息确认消息引发的问题

如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的问题,Queue中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑。

2、channel.basicConsume(queueName,autoAck,callback)方法

在这里插入图片描述

2.1、参数

  1. queueName:队列名称
  2. autoAck:设置是否自动确认 1. true:自动确认,消息一旦被消费者接收,队列中的消息就会被删除2. false:手动确认
  3. callback:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息

3、消息确认机制 ACK

  1. 为了保证消息从队列可靠地达到消费者
  2. 当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。
  3. ACK分两种情况 1. 自动 ACK2. 手动 ACK

3.1、自动 ACK

  1. autoAck=false。
  2. RabbitMQ 会自动把发送出去的消息设置为确认,然后从内存或磁盘中删除,而不管消费者是否真正地消费了这些消息

3.2、手动 ACK

  1. autoAck=false。
  2. RabbitMQ 会等待消费者显示地回复确认信号后才从内存或磁盘中移去消息

3.2.1、好处

  1. autoAck=false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程挂掉后消息丢失的问题。因为,RabbitMQ 会一直等待持有消息,直到消费者显示调用 Basic.Ack 命令为止。

3.2.2、原理

  1. autoAck=false,队列中的消息分成了两部分 1. 等待投递给消费者的消息2. 已经投递给消费者,但还没有收到消费者确认信号的消息。
  2. 如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者。这样就保证消息不丢失了。

3.3、使用场景

  1. 如果消息不太重要,丢失也没有影响,那么autoAck=ture。
  2. 如果消息非常重要,不容丢失,那么autoAck=ture。

4、代码实现(手动 ACK)

4.1、代码结构

在这里插入图片描述

4.2、生产者


package com.example.rabbitmq03.business.test7;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    // 简单模式
    public static void main(String[] args) {
        // 1. 获取连接
        Connection connection = null;
        try {
            connection = RabbitMqUtil.getConnection("生产者");
        } catch (Exception e) {
            System.out.println("获取连接时,出现异常");
        }
        
        Channel channel = null;
        try {
            // 2. 通过连接获取通道 Channel
            channel = connection.createChannel();
            String queueName = "code_simple_queue1";
            // 3. 通过通道创建声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 4. 准备消息内容
            String message = "你好";
            // 5. 发送消息给队列 Queue
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送完成~~~发送的消息为:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                RabbitMqUtil.close(connection, channel);
            } catch (Exception e) {
                System.out.println("关闭时,出现异常");
            }
        }
    }
}

4.3、消费者

修改的地方
在这里插入图片描述


package com.example.rabbitmq03.business.test7;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.*;
public class Consumer {

    public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");

        final Channel channel = connection.createChannel();
        String queueName = "code_simple_queue1";

        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 消息体
                String msg = new String(body,"utf-8");
                System.out.println("收到消息:" + msg);
                /**
                 * @param1:deliveryTag:用来标识消息的id
                 * @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息
                 */
                // 手动确认
                channel.basicAck(deliveryTag, false);
            }
        };

        // 监听队列  手动 ACK
        channel.basicConsume(queueName, false, consumer);

        System.out.println("开始接收消息~~~");
        System.in.read();

        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}

4.4、测试

在这里插入图片描述
在这里插入图片描述

5、自动 ACK 带来的问题

5.1、执行生产者,产生一条记录

在这里插入图片描述

5.2、设置修改消费者为自动ACK

package com.example.rabbitmq03.business.test7;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.*;

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");
        
        final Channel channel = connection.createChannel();
        String queueName = "code_simple_queue1";
        
        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                int result = 1 / 0;
                // body 消息体
                String msg = new String(body, "utf-8");
                System.out.println("收到消息:" + msg);
                
            }
        };
        // 监听队列 自动 ACK
        channel.basicConsume(queueName, true, consumer);
        
        System.out.println("开始接收消息~~~");
        System.in.read();
        
        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}

5.3、执行消费者

在这里插入图片描述
在这里插入图片描述

消费者代码报错,没有收到消息,但是队列的消息少了,原因就是,MQ将异常内部消化了。
在这里插入图片描述


本文转载自: https://blog.csdn.net/zhou920786312/article/details/127425682
版权归原作者 勤径苦舟 所有, 如有侵权,请联系我们删除。

“RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)”的评论:

还没有评论