0


仿RabbitMq实现消息队列正式篇(虚拟机篇)

@TOC

虚拟机模块

要管理的数据

  • 交换机数据管理句柄
  • 队列数据管理句柄
  • 绑定信息数据管理模块
  • 消息数据管理模块

要管理的操作

  • 声明 / 删除交换机:在删除交换机的时候,要删除相关的绑定信息
  • 声明 / 删除队列 :在删除队列的时候,要删除相关的绑定信息以及消息数据
  • 队列的绑定 / 解绑 :绑定的时候,必须交换机和队列是存在的
  • 获取指定队列的消息
  • 对指定队列的指定消息进行确认
  • 获取交换机相关的所有绑定信息:一条消息要发布给指定交换机的时候,交换机获取所有绑定信息,来确认消息要发布到哪个队列

基于这个,那我们先完成虚拟机模块管理的相关数据

消息管理模块

syntax = "proto3";  
package mymq;

enum ExchangeType  # 交换类型
{
    UNKNOWTYPE = 0;
    DIRECT = 1;
    FANOUT = 2;
    TOPIC = 3; 
};

enum DeliveryMode # 是否持久化
{
    UNKNOWMODE = 0;
    UNDURABLE = 1;
    DURABLE = 2;
};
 
message BasicProperties #消息的属性
{
    string id = 1;                         # 消息ID, UUID
    DeliveryMode delivery_mode = 2;        # 持久化模块
    string rounting_key = 3;               # routing_key绑定信息
};

message Message
{
    message Payload
    {
        BasicProperties properties = 1;     # 消息属性
        string body = 2;                    # 有效载荷数据
        string valid = 3;                   # 消息是否有效
    };
    Payload payload = 1;                    # 真正持久化的只有这一个字段
    uint32 offset = 2;                      # 消息的位置
    uint32 length = 3;                      # 消息的长度
};

要管理的数据

消息信息

  1. ID:消息的唯一标识
  2. 持久化标识:表示是否对消息进行持久化(还取决于队列的持久化标志)
  3. routing_key:决定了当前消息要发布的队列(消息发布到交换机后,根据绑定队列binding_key决定是否发布到指定队列)
消息主体
  1. 存储偏移量:消息以队列为单元存储在文件中,这个偏移量,是当前消息相对于文件起始位置的偏移量

  2. 消息长度:从偏移量位置取出指定长度的消息(解决了粘包问题)

  3. 是否有效标志:标识当前消息是否已经删除(删除一条消息,并不是用后面的消息覆盖掉前面的数据,而是重置有效标志位,当一个文件中的有效消息占据总消息比例不到50%,且数据量超过2000,则进行垃圾回收,重新整理文件数据存储)

     当系统重启时,也需要重新加载有效消息即可。
    

消息的管理

管理方法

以队列为单元进行管理(因为消息的所有操作都是以队列为单元)

管理数据
  1. 消息链表
  2. 待确定消息hash
  3. 持久化消息hash
  4. 持久化的有效消息数据
  5. 持久化的总的消息数据
管理操作
  • 向队列新增消息
  • 获取队首消息
  • 对消息进行确认
  • 恢复队列历史消息
  • 垃圾回收
  • 删除队列相关消息文件
队列消息管理
  • 初始化队列消息结构
  • 移除队列消息结果
  • 向队列新增消息
  • 对队列消息进行确认
  • 恢复队列历史消息
#ifndef __M_MSG_H__
#define __M_MSG_H__

#include "../mqcommon/logger.hpp"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/msg.pb.h"
#include <iostream>
#include <mutex>
#include <memory>
#include <unordered_map>
#include <list>

namespace mymq
{

    // 消息的持久化管理
    //  a. 管理数据
    //  i. 队列消息⽂件存储的路径
    //  ii. 队列消息的存储⽂件名
    //  iii. 队列消息的临时交换⽂件名
    // b. 管理操作
    // i. ⽇志消息存储在⽂件中(4B⻓度+(属性+内容+有效位)序列化消息,连续存储即可)
    // ii. 提供队列消息⽂件创建/删除功能
    // iii. 提供队列消息的新增持久化/删除持久化
    // iv. 提供持久化内容的垃圾回收(其实就是重新加载出所有有效消息返回,并重新⽣成新的消息
    // 存储⽂件)
#define DATAFILE_SUBFIX ".mqd"
#define TMPFILE_SUBFIX ".mqb.tmp"
    using MessagePtr = std::shared_ptr<mymq::Message>;

    // 消息持久化
    class MessageMapper
    {
    public:
        MessageMapper(std::string &basedir, const std::string &qname)
            : _qname(qname)
        {
            if (basedir.back() != '/')
            {
                basedir.push_back('/');
            }
            _datafile = basedir + qname + DATAFILE_SUBFIX;
            _tmpfile = basedir + qname + TMPFILE_SUBFIX;
            if (FileHelper(basedir).exists() == false)
            {
                FileHelper::createDirectory(basedir);
            }
            createMsgFile();
        }

        bool createMsgFile()
        {
            if (FileHelper(_datafile).exists() == true)
            {
                return true;
            }
            bool ret = FileHelper::createFile(_datafile);
            if (ret == false)
            {
                ELOG("创建队列数据文件:%s 失败", _datafile.c_str());
                return false;
            }
            return true;
        }

        void removeMsgFile()
        {
            FileHelper::removeFile(_datafile);
            FileHelper::removeFile(_tmpfile);
        }

        bool insert(MessagePtr &msg)
        {
            return insert(_datafile, msg);
        }

