0


2024.2.23 模拟实现 RabbitMQ —— 实现消费消息逻辑

引言

函数式接口

  • Lambda 表达式的本质是匿名函数
  • Java 函数无法脱离类而存在,所以 Java 通过引入函数式接口以支持 Lambda 表达式

特性:

  1. 函数式接口为一个 interface 类
  2. 该类中有且仅有一个方法
  3. 该类需加上 @FunctionalInterface 注解

注意:

  • 上述三点其实就是 Lambda 的本质,即底层实现

消费者订阅消息 实现思路

1、让 broker server 把有哪些消费者管理好

  • 消费者调用 basicConsume 方法就是订阅某个指定队列的消息

注意:

  • 消费者是以队列为纬度订阅的
  • 一个队列可以有多个消费者

  • 约定 消费者之间按照 轮询 的方式进行消费

代码编写:

  • 定义一个 ConsumerEnv 类,用来描述一个消费者
  • 该类中也会包含一些消费者消费过程中用到的数据
import lombok.Data;

/*
* 表示一个消费者(完整的执行环境)
* */
@Data
public class ConsumerEnv {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
//    通过这个回调来处理收到的消息
    private Consumer consumer;

    public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        this.consumerTag = consumerTag;
        this.queueName = queueName;
        this.autoAck = autoAck;
        this.consumer = consumer;
    }
}
  • 给每个队列对象(MSGQueue 对象)添加属性 List,用于存储该队列的 消费者对象
//    当前队列都有哪些消费者订阅了
    private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//    记录当前取到了第几个消费者,方便实现轮询策略
    private AtomicInteger consumerSeq = new AtomicInteger(0);
//    添加一个新的订阅者
    public void addConsumerEnv(ConsumerEnv consumerEnv) {
            consumerEnvList.add(consumerEnv);
    }
//    订阅者的删除暂时不考虑
//    挑选一个订阅者,用来处理当前的消息 (按照轮询的方式)
    public ConsumerEnv chooseConsumer() {
        if(consumerEnvList.size() == 0) {
//            该队列没有人订阅
            return null;
        }
//        计算一下当前要取的元素的下标
        int index = consumerSeq.get() % consumerEnvList.size();
        consumerSeq.getAndDecrement();
        return consumerEnvList.get(index);
    }

2、消费者 订阅队列消息,并使用该消息完成明确好的业务逻辑

  • 所谓 消费者 消费消息****,其实就是让线程池 执行对应消费者中的回调函数
  • 通过回调函数,将消息的内容通过参数传递
  • 回调函数中的内容由消费者编写,具体里面要干啥,取决于消费者自己的业务逻辑

代码编写:

  • 此处我们使用 函数式接口 的方式,让消费者在订阅消息时,明确使用该消息进行的业务逻辑是什么
import com.example.demo.mqserver.core.BasicProperties;

/*
* 只是一个单纯的函数式接口(回调函数),收到消息之后要处理消息时调用的方法
* */
@FunctionalInterface
public interface Consumer {
//    Delivery 的意思是 "投递",这个方法预期是在每次服务器收到消息之后,来调用
//    通过这个方法把消息推送给对应的消费者
//    (注意!!这里的方法名和参数,也都是参考 RabbitMQ 展开的)
    void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body);
}
  • 在 VirtualHost 中实现消费者订阅某个队列的消息
//    订阅消息
//    添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者
//    consumerTag:表示消费者的身份标识
//    autoAck:消息被消费完成后,应答的方式,为 true 自动应答,为 false 手动应答
//    consumer:是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子
    public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
//        构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 Consumer 对象添加到该队列中
        queueName = virtualHostName + queueName;
        try {
            consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);
            System.out.println("[VirtualHost] basicConsume 成功! queueName = " + queueName);
            return true;
        }catch (Exception e) {
            System.out.println("[VirtualHost] basicConsume 失败! queueName = " + queueName);
            e.printStackTrace();
            return false;
        }
    }

