@TOC
虚拟机模块
要管理的数据
- 交换机数据管理句柄
- 队列数据管理句柄
- 绑定信息数据管理模块
- 消息数据管理模块
要管理的操作
- 声明 / 删除交换机:在删除交换机的时候,要删除相关的绑定信息
- 声明 / 删除队列 :在删除队列的时候,要删除相关的绑定信息以及消息数据
- 队列的绑定 / 解绑 :绑定的时候,必须交换机和队列是存在的
- 获取指定队列的消息
- 对指定队列的指定消息进行确认
- 获取交换机相关的所有绑定信息:一条消息要发布给指定交换机的时候,交换机获取所有绑定信息,来确认消息要发布到哪个队列
基于这个,那我们先完成虚拟机模块管理的相关数据
消息管理模块
syntax = "proto3";
package mymq;
enum ExchangeType # 交换类型
{
UNKNOWTYPE = 0;
DIRECT = 1;
FANOUT = 2;
TOPIC = 3;
};
enum DeliveryMode # 是否持久化
{
UNKNOWMODE = 0;
UNDURABLE = 1;
DURABLE = 2;
};
message BasicProperties #消息的属性
{
string id = 1; # 消息ID, UUID
DeliveryMode delivery_mode = 2; # 持久化模块
string rounting_key = 3; # routing_key绑定信息
};
message Message
{
message Payload
{
BasicProperties properties = 1; # 消息属性
string body = 2; # 有效载荷数据
string valid = 3; # 消息是否有效
};
Payload payload = 1; # 真正持久化的只有这一个字段
uint32 offset = 2; # 消息的位置
uint32 length = 3; # 消息的长度
};
要管理的数据
消息信息
- ID:消息的唯一标识
- 持久化标识:表示是否对消息进行持久化(还取决于队列的持久化标志)
- routing_key:决定了当前消息要发布的队列(消息发布到交换机后,根据绑定队列binding_key决定是否发布到指定队列)
消息主体
存储偏移量:消息以队列为单元存储在文件中,这个偏移量,是当前消息相对于文件起始位置的偏移量
消息长度:从偏移量位置取出指定长度的消息(解决了粘包问题)
是否有效标志:标识当前消息是否已经删除(删除一条消息,并不是用后面的消息覆盖掉前面的数据,而是重置有效标志位,当一个文件中的有效消息占据总消息比例不到50%,且数据量超过2000,则进行垃圾回收,重新整理文件数据存储)
当系统重启时,也需要重新加载有效消息即可。
消息的管理
管理方法
以队列为单元进行管理(因为消息的所有操作都是以队列为单元)
管理数据
- 消息链表
- 待确定消息hash
- 持久化消息hash
- 持久化的有效消息数据
- 持久化的总的消息数据
管理操作
- 向队列新增消息
- 获取队首消息
- 对消息进行确认
- 恢复队列历史消息
- 垃圾回收
- 删除队列相关消息文件
队列消息管理
- 初始化队列消息结构
- 移除队列消息结果
- 向队列新增消息
- 对队列消息进行确认
- 恢复队列历史消息
#ifndef __M_MSG_H__
#define __M_MSG_H__
#include "../mqcommon/logger.hpp"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/msg.pb.h"
#include <iostream>
#include <mutex>
#include <memory>
#include <unordered_map>
#include <list>
namespace mymq
{
// 消息的持久化管理
// a. 管理数据
// i. 队列消息⽂件存储的路径
// ii. 队列消息的存储⽂件名
// iii. 队列消息的临时交换⽂件名
// b. 管理操作
// i. ⽇志消息存储在⽂件中(4B⻓度+(属性+内容+有效位)序列化消息,连续存储即可)
// ii. 提供队列消息⽂件创建/删除功能
// iii. 提供队列消息的新增持久化/删除持久化
// iv. 提供持久化内容的垃圾回收(其实就是重新加载出所有有效消息返回,并重新⽣成新的消息
// 存储⽂件)
#define DATAFILE_SUBFIX ".mqd"
#define TMPFILE_SUBFIX ".mqb.tmp"
using MessagePtr = std::shared_ptr<mymq::Message>;
// 消息持久化
class MessageMapper
{
public:
MessageMapper(std::string &basedir, const std::string &qname)
: _qname(qname)
{
if (basedir.back() != '/')
{
basedir.push_back('/');
}
_datafile = basedir + qname + DATAFILE_SUBFIX;
_tmpfile = basedir + qname + TMPFILE_SUBFIX;
if (FileHelper(basedir).exists() == false)
{
FileHelper::createDirectory(basedir);
}
createMsgFile();
}
bool createMsgFile()
{
if (FileHelper(_datafile).exists() == true)
{
return true;
}
bool ret = FileHelper::createFile(_datafile);
if (ret == false)
{
ELOG("创建队列数据文件:%s 失败", _datafile.c_str());
return false;
}
return true;
}
void removeMsgFile()
{
FileHelper::removeFile(_datafile);
FileHelper::removeFile(_tmpfile);
}
bool insert(MessagePtr &msg)
{
return insert(_datafile, msg);
}
bool remove(MessagePtr &msg)
{
// 1. 将msg中的有效标志位修改为‘0’
msg->mutable_payload()->set_valid("0");
// 2,对msg进行序列化
std::string body = msg->SerializeAsString();
if (body.size() != msg->length())
{
ELOG("不能修改文件中的数据信息,因为生成的数据与原数据长度不一样");
return false;
}
// 3. 将序列化后的消息,写入到数据在文件中的指定位置(覆盖原有的数据)
FileHelper helper(_datafile);
bool ret = helper.write(body.c_str(), msg->offset(), body.size());
if (ret == false)
{
DLOG("向队列数据文件写入失败!");
return false;
}
return true;
}
std::list<MessagePtr> gc()
{
std::list<MessagePtr> result;
bool ret = load(result);
if (ret == false)
{
ELOG("加载有效数据失败\n");
return result;
}
// 2. 将有效数据,进行序列化存储到临时文件中
FileHelper::createFile(_tmpfile);
for (auto e : result)
{
DLOG("向临时文件中添加数据 : %s", e->payload().body().c_str());
ret = insert(_tmpfile, e);
if (ret == false)
{
ELOG("向临时文件中添加数据失败");
return result;
}
}
// 3. 删除源文件
ret = FileHelper::removeFile(_datafile);
if (ret == false)
{
ELOG("删除原文件失败");
return result;
}
// 4. 修改临时文件的名字,改成原文件的名称
ret = FileHelper(_tmpfile).rename(_datafile.c_str());
if (ret == false)
{
ELOG("修改文件名称失败");
return result;
}
// 5. 返回新的有效数据
return result;
}
private:
bool load(std::list<MessagePtr> &result)
{
// 1. 加载出文件中的所有的有效数据:存储格式:4字节长度 | 数据 | 4字节长度 | 数据 ......
FileHelper data_file_helper(_datafile);
size_t offset = 0; // 用来定位
size_t msg_size = 0; // 用来 4 字节长度的数据长度
size_t file_size = data_file_helper.size();
while (offset < file_size)
{
bool ret = data_file_helper.read((char*)&msg_size, offset, sizeof(size_t));
if (ret == false)
{
DLOG("读取消息长度失败!");
}
offset += sizeof(size_t);
std::string msg_body(msg_size, '\0');
ret = data_file_helper.read(&msg_body[0], offset, msg_size);
// 为什么使用 &msg_body[0]
// &msg_body[0] 返回的是指向 std::string 内部字符数组首元素的指针,即指向字符串内容的指针。
// 适用于需要直接操作字符串内容的场景,例如文件读写操作
// &msg_body:
// &msg_body 返回的是 std::string 对象本身的指针,即指向 std::string 对象的指针。
// 不适用于直接操作字符串内容的场景,而是用于操作整个 std::string 对象本身。
if (ret == false)
{
DLOG("读取消息数据失败");
return false;
}
offset += msg_size;
MessagePtr msgp = std::make_shared<Message>();
msgp->mutable_payload()->ParseFromString(msg_body);
// 如果是无效消息,则直接处理下一个
if (msgp->payload().valid() == "0")
{
ELOG("加载到无效消息:%s", msgp->payload().body().c_str());
continue;
}
// 有效消息则保存下来
result.push_back(msgp);
}
return true;
}
bool insert(const std::string &filename, MessagePtr &msg)
{
// 新增数据都是添加在文件末尾的
// 1. 进行消息的序列化,获取到格式化后的消息
// 因为使用的是 sqlite 数据库,存储的是二进制信息,所以需要进行序列化
std::string body = msg->payload().SerializeAsString();
// 2. 获取文件长度
FileHelper helper(filename);
size_t fsize = helper.size();
size_t msg_size = body.size();
// 写入逻辑:1. 先写入 4 字节数据长度,2. 再写入指定长度数据
bool ret = helper.write((char *)&msg_size, fsize, sizeof(size_t));
if (ret == false)
{
DLOG("向队列数据文件写入数据长度失败!");
return false;
}
// 将数据写入文件的指定位置
ret = helper.write(body.c_str(), fsize + sizeof(size_t), msg_size);
if (ret == false)
{
DLOG("向队列数据文件写入数据失败!");
return false;
}
// 更新msg中的实际存储信息
msg->set_offset(fsize + sizeof(size_t));
msg->set_length(msg_size);
return true;
}
private:
std::string _qname;
std::string _datafile;
std::string _tmpfile;
};
// 管理数据
class QueueMessage
{
public:
using ptr = std::shared_ptr<QueueMessage>;
// 初始化
QueueMessage(std::string &basedir, const std::string &qname)
: _mapper(basedir, qname),
_qname(qname),
_valid_count(0),
_total_count(0)
{}
bool recovery()
{
// 恢复历史消息
std::unique_lock<std::mutex> lock(_mutex);
_msgs = _mapper.gc();
for (auto &msg : _msgs)
{
_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
}
_valid_count = _total_count = _msgs.size();
return true;
}
bool insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable)
{
// 构造消息对象
MessagePtr msg = std::make_shared<Message>();
msg->mutable_payload()->set_body(body);
if (bp != nullptr)
{
DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;
msg->mutable_payload()->mutable_properties()->set_id(bp->id());
msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
msg->mutable_payload()->mutable_properties()->set_rounting_key(bp->rounting_key());
}
else
{
DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;
msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
msg->mutable_payload()->mutable_properties()->set_rounting_key("");
}
std::unique_lock<std::mutex> lock(_mutex);
// 2. 判断消息是否需要持久化
if (msg->payload().properties().delivery_mode() == DURABLE)
{
msg->mutable_payload()->set_valid("1");
// 3. 进行持久化存储
bool ret = _mapper.insert(msg);
if (ret == false)
{
ELOG("持久化存储信息:%s失败了!", body.c_str());
}
_valid_count += 1;
_total_count += 1;
_durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));
}
// 4. 内容的管理
_msgs.push_back(msg);
return true;
}
MessagePtr front()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_msgs.size() == 0)
{
return MessagePtr();
}
// 获取一条队首消息:从_msgs中获取数据
MessagePtr msg = _msgs.front();
_msgs.pop_front();
// 将该消息对象,向待确认的hash表中添加一份,等到收到消息确认后进行删除
_waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
return msg;
}
// 每次删除消息后,判断是否需要垃圾回收
bool remove(const std::string &msg_id)
{
std::unique_lock<std::mutex> lock(_mutex);
// 1. 从待确认队列中找到信息
auto it = _waitack_msgs.find(msg_id);
if (it == _waitack_msgs.end())
{
DLOG("没有找到要删除的消息:%s!", msg_id.c_str());
return true;
}
// 2. 根据消息的持久化模式,决定是否删除持久化消息
if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
{
// 删除持久化消息
_mapper.remove(it->second);
_durable_msgs.erase(msg_id);
_valid_count -= 1;
gc();
}
// 4. 删除内存中的信息
_waitack_msgs.erase(msg_id);
return true;
}
size_t getable_count()
{
std::unique_lock<std::mutex> lock(_mutex);
return _msgs.size();
}
size_t total_count()
{
std::unique_lock<std::mutex> lock(_mutex);
return _total_count;
}
size_t durable_count()
{
std::unique_lock<std::mutex> lock(_mutex);
return _durable_msgs.size();
}
size_t waitack_count()
{
std::unique_lock<std::mutex> lock(_mutex);
return _waitack_msgs.size();
}
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeMsgFile();
_msgs.clear();
_durable_msgs.clear();
_waitack_msgs.clear();
_valid_count = 0;
_total_count = 0;
}
private:
bool GCCheck() {
//持久化的消息总量大于2000, 且其中有效比例低于50%则需要持久化
if (_total_count > 2000 && _valid_count * 10 / _total_count < 5) {
return true;
}
return false;
}
void gc()
{
// 1. 进行垃圾回收,获取到垃圾回收后,有效的消息信息链表
if(GCCheck() == false)
{
return;
}
std::list<MessagePtr> msgs = _mapper.gc();
for(auto& msg : msgs)
{
auto it = _durable_msgs.find(msg->payload().properties().id());
if(it == _durable_msgs.end())
{
DLOG("垃圾回收之后,有一条持久化消息,在内存中没有进行管理");
_msgs.push_back(msg); // 做法:重新添加到推送链表的末尾
_durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));
continue;
}
// 2. 更新每一条消息的实际存储位置
it->second->set_offset(msg->offset());
it->second->set_length(msg->length());
}
_valid_count = _total_count = msgs.size();
}
private:
std::mutex _mutex;
std::string _qname;
size_t _valid_count;
size_t _total_count;
MessageMapper _mapper;
std::list<MessagePtr> _msgs;
std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息hash
std::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息hash
};
// 实现一个对外的总体消息管理类
// 消息的总体对外管理
// i. 初始化新建队列的消息管理结构,并创建消息存储⽂件
// ii. 删除队列的消息管理结构,以及消息存储⽂件
// iii. 向指定队列新增消息
// iv. 获取指定队列队⾸消息
// v. 确认指定队列待确认消息(删除)
// vi. 判断指定队列消息是否为空
class MessageManager
{
public:
using ptr = std::shared_ptr<MessageManager>;
MessageManager(const std::string basedir)
:_basedir(basedir)
{}
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
for(auto& it : _queue_msgs)
{
it.second->clear();
}
}
void initQueueMessage(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it != _queue_msgs.end())
{
return ;
}
qmp = std::make_shared<QueueMessage>(_basedir, qname);
_queue_msgs.insert(std::make_pair(qname, qmp));
}
qmp->recovery();
}
void destroyQueueMessage(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
return ;
}
qmp = it->second;
_queue_msgs.erase(it);
}
qmp->clear();
}
bool insert(const std::string& qname, BasicProperties* bp, const std::string& body, bool _queue_is_durable)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
ELOG("向队列 %s 新增消息失败,没有找到消息管理句柄", qname.c_str());
return false;
}
qmp = it->second;
}
return qmp->insert(bp, body, _queue_is_durable);
}
MessagePtr front(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
DLOG("获取 %s 队首数据失败:没有找到对应的消息句柄", qname.c_str() );
return MessagePtr();
}
qmp = it->second;
}
//取出对应的相关队列的首部消息
return qmp->front();
}
void ack(const std::string& qname, const std::string& msg_id)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
DLOG("确认队列 %s 消息 %s 失败:没有找到消息管理句柄", qname.c_str(), msg_id.c_str());
return ;
}
qmp = it->second;
}
qmp->remove(msg_id);
return ;
}
size_t getable_count(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
DLOG("获取队列 %s 待推送消息数量失败:没有找到消息管理句柄", qname.c_str());
return 0;
}
qmp = it->second;
}
return qmp->getable_count();
}
size_t total_count(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
return 0;
}
qmp = it->second;
}
return qmp->total_count();
}
size_t durable_count(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
return 0;
}
qmp = it->second;
}
return qmp->durable_count();
}
size_t waitack_count(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
return 0;
}
qmp = it->second;
}
return qmp->waitack_count();
}
private:
std::mutex _mutex;
std::string _basedir;
std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
};
}
#endif
交换机数据管理
要管理的数据
- 交换机名称:唯一标识
- 交换机类型:决定了消息的转发方式
- 每一个队列绑定中有个binding_key, 每条消息中有一个routing_key
- 直接交换:binding_key和routing_key相同,则将消息放入队列
- 广播交换:将消息放入交换机绑定的所有队列中
- 主题交换:routing_key与多个绑定队列的binding_key有个匹配机制,匹配成功了则放入
- 持久化标志:决定了当前交换机信息是否需要持久化存储
- 自动化删除标志:指的是关联了当前交换机的所有客户端都退出了,是否要自动删除交换机
要管理的操作
- 创建交换机:本质上需要的是声明 ----- 强断言的思想,没有的话,就创建
- 删除交换机:每个交换机都会绑定一个或多个队列(意味着会有一个或多个绑定信息,因此删除交换机需要删除相关绑定信息
- 获取指定名称的交换机
- 获取当前交换机名称
注意交换机有持久化的标识,所以在进行管理操作的时候,需要分情况进行,在内存中管理,以及在硬盘中管理
代码展示
#include "../mqcommon/helper.hpp"
#include "../mqcommon/logger.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include <google/protobuf/map.h>
#include <iostream>
#include <unordered_map>
#include <memory>
#include <mutex>
namespace mymq
{
// 1. 定义交换机类
struct Exchange
{
using ptr = std::shared_ptr<Exchange>;
// 1. 交换机名称
std::string name;
// 2. 交换机类型
ExchangeType type;
// 3. 交换机持久化标志
bool durable;
// 4. 是否自动删除标志
bool auto_delete;
// 5. 其他参数
google::protobuf::Map<std::string, std::string> args;
//因为后面的函数需要构造一个Exchange的空对象
Exchange()
{}
Exchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete, const google::protobuf::Map<std::string, std::string> eargs)
: name(ename),
type(etype),
durable(edurable),
auto_delete(eauto_delete),
args(eargs)
{}
// args存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=value&key=value.....
// 内部解析str_args字符串,将容器存储到成员中
void setArgs(const std::string &str_args)
{
//key=val&key=val
std::vector<std::string> sub_args;
StrHelper::split(str_args, "&", sub_args);
for(auto& str : sub_args)
{
size_t pos = str.find('=');
std::string key = str.substr(0, pos);
std::string val = str.substr(pos + 1);
args[key] = val;
}
}
// 将args中的内容进行序列化后,返回一个字符串
std::string getArgs()
{
std::string result;
for(auto e = args.begin(); e != args.end(); e++)
{
result = e->first + "=" + e->second + "&" + result;
}
return result;
}
};
// 2. 定义交换机数据持久化管理类 -- 数据存储在sqlite数据库中
class ExchangeMapper
{
public:
ExchangeMapper(const std::string &dbfile)
:_sqlite_helper(dbfile)
{
std::string path = FileHelper::parentDirectory(dbfile);
FileHelper::createDirectory(path);
assert(_sqlite_helper.open());
createTable();
}
void createTable()
{
#define CREATE_TABLE "create table if not exists exchange_table(\
name varchar(32) primary key, \
type int,\
durable int, \
auto_delete int ,\
args varchar(128));"
bool ret = _sqlite_helper.exec(CREATE_TABLE, nullptr, nullptr);
if(ret == false)
{
DLOG("创建交换机数据库失败");
abort(); //直接异常退出程序
}
}
void removeTable()
{
#define DROP_TABLE "drop table if exists exchange_table;"
bool ret = _sqlite_helper.exec(DROP_TABLE, nullptr, nullptr);
if(ret == false)
{
DLOG("删除交换机数据库失败");
abort(); //直接异常退出程序
}
}
bool insert(Exchange::ptr &exp)
{
#define INSERT_TABLE "insert into exchange_table values('%s', %d, %d, %d, '%s');"
std::string args_str = exp->getArgs();
char sql_str[4096] = {0};
sprintf(sql_str, INSERT_TABLE, exp->name.c_str(), exp->type, exp->durable, exp->auto_delete, args_str.c_str());
bool ret = _sqlite_helper.exec(sql_str, nullptr, nullptr);
if(ret == false)
{
DLOG("插入数据库失败");
return false;
}
return true;
}
void remove(const std::string &name)
{
std::stringstream ss;
ss << "delete from exchange_table where name = ";
ss << "'" << name << "';";
_sqlite_helper.exec(ss.str(), nullptr, nullptr);
}
using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;
ExchangeMap recovery()
{
ExchangeMap result;
std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";
_sqlite_helper.exec(sql.c_str(), SelectCallBack, &result);
return result;
}
private:
static int SelectCallBack(void* args, int numcol, char** row, char** fields)
{
ExchangeMap* result = (ExchangeMap*)args;
auto exp = std::make_shared<Exchange>();
exp->name = row[0];
exp->type = (mymq::ExchangeType)std::stoi(row[1]);
exp->durable = (bool)std::stoi(row[2]);
exp->auto_delete = (bool)std::stoi(row[3]);
if(row[4])
{
exp->setArgs(row[4]);
}
result->insert(std::make_pair(exp->name, exp));
return 0;
}
private:
SqliteHelper _sqlite_helper;
};
// 3. 定义交换机在内存管理类
class ExchangeManager
{
public:
using ptr = std::shared_ptr<ExchangeManager>;
ExchangeManager(const std::string &dbfile)
:_mapper(dbfile)
{
_exchanges = _mapper.recovery();
}
// 声明交换机
bool declareExchange(const std::string &name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string> &args)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _exchanges.find(name);
if(it != _exchanges.end())
{
//如果交换机已经存在,那就直接返回,不需要重复新增
return true;
}
auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);
if(durable == true)
{
bool ret = _mapper.insert(exp);
if(ret == false)
{
return false;
}
}
_exchanges.insert(std::make_pair(name, exp));
return true;
}
// 删除交换机
void deleteExchange(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _exchanges.find(name);
if(it == _exchanges.end())
{
//如果交换机不存在,那就直接返回
return ;
}
if(it->second->durable == true)
_mapper.remove(name);
_exchanges.erase(name);
}
Exchange::ptr selectExchange(const std::string& name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _exchanges.find(name);
if(it == _exchanges.end())
{
//如果交换机不存在,那就直接返回
return nullptr;
}
return it->second;
}
// 判断交换机是否存在
bool exists(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _exchanges.find(name);
if(it == _exchanges.end())
{
//如果交换机不存在,那就直接返回
return false;
}
return true;
}
size_t size()
{
std::unique_lock<std::mutex> lock(_mutex);
return _exchanges.size();
}
// 清理所有交换机数据
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeTable();
_exchanges.clear();
}
private:
std::mutex _mutex;
ExchangeMapper _mapper;
std::unordered_map<std::string, Exchange::ptr> _exchanges;
};
}
队列数据管理
要管理的数据
- 队列名称:唯一的标识
- 持久化存储标志:决定了是否将队列信息持久化存储起来,决定了重启后,这个队列是否存在
- 是否独占标志:独占就指的是,只有当前客户端自己能够订阅队列消息
- 自动删除标志:当订阅了当前队列的所有客户端退出后,是否删除队列
要管理的操作
- 创建队列
- 删除队列
- 获取指定队列信息
- 获取队列数据
- 获取所有队列名称:当系统重启后,需要重新加载数据,加载历史消息(消息以队列为单元存储在文件中)而加载消息需要直到队列名称,因为后边消息存储的时候,存储文件以队列名称进行的取名
代码展示
#ifndef __M_QUEUE_H__
#define __M_QUEUE_H__
#include "../mqcommon/helper.hpp"
#include "../mqcommon/logger.hpp"
#include "../mqcommon/msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <memory>
#include <mutex>
namespace mymq
{
struct MsgQueue
{
using ptr = std::shared_ptr<MsgQueue>; // 放在这个位置合适吗?
std::string name;
bool durable;
bool exclusive;
bool auto_delete;
google::protobuf::Map<std::string, std::string> args;
MsgQueue()
{}
MsgQueue(const std::string qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map<std::string, std::string> &qargs)
:name(qname),
durable(qdurable),
exclusive(qexclusive),
auto_delete(qauto_delete),
args(qargs)
{}
void setArgs(const std::string& str_args)
{
std::vector<std::string> sub_args;
StrHelper::split(str_args, "&", sub_args);
for(auto sub_string : sub_args)
{
size_t pos = sub_string.find("=");
std::string key = sub_string.substr(0, pos);
std::string value = sub_string.substr(pos + 1);
args[key] = value;
}
}
std::string getArgs()
{
std::string result;
for(auto str = args.begin(); str != args.end(); str++)
{
result = str->first + "=" + str->second + result;
}
return result;
}
};
using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;
class MsgQueueMapper
{
public:
MsgQueueMapper(const std::string& dbfile)
:_sqlite_helper(dbfile)
{
std::string path = FileHelper::parentDirectory(dbfile);
bool ret = FileHelper::createDirectory(path);
if(ret == false)
{
ELOG("创建父目录失败: %s", strerror(errno));
}
_sqlite_helper.open(); //创建sqlite目录
createTable();
}
void createTable()
{
std::stringstream sql;
sql << "create table if not exists queue_table(";
sql << "name varchar(32) primary key, ";
sql << "durable int, ";
sql << "exclusive int, ";
sql << "auto_delete int, ";
sql << "args varchar(128));";
_sqlite_helper.exec(sql.str(), nullptr, nullptr);
}
void removeTable()
{
std::stringstream sql;
sql << "drop table if exists queue_table;";
_sqlite_helper.exec(sql.str(), nullptr, nullptr);
}
bool insert(MsgQueue::ptr& queue)
{
std::stringstream sql;
sql << "insert into queue_table values(";
sql << "'" << queue->name << "',";
sql << queue->durable << ", ";
sql << queue->exclusive << ", ";
sql << queue->auto_delete << ", ";
sql << "'" << queue->getArgs() << "');";
return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
}
void remove(const std::string& name)
{
std::stringstream sql;
sql << "delete from queue_table where name = ";
sql << "'" << name << "';";
_sqlite_helper.exec(sql.str(), nullptr, nullptr);
}
QueueMap recovery()
{
QueueMap result;
std::stringstream sql;
sql << "select name, durable, exclusive, auto_delete, args from queue_table;";
_sqlite_helper.exec(sql.str(), SelectCallBack, &result);
return result;
}
private:
static int SelectCallBack(void* args, int numcol, char** row, char** fields)
{
QueueMap* result = (QueueMap*)args;
MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
mqp->name = row[0];
mqp->durable = std::stoi(row[1]);
mqp->exclusive = std::stoi(row[2]);
mqp->auto_delete = std::stoi(row[3]);
if(row[4]) mqp->setArgs(row[4]);
result->insert(std::make_pair(mqp->name, mqp));
return 0;
}
private:
SqliteHelper _sqlite_helper;
};
class MsgQueueManager
{
public:
using ptr = std::shared_ptr<MsgQueueManager>;
MsgQueueManager(const std::string& dbfile)
:_mapper(dbfile)
{
_msg_queue = _mapper.recovery();
}
bool declareQueue(const std::string& name, bool durable, bool exclusive, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _msg_queue.find(name);
if(it != _msg_queue.end())
{
// 队列存在,直接返回true
return true;
}
MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
mqp->name = name;
mqp->durable = durable;
mqp->exclusive = exclusive;
mqp->auto_delete = auto_delete;
mqp->args = args;
if(durable ==true) //需要进行持久化存储
{
bool ret = _mapper.insert(mqp);
if(ret == false) return false;
}
_msg_queue.insert(make_pair(mqp->name, mqp));
return true;
}
void deleteQueue(const std::string& name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _msg_queue.find(name);
if(it == _msg_queue.end())
{
DLOG("未找到相关信息");
return ;
}
if(it->second->durable == true)
{
_mapper.remove(name);
}
_msg_queue.erase(name);
}
MsgQueue::ptr selectQueue(const std::string& name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _msg_queue.find(name);
if(it == _msg_queue.end())
{
DLOG("未找到相关信息");
return MsgQueue::ptr();
}
return it->second;
}
QueueMap AllQueues()
{
std::unique_lock<std::mutex> lock(_mutex);
return _msg_queue;
}
bool exists(const std::string& name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _msg_queue.find(name);
if(it == _msg_queue.end())
{
return false;
}
return true;
}
size_t size()
{
std::unique_lock<std::mutex> lock(_mutex);
return _msg_queue.size();
}
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_msg_queue.clear();
}
private:
std::mutex _mutex;
MsgQueueMapper _mapper;
QueueMap _msg_queue;
};
}
#endif
绑定数据管理模块
描述将哪个队列与哪个交换机绑定到了一起
管理的数据
- 交换机名称
- 队列名称
- binding_key:绑定密钥----描述在交换机的主题交换&直接交换的消息发布匹配规则
由数字, 字符,_,#, * 组成 binding_key, 例如:news.music.#
管理的操作
添加绑定
解除绑定
获取交换机相关的所有绑定信息
删除交换机的时候,要删除相关绑定信息 当消息发布到交换机中,交换机得通过这些信息来将消息发布到指定队列
获取队列相关的所有绑定信息
删除队列的时候,要删除相关的绑定信息
获取绑定信息数量
代码展示
#ifndef _M_BINDING_H_
#define _M_BINDING_H_
#include "../mqcommon/helper.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/logger.hpp"
#include <iostream>
#include <unordered_map>
#include <memory>
#include <mutex>
namespace mymq
{
struct Binding
{
using ptr = std::shared_ptr<Binding>;
std::string exchange_name;
std::string msgqueue_name;
std::string binding_key;
Binding() {}
Binding(const std::string &ename, const std::string &msgqname, const std::string &key)
: exchange_name(ename),
msgqueue_name(msgqname),
binding_key(key)
{
}
};
// 队列与绑定信息是一一对应的(因为是给某一个交换机去绑定队列,因此一个交换机可能会有多个队列的绑定信息)
// 因此先定义一个队列名,与绑定信息的映射信息,这个是为了方便通过队名查找绑定信息
using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
// 然后定义一个交换机名称与队列绑定信息之间的映射关系,这个map包含了所有的绑定信息,并且以交换机为单元进行区分
using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
// 采用上面两种结构,则删除交换机相关绑定信息的时候,不仅要删除交换机映射,还需要删除对应队列中的映射,否则对象得不到释放
class BindingMapper
{
public:
BindingMapper(const std::string& dbfile)
:_sqlite_helper(dbfile)
{
std::string path = FileHelper::parentDirectory(dbfile);
FileHelper::createDirectory(path);
_sqlite_helper.open();
createTable();
}
void createTable()
{
std::stringstream sql;
sql << "create table if not exists binding_table(";
sql << "exchange_name varchar(32), ";
sql << "msgqueue_name varchar(32), ";
sql << "binding_key varchar(128));";
assert(_sqlite_helper.exec(sql.str(), nullptr, nullptr));
}
void removeTable()
{
std::stringstream sql;
sql << "drop table if exists binding_table; ";
assert(_sqlite_helper.exec(sql.str(), nullptr, nullptr));
}
bool insert(Binding::ptr& binding)
{
std::stringstream sql;
sql << "insert into binding_table values(";
sql << "'" << binding->exchange_name <<"',";
sql << "'" << binding->msgqueue_name << "',";
sql << "'" << binding->binding_key << "');";
return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
}
bool remove(const std::string& ename, const std::string& qname)
{
std::stringstream sql;
sql << "delete from binding_table where ";
sql << "exchange_name = '" << ename << "' and ";
sql << "msgqueue_name = '" << qname << "';";
return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
}
void removeExchangeBindings(const std::string& ename)
{
std::stringstream sql;
sql << "delete from binding_table where ";
sql << "exchange_name = '" << ename << "';";
_sqlite_helper.exec(sql.str(), nullptr, nullptr);
}
void removeMsgQueueBindings(const std::string& qname)
{
std::stringstream sql;
sql << "delete from binding_table where ";
sql << "msgqueue_name = '" << qname << "';";
_sqlite_helper.exec(sql.str(), nullptr, nullptr);
}
BindingMap recovery()
{
BindingMap result;
std::string sql = "select exchange_name, msgqueue_name, binding_key from binding_table;";
_sqlite_helper.exec(sql, SelectCallBack, &result);
return result;
}
private:
static int SelectCallBack(void* args, int numcol, char** row, char** fileds)
{
BindingMap* result = (BindingMap*)args;
Binding::ptr bq = std::make_shared<Binding>();
bq->exchange_name = row[0];
bq->msgqueue_name = row[1];
bq->binding_key = row[2];
//为了防止 交换相关的绑定信息,因此不能直接创建队列映射,这样会覆盖历史数据
// 因此得先获取交换机对应的映射对象,往里边添加数据
// 但是,若这时候没有交换机对应的映射信息,因此这里的获取要使用引用(会保证不存在则自动创建)
MsgQueueBindingMap& qbm = (*result)[bq->exchange_name];
qbm.insert(std::make_pair(bq->msgqueue_name, bq));
return 0;
}
private:
SqliteHelper _sqlite_helper;
};
class BindingManager
{
public:
using ptr = std::shared_ptr<BindingManager>;
BindingManager(const std::string dbfile)
:_mapper(dbfile)
{
_bindmap = _mapper.recovery();
}
bool bind(const std::string& ename, const std::string& qname, const std::string& key ,bool durable)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _bindmap.find(ename);
if(it != _bindmap.end() && it->second.find(qname) != it->second.end())
{
return true;
}
//绑定是否是持久化
Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);
// 磁盘数据持久化
if(durable == true)
{
bool ret = _mapper.insert(bp);
if(ret == false)
{
//未添加成功
return false;
}
}
// 一个交换机对应着多条绑定信息,所以先确定交换机,再将相关绑定信息插进去
MsgQueueBindingMap& mqbm = _bindmap[ename];
mqbm.insert(std::make_pair(bp->msgqueue_name, bp));
return true;
}
void unBind(const std::string& ename, const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
auto itBindMap = _bindmap.find(ename);
if(itBindMap == _bindmap.end())
{
return;
}
auto itMsgQueueBindings = itBindMap->second.find(qname);
if(itMsgQueueBindings == itBindMap->second.end())
{
return ;
}
_mapper.remove(ename, qname);
_bindmap[ename].erase(qname);
}
void removeExchangeBindings(const std::string& ename)
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeExchangeBindings(ename);
_bindmap.erase(ename);
}
void removeMsgqueueBindings(const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeMsgQueueBindings(qname);
for(auto it = _bindmap.begin(); it != _bindmap.end(); it++)
{
it->second.erase(qname);
}
}
MsgQueueBindingMap getExchangeBindings(const std::string& ename)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _bindmap.find(ename);
if(it == _bindmap.end())
{
return MsgQueueBindingMap();
}
return it->second;
}
Binding::ptr getBinding(const std::string& ename, const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it_bindmap = _bindmap.find(ename);
if(it_bindmap == _bindmap.end())
{
return Binding::ptr();
}
auto it_MsgQueueBindings = it_bindmap->second.find(qname);
if(it_MsgQueueBindings == it_bindmap->second.end())
{
return Binding::ptr();
}
return it_MsgQueueBindings->second;
}
bool exists(const std::string& ename, const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it_bindmap = _bindmap.find(ename);
if(it_bindmap == _bindmap.end())
{
return false;
}
auto it_MsgQueueBindings = it_bindmap->second.find(qname);
if(it_MsgQueueBindings ==it_bindmap->second.end())
{
return false;
}
return true;
}
size_t size()
{
std::unique_lock<std::mutex> lock(_mutex);
size_t total_size = 0;
for(auto it = _bindmap.begin(); it != _bindmap.end(); it++)
{
total_size += it->second.size();
}
return total_size;
}
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeTable();
_bindmap.clear();
}
private:
std::mutex _mutex;
BindingMapper _mapper;
BindingMap _bindmap;
};
}
#endif
现在虚拟机模块的所有需要管理的数据全部都安排成功了,接下来是虚拟机的代码展示
虚拟机代码展示
#ifndef __M_HOST_H__
#define __M_HOST_H__
#include "mq_exchange.hpp"
#include "mq_queue.hpp"
#include "mq_binding.hpp"
#include "mq_message.hpp"
namespace mymq
{
class VirtualHost
{
public:
using ptr = std::shared_ptr<VirtualHost>;
VirtualHost(const std::string& hname, const std::string& basedir, const std::string& dbfile)
:_host_name(hname),
_emp(std::make_shared<ExchangeManager>(dbfile)),
_mqmp(std::make_shared<MsgQueueManager>(dbfile)),
_bmp(std::make_shared<BindingManager>(dbfile)),
_mmp(std::make_shared<MessageManager>(basedir))
{}
bool declareExchange(const std::string& name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args)
{
return _emp->declareExchange(name, type, durable, auto_delete, args);
}
void deleteExchange(const std::string& name)
{
//删除交换机的时候,需要将交换机相关的绑定信息也删除掉
_bmp->removeExchangeBindings(name);
return _emp->deleteExchange(name);
}
bool existsExchange(const std::string& name)
{
return _emp->exists(name);
}
Exchange::ptr selectExchange(const std::string& ename)
{
return _emp->selectExchange(ename);
}
bool declareQueue(const std::string qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map<std::string, std::string> args)
{
//初始化队列消息句柄(消息的存储管理)
//队列的创建
_mmp->initQueueMessage(qname);
return _mqmp->declareQueue(qname, qdurable, qexclusive, qauto_delete, args);
}
void deleteQueue(const std::string& qname)
{
//删除的时候队列相关的数据有两个:队列的消息,队列的绑定消息
//删除队列里的消息
_mmp->destroyQueueMessage(qname);
//删除队列的绑定消息
_bmp->removeMsgqueueBindings(qname);
//删除队列
_mqmp->deleteQueue(qname);
}
bool existsQueue(const std::string qname)
{
return _mqmp->exists(qname);
}
QueueMap allqueue()
{
return _mqmp->AllQueues();
}
bool bind(const std::string& ename, const std::string& qname, const std::string& key)
{
Exchange::ptr ep = _emp->selectExchange(ename);
if(ep.get() == nullptr)
{
DLOG("进行队列绑定失败,交换机%s不存在", ename.c_str());
return false;
}
MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
if(mqp.get() == nullptr)
{
DLOG("进行队列绑定失败,队列%s不存在", qname.c_str());
}
return _bmp->bind(ename, qname,key, ep->durable && mqp->durable);
}
void unBind(const std::string& ename, const std::string& qname)
{
return _bmp->unBind(ename, qname);
}
MsgQueueBindingMap exchangeBindings(const std::string& ename)
{
return _bmp->getExchangeBindings(ename);
}
bool existsBinding(const std::string& ename, const std::string& qname)
{
return _bmp->exists(ename, qname);
}
// 消息发布函数
bool basicPublish(const std::string& qname, BasicProperties* bp, const std::string& body)
{
MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
if(mqp.get() == nullptr)
{
DLOG("发布消息失败, 队列%s不存在", qname.c_str());
return false;
}
return _mmp->insert(qname, bp, body, mqp->durable);
}
//获取消息
MessagePtr basicConsume(const std::string& name)
{
return _mmp->front(name);
}
// 消息应答
void basicAck(const std::string& qname, const std::string& msgid)
{
return _mmp->ack(qname, msgid);
}
void clear()
{
_emp->clear();
_mqmp->clear();
_bmp->clear();
_mmp->clear();
}
private:
std::string _host_name;
ExchangeManager::ptr _emp;
MsgQueueManager::ptr _mqmp;
BindingManager::ptr _bmp;
MessageManager::ptr _mmp;
};
}
#endif
版权归原作者 疏 石 兰 兮 所有, 如有侵权,请联系我们删除。