        bool remove(MessagePtr &msg)
        {
            // 1. 将msg中的有效标志位修改为‘0’
            msg->mutable_payload()->set_valid("0");
            // 2,对msg进行序列化
            std::string body = msg->SerializeAsString();
            if (body.size() != msg->length())
            {
                ELOG("不能修改文件中的数据信息,因为生成的数据与原数据长度不一样");
                return false;
            }
            // 3. 将序列化后的消息,写入到数据在文件中的指定位置(覆盖原有的数据)
            FileHelper helper(_datafile);
            bool ret = helper.write(body.c_str(), msg->offset(), body.size());
            if (ret == false)
            {
                DLOG("向队列数据文件写入失败!");
                return false;
            }
            return true;
        }

        std::list<MessagePtr> gc()
        {
            std::list<MessagePtr> result;
            bool ret = load(result);
            if (ret == false)
            {
                ELOG("加载有效数据失败\n");
                return result;
            }
            // 2. 将有效数据,进行序列化存储到临时文件中
            FileHelper::createFile(_tmpfile);
            for (auto e : result)
            {
                DLOG("向临时文件中添加数据 : %s", e->payload().body().c_str());
                ret = insert(_tmpfile, e);
                if (ret == false)
                {
                    ELOG("向临时文件中添加数据失败");
                    return result;
                }
            }
            // 3. 删除源文件
            ret = FileHelper::removeFile(_datafile);
            if (ret == false)
            {
                ELOG("删除原文件失败");
                return result;
            }
            // 4. 修改临时文件的名字,改成原文件的名称
            ret = FileHelper(_tmpfile).rename(_datafile.c_str());
            if (ret == false)
            {
                ELOG("修改文件名称失败");
                return result;
            }
            // 5. 返回新的有效数据
            return result;
        }

    private:
        bool load(std::list<MessagePtr> &result)
        {
            // 1. 加载出文件中的所有的有效数据:存储格式:4字节长度 | 数据 | 4字节长度 | 数据 ......
            FileHelper data_file_helper(_datafile);
            size_t offset = 0;   // 用来定位
            size_t msg_size = 0; // 用来 4 字节长度的数据长度
            size_t file_size = data_file_helper.size();
            while (offset < file_size)
            {
                bool ret = data_file_helper.read((char*)&msg_size, offset, sizeof(size_t));
                if (ret == false)
                {
                    DLOG("读取消息长度失败!");
                }
                offset += sizeof(size_t);
                std::string msg_body(msg_size, '\0');
                ret = data_file_helper.read(&msg_body[0], offset, msg_size);
                // 为什么使用 &msg_body[0]
                // &msg_body[0] 返回的是指向 std::string 内部字符数组首元素的指针,即指向字符串内容的指针。
                // 适用于需要直接操作字符串内容的场景,例如文件读写操作

                // &msg_body:
                // &msg_body 返回的是 std::string 对象本身的指针,即指向 std::string 对象的指针。
                // 不适用于直接操作字符串内容的场景,而是用于操作整个 std::string 对象本身。
                if (ret == false)
                {
                    DLOG("读取消息数据失败");
                    return false;
                }
                offset += msg_size;
                MessagePtr msgp = std::make_shared<Message>();
                msgp->mutable_payload()->ParseFromString(msg_body);

                // 如果是无效消息,则直接处理下一个
                if (msgp->payload().valid() == "0")
                {
                    ELOG("加载到无效消息:%s", msgp->payload().body().c_str());
                    continue;
                }
                // 有效消息则保存下来
                result.push_back(msgp);
            }
            return true;
        }

        bool insert(const std::string &filename, MessagePtr &msg)
        {
            // 新增数据都是添加在文件末尾的
            //  1. 进行消息的序列化,获取到格式化后的消息
            // 因为使用的是 sqlite 数据库,存储的是二进制信息,所以需要进行序列化
            std::string body = msg->payload().SerializeAsString();
            // 2. 获取文件长度
            FileHelper helper(filename);
            size_t fsize = helper.size();
            size_t msg_size = body.size();
            // 写入逻辑:1. 先写入 4 字节数据长度,2. 再写入指定长度数据
            bool ret = helper.write((char *)&msg_size, fsize, sizeof(size_t));
            if (ret == false)
            {
                DLOG("向队列数据文件写入数据长度失败!");
                return false;
            }
            // 将数据写入文件的指定位置
            ret = helper.write(body.c_str(), fsize + sizeof(size_t), msg_size);
            if (ret == false)
            {
                DLOG("向队列数据文件写入数据失败!");
                return false;
            }
            // 更新msg中的实际存储信息
            msg->set_offset(fsize + sizeof(size_t));
            msg->set_length(msg_size);
            return true;
        }

    private:
        std::string _qname;
        std::string _datafile;
        std::string _tmpfile;
    };

    // 管理数据
    class QueueMessage
    {
    public:
        using ptr = std::shared_ptr<QueueMessage>;
        // 初始化
        QueueMessage(std::string &basedir, const std::string &qname)
            : _mapper(basedir, qname),
              _qname(qname),
              _valid_count(0),
              _total_count(0)
        {}

        bool recovery()
        {
            // 恢复历史消息
            std::unique_lock<std::mutex> lock(_mutex);
            _msgs = _mapper.gc();
            for (auto &msg : _msgs)
            {
                _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
            }
            _valid_count = _total_count = _msgs.size();
            return true;
        }