3、队列收到消息,并将消息推送给订阅该队列的消费者

  • 为了能够让 线程池 知道要执行哪个回调函数及其参数中的 消息 来自于哪个队列
  • 我们定义一个单独的扫描线程,用于感知哪个队列收到了新消息

问题一:

  • 为啥搞了扫描线程,还要再搞个线程池呢?
  • 既让该扫描线程获取 消息和 消费者的回调函数,又让其执行回调函数不就行了?

回答:

  • 由于消费者编写的回调函数,具体是干啥的,我们并不知道
  • 如果是比较耗时的业务逻辑的话,此时仅由一个线程来完成上述这些操作,就可能周转不开了****!

问题二:

  • 当前有多个队列,但扫描线程就一个,扫描线程如何知道是哪个队列中 来了新消息呢?

方案一:

  • 直接让扫描线程不停的循环遍历所有对列****,如果发现有新的元素就立即处理

方案二:

  • 引入一个阻塞队列,哪个队列新增了一个消息,就将哪个队列的名字放入 阻塞队列中
  • 此时 扫描线程 仅需要盯住这阻塞队列即可
  • 阻塞队列中队列名相当于 "令牌",扫描线程从阻塞队列中取队列名,进而再根据队列名,从对应的队列中取一个消息

回答:

  • 此处我们采用方案二!

代码编写:

  • 此处我们实现一个 ConsumerManager 类,用于实现消费消息的核心逻辑
import com.example.demo.common.Consumer;
import com.example.demo.common.ConsumerEnv;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.VirtualHost;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/*
* 通过这个类来实现消费消息的核心逻辑
* */
public class ConsumerManager {
//    持有上层的 VirtualHost 对象的引用,用来操作数据
    private VirtualHost parent;
//    指定一个线程池,负责执行具体的回调任务
    private ExecutorService workerPool = Executors.newFixedThreadPool(4);
//    存放令牌的阻塞队列
    private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
//    扫描线程
    private Thread scannerThread = null;

    public ConsumerManager(VirtualHost p) {
        this.parent = p;

        scannerThread = new Thread(() -> {
            while (true) {
                try {
//                    1、拿到令牌
                    String queueName = tokenQueue.take();
//                    2、根据令牌找到队列
                    MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
                    if(queue == null) {
                        throw new MqException("[ConsumerManager] 取令牌后发现,该队列名不存在!queueName = " + queueName);
                    }
//                    3、从队列中消费消费一个消息
                    synchronized (queue) {
                        consumeMessage(queue);
                    }
                } catch (InterruptedException | MqException e) {
                    e.printStackTrace();
                }
            }
        });
//        把线程设为后台线程
        scannerThread.setDaemon(true);
        scannerThread.start();
    }

//    这个方法的调用时机就是发送消息的时候
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }

    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
