前言:本项目是仿照RabbitMQ并基于SpringBoot + Mybatis + SQLite3实现的消息队列,该项目实现了MQ的核心功能:生产者、消费者、中间人、发布、订阅等。
源码链接:仿Rabbit MQ实现消息队列
一、核心概念
关于消息队列,有几个重要的核心概念:
- 生产者(Producer) :负责将应用程序产生的数据转换成消息,并将这些消息推送到消息队列服务器上,以便消费者(Consumer)可以接收并处理这些消息。
- 消费者(Consumer):它的主要职责是监听特定的队列或主题,并对到达的消息执行必要的业务逻辑。
- 中间人(Broker):作为生产者(Producer)和消费者(Consumer)之间的中介,负责管理和协调消息的传递过程。
- 发布(Publish):生产者向中间人投递消息的过程。
- 订阅(Subscribe):哪些消费者要从这个中间人获取数据,这个注册的过程称为订阅。
在中间人(Broker)模块,又有以下几个概念:
虚拟机 (VirtualHost):类似于 MySQL 的 "database", 是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue。
队列 (Queue): 真正⽤来存储消息的部分,每个消费者决定⾃⼰从哪个 Queue 上读取消息。
绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多" 关系,使⽤⼀个关联表就可以把这两个概念联系起来。
二、模块划分
明确需要做的工作:
- 实现生产者、消费者、Broker Server这三个部分。
- 针对生产者、消费者,主要实现的是客户端和服务器的网络通信部分。
- 给客户端提供一组 API,让客户端的业务代码来调用,通过网络通信的方式远程调用Broker Server上的方法。
- 实现Broker Server 内部的一些基本概念和API(虚拟主机、交换机、队列、绑定、消息)。
- 持久化(考虑到 SQLite 相比 MySQL 来说比较轻量,因此存储交换机、队列等这些实体用 SQLite,消息的存储使用文件进行管理)。
针对于上述所需要实现的模块,进行划分:
三、创建核心实体类
3.1 创建交换机(Exchange)
nametypedurableautoDeletearguments交换机身份标识交换机类型是否持久化是否自动删除
额外参数选项
@Data
public class Exchange {
//交换机的身份标识(唯一)
private String name;
//交换机类型 direct fanout topic
private ExchangeType type = ExchangeType.DIRECT;
//表示该交换机是否要持久化存储. true 表示需要持久化. false 表示不需要持久化
private boolean durable = false;
//如果当前交换机,无客户端使用,就自动删除
private boolean autoDelete = false;
//表示创建交换机时指定的一些额外的参数选项
private Map<String,Object> arguments = new HashMap<>();
}
此处省略 arguments 存储数据库时的Json转换,只需要使用 ObjectMapper即可实现。关于交换机的类型,此次主要实现了以下三种:DIRECT、FANOUT、TOPIC。并使用枚举类定义:
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
private ExchangeType(int type){
this.type = type;
}
public int getType() {
return type;
}
}
3.2 创建队列实体类(MSGQueue)
namedurableexclusiveautoDeleteargumentsconsumerEnvListconsumerSeq队列标识是否持久化是否独占是否自动删除额外参数选项当前订阅的消费者列表记录当前取到第几个消费者
@Data
public class MSGQueue {
//表示队列的身份标识
private String name;
//表示队列是否持久化 true:需要持久化 false:不需要持久化
private boolean durable = false;
//表示是否独占,true:独占 false:都可以使用
private boolean exclusive = false;
//表示无客户端使用是,是否自动删除
private boolean autoDelete = false;
//表示扩展参数
private Map<String,Object> arguments = new HashMap<>();
//当前队列都有哪些消费者订阅了.
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//记录当前取到的第几个消费者,方便实现轮询策略
private AtomicInteger consumerSeq = new AtomicInteger(0);
}
3.3 创建绑定实体类(Binding)
exchangeNamequeueNamebindingKey交换机名字队列名字绑定(和 routingKey匹配)
@Data
public class Binding {
// 交换机名字
private String exchangeName;
//队列名字
private String queueName;
// 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配
private String bindingKey;
}
3.4 创建消息实体类(Message)
消息存储为二进制形式,因此对消息的存储需要进行序列化处理,所以 Message 类要实现Serializable 接口。
basicProperties bodyoffsetBegoffsetEndisValid消息的属性消息的正文消息开头在文件中的偏移量消息末尾在文件中的偏移量是否有效
@Data
public class Message implements Serializable {
private BasicProperties basicProperties = new BasicProperties();
private byte[] body;
//辅助属性,后续消息要存储在文件中
//一个文件存储很多消息 [offsetBeg, offsetEnd)
private transient long offsetBeg = 0;//消息数据的开头距离文件开头的位置偏移(字节)
private transient long offsetEnd = 0;//消息数据的结尾距离文件开头的位置偏移(字节)
private byte isValid = 0x1;//表示该消息在文件中是否是有效的消息,逻辑删除, 0x1:有效 0x0:无效
//创建一个工厂方法,让工厂方法去封装一下创建 Message 对象的过程
//这个方法创建的 Message 会自动生成一个唯一的 MessageId
public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){
Message message = new Message();
if(basicProperties != null){
message.setBasicProperties(basicProperties);
}
//此处生成的 MessageId 以 "M-" 为前缀, 方便区分
message.setMessageId("M-" + UUID.randomUUID());
message.setRoutingKey(routingKey);
message.body = body;
return message;
}
public String getMessageId(){
return basicProperties.getMessageId();
}
public void setMessageId(String messageId){
basicProperties.setMessageId(messageId);
}
public String getRoutingKey(){
return basicProperties.getRoutingKey();
}
public void setRoutingKey(String routingKey){
basicProperties.setRoutingKey(routingKey);
}
public int getDeliverMode(){
return basicProperties.getDeliverMode();
}
public void setDeliverMode(int deliverMode){
basicProperties.setDeliverMode(deliverMode);
}
}
对于消息的属性,使用一个实体类去表示:
messageIdroutingKeydeliverMode消息的唯一身份标识和(bindingKey匹配)是否持久化
@Data
public class BasicProperties implements Serializable {
//消息的唯一身份标识,使用 UUID 作为 messageId
private String messageId;
/**
* 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名
* 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用)
* 如果当前的交换机类型是 TOPIC, 此时 routingKey 就表示和 bindingKey 进行匹配
*/
private String routingKey;
//表示消息是否持久化, 1: 不持久化; 2: 持久化
private int deliverMode = 1;
}
四、数据库操作
对于 Exchange, MSGQueue, Binding, 需要使⽤数据库进⾏持久化保存,这里使用 SQLite 进行存储,直接去 Maven 中央仓库复制依赖到项目的POM文件,再配置数据库文件即可。
SQLite 只是把数据单纯的存储到⼀个⽂件中,因此在这里设定存储到 “./data/meta.db”文件。
实现创建表以及数据库操作(这里不再展示具体的SQL语句)
@Mapper
public interface MetaMapper {
//三个核心建表方法
void createExchangeTable();
void createQueueTable();
void createBindingTable();
//针对上述三个基本概念进行插入和删除
void insertExchange(Exchange exchange);
List<Exchange> selectAllExchanges();
void deleteExchange(@Param("exchangeName") String exchangeName);
void insertQueue(MSGQueue queue);
List<MSGQueue> selectAllQueues();
void deleteQueue(@Param("queueName") String queueName);
void insertBinding(Binding binding);
List<Binding> selectAllBindings();
void deleteBinding(Binding binding);
}
五、封装对数据库的操作
创建 DataBaseManager 类,通过这个类来封装针对数据库的操作。
public class DataBaseManager {
//数据库初始化
public void init(){...}
//删除数据库
public void deleteDB(){...}
//判断数据库是否存在
private boolean checkDBExists(){...}
//建表操作
private void createTable(){...}
//创建默认数据,RabbitMQ 里默认也带有一个 匿名 的交换机,类型是 DIRECT
private void createDefaultData(){...}
//交换机的数据库操作:增删查
public void insertExchange(Exchange exchange){...}
public void deleteExchange(String exchangeName){...}
public List<Exchange> selectAllExchanges(){...}
//队列的数据库操作:增删查
public void insertQueue(MSGQueue queue){...}
public void deleteQueue(String queueName){...}
public List<MSGQueue> selectAllQueues(){...}
//Binding的数据库操作:增删查
public void insertBinding(Binding binding){...}
public void deleteBinding(Binding binding){...}
public List<Binding> selectAllBindings(){...}
}
六、消息的存储设计
6.1 设计思路及设定
设计思路:消息需要在硬盘上存储,考虑到对于消息的操作并不需要复杂的增删改查,而⽂件的操作效率比数据库会高很多,因此这里设定,用文件来管理消息。
同时,因为队列用来存储消息,因此这里约定:
- 给每个队列分配⼀个目录,目录的名字为 data + 队列名,形如 :**./data/queueName**
- 该目录中包含两个固定名字的⽂件:
queue_data.txt 消息数据⽂件, 用来保存消息内容。
queue_stat.txt 消息统计⽂件, 用来保存消息统计信息。(消息总个数/t有效消息数),这样设计主要时考虑到后续进行垃圾回收,方便判断进行GC的时机。
6.2 设定存储消息的格式
消息数据文件以二进制的形式存储在 queue_data.txt 文件中 ,为了方便进行消息的读取,这里进行这样的设定:
每个消息分成两个部分:前四个字节, 表示 Message 对象的长度(字节数),后面若干字节,表示 Message 内容,消息和消息之间首尾相连。同时每个 Message 基于 Java 标准库进行序列化。Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置。
6.3 实现消息序列化工具
对于实现消息序列化,首先 Message 实体类要实现 Serializable 接口,接下来需要借助ByteArrayOutputStream 和 ObjectOutputStream 实现消息的序列化和反序列化:
public class BinaryTool {
/**
* 把对象序列化成一个字节数组
* @param object
* @return
*/
public static byte[] toBytes(Object object) throws IOException {
// 这个流对象相当于一个变长的字节数组,
// 可以把 object 对象序列化的数据逐步写入导 byteArrayOutputStream 中,
// 再统一转成 byte[]
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
// 此处的 writeObject 就会把 object 进行序列化,生成的字节数据就会写入到 objectOutputStream
// objectOutputStream 又关联到了 byteArrayOutputStream,最终结果写入到 byteArrayOutputStream 里
objectOutputStream.writeObject(object);
}
return byteArrayOutputStream.toByteArray();
}
}
/**
* 把一个字节数组反序列化成一个对象
* @param data
* @return
*/
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){
try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
object = objectInputStream.readObject();
}
}
return object;
}
}
七、实现文件管理消息
创建 MessageFileManager 类,这个类主要去实现消息统计文件的读写、消息数据文件的读写、创建存储消息的文件、消息的垃圾回收等等。
下面这段代码是 MessageFileManager 的基础代码,实现文件的读写和垃圾回收都需要调用下面的方法:
public class MessageFileManager {
//定义一个内部类,来表示该队列的统计信息
static public class Stat{
public int totalCount; // 总消息数量
public int validCount; // 有效消息数量
}
public void init(){
}
//约定消息文件所在的目录和文件名
// 所在路径及文件名: ./data/队列名/queue_data.txt(消息数据文件)
// ./data/队列名/queue_stat.txt(消息统计文件)
/**
* 这个方法用来获取指定队列对应的消息文件所在路径
* @param queueName
* @return
*/
private String getQueueDir(String queueName){
return "./data/" + queueName;
}
/**
* 这个方法用来获取该队列的消息数据文件路径
* @param queueName
* @return
*/
private String getQueueDataPath(String queueName){
return getQueueDir(queueName) + "/queue_data.txt";
}
/**
* 这个方法用来获取指定队列的消息统计文件的路径
* @param queueName
* @return
*/
private String getQueueStatPath(String queueName){
return getQueueDir(queueName) + "/queue_stat.txt";
}
7.1 读取消息统计文件
private Stat readStat(String queueName){
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
7.2 写消息统计文件
private void writeStat(String queueName, Stat stat){
// 使用 PrintWriter 写文件
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
7.3 创建队列对应的文件和目录
public void createQueueFiles(String queueName) throws IOException {
//1.创建队列对应的消息目录
File baseDir = new File(getQueueDir(queueName));
if(!baseDir.exists()){
boolean success = baseDir.mkdirs();
if(!success){
throw new IOException("创建目录失败! baseDir :" + baseDir.getAbsolutePath());
}
}
//2.创建消息数据文件
File queueDataFile = new File(getQueueDataPath(queueName));
if(!queueDataFile.exists()){
boolean success = queueDataFile.createNewFile();
if(!success){
throw new IOException("创建消息数据文件失败! queueDataFile: " + queueDataFile.getAbsolutePath());
}
}
//3.创建消息统计文件
File queueStatFile = new File(getQueueStatPath(queueName));
if(!queueStatFile.exists()){
boolean success = queueStatFile.createNewFile();
if(!success){
throw new IOException("创建消息统计文件失败! queueStatFile: " + queueStatFile.getAbsolutePath());
}
}
//4.给消息统计文件设置初始值
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
writeStat(queueName,stat);
}
7.4 删除队列对应的文件和目录
public void destroyQueueFiles(String queueName) throws IOException {
//先删除文件,再删除目录
File queueDataFile = new File(getQueueDataPath(queueName));
boolean success1 = queueDataFile.delete();
File queueStatFile = new File(getQueueStatPath(queueName));
boolean success2 = queueStatFile.delete();
File baseDir = new File(getQueueDir(queueName));
boolean success3 = baseDir.delete();
if(!success1 || !success2 || !success3){
//删除失败
throw new IOException("删除队列目录和消息文件失败! baseDir: " + baseDir.getAbsolutePath());
}
}
7.5 判断队列的目录和消息文件是否存在
public boolean checkFileExists(String queueName){
//判断队列的 消息数据文件 和 消息统计文件 是否都存在
File queueDataFile = new File(getQueueDataPath(queueName));
File queueStatFile = new File(getQueueStatPath(queueName));
if(queueDataFile.exists() && queueStatFile.exists()){
return true;
}
return false;
}
7.6 新的消息写入到文件中
步骤:
- 检查要写入的队列对应的文件是否存在
- 对 Message 对象进行序列化
- 获取当前消息数据文件的长度,由此来设置当前要写入的消息的 offsetBeg 和 offsetEnd。
offsetBeg = 消息数据文件长度 + 4
offsetEnd = 消息数据文件长度 + 4 + 该消息序列化后的 byte 数组的长度
- 写入消息数据文件,更新消息统计文件
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
// 检查当前要写入的队列对应的文件是否存在
if(!checkFileExists(queue.getName())){
throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName: " + queue.getName());
}
// 把 Message 对象进行序列化,转成二进制的字节数组
byte[] messageBinary = BinaryTool.toBytes(message);
// 避免出现线程安全问题,即多个消息同时都往一个消息队列里面写消息
synchronized (queue){
// 获取到队列数据文件的长度,计算出该 Message 对象的 offsetBeg 和 offsetEnd
// 把新的 Message 数据写入到数据文件的末尾, 此时 Message 对象的 offsetBeg, 就是当前文件长度 + 4
// offsetEnd 就是当前文件长度 + 4 + message长度
File queueDateFile = new File(getQueueDataPath(queue.getName()));
message.setOffsetBeg(queueDateFile.length() + 4);
message.setOffsetEnd(queueDateFile.length() + 4 + messageBinary.length);
// 写入消息数据文件, 此处是追加写
try (OutputStream outputStream = new FileOutputStream(queueDateFile, true)){
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
// 先写消息长度,占据 4 个字节
dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体
dataOutputStream.write(messageBinary);
}
}
// 更新消息统计文件
Stat stat = readStat(queue.getName());
stat.totalCount += 1;
stat.validCount += 1;
writeStat(queue.getName(),stat);
}
}
7.7 删除队列对应的消息数据文件中的消息
这里的删除采用逻辑删除,即把 Message 对象从文件中读取出来之后,把 valid 属性设置成 0 ,再重新写入并更新消息统计文件。
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
synchronized (queue){
try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){
// 读取对应的 Message 数据.
byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);
// 转换为 Message 对象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
diskMessage.setIsValid((byte) 0x0);
// 重新写入文件
byte[] bufferDest = BinaryTool.toBytes(diskMessage);
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);
}
// 更新统计文件
Stat stat = readStat(queue.getName());
if(stat.validCount > 0){
stat.validCount -= 1;
}
writeStat(queue.getName(),stat);
}
}
7.8 从文件中读取消息到内存中
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
LinkedList<Message> messages = new LinkedList<>();
long currentOffset = 0; // 使用这个变量记录光标的位置
try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
try (DataInputStream dataInputStream = new DataInputStream(inputStream)){
while (true){
//读取一条消息长度
int messageSize = dataInputStream.readInt();
// 按照长度读取消息内容
byte[] buffer = new byte[messageSize];
int actualSize = dataInputStream.read(buffer);
if(messageSize != actualSize){
//不匹配说明文件有问题
throw new MqException("[MessageFileManager] 文件格式错误! queueName: " + queueName);
}
//反序列化
Message message = (Message) BinaryTool.fromBytes(buffer);
//判断是否是无效数据
if (message.getIsValid() != 0x1){
currentOffset += (4 + messageSize);
continue;
}
//有效数据,加入到链表中
message.setOffsetBeg(currentOffset + 4);
message.setOffsetEnd(currentOffset + 4 + messageSize);
currentOffset += (4 + messageSize);
messages.add(message);
}
}catch (EOFException e){
System.out.println("[MessageFileManager] 恢复 Message 数据完成!");
}
return messages;
}
}
7.9 判断垃圾回收时机
这里的数字是拍脑门写的,当消息总数大于 2000 并且 消息的有效个数小于 50% 时,进行垃圾回收。
public boolean checkGC(String queueName){
// 读取消息统计文件的数据
Stat stat = readStat(queueName);
if(stat.totalCount > 2000 && (double)stat.validCount / stat.totalCount < 0.5){
return true;
}
return false;
}
7.10 获取新消息数据文件的路径
public String getQueueDataNewPath(String queueName){
return getQueueDir(queueName) + "/queue_data_new.txt";
}
7.11 消息的垃圾回收
这里我采用了复制算法进行垃圾回收,具体实现步骤:
- 创建一个新的文件,命名为 queue_data_new.txt
- 把之前消息数据文件的有效消息读取并写到新的文件中
- 删除旧的消息数据文件,进行文件重命名,同时记录消息统计文件
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
synchronized (queue){
long gcBeg = System.currentTimeMillis();
//创建一个新的文件
File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
if(queueDataNewFile.exists()){
throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new.txt 已经存在! queueName: " + queue.getName());
}
boolean success = queueDataNewFile.createNewFile();
if(!success){
throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile: " + queueDataNewFile.getAbsolutePath());
}
// 从旧的消息文件中读取所有的有效数据文件
LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
// 把有效的消息全部写入到新的文件中
try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)){
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
for (Message message : messages){
byte[] buffer = BinaryTool.toBytes(message);
//先写四个字节消息的长度
dataOutputStream.writeInt(buffer.length);
// 再写消息的内容
dataOutputStream.write(buffer);
}
}
}
// 删除旧的数据文件,进行文件重命名
File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
success = queueDataOldFile.delete();
if(!success){
throw new MqException("[MessageFileManager] 旧的数据文件删除失败! queueDataOldFile: " + queueDataOldFile.getAbsolutePath());
}
success = queueDataNewFile.renameTo(queueDataOldFile);
if(!success){
throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile: " + queueDataNewFile.getAbsolutePath() +
", queueDataOldFile: " + queueDataOldFile.getAbsolutePath());
}
// 更新消息统计文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(),stat);
long gcEnd = System.currentTimeMillis();
System.out.println("[MessageFileManager] gc 执行完毕! queueName: " + queue.getName() +
", time: " + (gcEnd - gcBeg) + "ms");
}
}
八、统一硬盘处理
消息存储在文件中,交换机、绑定、队列存储在数据库中,对此进行统一处理。也就是说使用一个类管理所有硬盘上的数据。
public class DiskDataCenter {
//这个实例用来管理数据库的数据
private DataBaseManager dataBaseManager = new DataBaseManager();
//这个实例用来管理文件中的数据
private MessageFileManager messageFileManager = new MessageFileManager();
public void init(){
dataBaseManager.init();
messageFileManager.init();
}
/**
* 封装交换机、绑定、队列操作
* @param exchange
*/
public void insertExchange(Exchange exchange){
dataBaseManager.insertExchange(exchange);
}
public void deleteExchange(String exchangeName){
dataBaseManager.deleteExchange(exchangeName);
}
public List<Exchange> selectAllExchanges(){
return dataBaseManager.selectAllExchanges();
}
//封装队列操作
public void insertQueue(MSGQueue queue) throws IOException {
messageFileManager.createQueueFiles(queue.getName());
dataBaseManager.insertQueue(queue);
}
public void deleteQueue(String queueName) throws IOException {
messageFileManager.destroyQueueFiles(queueName);
dataBaseManager.deleteQueue(queueName);
}
public List<MSGQueue> selectAllQueues(){
return dataBaseManager.selectAllQueues();
}
//封装绑定操作
public void insertBinding(Binding binding){
dataBaseManager.insertBinding(binding);
}
public void deleteBinding(Binding binding){
dataBaseManager.deleteBinding(binding);
}
public List<Binding> selectAllBindings(){
return dataBaseManager.selectAllBindings();
}
/**
* 封装消息操作
*/
public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue,message);
}
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {
messageFileManager.deleteMessage(queue,message);
if(messageFileManager.checkGC(queue.getName())){
messageFileManager.gc(queue);
}
}
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadAllMessageFromQueue(queueName);
}
}
九、内存管理
这里主要使用了线程安全的哈希表保存内存中的消息、交换机、绑定、队列等。
public class MemoryDataCenter {
// key 是 exchangeName value 是 Exchange 对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
// key 是 queueName, value 是 MSGQueue 对象
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
// 第一个 key 是 exchangeName, 第二个 key 是 queueName
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
// key 是 messageId, value 是 Message 对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
// key 是 queueName, value 是 Message 的链表
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
// 第一个 key 是 queueName 第二个 key 是 messageId
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
}
9.1 交换机相关操作的API
- 新增交换机
public void insertExchange(Exchange exchange){
exchangeMap.put(exchange.getName(), exchange);
System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName: " + exchange.getName());
}
- 查询交换机
public Exchange getExchange(String exchangeName){
return exchangeMap.get(exchangeName);
}
- 删除交换机
public void deleteExchange(String exchangeName){
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 删除交换机成功! exchangeName: " + exchangeName);
}
9.2 队列相关操作的API
- 新增队列
public void insertQueue(MSGQueue queue){
queueMap.put(queue.getName(), queue);
System.out.println("[MemoryDataCenter] 新队列添加成功! queueName: " + queue.getName());
}
- 查询队列
public MSGQueue getQueue(String queueName){
return queueMap.get(queueName);
}
- 删除队列
public void deleteQueue(String queueName){
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 删除队列成功成功! queueName: " + queueName);
}
9.3 绑定相关操作的API
- 新增绑定
public void insertBinding(Binding binding) throws MqException {
//先使用 exchangeName 查一下对应的哈希表是否存在
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());
synchronized (bindingMap){
//再根据 queueName 查一下,如果已经存在,就抛出异常,不存在才能插入
if(bindingMap.get(binding.getQueueName()) != null){
throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName: " +
binding.getExchangeName() + ", queueName: " + binding.getQueueName());
}
bindingMap.put(binding.getQueueName(),binding);
}
System.out.println("[MemoryDataCenter] 新绑定添加成功! queueName: " + binding.getQueueName() + ", exchangeName: "
+ binding.getExchangeName());
}
- 根据交换机名和队列名查询绑定
public Binding getBinding(String exchangeName,String queueName){
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);
if(bindingMap == null){
return null;
}
return bindingMap.get(queueName);
}
- 查询交换机绑定的所有队列
public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){
return bindingsMap.get(exchangeName);
}
- 删除绑定
public void deleteBinding(Binding binding) throws MqException {
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if(bindingMap == null){
//无法删除
throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName: " + binding.getExchangeName() +
", queueName: " + binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 绑定删除成功! queueName: " + binding.getQueueName() + ", exchangeName: "
+ binding.getExchangeName());
}
9.4 消息相关操作的API
- 添加消息
public void addMessage(Message message){
messageMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 新消息添加成功! messageId: " + message.getMessageId());
}
- 查询消息
public Message getMessage(String messageId){
return messageMap.get(messageId);
}
- 删除消息
public void removeMessage(String messageId){
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息成功移除! messageId: " + messageId);
}
- 发送消息到指定队列
public void sendMessage(MSGQueue queue, Message message){
//把消息放到指定的数据结构中
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
synchronized (messages){
messages.add(message);
}
addMessage(message);
System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId: " + message.getMessageId());
}
- 从队列中获取消息
public Message pollMessage(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages == null){
return null;
}
synchronized (messages){
if(messages.size() == 0){
return null;
}
//取头元素
Message curMessage = messages.remove(0);
System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId: " + curMessage.getMessageId());
return curMessage;
}
}
- 获取指定队列中的消息个数
public int getMessageCount(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages == null){
return 0;
}
synchronized (messages){
return messages.size();
}
}
- 添加未确认的消息
public void addMessageWaitAck(String queueName,Message message){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());
messageHashMap.put(message.getMessageId(),message);
System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId: " + message.getMessageId());
}
- 删除已经确认的消息
public void removeMessageWaitAck(String queueName, String messageId){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if(messageHashMap == null){
return;
}
messageHashMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId: " + messageId);
}
- 获取未确认的消息
public Message getMessageWaitAck(String queueName, String messageId){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if(messageHashMap == null){
return null;
}
return messageHashMap.get(messageId);
}
- 从硬盘读取数据恢复到内存中
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
// 1. 恢复所有的交换机数据
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageMap.clear();
List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
for (Exchange exchange : exchanges){
exchangeMap.put(exchange.getName(), exchange);
}
// 2. 恢复所有的队列数据
List<MSGQueue> queues = diskDataCenter.selectAllQueues();
for(MSGQueue queue : queues){
queueMap.put(queue.getName(), queue);
}
// 3. 恢复所有的绑定数据
List<Binding> bindings = diskDataCenter.selectAllBindings();
for(Binding binding : bindings){
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(),binding);
}
// 4. 恢复所有的消息数据
// 遍历所有的队列 根据每个队列的名字获取到所有的消息
for(MSGQueue queue : queues){
LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
queueMessageMap.put(queue.getName(), messages);
for(Message message : messages){
messageMap.put(message.getMessageId(), message);
}
}
// 针对未确认的消息不需要从硬盘上恢复,一旦在等待 ack 的过程中服务器重启,此时被恢复成未被取走的消息
}
十、虚拟机 VirtualHost
每个虚拟机下都管理着自己的交换机、队列、绑定、消息这些数据,同时供上层API进行调用,本项目目前只支持单个交换机。
public class VirtualHost {
private String virtualHostName;
private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
private DiskDataCenter diskDataCenter = new DiskDataCenter();
private Router router = new Router();
private ConsumerManager consumerManager = new ConsumerManager(this);
private final Object exchangeLocker = new Object();// 操作交换机的锁对象
private final Object queueLocker = new Object(); // 操作队列的锁对象
public VirtualHost(String name){
this.virtualHostName = name;
//对于 MemoryDataCenter 来说, 不需要初始化
//针对 DiskDataCenter 来说,需要进行初始化操作,建库建表和初始数据的设定
//此外,针对硬盘的数据,恢复到内存中
diskDataCenter.init();
try {
memoryDataCenter.recovery(diskDataCenter);
} catch (IOException | MqException | ClassNotFoundException e) {
e.printStackTrace();
System.out.println("[VirtualHost] 恢复内存数据失败!");
}
}
public String getVirtualHostName() {
return virtualHostName;
}
public MemoryDataCenter getMemoryDataCenter() {
return memoryDataCenter;
}
public DiskDataCenter getDiskDataCenter() {
return diskDataCenter;
}
其中,Router 类规定了交换机转发的规则:
public class Router {
/**
* bindingKey 的构造规则
* 1.数字、字母、下划线
* 2.使用 . 分割成若干部分
* 3.允许存在 * 和 # 作为通配符,但是通配符只能作为一个独立的分段
* @param bindingKey
* @return
*/
public boolean checkBindingKey(String bindingKey){}
/**
* routingKey 的构造规则:
* 1.数字、字母、下划线
* 2.使用 . 分割成若干部分
*/
public boolean checkRoutingKey(String routingKey){}
/**
* 判定该消息是否可以转发给这个绑定对应的队列
* @param exchangeType
* @param binding
* @param message
* @return
*/
public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException{}
/**
* 校验 bindingKey 和 routingKey 是否匹配
* @param binding
* @param message
* @return
*/
private boolean routeTopic(Binding binding,Message message){}
}
10.1 创建交换机
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
Map<String,Object> arguments){
//把交换机的名字加上虚拟主机的名字
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
//判定该交换机是否已经存在,直接通过内存查询
Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
if(existsExchange != null){
//该交换机已经存在
System.out.println("[VirtualHost] 交换机已经存在! exchangeName: " + exchangeName);
return true;
}
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(exchangeType);
exchange.setDurable(durable);
exchange.setAutoDelete(autoDelete);
exchange.setArguments(arguments);
if(durable){
diskDataCenter.insertExchange(exchange);
}
memoryDataCenter.insertExchange(exchange);
System.out.println("[VirtualHost] 交换机创建完成! exchangeName: " + exchangeName);
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 交换机创建失败! exchangeName: " + exchangeName);
e.printStackTrace();
return false;
}
}
10.2 删除交换机
public boolean exchangeDelete(String exchangeName){
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
if(toDelete == null){
throw new MqException("[VirtualHost] 交换机不存在,无法删除!");
}
if(toDelete.isDurable()){
diskDataCenter.deleteExchange(exchangeName);
}
memoryDataCenter.deleteExchange(exchangeName);
System.out.println("[VirtualHost] 交换机删除成功! exchangeName: " + exchangeName);
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 交换机删除失败! exchangeName: " + exchangeName);
e.printStackTrace();
return false;
}
}
10.3 创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
Map<String,Object> arguments){
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker){
//1.判断队列是否存在
MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
if(existsQueue != null){
System.out.println("[VirtualHost] 队列已经存在! queueName: " + queueName);
return false;
}
//2.构造队列对象
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(durable);
queue.setExclusive(exclusive);
queue.setAutoDelete(autoDelete);
queue.setArguments(arguments);
//3.插入硬盘
if(queue.isDurable()){
diskDataCenter.insertQueue(queue);
}
//4.插入内存
memoryDataCenter.insertQueue(queue);
System.out.println("[VirtualHost] 队列创建成功! queueName: " + queueName);
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 创建队列失败! queueName: " + queueName);
e.printStackTrace();
return false;
}
}
10.4 删除队列
public boolean queueDelete(String queueName){
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker){
MSGQueue toDelete = memoryDataCenter.getQueue(queueName);
if (toDelete == null){
throw new MqException("[VirtualHost] 队列不存在,无法删除!");
}
if(toDelete.isDurable()){
diskDataCenter.deleteQueue(queueName);
}
memoryDataCenter.deleteQueue(queueName);
System.out.println("[VirtualHost] 队列删除成功! queueName: " + queueName);
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 队列删除失败! queueName: " + queueName);
e.printStackTrace();
return false;
}
}
10.5 创建绑定
public boolean queueBind(String queueName,String exchangeName,String bindingKey){
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
synchronized (queueLocker){
//1.判断绑定是否存在
Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);
if(existsBinding != null){
throw new MqException("[VirtualHost] 绑定已经存在! queueName: " + queueName + ", exchangeName: "
+ exchangeName);
}
//2.判断绑定是否合法
if(!router.checkBindingKey(bindingKey)){
throw new MqException("[VirtualHost] bindingKey 不合法! bindingKey: " + bindingKey);
}
//3.创建 Binding 对象
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
//4.获取到对应的交换机和队列,如果对应的交换机和队列不存在,这样的绑定是无法创建的
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[VirtualHost] 该绑定对应的队列不存在! queueName: " + queueName);
}
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange == null){
throw new MqException("[VirtualHost] 该绑定对应的交换机不存在! exchangeName: " + exchangeName);
}
if(queue.isDurable() && exchange.isDurable()){
diskDataCenter.insertBinding(binding);
}
memoryDataCenter.insertBinding(binding);
System.out.println("[VirtualHost] 创建绑定成功! queueName: " + queueName + ", exchangeName: "
+ exchangeName);
}
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 绑定创建失败! queueName: " + queueName + ", exchangeName: "
+ exchangeName);
e.printStackTrace();
return false;
}
}
10.6 删除绑定
public boolean queueUnbind(String queueName, String exchangeName){
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
synchronized (queueLocker){
//1.判断绑定是否存在
Binding toDelete = memoryDataCenter.getBinding(exchangeName,queueName);
if(toDelete == null){
throw new MqException("[VirtualHost] 绑定不存在,无法删除! queueName: " + queueName
+ ", exchangeName: " + exchangeName);
}
//2.无论绑定是否持久化,都尝试在硬盘上删一下,就算不存在,这个删除操作也没有副作用
diskDataCenter.deleteBinding(toDelete);
memoryDataCenter.deleteBinding(toDelete);
System.out.println("[VirtualHost] 删除绑定成功! queueName: " + queueName + ", exchangeName: " + exchangeName);
}
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 删除绑定失败!");
return false;
}
}
10.7 发送消息
流程图:
实现步骤:
- 检查 routingKey 是否合法
- 判断交换机是否存在
- 判断交换机的类型,根据不同的类型决定如何进行后续转发
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body){
try {
//1.转换交换机的名字
exchangeName = virtualHostName + exchangeName;
//2.检查 routingKey 是否合法
if(!router.checkRoutingKey(routingKey)){
throw new MqException("[virtualHost] routingKey 非法! routingKey: " + routingKey);
}
//3.查找交换机对象
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange == null){
throw new MqException("[virtualHost] 交换机不存在! exchangeName: " + exchangeName);
}
//4.判定交换机的类型
if(exchange.getType() == ExchangeType.DIRECT){
//按照直接交换机的方式转发消息
//以 routingKey 作为队列的名字,直接把消息写入指定的队列中
//此时,可以无视绑定关系
String queueName = virtualHostName + routingKey;
//5.构造消息对象
Message message = Message.createMessageWithId(routingKey,basicProperties,body);
//6.查找队列名对应的对象
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[virtualHost] 队列不存在! queueName: " + queueName);
}
//7.队列存在,给队列中写入消息
sendMessage(queue,message);
}else{
//按照 fanout 和 topic 的方式来转发
//5.找到该交换机关联的所有绑定
ConcurrentHashMap<String,Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
for(Map.Entry<String,Binding> entry : bindingsMap.entrySet()){
// 1) 获取到绑定队列,判定对应的队列是否存在
Binding binding = entry.getValue();
MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
if(queue == null){
System.out.println("[virtualHost] basicPublish 发送消息时,发现队列不存在! queueName: " + binding.getQueueName());
continue;
}
// 2) 构造消息对象
Message message = Message.createMessageWithId(routingKey,basicProperties,body);
// 3) 判定这个消息是否能转发给该队列
// 如果是 fanout, 所有绑定的队列都要进行转发
// 如果是 topic, 需要判定 bindingKey 和 routingKey 是否匹配
if(!router.route(exchange.getType(),binding,message)){
continue;
}
// 4) 真正转发消息给队列
sendMessage(queue,message);
}
}
return true;
}catch (Exception e){
System.out.println("[virtualHost] 消息发送失败!");
e.printStackTrace();
return false;
}
}
上述过程中涉及到的被调用的 API:
private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
//此处发送消息,就是调用之前封装好的 api, 写到内容和硬盘上
int deliverMode = message.getDeliverMode();
//deliverMode 为 1 不持久化
//deliverMode 为 2 要持久化
if(deliverMode == 2){
diskDataCenter.sendMessage(queue,message);
}
//写入内存
memoryDataCenter.sendMessage(queue,message);
//通知消费者可以消费消息了
consumerManager.notifyConsume(queue.getName());
}
10.8 订阅消息
流程图:
/**
* 订阅消息
* 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者
* @param consumerTag 消费者的身份标识
* @param queueName
* @param autoAck 消息被消费完成后,应答的方式 true:自动应答; false:手动应答
* @param consumer 是一个回调函数,此处类型设定为函数式接口,后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 表达式
* @return
*/
public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){
//构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 ConsumerEnv 对象添加到该队列中
queueName = virtualHostName + queueName;
try {
consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);
System.out.println("[virtualHost] basicConsume 成功! queueName: " + queueName);
return true;
}catch (Exception e){
System.out.println("[virtualHost] basicConsume 失败! queueName: " + queueName);
e.printStackTrace();
return false;
}
}
上述过程涉及到了 ConsumerManager 类:
public class ConsumerManager {
//持有上层 VirtualHost 对象的引用,用来操作数据
private VirtualHost parent;
//指定一个线程池,执行具体的回调任务
private ExecutorService workerPool = Executors.newFixedThreadPool(4);
//存放令牌的队列
private BlockingDeque<String> tokenQueue = new LinkedBlockingDeque<>();
//扫描线程
private Thread scannerThread = null;
public ConsumerManager(VirtualHost p){
this.parent = p;
scannerThread = new Thread(() ->{
while (true){
try {
//1.拿到令牌
String queueName = tokenQueue.take();
//2.根据令牌找到队列
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if(queue == null){
throw new MqException("[ConsumerManager] 取令牌时队列名不存在! queueName: " + queueName);
}
//3.从队列中消费一个消息
synchronized (queue){
consumeMessage(queue);
}
} catch (InterruptedException | MqException e) {
e.printStackTrace();
}
}
});
//线程设为后台线程
scannerThread.setDaemon(true);
scannerThread.start();
}
/**
* 调用时机:发送消息的时候
* @param queueName
* @throws InterruptedException
*/
public void notifyConsume(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
//找到对应的队列
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if(queue == null){
throw new MqException("[ConsumerManager] 队列不存在! queueName: " + queueName);
}
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);
synchronized (queue){
queue.addConsumerEnv(consumerEnv);
int n = parent.getMemoryDataCenter().getMessageCount(queueName);
for(int i = 0; i < n; i++){
//调用一次就消费一条消息
consumeMessage(queue);
}
}
}
/**
* 消费一个消息
* @param queue
*/
private void consumeMessage(MSGQueue queue) {
// 1.按照轮询的方式找个消费者出来
ConsumerEnv lucyDog = queue.chooseConsumer();
if(lucyDog == null){
return;//当前队列没有消费者,暂时不消费
}
//2.从队列中取出一个消息
Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
if(message == null){
return;//当前队列中没有消息,也不需要消费
}
//3.把消息带入到消费者的回调方法中,让线程池去执行
workerPool.submit(() ->{
try {
// 1) 先把消息放到待确认的集合中
parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);
// 2) 执行回调
lucyDog.getConsumer().handleDelivery(lucyDog.getConsumerTag(),message.getBasicProperties(),
message.getBody());
// 3) 如果当前是 自动应答,直接把消息删除
// 如果当前是手动应答,交给后续消费者调用 basicAck 来处理
if(lucyDog.isAutoAck()){
//删除硬盘上的消息
if(message.getDeliverMode() == 2){
//当前这个消息是持久化存储,需要删除硬盘上的消息
parent.getDiskDataCenter().deleteMessage(queue,message);
}
//删除等待应答的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
//删除内存中的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageId());
System.out.println("[ConsumerManager] 消息被成功消费! queueName: " + queue.getName());
}
}catch (Exception e){
e.printStackTrace();
}
});
}
}
ConsumeMessage方法流程图:
10.9 消息确认
该方法只是手动应答的时候才会使用,应答成功, 则把消息删除掉。
public boolean basicAck(String queueName, String messageId){
queueName = virtualHostName + queueName;
try {
Message message = memoryDataCenter.getMessage(messageId);
if(message == null){
//要确认的消息不存在
throw new MqException("[VirtualHost] 要确认的消息不存在! messageId: " + messageId);
}
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[VirtualHost] 要确认的队列不存在! queueName: " + queueName);
}
//1.删除硬盘上的数据
if(message.getDeliverMode() == 2){
diskDataCenter.deleteMessage(queue,message);
}
//2.删除消息中心中的数据
memoryDataCenter.removeMessage(messageId);
//3.删除待确认集合中的消息
memoryDataCenter.removeMessageWaitAck(queueName,messageId);
System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName: " + queueName + ", messageId: " +
messageId);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] basicAck 失败! 消息被确认失败! queueName: " + queueName + ", messageId: " +
messageId);
e.printStackTrace();
return false;
}
}
十一、网络通信协议设计
生产者和消费者都是客户端程序,并且需要通过网络远程调用 BrokerServer 提供的 API, 这里,我使用 TCP 作为底层协议,在这个基础上自定义应用层协议,简单来说就是约定一下生产者以及消费者和 BrokerServer 之间交互的规范或者是传输数据的格式。
客户端要调用的功能有以下几个部分:
- 创建 channel
- 关闭 channel
- 创建 exchange
- 删除 exchange
- 创建 queue
- 删除 queue
- 创建 binding
- 删除 binding
- 发送 message
- 订阅 message
- 发送 ack
- 返回 message (服务器 -> 客户端)
11.1 设计应用层协议
因为 Message 本身就是二进制数据,因此这里同样使用二进制的方式设定协议。
其中 type 表示请求响应不同的功能,取值如下:
0x1
创建 channel
0x2关闭 channel0x3
创建 exchange
0x4
销毁 exchange
0x5
创建 queue
0x6
销毁 queue
0x7
创建 binding
0x8
销毁 binding
0x9
发送 message
0xa
订阅 message
0xb
返回 ack
0xc
服务器给客户端推送的消息
payload 部分, 会根据不同的 type, 存在不同的格式:
- 对于请求来说, payload 表示这次方法调用的各种参数信息
- 对于响应来说, payload 表示这次方法调用的返回值
11.2 定义Request/Response
/**
* @description:表示一个请求对象,按照自定义协议的格式展开
* @created by 清风 on 2024/7/30 21:21
*/
@Data
public class Request {
private int type;
private int length;
private byte[] payload;
}
/**
* @description:这是一个响应,也是根据自定义应用层协议来的
* @created by 清风 on 2024/7/30 21:22
*/
@Data
public class Response {
private int type;
private int length;
private byte[] payload;
}
11.3 定义参数父类
/**
* @description:使用这个类来表示方法的公共参数/辅助的字段
* 后续每个方法就会有不同的参数,不同的参数使用不同的子类表示
* @created by 清风 on 2024/7/30 21:24
*/
@Data
public class BasicArguments implements Serializable{
// 表示一次请求和一次响应的身份标识,可以把请求和响应对上
protected String rid;
//本次通信使用的 channel 的身份标识
protected String channelId;
}
11.4 定义返回值父类
/**
* @description:表示各个远程的调用的方法的返回值的公共信息
* @created by 清风 on 2024/7/30 21:28
*/
@Data
public class BasicReturns implements Serializable {
//用来标识唯一的请求和响应
protected String rid;
//用来标识一个 channel
protected String channelId;
//表示远程调用方法的返回值
protected boolean ok;
}
11.5 定义其他参数类
针对每个 VirtualHost 提供的方法, 都需要有⼀个类表⽰对应的参数。这里写一个创建交换机的请求参数类,其他的相关请求参数都大同小异,想看的可以去源码链接看。
/**
* @description:这个类是创建交换机的请求参数的类
* @created by 清风 on 2024/7/30 21:33
*/
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
private String exchangeName;
private ExchangeType exchangeType;
private boolean durable;
private boolean autoDelete;
private Map<String,Object> arguments;
}
⼀个创建交换机的请求, 形如:
- 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图片的结构
- 按照 length 长度读取出 payload, 就可以把读到的⼆进制数据转换成 ExchangeDeclareArguments 对象
十二、实现 BrokerServer 类
public class BrokerServer {
private ServerSocket serverSocket = null;
//一个 BrokerServer 上只有一个虚拟主机
private VirtualHost virtualHost = new VirtualHost("default");
//存储当前的所有会话,也就是有哪些客户端正在和服务器进行通信
//此处的 key 是 channelId; value 为对应的 socket 对象
private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();
//引入线程池来处理多个客户端的请求
private ExecutorService executorService = null;
//引入一个 boolean 变量控制服务器是否继续运行
private volatile boolean runnable = true;
}
12.1 启动/停止服务器
public BrokerServer(int port) throws IOException {
serverSocket = new ServerSocket(port);
}
public void start() throws IOException {
System.out.println("[BrokerServer] 启动!");
executorService = Executors.newCachedThreadPool();
try {
while (runnable){
Socket clientSocket = serverSocket.accept();
//把处理连接的逻辑丢给线程池
executorService.submit(() ->{
processConnection(clientSocket);
});
}
}catch (SocketException e){
System.out.println("[BrokerServer] 服务器停止运行!");
}
}
/**
* 停止服务器
*/
public void stop() throws IOException {
runnable = false;
executorService.shutdownNow();//把线程池中的任务都放弃,让线程都销毁
serverSocket.close();
}
12.2 实现处理连接
/**
* 通过这个方法来处理一个客户端的连接
* 在这个连接中,可能会涉及到多个请求和响应
* @param clientSocket
*/
private void processConnection(Socket clientSocket) {
try (InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream()){
//这里需要按照特定格式读取并解析,此时需要用到 DataInputStream 和 DataOutputStream
try (DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
while (true){
//1.读取请求并解析
Request request = readRequest(dataInputStream);
//2.根据请求计算响应
Response response = process(request,clientSocket);
//3.把响应写回给客户端
writeResponse(dataOutputStream,response);
}
}catch (EOFException | SocketException e){
//对于当前代码, DataInputStream 如果读到 EOF, 就会抛出一个 EOFException 异常
//需要借助这个异常来结束循环
System.out.println("[BrokerServer] 连接关闭! 客户端地址: " + clientSocket.getInetAddress().toString()
+ ":" + clientSocket.getPort());
}
}catch (IOException | ClassNotFoundException | MqException e){
System.out.println("[BrokerServer] Connection 出现异常!");
e.printStackTrace();
}finally {
try {
//关闭 socket
clientSocket.close();
//一个 TCP 连接中,可能包含多个 channel, 需要把当前这个 socket 对应的所有 channel 清理掉
clearClosedSession(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
12.3 实现 readRequest / writeResponse
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = dataInputStream.read(payload);
if(n != request.getLength()){
throw new IOException("读取请求格式出错!");
}
request.setPayload(payload);
return request;
}
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
dataOutputStream.flush();
}
12.4 实现处理请求
先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
再根据不同的 type, 分别处理不同的逻辑,(主要是调⽤ virtualHost 中不同的方法)
针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客户端
最后构造成统⼀的响应
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
//1.把 request 中的 payload 做一个初步的解析
BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
System.out.println("[Request] rid: " + basicArguments.getRid() + ", channelId: " + basicArguments.getChannelId()
+ ", type: " + request.getType() + ", length: " + request.getLength());
//2.根据 type 的值来进一步区分接下来这次请求要干什么
boolean ok = true;
if(request.getType() == 0x1){
//创建 channel
sessions.put(basicArguments.getChannelId(), clientSocket);
System.out.println("[BrokerServer] 创建 channel 完成! channelId: " + basicArguments.getChannelId());
}else if(request.getType() == 0x2){
//销毁 channel
sessions.remove(basicArguments.getChannelId());
System.out.println("[BrokerServer] 销毁 channel 完成! channelId: " + basicArguments.getChannelId());
}else if(request.getType() == 0x3){
//创建一个交换机
//此时 payload 就是 ExchangeDeclareArguments 对象
ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(),
arguments.isAutoDelete(),arguments.getArguments());
}else if(request.getType() == 0x4){
//销毁交换机
ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
ok = virtualHost.exchangeDelete(arguments.getExchangeName());
} else if (request.getType() == 0x5) {
//创建队列
QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(),
arguments.isAutoDelete(),arguments.getArguments());
}else if(request.getType() == 0x6){
//销毁队列
QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
ok = virtualHost.queueDelete(arguments.getQueueName());
}else if(request.getType() == 0x7){
//创建 Binding
QueueBindArguments arguments = (QueueBindArguments) basicArguments;
ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
}else if(request.getType() == 0x8){
//删除 Binding
QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
}else if(request.getType() == 0x9){
//发送消息
BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
arguments.getBasicProperties(),arguments.getBody());
}else if(request.getType() == 0xa){
//订阅消息
BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
//这个回调函数要做的工作:把服务器收到的消息直接推送给对应的客户端
//先知道当前这个收到的消息要发给哪个客户端
//此处 consumerTag 其实是 channelId
//1.根据 channel 找到 Socket 对象
Socket clientSocket = sessions.get(consumerTag);
if(clientSocket == null || clientSocket.isClosed()){
throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
}
//2.构造响应数据
//此处 response 的 payload 就是 SubScribeReturns
SubScribeReturns subScribeReturns = new SubScribeReturns();
subScribeReturns.setConsumerTag(consumerTag);
subScribeReturns.setBasicProperties(basicProperties);
subScribeReturns.setChannelId(consumerTag);
subScribeReturns.setRid("");
subScribeReturns.setOk(true);
subScribeReturns.setBody(body);
byte[] payload = BinaryTool.toBytes(subScribeReturns);
Response response = new Response();
response.setType(0xc);//0xc 表示服务器给消费者客户端推送的数据
response.setLength(payload.length);
response.setPayload(payload);
//3.把数据写回给客户端
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream,response);
}
});
}else if(request.getType() == 0xb){
//调用 basicAck 来确认消息
BasicAckArguments arguments = (BasicAckArguments) basicArguments;
ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
}else {
//当前的 type 是非法的
throw new MqException("[BrokerServer] 未知的 type: " + request.getType());
}
//3.构造响应
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(basicArguments.getChannelId());
basicReturns.setRid(basicArguments.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toBytes(basicReturns);
Response response = new Response();
response.setType(request.getType());
response.setLength(payload.length);
response.setPayload(payload);
System.out.println("[Response] rid: " + basicReturns.getRid() + ", channelId: "
+ basicReturns.getChannelId() + ", type: "
+ response.getType() + ", length: " + response.getLength());
return response;
}
12.5 实现清理过期的会话
/**
* 清理过期的会话
* @param clientSocket
*/
private void clearClosedSession(Socket clientSocket) {
//遍历 sessions 哈希表
List<String> toDeleteChannelId = new ArrayList<>();
for(Map.Entry<String,Socket> entry : sessions.entrySet()){
if(entry.getValue() == clientSocket){
toDeleteChannelId.add(entry.getKey());
}
}
for(String channelId : toDeleteChannelId){
sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId: " + toDeleteChannelId);
}
十三、实现客户端
13.1 创建 ConnectionFactory
用来创建连接的工厂类:
@Data
public class ConnectionFactory {
// broker server 的 ip 地址
private String host;
// broker server 的端口号
private int port;
// 访问 broker server 的哪个虚拟主机.
// 下列几个属性暂时不搞了.
// private String virtualHostName;
// private String username;
// private String password;
public Connection newConnection() throws IOException {
Connection connection = new Connection(host, port);
return connection;
}
}
13.2 Connection 和 Channel 定义
- 一个客户端可以创建多个 Connection
- 一个 Connection 对应一个 Socket,一个TCP 连接
- 一个 Connection 可以包含多个 Channel
Connection 定义: (这个类中其他的方法在我的项目源码中自行观看,主要包括处理响应、统一封装写请求和读取响应以及创建 Channel)
@Data
public class Connection {
private Socket socket = null;
// 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来.
private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
private InputStream inputStream;
private OutputStream outputStream;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
private ExecutorService callbackPool = null;
}
- Socket 是客户端持有的套接字
- InputStream OutputStream DataInputStream ,DataOutputStream 均为 socket 通信的接口
- channelMap 用来管理该连接中所有的 Channel
- callbackPool 是用来在客户端这边执行用户回调的线程池
** Channel 定义(这个类中主要包括的方法就是构造请求参数,和服务器交互进行相关的操作,也就是远程调用服务器提供的 API,可在项目源码自行观看)**
@Data
public class Channel {
private String channelId;
// 当前这个 channel 属于哪个连接.
private Connection connection;
// 用来存储后续客户端收到的服务器的响应.
private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.
// 此处约定一个 Channel 中只能有一个回调.
private Consumer consumer = null;
public Channel(String channelId, Connection connection) {
this.channelId = channelId;
this.connection = connection;
}
}
十四、样例演示
**生产者: **
/**
* @description:这个类用来表示一个生产者
* 通常这是一个单独的服务器程序
* @created by 清风 on 2024/8/3 19:38
*/
public class DemoProducer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("启动生产者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建交换机和队列
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
channel.queueDeclare("testQueue",true,false,false,null);
//创建一个消息并发送
byte[] body = "hello".getBytes();
boolean ok = channel.basicPublish("testExchange","testQueue",null,body);
System.out.println("消息投递完成! ok :" + ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
消费者:
/**
* @description:这个类表示一个消费者
* 通常这个类也应该是在一个独立的服务器中被执行
* @created by 清风 on 2024/8/3 19:39
*/
public class DemoConsumer {
public static void main(String[] args) throws IOException, MqException, InterruptedException {
System.out.println("启动消费者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
channel.queueDeclare("testQueue",true,false,false,null);
channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag: " + consumerTag);
System.out.println("basicProperties: " + basicProperties);
String bodyString = new String(body,0, body.length);
System.out.println("body: " + bodyString);
System.out.println("[消费数据] 结束!");
}
});
//模拟一直等待消费
while (true){
Thread.sleep(500);
}
}
}
**启动项目之后,再先后启动消费者和生产者: **
- 启动项目,建库建表:
- 启动消费者:
- 启动生产者:
- 查看消费者端控制台:
至此,一个简易版本的MQ实现。文章篇幅太长,可能过于繁琐,还请各位读者有不满意的地方多多指教!
版权归原作者 ..清风 所有, 如有侵权,请联系我们删除。