        bool insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable)
        {
            // 构造消息对象
            MessagePtr msg = std::make_shared<Message>();
            msg->mutable_payload()->set_body(body);
            if (bp != nullptr)
            {
                DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;
                msg->mutable_payload()->mutable_properties()->set_id(bp->id());
                msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
                msg->mutable_payload()->mutable_properties()->set_rounting_key(bp->rounting_key());
            }
            else
            {
                DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;
                msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
                msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
                msg->mutable_payload()->mutable_properties()->set_rounting_key("");
            }
            std::unique_lock<std::mutex> lock(_mutex);
            // 2. 判断消息是否需要持久化
            if (msg->payload().properties().delivery_mode() == DURABLE)
            {
                msg->mutable_payload()->set_valid("1");
                // 3. 进行持久化存储
                bool ret = _mapper.insert(msg);
                if (ret == false)
                {
                    ELOG("持久化存储信息:%s失败了!", body.c_str());
                }

                _valid_count += 1;
                _total_count += 1;
                _durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));
            }
            // 4. 内容的管理
            _msgs.push_back(msg);
            return true;
        }

        MessagePtr front()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            if (_msgs.size() == 0)
            {
                return MessagePtr();
            }
            // 获取一条队首消息:从_msgs中获取数据
            MessagePtr msg = _msgs.front();
            _msgs.pop_front();
            // 将该消息对象,向待确认的hash表中添加一份,等到收到消息确认后进行删除
            _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
            return msg;
        }

        // 每次删除消息后,判断是否需要垃圾回收
        bool remove(const std::string &msg_id)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            // 1. 从待确认队列中找到信息
            auto it = _waitack_msgs.find(msg_id);
            if (it == _waitack_msgs.end())
            {
                DLOG("没有找到要删除的消息:%s!", msg_id.c_str());
                return true;
            }
            // 2. 根据消息的持久化模式,决定是否删除持久化消息
            if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
            {
                // 删除持久化消息
                _mapper.remove(it->second);
                _durable_msgs.erase(msg_id);
                _valid_count -= 1;
                gc();
            }
            // 4. 删除内存中的信息
            _waitack_msgs.erase(msg_id);
            return true;
        }

        size_t getable_count()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _msgs.size();
        }

        size_t total_count()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _total_count;
        }

        size_t durable_count()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _durable_msgs.size();
        }
        size_t waitack_count()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _waitack_msgs.size();
        }
        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _mapper.removeMsgFile();
            _msgs.clear();
            _durable_msgs.clear();
            _waitack_msgs.clear();
            _valid_count = 0;
            _total_count = 0;
        }

    private:
        bool GCCheck() {
                //持久化的消息总量大于2000, 且其中有效比例低于50%则需要持久化
                if (_total_count > 2000 && _valid_count * 10  / _total_count < 5) {
                    return true;
                }
                return false;
        }

        void gc()
        {
            // 1. 进行垃圾回收,获取到垃圾回收后,有效的消息信息链表
            if(GCCheck() == false)
            {
                return; 
            }
            std::list<MessagePtr> msgs = _mapper.gc();
            for(auto& msg : msgs)
            {
                auto it = _durable_msgs.find(msg->payload().properties().id());
                if(it == _durable_msgs.end())
                {
                    DLOG("垃圾回收之后,有一条持久化消息,在内存中没有进行管理");
                    _msgs.push_back(msg); // 做法:重新添加到推送链表的末尾
                    _durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));
                    continue;
                }
                // 2. 更新每一条消息的实际存储位置
                it->second->set_offset(msg->offset());
                it->second->set_length(msg->length());
            }
            _valid_count = _total_count = msgs.size();
        }

    private:
        std::mutex _mutex;
        std::string _qname;
        size_t _valid_count;
        size_t _total_count;
        MessageMapper _mapper;
        std::list<MessagePtr> _msgs;
        std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息hash
        std::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息hash
    };

    // 实现一个对外的总体消息管理类
    // 消息的总体对外管理
    // i. 初始化新建队列的消息管理结构,并创建消息存储⽂件
    // ii. 删除队列的消息管理结构,以及消息存储⽂件
    // iii. 向指定队列新增消息
    // iv. 获取指定队列队⾸消息
    // v. 确认指定队列待确认消息(删除)
    // vi. 判断指定队列消息是否为空

    class MessageManager
    {
    public:
        using ptr = std::shared_ptr<MessageManager>;
        MessageManager(const std::string basedir)
            :_basedir(basedir)
        {}

        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            for(auto& it : _queue_msgs)
            {
                it.second->clear();
            }
        }

        void initQueueMessage(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it != _queue_msgs.end())
                {
                    return ;
                }
                qmp = std::make_shared<QueueMessage>(_basedir, qname);
                _queue_msgs.insert(std::make_pair(qname, qmp));
            }
            qmp->recovery();
        }

        void destroyQueueMessage(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    return ;
                }
                qmp = it->second;
                _queue_msgs.erase(it);
            }
            qmp->clear();
        }

        bool insert(const std::string& qname, BasicProperties* bp, const std::string& body, bool _queue_is_durable)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    ELOG("向队列 %s 新增消息失败,没有找到消息管理句柄", qname.c_str());
                    return false;
                }
                qmp = it->second;
            }
            return qmp->insert(bp, body, _queue_is_durable); 
        }

        MessagePtr front(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    DLOG("获取 %s 队首数据失败:没有找到对应的消息句柄", qname.c_str() ); 
                    return MessagePtr();
                }
                qmp = it->second;
            }
            //取出对应的相关队列的首部消息
            return qmp->front();
        }

        void ack(const std::string& qname, const std::string& msg_id)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    DLOG("确认队列 %s 消息 %s 失败:没有找到消息管理句柄", qname.c_str(), msg_id.c_str());
                    return ;
                }
                qmp = it->second;
            }
            qmp->remove(msg_id);
            return ;
        }

        size_t getable_count(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    DLOG("获取队列 %s 待推送消息数量失败:没有找到消息管理句柄", qname.c_str());
                    return 0;
                }
                qmp = it->second;
            }
            return qmp->getable_count();
        }
        size_t total_count(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
                    return 0;
                }
                qmp = it->second;
            }
            return qmp->total_count();
        }
        size_t durable_count(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
                    return 0;
                }
                qmp = it->second;
            }
            return qmp->durable_count();
        }
        size_t waitack_count(const std::string& qname)
        {
                 QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
                    return 0;
                }
                qmp = it->second;
            }
            return qmp->waitack_count();
        }
    private:
        std::mutex _mutex;
        std::string _basedir;
        std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
    };

}