//        找到对应的队列
       MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
       if(queue == null) {
           throw new MqException("[ConsumerManager] 队列不存在! queueName = " + queueName);
       }
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);
       synchronized (queue) {
           queue.addConsumerEnv(consumerEnv);
//           如果当前队列中已经有一些消息了,需要立即就消费掉
           int n = parent.getMemoryDataCenter().getMessageCount(queueName);
           for (int i = 0;i < n;i++) {
//               这个方法调用一次就消费一条消息
               consumeMessage(queue);
           }
       }
    }

    private void consumeMessage(MSGQueue queue) {
//        1、按照轮询的方式,找个消费者出来
        ConsumerEnv luckyDog = queue.chooseConsumer();
        if(luckyDog == null) {
//            当前队列没有消费者,暂时不消费,等后面有消费者出现再说
            return;
        }
//        2、从队列中取出一个消息
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        if(message == null) {
//            当前队列中还没有消息,也不需要消费
            return;
        }
//        3、把消息带入到消费者的回调方法中,丢给线程池执行
        workerPool.submit(() -> {
            try {
//            1) 把消息放入待确认的集合中,这个操作必须要在执行回调之前
                parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
//            2) 真正执行回调函数
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());
//            3) 如果当前是 "自动应答",就可以直接把消息删除了
//               如果当前是 "手动应答",则先不处理,交给后续消费者调用 basicAck 方法来处理
                if(luckyDog.isAutoAck()) {
//                    a.删除硬盘上的消息
                    if(message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue,message);
                    }
//                    b.删除上面的待确认集合中的消息
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId());
//                    c.删除内存中消息中心里的消息
                    parent.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费 queueName = " + queue.getName());
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

关于消息确认

  • 为了能够确保消息是被正确的消费掉了,我们需要引入 消息确认 机制
  • 即消费者的回调方法 顺利执行完(未抛异常啥的),那么这条消息的历史使命就算完成了,该消息也就可以被删除了
  • 消息确认机制 也就是为了保证 消息不丢失

具体思路

  1. 在真正执行回调之前,先将该消息放到 "待确认集合" 中,避免因为回调失败,导致消息的丢失
  2. 真正执行回调
  3. 当前消费者采取的是 autoAck=true 自动应答,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息了(硬盘、消息中心、待确认集合)
  4. 当前消费者采取的是 autoAck=false 手动应答,需要消费者这边,在自己的回调方法内部,显式调用 basicAck 这个核心 API

basicAck 代码编写:

public boolean basicAck(String queueName,String messageId) {
        queueName = virtualHostName + queueName;
        try {
//            1、获取到消息和队列
            Message message = memoryDataCenter.getMessage(messageId);
            if(message == null) {
                throw new MqException("[VirtualHost] 要确认的消息不存在!messageId = " + messageId);
            }
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if(queue == null) {
                throw new MqException("[VirtualHost] 要确认的消息不存在!queueName = " + queueName);
            }
//            2、删除硬盘上的数据
            if(message.getDeliverMode() == 2) {
                diskDataCenter.deleteMessage(queue,message);
            }
//            3、删除消息中心的数据
            memoryDataCenter.removeMessage(messageId);
//            4、删除待确认的集合中的消息
            memoryDataCenter.removeMessageWaitAck(queueName,messageId);
            System.out.println("[VirtualHost] basicAck 成功!消息被确认成功!queueName = " + queueName
            + ", messageId = " + messageId);
            return true;
        }catch (Exception e) {
            System.out.println("[VirtualHost] basicAck 失败!消息确认失败!queueName = " + queueName
                    + ", messageId = " + messageId);
            e.printStackTrace();
            return false;
        }
    }

问题一:

  • 执行回调方法的过程中抛异常了会产生什么影响?

回答:

  • 当回调方法抛异常,后续逻辑便会执行不到,此时该消息就会始终在 待确认的集合中
  • RabbitMQ 的做法是另外搞一个扫描线程(其实 RabbitMQ 里面不叫线程,人家是叫进程,但是这个进程不是操作系统的进程,而是 erlang 中的概念)
  • 由该线程负责关注 待确认集合中,每个待确认的消息呆多久了****,如果呆的时间超出了范围就会把这个消息放到一个特定的队列 "死信队列"
  • 当然,死信对列 也是程序员手动配置的,但此处我们并未实现 死信队列逻辑

问题二:

  • 执行回调过程中,broker server 崩溃了,其中的内存数据全没了,此时有什么影响?

回答:

  • 此时硬盘数据还是在的!
  • 正在消费的这个消息,在硬盘中仍然存在
  • broker server 重启之后,这个消息就又被加载回内存了,就像从来没有消费过一样
  • 消费者就有机会重新消费到这个消息
  • 当然重复消费的问题,应该由消费者的业务代码负责保证,broker server 管不了!
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/weixin_63888301/article/details/136275135
版权归原作者 茂大师 所有, 如有侵权,请联系我们删除。

“2024.2.23 模拟实现 RabbitMQ —— 实现消费消息逻辑”的评论:

还没有评论