文章目录
1. 项目介绍
什么是RabbitMQ?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
主要用途:
- 异步处理无需即时返回且耗时的操作,提高系统吞吐量。
- 解耦生产者和消费者,提高系统灵活性。
- 实现分布式系统的集成。
RabbitMQ实际上是实现了一个基于AMQP的生产者消费者模型。生产者消费者模型是后端开发的常用编程方式,它有诸多好处:
- 解耦合。
- 并发处理。
- 支持忙闲不均。
- 削谷填峰等。
在实际的后端开发中,尤其是分布式系统里,跨主机之间使用生产者消费者模型,也是很普遍的需求。因此我们以AMQP为核心封装一个独立的服务器程序。这样的服务程序我们就称为消息队列(Message Queue)。市面上成熟的消息队列有很多:
- Rabbit
- Kafka
- RocketMQ
- ActiveMQ等
2. 开发环境
- Linux(ubuntu-22.04)
- VSCode
- g++/gdb
- Makefile
3. 技术选型
- 主开发语言:C++
- 序列化框架:ProtoBuf二进制序列化
- 网络通信:自定义应用层协议 + muduo库(对长TCP长连接的封装,并使用epoll的事件驱动模式实现高并发服务器与客户端)
- 数据管理数据库:SQLite3
- 单元测试框架:Gtest
3.1ProtoBuf使用介绍:
3.2 Muduo库
Muduo是由陈硕大佬开发,基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。它是一款基于主从Reactor模型的网络库,使用的线程模型是one loop per thread,所谓one loop per thread指的是:
- 一个线程只能有一个事件循环(EventLoop),用于响应计时器和IO事件。
- 一个文件描述符只能由一个线程进行读写,也就是说一个TCP连接必须归属于某个EventLoop管理。
3.3 SQLite3
什么是SQLIte?
SQLite是一个进程内的轻量级数据库,它实现了自给自足的、无服务器的、零配置的、事务性的SQL数据库引擎。我们不需要再系统中配置。SQLite引擎不是一个独立的进程。可以按应用程序需求进行静态或动态链接。SQLite直接访问其存储文件。
为什么要用SQLite?
- 不需要一个单独的服务器进程或操作的系统(无服务器的)。
- SQLite不需要配置。
- 一个完整的SQLite数据库存储在一个单一的跨平台的磁盘文件。
- SQLite非常小,是轻量级的。完全配置时小于400KiB,省略可选功能时小于250KiB。
- SQLite自给自足,不需要任何外部的依赖
- SQLite事务完全兼容ACID,允许从多个进程或线程安全访问。
- SQLite支持SQL92标准的大多数查询语言功能。
- SQLite使用ANSI-C编写,并提供了简单和易于使用的API
- SQLite可在UNIX(Linux,Mac OS-X,Android,iOS)和Windows(Win32,WinCE,WinRT)中运行。
3.4 Gtest
什么是Gtest
Gtest是一个跨平台的C++单元测试框架,由google公司发布。gtest是为了在不同平台上为编写C++单元测试而生成的。提供了丰富的断言、致命和非致命、参数化等等。
4. 需求分析
4.1 核心概念
- 生产者(Producer)
- 消费者(Consumer)
- 中间人(Broker)
- 发布(Publish)
- 订阅(Subscribe)
- 一个生产者,一个消费者
- N个生产者,N个消费者 其中Broker Server为核心部分,负责消息的存储和转发。 而在AMQP模型中,也就是消息中间件服务器Broker中,又存在以下概念:
- 虚拟机(VirtualHost):类似于MySQl的“database”,是一个逻辑上的集合。一个BrokerServer上可以存在多个VirtualHost。
- 交换机(Exchange):生产者先把消息发送到Broker中的Exchange上,再根据不同的匹配规则,把消息转发给不同的Queue。
- 队列(Queue):真正用来存储消息的部分,每个消费者自己决定从哪个Queue上读取消息。
- 绑定(Binding):Exchange和Queue之间的关联关系,两者可理解为“多对多”关系,使用一个关联表就可以把这两个概念关联起来。(一个Exchange可以绑定多个Queue,一个Queue也可以被多个Exchange绑定)
- 消息(Message):传递的内容。
上述数据结构,既需要再内存中存储,也需要在硬盘中存储
- 内存存储:方便使用
- 硬盘存储:重启数据不丢失
4.2 核心API
对于Broker来说,要通过以下核心API实现消息队列的基本功能。
- 创建交换机(DeclareExchange)
- 销毁交换机(DeleteExchange)
- 创建队列(DeclareQueue)
- 销毁队列(DeleteQueue)
- 创建绑定(Bind)
- 解除绑定(UnBind)
- 发布消息(BasicPublish)
- 订阅消息(BasicConsume)
- 确认消息(BasicAck)
- 取消订阅(BasicCancel)
而Producer和Consumer则通过网络远程调用这些API,实现生产者消费者模型。
4.3 交换机类型
对于RAbbitMQ,主要支持四种交换机类型:
- Direct
- Fanout
- Topic
- Header
其中Header比较复杂且少见。常用的是前三种类型,项目中也主要实现这三种:
- Direct:生产者发送消息时,直接指定该交换机绑定的队列名。
- Fanout:生产者发送的消息会被复制到该交换机所有队列中,也就是广播。
- Topic:队列与交换机绑定时,指定一个字符串bindingKey,发送的消息里有指定字符串routingKey。当routingKey与bindingKey满足一定的匹配条件时,把消息投递到对应队列。
4.4 持久化
Exchange、Queue、Binding、Message等数据都有持久化需求,当程序重启 / 主机重启,保证上述内容不丢失。
4.5 网络通信
生产者和消费者都是客户端程序,Broker是服务端程序,通过网络进行通信。
在通信过程中,客户端要提供对应的API实现对服务器的操作。
- 创建Connection
- 关闭Connection
- 创建Channel(OpenChannel)
- 关闭Channel(CloseChannel)
- 创建交换机(DeclareExchange)
- 销毁交换机(DeleteExchange)
- 创建队列(DeclareQueue)
- 销毁队列(DeleteQueue)
- 创建绑定(QueueBind)
- 解除绑定(QueueUnBind)
- 发布消息(BasicPublish)
- 订阅消息(BasicConsume)
- 确认消息(BasicAck)
- 取消订阅(BasicCancel)
在Broker的基础上,还要增加Connection操作和Channel操作:
- Connection对应一个TCP连接
- Channel是Connection中的逻辑通道
一个Connection中可以包含多个Channel(一个Connection能被多个Channel使用)。Channel和Channel之间数据独立,不会互相干扰。这样能更好地复用TCP连接,达到长连接的效果,避免频繁创建关闭TCP连接。
4.6 消息应答
被消费的消息,需要消费者客户端进行应答。应答模式分为两种:
- 自动应答:消费者只要消费了消息,就算应答完毕。Broker直接删除这个消息。
- 手动应答:消费者手动调用应答接口,Broker收到应答请求后,才真正删除这个消息(未应答时,消息位于待确认队列,没有被真正删除)。
手动应答目的是为了保证消费者处理成功了,在一些对数据可靠性要求较高的场景比较常见。
5. 模块划分
5.1 服务端模块
5.1.1 持久化数据管理中心模块
在数据管理模块中管理交换机、队列、队列绑定、消息等数据。
1.交换机管理:
- 管理信息:名称、类型(Direct等)、是否持久化标志、是否(无人使用时)自动删除标志、其它参数(作为扩展)。
- 管理操作:恢复历史数据、声明、删除、获取、判断是否存在。
2. 队列管理:
- 管理信息:名称、是否持久化标志、是否独占标志、是否(无人使用时)自动删除标志、其它参数。
- 管理操作:恢复历史数据、声明、删除、获取、判断是否存在。
3. 绑定管理:
- 管理信息:交换机名称、队列名称、绑定主题(bindingKey)。
- 管理操作:恢复历史数据、绑定、解绑、解除交换机关联绑定信息、解除队列关联绑定信息、获取交换机关联绑定信息。、获取指定绑定信息等。
4. 消息管理:
- 管理信息:
- 属性:消息ID、路由主题(routingKey)、持久化模式标志
- 消息内容
- 有效标志(配合持久化需要)
- 持久化位置(消息在文件中的偏移量)
- 持久化消息长度(该消息存储在文件中的长度)
- 管理操作:恢复历史消息、向指定队列新增消息、获取指定队列队首消息、确认移除消息。
这几个概念数据都要在内存和硬盘中存储。
- 以内存存储为主,保证快速查找信息进行处理。
- 以硬盘存储为辅,保证服务器重启后,之前的信息可以正常保持。
5.1.2 虚拟机管理模块
因为交换机、队列、绑定、都是以虚拟机为单元整体进行操作。因此虚拟机是对以上数据管理模块的整合模块。
1. 虚拟机管理信息:
- 交换机数据管理模块句柄
- 队列数据管理模块句柄
- 绑定数据管理模块句柄
- 消息数据管理模块句柄
2. 虚拟机对外操作:
- 提供虚拟机内交换机声明,交换机删除操作(删除时同时需要删除交换机关联的绑定信息)
- 提供虚拟机队列声明、队列删除操作(删除时同时需要删除队列关联的绑定信息以及消息管理)
- 提供虚拟机内交换机 - 队列绑定、解绑操作
- 获取交换机相关绑定信息
- 发布消息、消费消息、确认消息等
3. 虚拟机管理操作:
- 创建虚拟机
- 查询虚拟机
- 删除虚拟机
5.1.3 交换路由模块
当客户端发送一条消息到交换机后,这条消息应该转发给该交换机绑定的哪些队列中?
由交换路由模块决定的。绑定信息中有bindingKey,而每条发布的消息中有routingKey。能否入队取决于两个要素:交换机类型和Key
- 直接交换(Direct):将消息入队到绑定信息中bindingKey与消息routingKey一致的队列中
- 广播交换(Fanout):将消息入队到该交换机的所有绑定队列中
- 主题交换(Topic):将消息入队到绑定信息中bindingKey与消息routingKey匹配成功的队列中
bindingKey
由数字字母下划线构成,并使用 . 分成若干部分。
如: news.music.# 用于表示交换机绑定的当前队列是一个用于发布音乐新闻的队列。
支持 ***** 和 # 两种通配符,但 *** #** 只能作为 . 切分出来的单独部分,不能和其他数字字母混用。如:
- __a.*.b__是合法的,而 *a.a.b 是不合法的
- ***** 可以匹配任意一个单词(注意不是字母)
- # 可以匹配任意零个或多个单词(注意不是字母)
- 注意: ***** 和 # 不能相邻,因为 # 完全可以替代 ***** 的功能
routingKey
由数字字母下划线构成,并且可以使用 . 划分成若干部分。如:
- news.music.pop ,表示当前发布的消息是一个流行音乐的新闻。
5.1.4 消费者管理模块
消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅一个队列的消息,当队列中有消息后,会将消息轮询推送给订阅该队列的消费者(负载均衡)。
因此操作流程为:从队列关联的消息管理中取出消息,从队列关联的消费者中取出一个消费者,将消息推送给消费者。
- 消费者信息:
- 消费者标识tag
- 订阅队列名称
- 自动应答标志(决定一条消息推送给消费者后,是否需要等待收到确认后再删除消息)
- 消息处理回调函数指针(一个消息发布后被Push到线程池,调用传入Push的事件处理函数,函数内部选择队列关联的消息和消费者,接着调用消费者的消息处理回调函数将消息发送给消费者客户端)
void(const std::string &tag,const BasicProperties& bp,const std::string &body)
- 消费者管理:添加、删除、获取指定队列消费者,移除队列所有消费者等操作。
5.1.5 信道管理模块
在AMQP模型中,除了通信连接Connection概念外,还有一个Channel概念。Channel是针对Connection连接的一个更细粒度的通信信道,多个Channel可以使用同一个Connection进行通信,同时一个Connection之间的Channel之间互相独立。
信道模块是再次将上述模块进行整合提供服务的模块。
- 管理信息:
- 信道ID
- 信道关联的消费者 / 生产者
- 信道关联的连接
- 信道关联的虚拟机
- 工作线程池(一条消息被发布到队列后,需要将消息推送给订阅了该队列的消费者,该工作由线程池完成)
- 管理操作:
- 交换机的声明 / 删除
- 队列的声明 / 删除
- 交换机 - 队列的绑定 / 解绑
- 消息的订阅 / 取消订阅
- 消息的发布 / 确认消息
5.1.6 连接管理模块
本项目的服务器是通过muduo库实现底层通信的,muduo库并不能提供我们所需的所有操作,我们需要连接管理模块实现对muduo库的二次封装(同时对信道管理模块的封装),以完成我们的需求。
- 管理信息:连接关联的信道管理句柄、连接关联的muduo库Connection、信道管理所需的模块句柄。
- 管理操作:新增、删除、获取连接,打开 / 关闭信道
5.1.7 Broker服务器管理模块
综合以上所有模块,搭建网络通信服务器,实现与客户端的网络通信,识别客户端的请求并提供请求处理服务。
- 管理信息:虚拟机、消费者、连接管理句柄,工作线程池句柄,muduo库通信所需元素。
- 提供服务:综合打开 / 关闭信道、消息订阅 / 取消订阅等请求处理接口。
5.2 客户端模块
5.2.1 消费者管理
消费者在客户端存在感比较薄弱,在用户使用角度中,只要创建一个信道,就可以通过信道完成所有操作。对于消费者的感官更多的是在订阅时传入了一个消费者标识。尤其是本项目的简单实现是一个信道只能订阅一个队列,也就是说一个信道只能创建一个消费者,一一对应更弱化了消费者的存在。
- 消费者信息:消费者标识、订阅队列名称、自动应答标志、消息处理回调函数
5.2.2 信道请求模块
与服务端信道类似,客户端也有Channel的概念。
- 信道管理信息:
- 信道ID
- 信道关联的连接
- 信道关联的消费者
- 请求对应的消息响应队列(这里使用hash表,以快速查找指定相应)
- 互斥锁&条件变量(大部分的请求都是阻塞操作,发送请求后需要等到响应后才能继续。但muduo库的通信是异步的,因此需要我们在收到响应后,通过判断是否是等待的指定响应来进行同步)
信道管理操作:
- 信道的创建 / 删除
- 交换机的声明 / 删除
- 队列的声明 / 删除
- 交换机 - 队列的绑定 / 解绑
- 添加 / 取消订阅
- 消息的发布 / 确认等
5.2.3 通信连接模块
向用户提供一个实现网络通信的Connection对象,内部可创建更细粒化的Channel对象与服务器通信。
- 管理信息:
- 连接关联的实际用于通信的muduo::net::Connection连接对象
- 连接关联的信道管理句柄(实现信道的增删查)
- 连接关联的Event Loop异步循环工作线程
- 异步工作线程池(对服务器发来的消息进行处理)
- 操作管理:
- 管道的创建 / 删除
5.3 项目模块关系图
6. 项目效果简单演示
- 启动服务器
- 发布客户端发布自定义消息(字符串或任务,这里简单起见使用字符串)
- 消费者客户端进行消费(命令行指定要订阅的队列)
7. 总结
本项目模拟RabbitMQ实现简化版的消息队列组件,内部实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布、订阅以及消息推送功能。
本篇博客,简单搭建了项目总体框架,难点在于要理清各数据结构间的关系以及回调函数较为复杂。
项目链接:项目源码
版权归原作者 风间琉璃大师 所有, 如有侵权,请联系我们删除。