#endif

交换机数据管理

要管理的数据

  • 交换机名称:唯一标识
  • 交换机类型:决定了消息的转发方式
  • 每一个队列绑定中有个binding_key, 每条消息中有一个routing_key
  • 直接交换:binding_key和routing_key相同,则将消息放入队列
  • 广播交换:将消息放入交换机绑定的所有队列中
  • 主题交换:routing_key与多个绑定队列的binding_key有个匹配机制,匹配成功了则放入
  • 持久化标志:决定了当前交换机信息是否需要持久化存储
  • 自动化删除标志:指的是关联了当前交换机的所有客户端都退出了,是否要自动删除交换机

要管理的操作

  • 创建交换机:本质上需要的是声明 ----- 强断言的思想,没有的话,就创建
  • 删除交换机:每个交换机都会绑定一个或多个队列(意味着会有一个或多个绑定信息,因此删除交换机需要删除相关绑定信息
  • 获取指定名称的交换机
  • 获取当前交换机名称

注意交换机有持久化的标识,所以在进行管理操作的时候,需要分情况进行,在内存中管理,以及在硬盘中管理

代码展示

#include "../mqcommon/helper.hpp"
#include "../mqcommon/logger.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include <google/protobuf/map.h>
#include <iostream>
#include <unordered_map>
#include <memory>
#include <mutex>

namespace mymq
{
    // 1. 定义交换机类
    struct Exchange
    {
        using ptr = std::shared_ptr<Exchange>; 
        // 1. 交换机名称
        std::string name;
        // 2. 交换机类型
        ExchangeType type;
        // 3. 交换机持久化标志
        bool durable;
        // 4. 是否自动删除标志
        bool auto_delete;
        // 5. 其他参数
        google::protobuf::Map<std::string, std::string> args;
        //因为后面的函数需要构造一个Exchange的空对象
        Exchange()
        {}
        
        Exchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete, const google::protobuf::Map<std::string, std::string> eargs)
            : name(ename),
              type(etype),
              durable(edurable),
              auto_delete(eauto_delete),
              args(eargs)
        {}

        // args存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=value&key=value.....
        // 内部解析str_args字符串,将容器存储到成员中
        void setArgs(const std::string &str_args)
        {
            //key=val&key=val
            std::vector<std::string> sub_args;
            StrHelper::split(str_args, "&", sub_args);
            for(auto& str : sub_args)
            {
                size_t pos = str.find('=');
                std::string key = str.substr(0, pos);
                std::string val = str.substr(pos + 1);
                args[key] = val;
            }
        }
        // 将args中的内容进行序列化后,返回一个字符串
        std::string getArgs()
        {
            std::string result;
            for(auto e = args.begin(); e != args.end(); e++)
            {
                result = e->first + "=" + e->second + "&" + result;
            }
            return result; 
        }
    };

    // 2. 定义交换机数据持久化管理类 -- 数据存储在sqlite数据库中
    class ExchangeMapper
    {
    public:
        ExchangeMapper(const std::string &dbfile)
            :_sqlite_helper(dbfile)
        {
            std::string path = FileHelper::parentDirectory(dbfile);
            FileHelper::createDirectory(path);
            assert(_sqlite_helper.open());
            createTable();
        }

        void createTable()
        {
            #define CREATE_TABLE "create table if not exists exchange_table(\
            name varchar(32) primary key, \
            type int,\
            durable int, \
            auto_delete int ,\
            args varchar(128));"
            bool ret = _sqlite_helper.exec(CREATE_TABLE, nullptr, nullptr);
            if(ret == false)
            {
                DLOG("创建交换机数据库失败");
                abort();  //直接异常退出程序
            }
        }

        void removeTable()
        {
            #define DROP_TABLE "drop table if exists exchange_table;"
            bool ret = _sqlite_helper.exec(DROP_TABLE, nullptr, nullptr);
            if(ret == false)
            {
                DLOG("删除交换机数据库失败");
                abort();  //直接异常退出程序
            }
        }

        bool insert(Exchange::ptr &exp)
        {
            #define INSERT_TABLE "insert into exchange_table values('%s', %d, %d, %d, '%s');"
            std::string args_str = exp->getArgs();
            char sql_str[4096] = {0};
            sprintf(sql_str, INSERT_TABLE, exp->name.c_str(), exp->type, exp->durable, exp->auto_delete, args_str.c_str());    
            bool ret = _sqlite_helper.exec(sql_str, nullptr, nullptr);
            if(ret == false)
            {
                DLOG("插入数据库失败");
                return false;
            }
            return true;
        }

        void remove(const std::string &name)
        {
            std::stringstream ss;
            ss << "delete from exchange_table where name = ";
            ss << "'" << name << "';";
            _sqlite_helper.exec(ss.str(), nullptr, nullptr);
        }

        using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;
        ExchangeMap recovery()
        {
            ExchangeMap result;
            std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";
            _sqlite_helper.exec(sql.c_str(), SelectCallBack, &result);
            return result;
        }
    private:
        static int SelectCallBack(void* args, int numcol, char** row, char** fields)
        {
            ExchangeMap* result = (ExchangeMap*)args;
            auto exp = std::make_shared<Exchange>();
            exp->name = row[0];
            exp->type = (mymq::ExchangeType)std::stoi(row[1]);
            exp->durable = (bool)std::stoi(row[2]);
            exp->auto_delete = (bool)std::stoi(row[3]);
            if(row[4])
            {
                exp->setArgs(row[4]);
            }
            result->insert(std::make_pair(exp->name, exp));
            return 0;
        }
    private:
        SqliteHelper _sqlite_helper;
    };

    // 3. 定义交换机在内存管理类
    class ExchangeManager
    {
    public:
        using ptr = std::shared_ptr<ExchangeManager>;
        ExchangeManager(const std::string &dbfile)
            :_mapper(dbfile)
        {
            _exchanges = _mapper.recovery();
        }
        // 声明交换机
        bool declareExchange(const std::string &name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string> &args)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _exchanges.find(name);
            if(it != _exchanges.end())
            {
                //如果交换机已经存在,那就直接返回,不需要重复新增
                return true;
            }
            auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);
            if(durable == true)
            {
                bool ret = _mapper.insert(exp);
                if(ret == false)
                {
                    return false;
                }
            }
            _exchanges.insert(std::make_pair(name, exp));
            return true;
        }
        // 删除交换机
        void deleteExchange(const std::string &name)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _exchanges.find(name);
            if(it == _exchanges.end())
            {
                //如果交换机不存在,那就直接返回
                return ;
            }
            if(it->second->durable == true)
                _mapper.remove(name);
            _exchanges.erase(name);
        }

        Exchange::ptr selectExchange(const std::string& name)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _exchanges.find(name);
            if(it == _exchanges.end())
            {
                //如果交换机不存在,那就直接返回
                return nullptr;
            }
            return it->second;
        }

        // 判断交换机是否存在
        bool exists(const std::string &name)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _exchanges.find(name);
            if(it == _exchanges.end())
            {
                //如果交换机不存在,那就直接返回
                return false;
            }
            return true;
        }

        size_t size()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _exchanges.size();
        }

        // 清理所有交换机数据
        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _mapper.removeTable();
            _exchanges.clear();
        }

    private:
        std::mutex _mutex;
        ExchangeMapper _mapper;
        std::unordered_map<std::string, Exchange::ptr> _exchanges;
    };

}

队列数据管理

要管理的数据

  1. 队列名称:唯一的标识
  2. 持久化存储标志:决定了是否将队列信息持久化存储起来,决定了重启后,这个队列是否存在
  3. 是否独占标志:独占就指的是,只有当前客户端自己能够订阅队列消息
  4. 自动删除标志:当订阅了当前队列的所有客户端退出后,是否删除队列

要管理的操作

  1. 创建队列
  2. 删除队列
  3. 获取指定队列信息
  4. 获取队列数据
  5. 获取所有队列名称:当系统重启后,需要重新加载数据,加载历史消息(消息以队列为单元存储在文件中)而加载消息需要直到队列名称,因为后边消息存储的时候,存储文件以队列名称进行的取名

代码展示

#ifndef __M_QUEUE_H__
#define __M_QUEUE_H__

#include "../mqcommon/helper.hpp"
#include "../mqcommon/logger.hpp"
#include "../mqcommon/msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <memory>
#include <mutex>

namespace mymq
{
    struct MsgQueue
    {
        using ptr = std::shared_ptr<MsgQueue>;   // 放在这个位置合适吗?
        std::string name;
        bool durable;
        bool exclusive;
        bool auto_delete;
        google::protobuf::Map<std::string, std::string> args;

        MsgQueue()
        {}

        MsgQueue(const std::string qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map<std::string, std::string> &qargs)
            :name(qname),
            durable(qdurable),
            exclusive(qexclusive),
            auto_delete(qauto_delete),
            args(qargs)
        {}

        void setArgs(const std::string& str_args)
        {
            std::vector<std::string> sub_args;
            StrHelper::split(str_args, "&", sub_args);
            for(auto sub_string : sub_args)
            {
                size_t pos = sub_string.find("=");
                std::string key = sub_string.substr(0, pos);
                std::string value = sub_string.substr(pos + 1);
                args[key] = value;
            }
        }

        std::string getArgs()
        {
            std::string result;
            for(auto str = args.begin(); str != args.end(); str++)
            {
                result = str->first + "=" + str->second + result;
            }
            return result;
        }
    };

    using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;
    class MsgQueueMapper
    {
    public:
        MsgQueueMapper(const std::string& dbfile)
            :_sqlite_helper(dbfile)
        {
            std::string path = FileHelper::parentDirectory(dbfile);
            bool ret = FileHelper::createDirectory(path);
            if(ret == false)
            {
                ELOG("创建父目录失败: %s", strerror(errno));
            }
            _sqlite_helper.open();  //创建sqlite目录
            createTable();
        }

        void createTable()
        {
            std::stringstream sql;
            sql << "create table if not exists queue_table(";
            sql << "name varchar(32) primary key, ";
            sql << "durable int, ";
            sql << "exclusive int, ";
            sql << "auto_delete int, ";
            sql << "args varchar(128));";
            _sqlite_helper.exec(sql.str(), nullptr, nullptr);
        }

        void removeTable()
        {
            std::stringstream sql;
            sql << "drop table if exists queue_table;";
            _sqlite_helper.exec(sql.str(), nullptr, nullptr);
        }

        bool insert(MsgQueue::ptr& queue)
        {
            std::stringstream sql;
            sql << "insert into queue_table values(";
            sql << "'" << queue->name << "',";
            sql << queue->durable << ", ";
            sql << queue->exclusive << ", ";
            sql << queue->auto_delete << ", ";
            sql << "'" << queue->getArgs() << "');";
            return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
        }

        void remove(const std::string& name)
        {
            std::stringstream sql;
            sql << "delete from queue_table where name = ";
            sql << "'" << name << "';";
            _sqlite_helper.exec(sql.str(), nullptr, nullptr);
        }

        QueueMap recovery()
        {
            QueueMap result;
            std::stringstream sql;
            sql << "select name, durable, exclusive, auto_delete, args from queue_table;";
            _sqlite_helper.exec(sql.str(), SelectCallBack, &result);
            return result;
        }

    private:
        static int SelectCallBack(void* args, int numcol, char** row, char** fields)
        {
            QueueMap* result = (QueueMap*)args;
            MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
            mqp->name = row[0];
            mqp->durable = std::stoi(row[1]);
            mqp->exclusive = std::stoi(row[2]);
            mqp->auto_delete = std::stoi(row[3]);
            if(row[4]) mqp->setArgs(row[4]);
            result->insert(std::make_pair(mqp->name, mqp));
            return 0;
        }
    private:
        SqliteHelper _sqlite_helper;
    };

    class MsgQueueManager
    {
    public:
        using ptr = std::shared_ptr<MsgQueueManager>;
        MsgQueueManager(const std::string& dbfile)
            :_mapper(dbfile)
        {
            _msg_queue = _mapper.recovery();
        }

        bool declareQueue(const std::string& name, bool durable, bool exclusive, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _msg_queue.find(name);
            if(it != _msg_queue.end())
            {
                // 队列存在,直接返回true
                return true;
            }
            MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
            mqp->name = name;
            mqp->durable = durable;
            mqp->exclusive = exclusive;
            mqp->auto_delete = auto_delete;
            mqp->args = args;
            if(durable ==true) //需要进行持久化存储
            {
                bool ret = _mapper.insert(mqp);
                if(ret == false) return false;
            }
            _msg_queue.insert(make_pair(mqp->name, mqp));
            return true;
        }

        void deleteQueue(const std::string& name)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _msg_queue.find(name);
            if(it == _msg_queue.end())
            {
                DLOG("未找到相关信息");
                return ;
            }
            if(it->second->durable == true)
            {
                _mapper.remove(name);
            }
            _msg_queue.erase(name);
        }

        MsgQueue::ptr selectQueue(const std::string& name)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _msg_queue.find(name);
            if(it == _msg_queue.end())
            {
                DLOG("未找到相关信息");
                return MsgQueue::ptr();
            }
            return it->second;
        }

        QueueMap AllQueues()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _msg_queue;
        }

        bool exists(const std::string& name)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _msg_queue.find(name);
            if(it == _msg_queue.end())
            {
                return false;
            }
            return true;
        }

        size_t size()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _msg_queue.size();
        }

        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _msg_queue.clear();
        }

    private:
        std::mutex _mutex;
        MsgQueueMapper _mapper;
        QueueMap _msg_queue;
    };
}

#endif

绑定数据管理模块

描述将哪个队列与哪个交换机绑定到了一起

管理的数据

  • 交换机名称
  • 队列名称
  • binding_key:绑定密钥----描述在交换机的主题交换&直接交换的消息发布匹配规则

由数字, 字符,_,#, * 组成 binding_key, 例如:news.music.#

管理的操作

  1. 添加绑定

  2. 解除绑定

  3. 获取交换机相关的所有绑定信息

     删除交换机的时候,要删除相关绑定信息
    
     当消息发布到交换机中,交换机得通过这些信息来将消息发布到指定队列
    
  4. 获取队列相关的所有绑定信息

     删除队列的时候,要删除相关的绑定信息
    
  5. 获取绑定信息数量

代码展示

#ifndef _M_BINDING_H_
#define _M_BINDING_H_

#include "../mqcommon/helper.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/logger.hpp"
#include <iostream>
#include <unordered_map>
#include <memory>
#include <mutex>

namespace mymq
{
    struct Binding
    {
        using ptr = std::shared_ptr<Binding>;
        std::string exchange_name;
        std::string msgqueue_name;
        std::string binding_key;

        Binding() {}

        Binding(const std::string &ename, const std::string &msgqname, const std::string &key)
            : exchange_name(ename),
              msgqueue_name(msgqname),
              binding_key(key)
        {
        }
    };
    // 队列与绑定信息是一一对应的(因为是给某一个交换机去绑定队列,因此一个交换机可能会有多个队列的绑定信息)
    // 因此先定义一个队列名,与绑定信息的映射信息,这个是为了方便通过队名查找绑定信息
    using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
    // 然后定义一个交换机名称与队列绑定信息之间的映射关系,这个map包含了所有的绑定信息,并且以交换机为单元进行区分
    using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;

    // 采用上面两种结构,则删除交换机相关绑定信息的时候,不仅要删除交换机映射,还需要删除对应队列中的映射,否则对象得不到释放
    class BindingMapper
    {
    public:
        BindingMapper(const std::string& dbfile)
            :_sqlite_helper(dbfile)
        {
            std::string path = FileHelper::parentDirectory(dbfile);
            FileHelper::createDirectory(path);
            _sqlite_helper.open();
            createTable();
        }

        void createTable()
        {
            std::stringstream sql;
            sql << "create table if not exists binding_table(";
            sql << "exchange_name varchar(32), ";
            sql << "msgqueue_name varchar(32), ";
            sql << "binding_key varchar(128));";
            assert(_sqlite_helper.exec(sql.str(), nullptr, nullptr));
        }

        void removeTable()
        {
            std::stringstream sql;
            sql << "drop table if exists binding_table; ";
            assert(_sqlite_helper.exec(sql.str(), nullptr, nullptr));
        }

        bool insert(Binding::ptr& binding)
        {
            std::stringstream sql; 
            sql << "insert into binding_table values(";
            sql << "'" << binding->exchange_name <<"',";
            sql << "'" << binding->msgqueue_name << "',";
            sql << "'" << binding->binding_key << "');";
            
            return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
        }

        bool remove(const std::string& ename, const std::string& qname)
        {
            std::stringstream sql;
            sql << "delete from binding_table where ";
            sql << "exchange_name = '" << ename << "' and ";
            sql << "msgqueue_name = '" << qname << "';";
            return _sqlite_helper.exec(sql.str(), nullptr, nullptr); 
        }

        void removeExchangeBindings(const std::string& ename)
        {
            std::stringstream sql;
            sql << "delete from binding_table where ";
            sql << "exchange_name = '" << ename << "';";
            _sqlite_helper.exec(sql.str(), nullptr, nullptr); 
        }

        void removeMsgQueueBindings(const std::string& qname)
        {
            std::stringstream sql;
            sql << "delete from binding_table where ";
            sql << "msgqueue_name = '" << qname << "';";
            _sqlite_helper.exec(sql.str(), nullptr, nullptr); 
        }

        BindingMap recovery()
        {
            BindingMap result;
            std::string sql = "select exchange_name, msgqueue_name, binding_key from binding_table;";
            _sqlite_helper.exec(sql, SelectCallBack, &result);
            return result;
        }

    private:
        static int SelectCallBack(void* args, int numcol, char** row, char** fileds)
        {
            BindingMap* result = (BindingMap*)args;
            Binding::ptr bq = std::make_shared<Binding>();
            bq->exchange_name = row[0];
            bq->msgqueue_name = row[1];
            bq->binding_key = row[2];
            //为了防止 交换相关的绑定信息,因此不能直接创建队列映射,这样会覆盖历史数据
            // 因此得先获取交换机对应的映射对象,往里边添加数据
            // 但是,若这时候没有交换机对应的映射信息,因此这里的获取要使用引用(会保证不存在则自动创建)
            MsgQueueBindingMap& qbm = (*result)[bq->exchange_name];
            qbm.insert(std::make_pair(bq->msgqueue_name, bq));
            return 0;
        }
    private:
        SqliteHelper _sqlite_helper;
    };

    class BindingManager
    {
    public:
        using ptr = std::shared_ptr<BindingManager>;
        BindingManager(const std::string dbfile)
            :_mapper(dbfile)
        {
            _bindmap = _mapper.recovery();
        }

        bool bind(const std::string& ename, const std::string& qname, const std::string& key ,bool durable)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _bindmap.find(ename);
            if(it != _bindmap.end() && it->second.find(qname) != it->second.end())
            {
                return true;
            }
            //绑定是否是持久化
            Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);
            // 磁盘数据持久化
            if(durable == true)
            {
                bool ret = _mapper.insert(bp);
                if(ret == false)
                {
                    //未添加成功
                    return false;
                }
            }
            // 一个交换机对应着多条绑定信息,所以先确定交换机,再将相关绑定信息插进去
            MsgQueueBindingMap& mqbm = _bindmap[ename];
            mqbm.insert(std::make_pair(bp->msgqueue_name, bp));
            return true;
        }

        void unBind(const std::string& ename, const std::string& qname)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto itBindMap = _bindmap.find(ename);
            if(itBindMap == _bindmap.end())
            {
                return;
            }
            auto itMsgQueueBindings = itBindMap->second.find(qname);
            if(itMsgQueueBindings == itBindMap->second.end())
            {
                return ;
            }
            _mapper.remove(ename, qname);
            _bindmap[ename].erase(qname);
        }

        void removeExchangeBindings(const std::string& ename)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _mapper.removeExchangeBindings(ename);
            _bindmap.erase(ename);
        }

        void removeMsgqueueBindings(const std::string& qname)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _mapper.removeMsgQueueBindings(qname);
            for(auto it = _bindmap.begin(); it != _bindmap.end(); it++)
            {
                it->second.erase(qname);
            }
        }

        MsgQueueBindingMap getExchangeBindings(const std::string& ename)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _bindmap.find(ename);
            if(it == _bindmap.end())
            {
                return MsgQueueBindingMap();
            }
            return it->second;
        }

        Binding::ptr getBinding(const std::string& ename, const std::string& qname)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it_bindmap = _bindmap.find(ename);
            if(it_bindmap == _bindmap.end())
            {
                return Binding::ptr();
            }
            auto it_MsgQueueBindings = it_bindmap->second.find(qname);
            if(it_MsgQueueBindings == it_bindmap->second.end())
            {
                return Binding::ptr();
            }
            return it_MsgQueueBindings->second;
        }

        bool exists(const std::string& ename, const std::string& qname)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it_bindmap = _bindmap.find(ename);
            if(it_bindmap == _bindmap.end())
            {
                return false;
            }
            auto it_MsgQueueBindings = it_bindmap->second.find(qname);
            if(it_MsgQueueBindings ==it_bindmap->second.end())
            {
                return false;
            }
            return true;
        }

        size_t size()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            size_t total_size = 0;
            for(auto it = _bindmap.begin(); it != _bindmap.end(); it++)
            {
                total_size += it->second.size();
            }
            return total_size;
        }

        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _mapper.removeTable();
            _bindmap.clear();
        }

        
    private:
        std::mutex _mutex;
        BindingMapper _mapper;
        BindingMap _bindmap;
    };
}

#endif

现在虚拟机模块的所有需要管理的数据全部都安排成功了,接下来是虚拟机的代码展示

虚拟机代码展示

#ifndef __M_HOST_H__
#define __M_HOST_H__

#include "mq_exchange.hpp"
#include "mq_queue.hpp"
#include "mq_binding.hpp"
#include "mq_message.hpp"

namespace mymq
{
    class VirtualHost
    {
        public:
            using ptr = std::shared_ptr<VirtualHost>;
            VirtualHost(const std::string& hname, const std::string& basedir, const std::string& dbfile)
                :_host_name(hname),
                _emp(std::make_shared<ExchangeManager>(dbfile)),
                _mqmp(std::make_shared<MsgQueueManager>(dbfile)),           
                _bmp(std::make_shared<BindingManager>(dbfile)),
                _mmp(std::make_shared<MessageManager>(basedir))
            {}

            bool declareExchange(const std::string& name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args)
            {
                return _emp->declareExchange(name, type, durable, auto_delete, args);
            }

            void deleteExchange(const std::string& name)
            {
                //删除交换机的时候,需要将交换机相关的绑定信息也删除掉
                _bmp->removeExchangeBindings(name);
                return _emp->deleteExchange(name);
            }

            bool existsExchange(const std::string& name)
            {
                return _emp->exists(name);
            }

            Exchange::ptr selectExchange(const std::string& ename)
            {
                return _emp->selectExchange(ename);
            }

            bool declareQueue(const std::string qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map<std::string, std::string> args)
            {
                //初始化队列消息句柄(消息的存储管理)
                //队列的创建
                _mmp->initQueueMessage(qname);
                return _mqmp->declareQueue(qname, qdurable, qexclusive, qauto_delete, args);
            }

            void deleteQueue(const std::string& qname)
            {
                //删除的时候队列相关的数据有两个:队列的消息,队列的绑定消息
                //删除队列里的消息
                _mmp->destroyQueueMessage(qname);
                //删除队列的绑定消息
                _bmp->removeMsgqueueBindings(qname);
                //删除队列
                _mqmp->deleteQueue(qname);
            }

            bool existsQueue(const std::string qname)
            {
                return _mqmp->exists(qname);
            }

            QueueMap allqueue()
            {
                return _mqmp->AllQueues();
            }

            bool bind(const std::string& ename, const std::string& qname, const std::string& key)
            {
                Exchange::ptr ep = _emp->selectExchange(ename);
                if(ep.get() == nullptr)
                {
                    DLOG("进行队列绑定失败,交换机%s不存在", ename.c_str());
                    return false;
                }
                MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
                if(mqp.get() == nullptr)
                {
                    DLOG("进行队列绑定失败,队列%s不存在", qname.c_str());
                }
                return _bmp->bind(ename, qname,key, ep->durable && mqp->durable);
            }

            void unBind(const std::string& ename, const std::string& qname)
            {
                return _bmp->unBind(ename, qname);
            }

            MsgQueueBindingMap exchangeBindings(const std::string& ename)
            {
                return _bmp->getExchangeBindings(ename);
            }

            bool existsBinding(const std::string& ename, const std::string& qname)
            {
                return _bmp->exists(ename, qname);
            }
            
            // 消息发布函数
            bool basicPublish(const std::string& qname, BasicProperties* bp, const std::string& body)
            {
                MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
                if(mqp.get() == nullptr)
                {
                    DLOG("发布消息失败, 队列%s不存在", qname.c_str());
                    return false;
                }
                return _mmp->insert(qname, bp, body, mqp->durable);
            }

            //获取消息
            MessagePtr basicConsume(const std::string& name)
            {
                return _mmp->front(name);
            }

            // 消息应答
            void basicAck(const std::string& qname, const std::string& msgid)
            {
                return _mmp->ack(qname, msgid);
            }

            void clear()
            {
                _emp->clear();
                _mqmp->clear();
                _bmp->clear();
                _mmp->clear();
            }

        private:
            std::string _host_name;
            ExchangeManager::ptr _emp;
            MsgQueueManager::ptr _mqmp;
            BindingManager::ptr _bmp;
            MessageManager::ptr _mmp;
    };
}

#endif
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/m0_62812354/article/details/141192464
版权归原作者 疏 石 兰 兮 所有, 如有侵权,请联系我们删除。

“仿RabbitMq实现消息队列正式篇(虚拟机篇)”的评论:

还没有评论