Mq所有代码笔记
⼀. 消息队列背景知识
曾经我们学习过 阻塞队列 (BlockingQueue) , 我们说, 阻塞队列最⼤的⽤途, 就是⽤来实现 ⽣产者消费者模型.
⽣产者消费者模型, 存在诸多好处, 是后端开发的常⽤编程⽅式.
- 解耦合
- 削峰填⾕
在实际的后端开发中, 尤其是分布式系统⾥, 跨主机之间使⽤⽣产者消费者模型, 也是⾮常普遍的需求.
因此, 我们通常会把阻塞队列, 封装成⼀个独⽴的服务器程序, 并且赋予其更丰富的功能.
这样的程序我们就称为 消息队列 (Message Queue, MQ)
⼆. 需求分析
核⼼概念(面试题)★
- ⽣产者 (Producer)
- 消费者 (Consumer)
- 中间⼈ (Broker)
- 发布 (Publish)
- 订阅 (Subscribe)
- 消费
⼀个⽣产者, ⼀个消费者
N 个⽣产者, N 个消费者
其中, Broker 是最核⼼的部分. 负责消息的存储和转发.
在 Broker 中, ⼜存在以下概念 ★
- 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可 以存在多个 VirtualHost.
- 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发 给不同的 Queue.
- 队列 (Queue): 真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息.
- 绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关 系. 使⽤⼀个关联表就可以把这两个概念联系起来.
- 消息 (Message): 传递的内容.
所谓的 Exchange 和 Queue 可以理解成 “多对多” 关系, 和数据库中的 “多对多” ⼀样. 意思是:
⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息).
⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange).
这些概念, 既需要在内存中存储, 也需要在硬盘上存储.
- 内存存储: ⽅便使⽤.
- 硬盘存储: 重启数据不丢失
核⼼ API ★
对于 Broker 来说, 要实现以下核⼼ API. 通过这些 API 来实现消息队列的基本功能.
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型.
交换机类型 (Exchange Type)
对于 RabbitMQ 来说, 主要⽀持四种交换机类型.
• Direct
• Fanout
• Topic
• Header
其中 Header 这种⽅式⽐较复杂, ⽐较少⻅. 常⽤的是前三种交换机类型. 咱们此处也主要实现这三种.
• Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.
• Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
• Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为
routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.
这三种操作就像给 qq 群发红包.
• Direct 是发⼀个专属红包, 只有指定的⼈能领.
• Fanout 是使⽤了魔法, 发⼀个 10 块钱红包, 群⾥的每个⼈都能领 10 块钱.
• Topic 是发⼀个画图红包, 发 10 块钱红包, 同时出个题, 得画的像的⼈, 才能领. 也是每个领到的⼈
都能领 10 块钱
持久化
Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失
⽹络通信 ★
⽣产者和消费者都是客⼾端程序, broker 则是作为服务器. 通过⽹络进⾏通信.
在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.
Connection 对应⼀个 TCP 连接.
Channel 则是 Connection 中的逻辑通道.
⼀个 Connection 中可以包含多个 Channel.
Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.
这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接
Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆
消息应答
被消费的消息, 需要进⾏应答.
应答模式分成两种.
• ⾃动应答: 消费者只要消费了消息, 就算应答完毕了. Broker 直接删除这个消息.
• ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息.
⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐较常⻅.
三. 模块划分
四、项目创建
五、创建核⼼类
创建包 mqserver.core
1. 创建 Exchange ★
参数:名称、交换机类型、持久化、自动删除、额外参数
get set方法
HashMap的get和set方法要用 objectMapper转
/*
* 这个类表示一个交换机
*/publicclassExchange{// 此处使用 name 来作为交换机的身份标识. (唯一的)privateString name;// 交换机类型, DIRECT, FANOUT, TOPICprivateExchangeType type =ExchangeType.DIRECT;// 该交换机是否要持久化存储. true 表示需要持久化; false 表示不必持久化.privateboolean durable =false;// 如果当前交换机, 没人使用了, 就会自动被删除.// 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的)privateboolean autoDelete =false;// arguments 表示的是创建交换机时指定的一些额外的参数选项. 后续代码中并没有真的实现对应的功能, 先列出来. (RabbitMQ 也是有的)// 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.privateMap<String,Object> arguments =newHashMap<>();publicStringgetName(){return name;}publicvoidsetName(String name){this.name = name;}publicExchangeTypegetType(){return type;}publicvoidsetType(ExchangeType type){this.type = type;}publicbooleanisDurable(){return durable;}publicvoidsetDurable(boolean durable){this.durable = durable;}publicbooleanisAutoDelete(){return autoDelete;}publicvoidsetAutoDelete(boolean autoDelete){this.autoDelete = autoDelete;}// 这里的 get set 用于和数据库交互使用.publicStringgetArguments(){// 是把当前的 arguments 参数, 从 Map 转成 String (JSON)ObjectMapper objectMapper =newObjectMapper();try{return objectMapper.writeValueAsString(arguments);}catch(JsonProcessingException e){
e.printStackTrace();}// 如果代码真异常了, 返回一个空的 json 字符串就 okreturn"{}";}// 这个方法, 是从数据库读数据之后, 构造 Exchange 对象, 会自动调用到publicvoidsetArguments(String argumentsJson){// 把参数中的 argumentsJson 按照 JSON 格式解析, 转成// 上述的 Map 对象ObjectMapper objectMapper =newObjectMapper();try{this.arguments = objectMapper.readValue(argumentsJson,newTypeReference<HashMap<String,Object>>(){});}catch(JsonProcessingException e){
e.printStackTrace();}}// 在这里针对 arguments, 再提供一组 getter setter , 用来去更方便的获取/设置这里的键值对.// 这一组在 java 代码内部使用 (比如测试的时候)publicObjectgetArguments(String key){return arguments.get(key);}publicvoidsetArguments(String key,Object value){
arguments.put(key, value);}publicvoidsetArguments(Map<String,Object> arguments){this.arguments = arguments;}}
针对数据库操作方便转成String,重写hashmap的get,set方法
// 这里的 get set 用于和数据库交互使用.publicStringgetArguments(){// 是把当前的 arguments 参数, 从 Map 转成 String (JSON)ObjectMapper objectMapper =newObjectMapper();try{return objectMapper.writeValueAsString(arguments);}catch(JsonProcessingException e){
e.printStackTrace();}// 如果代码真异常了, 返回一个空的 json 字符串就 okreturn"{}";}// 这个方法, 是从数据库读数据之后, 构造 Exchange 对象, 会自动调用到publicvoidsetArguments(String argumentsJson){// 把参数中的 argumentsJson 按照 JSON 格式解析, 转成// 上述的 Map 对象ObjectMapper objectMapper =newObjectMapper();try{this.arguments = objectMapper.readValue(argumentsJson,newTypeReference<HashMap<String,Object>>(){});}catch(JsonProcessingException e){
e.printStackTrace();}}
ExchangeType
packagecom.example.mq.mqserver.core;publicenumExchangeType{DIRECT(0),FANOUT(1),TOPIC(2);privatefinalint type;privateExchangeType(int type){this.type = type;}}
2. 创建 MSGQueue ★
队列参数:名称、持久化、独占、自动删除、参数、都有哪些消费者订阅、目前读取到第几个消费者、
方法:①新增一个订阅者 ②获取一个订阅者 ③get和set方法
/*
* 这个类表示一个存储消息的队列
* MSG => Message
*/publicclassMSGQueue{// 表示队列的身份标识.privateString name;// 表示队列是否持久化, true 表示持久化保存, false 表示不持久化.privateboolean durable =false;// 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用// 这个 独占 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.privateboolean exclusive =false;// 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除.// 这个 自动删除 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.privateboolean autoDelete =false;// 也是表示扩展参数. 当前也是先列在这里, 先暂时不实现privateMap<String,Object> arguments =newHashMap<>();// 当前队列都有哪些消费者订阅了.privateList<ConsumerEnv> consumerEnvList =newArrayList<>();// 记录当前取到了第几个消费者. 方便实现轮询策略.privateAtomicInteger consumerSeq =newAtomicInteger(0);// 添加一个新的订阅者publicvoidaddConsumerEnv(ConsumerEnv consumerEnv){
consumerEnvList.add(consumerEnv);}// 订阅者的删除暂时先不考虑.// 挑选一个订阅者, 用来处理当前的消息. (按照轮询的方式)publicConsumerEnvchooseConsumer(){if(consumerEnvList.size()==0){// 该队列没有人订阅的returnnull;}// 计算一下当前要取的元素的下标.int index = consumerSeq.get()% consumerEnvList.size();
consumerSeq.getAndIncrement();return consumerEnvList.get(index);}publicStringgetName(){return name;}publicvoidsetName(String name){this.name = name;}publicbooleanisDurable(){return durable;}publicvoidsetDurable(boolean durable){this.durable = durable;}publicbooleanisExclusive(){return exclusive;}publicvoidsetExclusive(boolean exclusive){this.exclusive = exclusive;}publicbooleanisAutoDelete(){return autoDelete;}publicvoidsetAutoDelete(boolean autoDelete){this.autoDelete = autoDelete;}publicStringgetArguments(){ObjectMapper objectMapper =newObjectMapper();try{return objectMapper.writeValueAsString(arguments);}catch(JsonProcessingException e){
e.printStackTrace();}return"{}";}publicvoidsetArguments(String argumentsJson){ObjectMapper objectMapper =newObjectMapper();try{this.arguments = objectMapper.readValue(argumentsJson,newTypeReference<HashMap<String,Object>>(){});}catch(JsonProcessingException e){
e.printStackTrace();}}publicObjectgetArguments(String key){return arguments.get(key);}publicvoidsetArguments(String key,Object value){
arguments.put(key, value);}publicvoidsetArguments(Map<String,Object> arguments){this.arguments = arguments;}}
3. 创建 Binding ★
参数:交换机名称、队列名称、bindingKey
publicclassBinding{privateString exchangeName;privateString queueName;privateString bindingKey;publicStringgetExchangeName(){return exchangeName;}publicvoidsetExchangeName(String exchangeName){this.exchangeName = exchangeName;}publicStringgetQueueName(){return queueName;}publicvoidsetQueueName(String queueName){this.queueName = queueName;}publicStringgetBindingKey(){return bindingKey;}publicvoidsetBindingKey(String bindingKey){this.bindingKey = bindingKey;}}
4. 创建 Message ★
- 消息需要序列化所以要用Serializable。并且offsetBeg和offsetEnd不序列化用transient。
- 参数:属性类、正文数组、开头位置、终点位置、是否有效
- 方法:①创建一个消息类 ②get和set方法 ③toString方法
/*
* 表示一个要传递的消息
* 注意!!! 此处的 Message 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中.
* 此时就需要针对 Message 进行序列化和反序列化.
* 此处使用 标准库 自带的 序列化/反序列化 操作.
*/publicclassMessageimplementsSerializable{// 这两个属性是 Message 最核心的部分.privateBasicProperties basicProperties =newBasicProperties();privatebyte[] body;// 下面的属性则是辅助用的属性.// Message 后续会存储到文件中(如果持久化的话).// 一个文件中会存储很多的消息. 如何找到某个消息, 在文件中的具体位置呢?// 使用下列的两个偏移量来进行表示. [offsetBeg, offsetEnd)// 这俩属性并不需要被序列化保存到文件中~~ 此时消息一旦被写入文件之后, 所在的位置就固定了. 并不需要单独存储.// 这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置.privatetransientlong offsetBeg =0;// 消息数据的开头距离文件开头的位置偏移(字节)privatetransientlong offsetEnd =0;// 消息数据的结尾距离文件开头的位置偏移(字节)// 使用这个属性表示该消息在文件中是否是有效消息. (针对文件中的消息, 如果删除, 使用逻辑删除的方式)// 0x1 表示有效. 0x0 表示无效.privatebyte isValid =0x1;// 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程.// 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId// 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.publicstaticMessagecreateMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){Message message =newMessage();if(basicProperties !=null){
message.setBasicProperties(basicProperties);}// 此处生成的 MessageId 以 M- 作为前缀.
message.setMessageId("M-"+UUID.randomUUID());
message.setRoutingKey(routingKey);
message.body = body;// 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.// 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.// 此处只是在内存中创建一个 Message 对象.return message;}publicStringgetMessageId(){return basicProperties.getMessageId();}publicvoidsetMessageId(String messageId){
basicProperties.setMessageId(messageId);}publicStringgetRoutingKey(){return basicProperties.getRoutingKey();}publicvoidsetRoutingKey(String routingKey){
basicProperties.setRoutingKey(routingKey);}publicintgetDeliverMode(){return basicProperties.getDeliverMode();}publicvoidsetDeliverMode(int mode){
basicProperties.setDeliverMode(mode);}publicBasicPropertiesgetBasicProperties(){return basicProperties;}publicvoidsetBasicProperties(BasicProperties basicProperties){this.basicProperties = basicProperties;}publicbyte[]getBody(){return body;}publicvoidsetBody(byte[] body){this.body = body;}publiclonggetOffsetBeg(){return offsetBeg;}publicvoidsetOffsetBeg(long offsetBeg){this.offsetBeg = offsetBeg;}publiclonggetOffsetEnd(){return offsetEnd;}publicvoidsetOffsetEnd(long offsetEnd){this.offsetEnd = offsetEnd;}publicbytegetIsValid(){return isValid;}publicvoidsetIsValid(byte isValid){this.isValid = isValid;}@OverridepublicStringtoString(){return"Message{"+"basicProperties="+ basicProperties +", body="+Arrays.toString(body)+", offsetBeg="+ offsetBeg +", offsetEnd="+ offsetEnd +", isValid="+ isValid +'}';}}
工厂方法:创建消息
// 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程.// 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId// 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.publicstaticMessagecreateMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){Message message =newMessage();if(basicProperties !=null){
message.setBasicProperties(basicProperties);}// 此处生成的 MessageId 以 M- 作为前缀.
message.setMessageId("M-"+UUID.randomUUID());
message.setRoutingKey(routingKey);
message.body = body;// 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.// 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.// 此处只是在内存中创建一个 Message 对象.return message;}
BasicProperties
参数:①消息id ②routingKey ③持久化
publicclassBasicPropertiesimplementsSerializable{// 消息的唯一身份标识. 此处为了保证 id 的唯一性, 使用 UUID 来作为 message idprivateString messageId;// 是一个消息上带有的内容, 和 bindingKey 做匹配.// 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名.// 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用).// 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配. 符合要求的才能转发给对应队列.privateString routingKey;// 这个属性表示消息是否要持久化. 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样搞的....)privateint deliverMode =1;// 其实针对 RabbitMQ 来说, BasicProperties 里面还有很多别的属性. 其他的属性暂时先不考虑了.publicStringgetMessageId(){return messageId;}publicvoidsetMessageId(String messageId){this.messageId = messageId;}publicStringgetRoutingKey(){return routingKey;}publicvoidsetRoutingKey(String routingKey){this.routingKey = routingKey;}publicintgetDeliverMode(){return deliverMode;}publicvoidsetDeliverMode(int deliverMode){this.deliverMode = deliverMode;}@OverridepublicStringtoString(){return"BasicProperties{"+"messageId='"+ messageId +'\''+", routingKey='"+ routingKey +'\''+", deliverMode="+ deliverMode +'}';}}
六、数据库设计
对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库.
SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件.
1、配置 sqlite★
引⼊ pom.xml 依赖
<dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency>
配置数据源 application.yml
spring:datasource:url: jdbc:sqlite:./data/meta.db
username:password:driver-class-name: org.sqlite.JDBC
mybatis:
mapper-locations:classpath:mapper/**Mapper.xml
此处我们约定, 把数据库⽂件放到 ./data/meta.db 中.
SQLite 只是把数据单纯的存储到⼀个⽂件中. ⾮常简单⽅便
2、实现创建表 MetaMapper ★
@MapperpublicinterfaceMetaMapper{voidcreateUserTable();voidcreateExchangeTable();voidcreateQueueTable();voidcreateBindingTable();}
本⾝ MyBatis 针对 MySQL / Oracle ⽀持执⾏多个 SQL 语句的, 但是针对 SQLite 是不⽀持的, 只能写成多个⽅法
3、实现mapper方法 ★
<updateid="createExchangeTable">
create table if not exists exchange (
name varchar(50) primary key,
type int,
durable boolean,
autoDelete boolean,
arguments varchar(1024)
);
</update><updateid="createQueueTable">
create table if not exists queue (
name varchar(50) primary key,
durable boolean,
exclusive boolean,
autoDelete boolean,
arguments varchar(1024)
);
</update><updateid="createBindingTable">
create table if not exists binding (
exchangeName varchar(50),
queueName varchar(50),
bindingKey varchar(256)
);
</update>
实现数据库基本操作 ★
给 mapper.MetaMapper 中添加
1voidinsertExchange(Exchange exchange);2voiddeleteExchange(String exchangeName);3voidinsertQueue(MSGQueue msgQueue);4voiddeleteQueue(String queueName);5voidinsertBinding(Binding binding);6voiddeleteBinding(Binding binding);
给 MetaMapper 中添加
<insertid="insertExchange"parameterType="com.example.mq.mqserver.core.Exchange">
insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});
</insert><selectid="selectAllExchanges"resultType="com.example.mq.mqserver.core.Exchange">
select * from exchange;
</select><deleteid="deleteExchange"parameterType="java.lang.String">
delete from exchange where name = #{exchangeName};
</delete><insertid="insertQueue"parameterType="com.example.mq.mqserver.core.MSGQueue">
insert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});
</insert><selectid="selectAllQueues"resultType="com.example.mq.mqserver.core.MSGQueue">
select * from queue;
</select><deleteid="deleteQueue"parameterType="java.lang.String">
delete from queue where name = #{queueName};
</delete><insertid="insertBinding"parameterType="com.example.mq.mqserver.core.Binding">
insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});
</insert><selectid="selectAllBindings"resultType="com.example.mq.mqserver.core.Binding">
select * from binding;
</select><deleteid="deleteBinding"parameterType="com.example.mq.mqserver.core.Binding">
delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
</delete>
4. 实现 DataBaseManager ★
mqserver.datacenter.DataBaseManager
创建 DataBaseManager 类
属性:①metaMapper
方法:①init(在MqApplication.context里获取mapper的bean;如果数据库目录不存在,则创建数据库目录,创建数据表,插入默认数据)②删除数据库目录(删除db文件 删除目录)③检查数据库目录是否存在 ④创建数据库表方法 ⑤创建一个默认数据插入到数据库种 ⑥针对队列、交换机、绑定创建插入,查询,删除方法。
/*
* 通过这个类, 来整合上述的数据库操作.
*/publicclassDataBaseManager{// 要做的是从 Spring 中拿到现成的对象privateMetaMapper metaMapper;// 针对数据库进行初始化publicvoidinit(){// 手动的获取到 MetaMapper
metaMapper =MqApplication.context.getBean(MetaMapper.class);if(!checkDBExists()){// 数据库不存在, 就进行建建库表操作// 先创建一个 data 目录File dataDir =newFile("./data");
dataDir.mkdirs();// 创建数据表createTable();// 插入默认数据createDefaultData();System.out.println("[DataBaseManager] 数据库初始化完成!");}else{// 数据库已经存在了, 啥都不必做即可System.out.println("[DataBaseManager] 数据库已经存在!");}}publicvoiddeleteDB(){File file =newFile("./data/meta.db");boolean ret = file.delete();if(ret){System.out.println("[DataBaseManager] 删除数据库文件成功!");}else{System.out.println("[DataBaseManager] 删除数据库文件失败!");}File dataDir =newFile("./data");// 使用 delete 删除目录的时候, 需要保证目录是空的.
ret = dataDir.delete();if(ret){System.out.println("[DataBaseManager] 删除数据库目录成功!");}else{System.out.println("[DataBaseManager] 删除数据库目录失败!");}}privatebooleancheckDBExists(){File file =newFile("./data/meta.db");if(file.exists()){returntrue;}returnfalse;}// 这个方法用来建表.// 建库操作并不需要手动执行. (不需要手动创建 meta.db 文件)// 首次执行这里的数据库操作的时候, 就会自动的创建出 meta.db 文件来 (MyBatis 帮我们完成的)privatevoidcreateTable(){
metaMapper.createExchangeTable();
metaMapper.createQueueTable();
metaMapper.createBindingTable();System.out.println("[DataBaseManager] 创建表完成!");}// 给数据库表中, 添加默认的数据.// 此处主要是添加一个默认的交换机.// RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机, 类型是 DIRECT.privatevoidcreateDefaultData(){// 构造一个默认的交换机.Exchange exchange =newExchange();
exchange.setName("");
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
metaMapper.insertExchange(exchange);System.out.println("[DataBaseManager] 创建初始数据完成!");}// 把其他的数据库的操作, 也在这个类中封装一下.publicvoidinsertExchange(Exchange exchange){
metaMapper.insertExchange(exchange);}publicList<Exchange>selectAllExchanges(){return metaMapper.selectAllExchanges();}publicvoiddeleteExchange(String exchangeName){
metaMapper.deleteExchange(exchangeName);}publicvoidinsertQueue(MSGQueue queue){
metaMapper.insertQueue(queue);}publicList<MSGQueue>selectAllQueues(){return metaMapper.selectAllQueues();}publicvoiddeleteQueue(String queueName){
metaMapper.deleteQueue(queueName);}publicvoidinsertBinding(Binding binding){
metaMapper.insertBinding(binding);}publicList<Binding>selectAllBindings(){return metaMapper.selectAllBindings();}publicvoiddeleteBinding(Binding binding){
metaMapper.deleteBinding(binding);}}
5. 测试 DataBaseManager ★
使⽤ Spring ⾃带的单元测试, 针对上述代码进⾏测试验证.
在 test ⽬录中, 创建 DataBaseManagerTests
- 引入DataBaseManeager
- 在每个操作之前setUp:给MqApplication.context赋值;dataBaseManager.init。
- 在每个操作之后teraDown:1.关闭contenxt 2.dataBaseManager.close。
- 准备⼯作
// 加上这个注解之后, 改类就会被识别为单元测试类.@SpringBootTestpublicclassDataBaseManagerTests{privateDataBaseManager dataBaseManager =newDataBaseManager();// 接下来下面这里需要编写多个 方法 . 每个方法都是一个/一组单元测试用例.// 还需要做一个准备工作. 需要写两个方法, 分别用于进行 "准备工作" 和 "收尾工作"// 使用这个方法, 来执行准备工作. 每个用例执行前, 都要调用这个方法.@BeforeEachpublicvoidsetUp(){// 由于在 init 中, 需要通过 context 对象拿到 metaMapper 实例的.// 所以就需要先把 context 对象给搞出来.MqApplication.context =SpringApplication.run(MqApplication.class);
dataBaseManager.init();}// 使用这个方法, 来执行收尾工作. 每个用例执行后, 都要调用这个方法.@AfterEachpublicvoidtearDown(){// 这里要进行的操作, 就是把数据库给清空~~ (把数据库文件, meta.db 直接删了就行了)// 注意, 此处不能直接就删除, 而需要先关闭上述 context 对象!!// 此处的 context 对象, 持有了 MetaMapper 的实例, MetaMapper 实例又打开了 meta.db 数据库文件.// 如果 meta.db 被别人打开了, 此时的删除文件操作是不会成功的 (Windows 系统的限制, Linux 则没这个问题).// 另一方面, 获取 context 操作, 会占用 8080 端口. 此处的 close 也是释放 8080.MqApplication.context.close();
dataBaseManager.deleteDB();}}
在每个方法之前执行(把数据库初始化)
@BeforeEachpublicvoidsetUp(){// 由于在 init 中, 需要通过 context 对象拿到 metaMapper 实例的.// 所以就需要先把 context 对象给搞出来.MqApplication.context =SpringApplication.run(MqApplication.class);
dataBaseManager.init();}
在每个方法之后(删除数据库)
// 使用这个方法, 来执行收尾工作. 每个用例执行后, 都要调用这个方法.@AfterEachpublicvoidtearDown(){// 这里要进行的操作, 就是把数据库给清空~~ (把数据库文件, meta.db 直接删了就行了)// 注意, 此处不能直接就删除, 而需要先关闭上述 context 对象!!// 此处的 context 对象, 持有了 MetaMapper 的实例, MetaMapper 实例又打开了 meta.db 数据库文件.// 如果 meta.db 被别人打开了, 此时的删除文件操作是不会成功的 (Windows 系统的限制, Linux 则没这个问题).// 另一方面, 获取 context 操作, 会占用 8080 端口. 此处的 close 也是释放 8080.MqApplication.context.close();
dataBaseManager.deleteDB();}
测试表的创建(可以正常查询就是正常创建)
@TestpublicvoidtestInitTable(){// 由于 init 方法, 已经在上面 setUp 中调用过了. 直接在测试用例代码中, 检查当前的数据库状态即可.// 直接从数据库中查询. 看数据是否符合预期.// 查交换机表, 里面应该有一个数据(匿名的 exchange); 查队列表, 没有数据; 查绑定表, 没有数据.List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();List<MSGQueue> queueList = dataBaseManager.selectAllQueues();List<Binding> bindingList = dataBaseManager.selectAllBindings();// 直接打印结果, 通过肉眼来检查结果, 固然也可以. 但是不优雅, 不方便.// 更好的办法是使用断言.// System.out.println(exchangeList.size());// assertEquals 判定结果是不是相等.// 注意这俩参数的顺序. 虽然比较相等, 谁在前谁在后, 无所谓.// 但是 assertEquals 的形参, 第一个形参叫做 expected (预期的), 第二个形参叫做 actual (实际的)Assertions.assertEquals(1, exchangeList.size());Assertions.assertEquals("", exchangeList.get(0).getName());Assertions.assertEquals(ExchangeType.DIRECT, exchangeList.get(0).getType());Assertions.assertEquals(0, queueList.size());Assertions.assertEquals(0, bindingList.size());}
测试交换机的插入(创建一个交换机,查询)
privateExchangecreateTestExchange(String exchangeName){Exchange exchange =newExchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.FANOUT);
exchange.setAutoDelete(false);
exchange.setDurable(true);
exchange.setArguments("aaa",1);
exchange.setArguments("bbb",2);return exchange;}@TestpublicvoidtestInsertExchange(){// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.Exchange exchange =createTestExchange("testExchange");
dataBaseManager.insertExchange(exchange);// 插入完毕之后, 查询结果List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2, exchangeList.size());Exchange newExchange = exchangeList.get(1);Assertions.assertEquals("testExchange", newExchange.getName());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(false, newExchange.isAutoDelete());Assertions.assertEquals(true, newExchange.isDurable());Assertions.assertEquals(1, newExchange.getArguments("aaa"));Assertions.assertEquals(2, newExchange.getArguments("bbb"));}
创建交换机 createTestExchange
privateExchangecreateTestExchange(String exchangeName){Exchange exchange =newExchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.FANOUT);
exchange.setAutoDelete(false);
exchange.setDurable(true);
exchange.setArguments("aaa",1);
exchange.setArguments("bbb",2);return exchange;}
测试删除交换机(创建交换机、查询交换机、删除交互机、再查询)
@TestpublicvoidtestDeleteExchange(){// 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!Exchange exchange =createTestExchange("testExchange");
dataBaseManager.insertExchange(exchange);List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2, exchangeList.size());Assertions.assertEquals("testExchange", exchangeList.get(1).getName());// 进行删除操作
dataBaseManager.deleteExchange("testExchange");// 再次查询
exchangeList = dataBaseManager.selectAllExchanges();Assertions.assertEquals(1, exchangeList.size());Assertions.assertEquals("", exchangeList.get(0).getName());}
测试队列的插入和删除
privateMSGQueuecreateTestQueue(String queueName){MSGQueue queue =newMSGQueue();
queue.setName(queueName);
queue.setDurable(true);
queue.setAutoDelete(false);
queue.setExclusive(false);
queue.setArguments("aaa",1);
queue.setArguments("bbb",2);return queue;}@TestpublicvoidtestInsertQueue(){MSGQueue queue =createTestQueue("testQueue");
dataBaseManager.insertQueue(queue);List<MSGQueue> queueList = dataBaseManager.selectAllQueues();Assertions.assertEquals(1, queueList.size());MSGQueue newQueue = queueList.get(0);Assertions.assertEquals("testQueue", newQueue.getName());Assertions.assertEquals(true, newQueue.isDurable());Assertions.assertEquals(false, newQueue.isAutoDelete());Assertions.assertEquals(false, newQueue.isExclusive());Assertions.assertEquals(1, newQueue.getArguments("aaa"));Assertions.assertEquals(2, newQueue.getArguments("bbb"));}@TestpublicvoidtestDeleteQueue(){MSGQueue queue =createTestQueue("testQueue");
dataBaseManager.insertQueue(queue);List<MSGQueue> queueList = dataBaseManager.selectAllQueues();Assertions.assertEquals(1, queueList.size());// 进行删除
dataBaseManager.deleteQueue("testQueue");
queueList = dataBaseManager.selectAllQueues();Assertions.assertEquals(0, queueList.size());}
测试绑定的创建和删除
privateBindingcreateTestBinding(String exchangeName,String queueName){Binding binding =newBinding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey("testBindingKey");return binding;}@TestpublicvoidtestInsertBinding(){Binding binding =createTestBinding("testExchange","testQueue");
dataBaseManager.insertBinding(binding);List<Binding> bindingList = dataBaseManager.selectAllBindings();Assertions.assertEquals(1, bindingList.size());Assertions.assertEquals("testExchange", bindingList.get(0).getExchangeName());Assertions.assertEquals("testQueue", bindingList.get(0).getQueueName());Assertions.assertEquals("testBindingKey", bindingList.get(0).getBindingKey());}@TestpublicvoidtestDeleteBinding(){Binding binding =createTestBinding("testExchange","testQueue");
dataBaseManager.insertBinding(binding);List<Binding> bindingList = dataBaseManager.selectAllBindings();Assertions.assertEquals(1, bindingList.size());// 删除Binding toDeleteBinding =createTestBinding("testExchange","testQueue");
dataBaseManager.deleteBinding(toDeleteBinding);
bindingList = dataBaseManager.selectAllBindings();Assertions.assertEquals(0, bindingList.size());}
七. 消息存储设计
设计思路
1. 为什么要用文件存储
消息需要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储.
原因如下:
- 对于消息的操作并不需要复杂的 增删改查 .
- 对于⽂件的操作效率⽐数据库会⾼很多
主流 MQ 的实现(包括 RabbitMQ), 都是把消息存储在⽂件中, ⽽不是数据库中.
文件存储结构
我们给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue
该⽬录中包含两个固定名字的⽂件
• queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
• queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.
2. queue_data.txt ⽂件格式 ★
使⽤⼆进制⽅式存储.
每个消息分成两个部分:
• 前四个字节, 表⽰ Message 对象的⻓度(字节数)
• 后⾯若⼲字节, 表⽰ Message 内容
• 消息和消息之间⾸尾相连
每个 Message 基于 Java 标准库的ObjectInputStream/ ObjectOutputStream 序列化.
Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置
3. queue_stat.txt ⽂件格式: ★
使⽤⽂本⽅式存储.
⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数), 使⽤ \t 分割.
第⼀列表⽰当前总的消息数⽬. 第⼆列表⽰有效消息数⽬.
形如:
2000\t1500
4. 创建 MessageFileManager 类 ★
创建 mqserver.database.MessageFileManager
- stat类 + 读写stat(InputStream-Scanner OutputStreadm-PrintWriter)
- 获取数据目录方法;创建队列数据目录文件方法,并初始化stat文件数据 0 0。以及删除文件目录方法
- sendMessage,把消息写入队列中的方法:①拿到队列目录 ②上锁 ③序列化 ④message初始位置和起始位置 ⑤写入
- 删除message:①上锁 ②找到队列 ③randomaccessfile 读取文件 ④设置无效 ⑤写回文件
/*
* 通过这个类, 来针对硬盘上的消息进行管理
*/publicclassMessageFileManager{// 定义一个内部类, 来表示该队列的统计信息// 有限考虑使用 static, 静态内部类.staticpublicclassStat{// 此处直接定义成 public, 就不再搞 get set 方法了.// 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.publicint totalCount;// 总消息数量publicint validCount;// 有效消息数量}publicvoidinit(){// 暂时不需要做啥额外的初始化工作, 以备后续扩展}// 预定消息文件所在的目录和文件名// 这个方法, 用来获取到指定队列对应的消息文件所在路径privateStringgetQueueDir(String queueName){return"./data/"+ queueName;}// 这个方法用来获取该队列的消息数据文件路径// 注意, 二进制文件, 使用 txt 作为后缀, 不太合适. txt 一般表示文本. 此处咱们也就不改.// .bin / .datprivateStringgetQueueDataPath(String queueName){returngetQueueDir(queueName)+"/queue_data.txt";}// 这个方法用来获取该队列的消息统计文件路径privateStringgetQueueStatPath(String queueName){returngetQueueDir(queueName)+"/queue_stat.txt";}privateStatreadStat(String queueName){// 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容Stat stat =newStat();try(InputStream inputStream =newFileInputStream(getQueueStatPath(queueName))){Scanner scanner =newScanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();return stat;}catch(IOException e){
e.printStackTrace();}returnnull;}privatevoidwriteStat(String queueName,Stat stat){// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try(OutputStream outputStream =newFileOutputStream(getQueueStatPath(queueName))){PrintWriter printWriter =newPrintWriter(outputStream);
printWriter.write(stat.totalCount +"\t"+ stat.validCount);
printWriter.flush();}catch(IOException e){
e.printStackTrace();}}// 创建队列对应的文件和目录publicvoidcreateQueueFiles(String queueName)throwsIOException{// 1. 先创建队列对应的消息目录File baseDir =newFile(getQueueDir(queueName));if(!baseDir.exists()){// 不存在, 就创建这个目录boolean ok = baseDir.mkdirs();if(!ok){thrownewIOException("创建目录失败! baseDir="+ baseDir.getAbsolutePath());}}// 2. 创建队列数据文件File queueDataFile =newFile(getQueueDataPath(queueName));if(!queueDataFile.exists()){boolean ok = queueDataFile.createNewFile();if(!ok){thrownewIOException("创建文件失败! queueDataFile="+ queueDataFile.getAbsolutePath());}}// 3. 创建消息统计文件File queueStatFile =newFile(getQueueStatPath(queueName));if(!queueStatFile.exists()){boolean ok = queueStatFile.createNewFile();if(!ok){thrownewIOException("创建文件失败! queueStatFile="+ queueStatFile.getAbsolutePath());}}// 4. 给消息统计文件, 设定初始值. 0\t0Stat stat =newStat();
stat.totalCount =0;
stat.validCount =0;writeStat(queueName, stat);}// 删除队列的目录和文件.// 队列也是可以被删除的. 当队列删除之后, 对应的消息文件啥的, 自然也要随之删除.publicvoiddestroyQueueFiles(String queueName)throwsIOException{// 先删除里面的文件, 再删除目录.File queueDataFile =newFile(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile =newFile(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir =newFile(getQueueDir(queueName));boolean ok3 = baseDir.delete();if(!ok1 ||!ok2 ||!ok3){// 有任意一个删除失败, 都算整体删除失败.thrownewIOException("删除队列目录和文件失败! baseDir="+ baseDir.getAbsolutePath());}}// 检查队列的目录和文件是否存在.// 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)publicbooleancheckFilesExits(String queueName){// 判定队列的数据文件和统计文件是否都存在!!File queueDataFile =newFile(getQueueDataPath(queueName));if(!queueDataFile.exists()){returnfalse;}File queueStatFile =newFile(getQueueStatPath(queueName));if(!queueStatFile.exists()){returnfalse;}returntrue;}// 这个方法用来把一个新的消息, 放到队列对应的文件中.// queue 表示要把消息写入的队列. message 则是要写的消息.publicvoidsendMessage(MSGQueue queue,Message message)throwsMqException,IOException{// 1. 检查一下当前要写入的队列对应的文件是否存在.if(!checkFilesExits(queue.getName())){thrownewMqException("[MessageFileManager] 队列对应的文件不存在! queueName="+ queue.getName());}// 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.byte[] messageBinary =BinaryTool.toBytes(message);synchronized(queue){// 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4// offsetEnd 就是当前文件长度 + 4 + message 自身长度.File queueDataFile =newFile(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.
message.setOffsetBeg(queueDataFile.length()+4);
message.setOffsetEnd(queueDataFile.length()+4+ messageBinary.length);// 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾.try(OutputStream outputStream =newFileOutputStream(queueDataFile,true)){try(DataOutputStream dataOutputStream =newDataOutputStream(outputStream)){// 接下来要先写当前消息的长度, 占据 4 个字节的~~
dataOutputStream.writeInt(messageBinary.length);// 写入消息本体
dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat =readStat(queue.getName());
stat.totalCount +=1;
stat.validCount +=1;writeStat(queue.getName(), stat);}}// 这个是删除消息的方法.// 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置成 0// 1. 先把文件中的这一段数据, 读出来, 还原回 Message 对象;// 2. 把 isValid 改成 0;// 3. 把上述数据重新写回到文件.// 此处这个参数中的 message 对象, 必须得包含有效的 offsetBeg 和 offsetEndpublicvoiddeleteMessage(MSGQueue queue,Message message)throwsIOException,ClassNotFoundException{synchronized(queue){try(RandomAccessFile randomAccessFile =newRandomAccessFile(getQueueDataPath(queue.getName()),"rw")){// 1. 先从文件中读取对应的 Message 数据.byte[] bufferSrc =newbyte[(int)(message.getOffsetEnd()- message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);// 2. 把当前读出来的二进制数据, 转换回成 Message 对象Message diskMessage =(Message)BinaryTool.fromBytes(bufferSrc);// 3. 把 isValid 设置为无效.
diskMessage.setIsValid((byte)0x0);// 此处不需要给参数的这个 message 的 isValid 设为 0, 因为这个参数代表的是内存中管理的 Message 对象// 而这个对象马上也要被从内存中销毁了.// 4. 重新写入文件byte[] bufferDest =BinaryTool.toBytes(diskMessage);// 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到// 下一个消息的位置了. 因此要想让接下来的写入, 能够刚好写回到之前的位置, 就需要重新调整文件光标.
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);// 通过上述这通折腾, 对于文件来说, 只是有一个字节发生改变而已了~~}// 不要忘了, 更新统计文件!! 把一个消息设为无效了, 此时有效消息个数就需要 - 1Stat stat =readStat(queue.getName());if(stat.validCount >0){
stat.validCount -=1;}writeStat(queue.getName(), stat);}}// 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动的时候, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.publicLinkedList<Message>loadAllMessageFromQueue(String queueName)throwsIOException,MqException,ClassNotFoundException{LinkedList<Message> messages =newLinkedList<>();try(InputStream inputStream =newFileInputStream(getQueueDataPath(queueName))){try(DataInputStream dataInputStream =newDataInputStream(inputStream)){// 这个变量记录当前文件光标.long currentOffset =0;// 一个文件中包含了很多消息, 此处势必要循环读取.while(true){// 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)// readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取消息内容byte[] buffer =newbyte[messageSize];int actualSize = dataInputStream.read(buffer);if(messageSize != actualSize){// 如果不匹配, 说明文件有问题, 格式错乱了!!thrownewMqException("[MessageFileManager] 文件格式错误! queueName="+ queueName);}// 3. 把这个读到的二进制数据, 反序列化回 Message 对象Message message =(Message)BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象, 是不是无效对象.if(message.getIsValid()!=0x1){// 无效数据, 直接跳过.// 虽然消息是无效数据, 但是 offset 不要忘记更新.
currentOffset +=(4+ messageSize);continue;}// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd// 进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置// 因此就需要手动计算下文件光标.
message.setOffsetBeg(currentOffset +4);
message.setOffsetEnd(currentOffset +4+ messageSize);
currentOffset +=(4+ messageSize);
messages.add(message);}}catch(EOFException e){// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}// 检查当前是否要针对该队列的消息数据文件进行 GCpublicbooleancheckGC(String queueName){// 判定是否要 GC, 是根据总消息数和有效消息数. 这两个值都是在 消息统计文件 中的.Stat stat =readStat(queueName);if(stat.totalCount >2000&&(double)stat.validCount /(double)stat.totalCount <0.5){returntrue;}returnfalse;}privateStringgetQueueDataNewPath(String queueName){returngetQueueDir(queueName)+"/queue_data_new.txt";}// 通过这个方法, 真正执行消息数据文件的垃圾回收操作.// 使用复制算法来完成.// 创建一个新的文件, 名字就是 queue_data_new.txt// 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.// 删除旧的文件, 再把新的文件改名回 queue_data.txt// 同时要记得更新消息统计文件.publicvoidgc(MSGQueue queue)throwsMqException,IOException,ClassNotFoundException{// 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.synchronized(queue){// 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.long gcBeg =System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile =newFile(getQueueDataNewPath(queue.getName()));if(queueDataNewFile.exists()){// 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.thrownewMqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName="+ queue.getName());}boolean ok = queueDataNewFile.createNewFile();if(!ok){thrownewMqException("[MessageFileManager] 创建文件失败! queueDataNewFile="+ queueDataNewFile.getAbsolutePath());}// 2. 从旧的文件中, 读取出所有的有效消息对象了. (这个逻辑直接调用上述方法即可, 不必重新写了)LinkedList<Message> messages =loadAllMessageFromQueue(queue.getName());// 3. 把有效消息, 写入到新的文件中.try(OutputStream outputStream =newFileOutputStream(queueDataNewFile)){try(DataOutputStream dataOutputStream =newDataOutputStream(outputStream)){for(Message message : messages){byte[] buffer =BinaryTool.toBytes(message);// 先写四个字节消息的长度
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件, 并且把新的文件进行重命名File queueDataOldFile =newFile(getQueueDataPath(queue.getName()));
ok = queueDataOldFile.delete();if(!ok){thrownewMqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile="+ queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt => queue_data.txt
ok = queueDataNewFile.renameTo(queueDataOldFile);if(!ok){thrownewMqException("[MessageFileManager] 文件重命名失败! queueDataNewFile="+ queueDataNewFile.getAbsolutePath()+", queueDataOldFile="+ queueDataOldFile.getAbsolutePath());}// 5. 更新统计文件Stat stat =readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();writeStat(queue.getName(), stat);long gcEnd =System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName="+ queue.getName()+", time="+(gcEnd - gcBeg)+"ms");}}}
4.1 创建 common.BinaryTool
ByteArrayOutputStream-ObjectOutputStream
ByteArrayInputStream-ObjectInputStream
// 下列的逻辑, 并不仅仅是 Message, 其他的 Java 中的对象, 也是可以通过这样的逻辑进行序列化和反序列化的.// 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口.publicclassBinaryTool{// 把一个对象序列化成一个字节数组publicstaticbyte[]toBytes(Object object)throwsIOException{// 这个流对象相当于一个变长的字节数组.// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]try(ByteArrayOutputStream byteArrayOutputStream =newByteArrayOutputStream()){try(ObjectOutputStream objectOutputStream =newObjectOutputStream(byteArrayOutputStream)){// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到// ObjectOutputStream 中.// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了
objectOutputStream.writeObject(object);}// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[]return byteArrayOutputStream.toByteArray();}}// 把一个字节数组, 反序列化成一个对象publicstaticObjectfromBytes(byte[] data)throwsIOException,ClassNotFoundException{Object object =null;try(ByteArrayInputStream byteArrayInputStream =newByteArrayInputStream(data)){try(ObjectInputStream objectInputStream =newObjectInputStream(byteArrayInputStream)){// 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
object = objectInputStream.readObject();}}return object;}}
• 使⽤ ByteArrayInputStream / ByteArrayOutputStream 针对 byte[] 进⾏封装, ⽅便后续操作. (这
两个流对象是纯内存的, 不需要进⾏ close).
• 使⽤ ObjectInputStream / ObjectOutputStream 进⾏序列化 / 反序列化操作. 通过内部的
readObject / writeObject 即可完成对应操作.
• 此处涉及到的序列化对象, 需要实现 Serializable 接⼝. 这⼀点咱们的 Message 对象已经实现过了.
对于 serialVersionUID , 此处咱们暂时不需要. ⼤家可以⾃⾏了解 serialVersionUID 的⽤途
实现写⼊消息⽂件【信息写入数据文件】
- 检查消息文件是否存在
- 把Message对象转成二进制数据
- 队列上锁
- 获取数据文件,根据数据文件长度设置beg,end
- 写入文件
- 更新stat
// 这个方法用来把一个新的消息, 放到队列对应的文件中.// queue 表示要把消息写入的队列. message 则是要写的消息.publicvoidsendMessage(MSGQueue queue,Message message)throwsMqException,IOException{// 1. 检查一下当前要写入的队列对应的文件是否存在.if(!checkFilesExits(queue.getName())){thrownewMqException("[MessageFileManager] 队列对应的文件不存在! queueName="+ queue.getName());}// 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.byte[] messageBinary =BinaryTool.toBytes(message);synchronized(queue){// 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4// offsetEnd 就是当前文件长度 + 4 + message 自身长度.File queueDataFile =newFile(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.
message.setOffsetBeg(queueDataFile.length()+4);
message.setOffsetEnd(queueDataFile.length()+4+ messageBinary.length);// 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾.try(OutputStream outputStream =newFileOutputStream(queueDataFile,true)){try(DataOutputStream dataOutputStream =newDataOutputStream(outputStream)){// 接下来要先写当前消息的长度, 占据 4 个字节的~~
dataOutputStream.writeInt(messageBinary.length);// 写入消息本体
dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat =readStat(queue.getName());
stat.totalCount +=1;
stat.validCount +=1;writeStat(queue.getName(), stat);}}
- 虑线程安全, 按照队列维度进⾏加锁.
- 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
- 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息. offsetBeg是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息⼤⼩的空间. (参考上⾯的图).
- 写完消息, 要同时更新统计信息.
创建 common.MqException , 作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常
实践中创建多个异常类, 分别表⽰不同异常种类是更好的做法. 此处我们只是偷懒了
publicclassMqExceptionextendsException{publicMqException(String reason){super(reason);}}
实现删除消息
根据beg 找到消息
此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.
这样删除速度⽐较快. 实际的彻底删除, 则通过我们⾃⼰实现的 GC 来解决
// 这个是删除消息的方法.// 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置成 0// 1. 先把文件中的这一段数据, 读出来, 还原回 Message 对象;// 2. 把 isValid 改成 0;// 3. 把上述数据重新写回到文件.// 此处这个参数中的 message 对象, 必须得包含有效的 offsetBeg 和 offsetEndpublicvoiddeleteMessage(MSGQueue queue,Message message)throwsIOException,ClassNotFoundException{synchronized(queue){try(RandomAccessFile randomAccessFile =newRandomAccessFile(getQueueDataPath(queue.getName()),"rw")){// 1. 先从文件中读取对应的 Message 数据.byte[] bufferSrc =newbyte[(int)(message.getOffsetEnd()- message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);// 2. 把当前读出来的二进制数据, 转换回成 Message 对象Message diskMessage =(Message)BinaryTool.fromBytes(bufferSrc);// 3. 把 isValid 设置为无效.
diskMessage.setIsValid((byte)0x0);// 此处不需要给参数的这个 message 的 isValid 设为 0, 因为这个参数代表的是内存中管理的 Message 对象// 而这个对象马上也要被从内存中销毁了.// 4. 重新写入文件byte[] bufferDest =BinaryTool.toBytes(diskMessage);// 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到// 下一个消息的位置了. 因此要想让接下来的写入, 能够刚好写回到之前的位置, 就需要重新调整文件光标.
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);// 通过上述这通折腾, 对于文件来说, 只是有一个字节发生改变而已了~~}// 不要忘了, 更新统计文件!! 把一个消息设为无效了, 此时有效消息个数就需要 - 1Stat stat =readStat(queue.getName());if(stat.validCount >0){
stat.validCount -=1;}writeStat(queue.getName(), stat);}}
• 使⽤ RandomAccessFile 来随机访问到⽂件的内容.
• 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在⽂件中的位置. 通过randomAccessFile.seek 操作⽂件指针偏移过去. 再读取.
• 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回⽂件. 注意写的时候要重新设定⽂件指针的位置. ⽂件指针会随着上述的读操作产⽣改变.
• 最后, 要记得更新统计⽂件, 把合法消息 - 1.
实现消息加载
把消息内容从⽂件加载到内存中. 这个功能在服务器重启, 和垃圾回收的时候都很关键
// 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动的时候, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.publicLinkedList<Message>loadAllMessageFromQueue(String queueName)throwsIOException,MqException,ClassNotFoundException{LinkedList<Message> messages =newLinkedList<>();try(InputStream inputStream =newFileInputStream(getQueueDataPath(queueName))){try(DataInputStream dataInputStream =newDataInputStream(inputStream)){// 这个变量记录当前文件光标.long currentOffset =0;// 一个文件中包含了很多消息, 此处势必要循环读取.while(true){// 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)// readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取消息内容byte[] buffer =newbyte[messageSize];int actualSize = dataInputStream.read(buffer);if(messageSize != actualSize){// 如果不匹配, 说明文件有问题, 格式错乱了!!thrownewMqException("[MessageFileManager] 文件格式错误! queueName="+ queueName);}// 3. 把这个读到的二进制数据, 反序列化回 Message 对象Message message =(Message)BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象, 是不是无效对象.if(message.getIsValid()!=0x1){// 无效数据, 直接跳过.// 虽然消息是无效数据, 但是 offset 不要忘记更新.
currentOffset +=(4+ messageSize);continue;}// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd// 进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置// 因此就需要手动计算下文件光标.
message.setOffsetBeg(currentOffset +4);
message.setOffsetEnd(currentOffset +4+ messageSize);
currentOffset +=(4+ messageSize);
messages.add(message);}}catch(EOFException e){// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}
• 使⽤ DataInputStream 读取数据. 先读 4 个字节为消息的⻓度, 然后再按照这个⻓度来读取实际消
息内容.
• 读取完毕之后, 转换成 Message 对象.
• 同时计算出该对象的 offsetBeg 和 offsetEnd.
• 最终把结果整理成链表, 返回出去.
• 注意, 对于 DataInputStream 来说, 如果读取到 EOF, 会抛出⼀个 EOFException , ⽽不是返回特定
值. 因此需要注意上述循环的结束条件.
实现垃圾回收(GC)
上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越
多. 因此需要定期的进⾏批量清除.
此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC.
GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.
// 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动的时候, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.publicLinkedList<Message>loadAllMessageFromQueue(String queueName)throwsIOException,MqException,ClassNotFoundException{LinkedList<Message> messages =newLinkedList<>();try(InputStream inputStream =newFileInputStream(getQueueDataPath(queueName))){try(DataInputStream dataInputStream =newDataInputStream(inputStream)){// 这个变量记录当前文件光标.long currentOffset =0;// 一个文件中包含了很多消息, 此处势必要循环读取.while(true){// 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)// readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取消息内容byte[] buffer =newbyte[messageSize];int actualSize = dataInputStream.read(buffer);if(messageSize != actualSize){// 如果不匹配, 说明文件有问题, 格式错乱了!!thrownewMqException("[MessageFileManager] 文件格式错误! queueName="+ queueName);}// 3. 把这个读到的二进制数据, 反序列化回 Message 对象Message message =(Message)BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象, 是不是无效对象.if(message.getIsValid()!=0x1){// 无效数据, 直接跳过.// 虽然消息是无效数据, 但是 offset 不要忘记更新.
currentOffset +=(4+ messageSize);continue;}// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd// 进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置// 因此就需要手动计算下文件光标.
message.setOffsetBeg(currentOffset +4);
message.setOffsetEnd(currentOffset +4+ messageSize);
currentOffset +=(4+ messageSize);
messages.add(message);}}catch(EOFException e){// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}// 检查当前是否要针对该队列的消息数据文件进行 GCpublicbooleancheckGC(String queueName){// 判定是否要 GC, 是根据总消息数和有效消息数. 这两个值都是在 消息统计文件 中的.Stat stat =readStat(queueName);if(stat.totalCount >2000&&(double)stat.validCount /(double)stat.totalCount <0.5){returntrue;}returnfalse;}privateStringgetQueueDataNewPath(String queueName){returngetQueueDir(queueName)+"/queue_data_new.txt";}// 通过这个方法, 真正执行消息数据文件的垃圾回收操作.// 使用复制算法来完成.// 创建一个新的文件, 名字就是 queue_data_new.txt// 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.// 删除旧的文件, 再把新的文件改名回 queue_data.txt// 同时要记得更新消息统计文件.publicvoidgc(MSGQueue queue)throwsMqException,IOException,ClassNotFoundException{// 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.synchronized(queue){// 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.long gcBeg =System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile =newFile(getQueueDataNewPath(queue.getName()));if(queueDataNewFile.exists()){// 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.thrownewMqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName="+ queue.getName());}boolean ok = queueDataNewFile.createNewFile();if(!ok){thrownewMqException("[MessageFileManager] 创建文件失败! queueDataNewFile="+ queueDataNewFile.getAbsolutePath());}// 2. 从旧的文件中, 读取出所有的有效消息对象了. (这个逻辑直接调用上述方法即可, 不必重新写了)LinkedList<Message> messages =loadAllMessageFromQueue(queue.getName());// 3. 把有效消息, 写入到新的文件中.try(OutputStream outputStream =newFileOutputStream(queueDataNewFile)){try(DataOutputStream dataOutputStream =newDataOutputStream(outputStream)){for(Message message : messages){byte[] buffer =BinaryTool.toBytes(message);// 先写四个字节消息的长度
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件, 并且把新的文件进行重命名File queueDataOldFile =newFile(getQueueDataPath(queue.getName()));
ok = queueDataOldFile.delete();if(!ok){thrownewMqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile="+ queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt => queue_data.txt
ok = queueDataNewFile.renameTo(queueDataOldFile);if(!ok){thrownewMqException("[MessageFileManager] 文件重命名失败! queueDataNewFile="+ queueDataNewFile.getAbsolutePath()+", queueDataOldFile="+ queueDataOldFile.getAbsolutePath());}// 5. 更新统计文件Stat stat =readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();writeStat(queue.getName(), stat);long gcEnd =System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName="+ queue.getName()+", time="+(gcEnd - gcBeg)+"ms");}}
如果⽂件很⼤, 消息⾮常多, 可能⽐较低效, 这种就需要把⽂件做拆分和合并了.
Rabbitmq 本体是这样实现的. 但是咱们此处为了实现简单, 就不做这个了.
5. 测试 MessageFileManager ★
创建 MessageFileManagerTests 编写测试⽤例代码.
• 创建两个队列, ⽤来辅助测试.
• 使⽤ ReflectionTestUtils.invokeMethod 来调⽤私有⽅法.
@SpringBootTestpublicclassMessageFileManagerTests{privateMessageFileManager messageFileManager =newMessageFileManager();privatestaticfinalString queueName1 ="testQueue1";privatestaticfinalString queueName2 ="testQueue2";// 这个方法是每个用例执行之前的准备工作@BeforeEachpublicvoidsetUp()throwsIOException{// 准备阶段, 创建出两个队列, 以备后用
messageFileManager.createQueueFiles(queueName1);
messageFileManager.createQueueFiles(queueName2);}// 这个方法就是每个用例执行完毕之后的收尾工作@AfterEachpublicvoidtearDown()throwsIOException{// 收尾阶段, 就把刚才的队列给干掉.
messageFileManager.destroyQueueFiles(queueName1);
messageFileManager.destroyQueueFiles(queueName2);}@TestpublicvoidtestCreateFiles(){// 创建队列文件已经在上面 setUp 阶段执行过了. 此处主要是验证看看文件是否存在.File queueDataFile1 =newFile("./data/"+ queueName1 +"/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 =newFile("./data/"+ queueName1 +"/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 =newFile("./data/"+ queueName2 +"/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 =newFile("./data/"+ queueName2 +"/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@TestpublicvoidtestReadWriteStat(){MessageFileManager.Stat stat =newMessageFileManager.Stat();
stat.totalCount =100;
stat.validCount =50;// 此处就需要使用反射的方式, 来调用 writeStat 和 readStat 了.// Java 原生的反射 API 其实非常难用~~// 此处使用 Spring 帮我们封装好的 反射 的工具类.ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat", queueName1, stat);// 写入完毕之后, 再调用一下读取, 验证读取的结果和写入的数据是一致的.MessageFileManager.Stat newStat =ReflectionTestUtils.invokeMethod(messageFileManager,"readStat", queueName1);Assertions.assertEquals(100, newStat.totalCount);Assertions.assertEquals(50, newStat.validCount);System.out.println("测试 readStat 和 writeStat 完成!");}privateMSGQueuecreateTestQueue(String queueName){MSGQueue queue =newMSGQueue();
queue.setName(queueName);
queue.setDurable(true);
queue.setAutoDelete(false);
queue.setExclusive(false);return queue;}privateMessagecreateTestMessage(String content){Message message =Message.createMessageWithId("testRoutingKey",null, content.getBytes());return message;}@TestpublicvoidtestSendMessage()throwsIOException,MqException,ClassNotFoundException{// 构造出消息, 并且构造出队列.Message message =createTestMessage("testMessage");// 此处创建的 queue 对象的 name, 不能随便写, 只能用 queueName1 和 queueName2. 需要保证这个队列对象// 对应的目录和文件啥的都存在才行.MSGQueue queue =createTestQueue(queueName1);// 调用发送消息方法
messageFileManager.sendMessage(queue, message);// 检查 stat 文件.MessageFileManager.Stat stat =ReflectionTestUtils.invokeMethod(messageFileManager,"readStat", queueName1);Assertions.assertEquals(1, stat.totalCount);Assertions.assertEquals(1, stat.validCount);// 检查 data 文件LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(1, messages.size());Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比较两个字节数组的内容是否相同, 不能直接使用 assertEquals 了.Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());System.out.println("message: "+ curMessage);}@TestpublicvoidtestLoadAllMessageFromQueue()throwsIOException,MqException,ClassNotFoundException{// 往队列中插入 100 条消息, 然后验证看看这 100 条消息从文件中读取之后, 是否和最初是一致的.MSGQueue queue =createTestQueue(queueName1);List<Message> expectedMessages =newLinkedList<>();for(int i =0; i <100; i++){Message message =createTestMessage("testMessage"+ i);
messageFileManager.sendMessage(queue, message);
expectedMessages.add(message);}// 读取所有消息LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for(int i =0; i < expectedMessages.size(); i++){Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("["+ i +"] actualMessage="+ actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@TestpublicvoidtestDeleteMessage()throwsIOException,MqException,ClassNotFoundException{// 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.MSGQueue queue =createTestQueue(queueName1);List<Message> expectedMessages =newLinkedList<>();for(int i =0; i <10; i++){Message message =createTestMessage("testMessage"+ i);
messageFileManager.sendMessage(queue, message);
expectedMessages.add(message);}// 删除其中的三个消息
messageFileManager.deleteMessage(queue, expectedMessages.get(7));
messageFileManager.deleteMessage(queue, expectedMessages.get(8));
messageFileManager.deleteMessage(queue, expectedMessages.get(9));// 对比这里的内容是否正确.LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for(int i =0; i < actualMessages.size(); i++){Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("["+ i +"] actualMessage="+ actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@TestpublicvoidtestGC()throwsIOException,MqException,ClassNotFoundException{// 先往队列中写 100 个消息. 获取到文件大小.// 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)// 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.MSGQueue queue =createTestQueue(queueName1);List<Message> expectedMessages =newLinkedList<>();for(int i =0; i <100; i++){Message message =createTestMessage("testMessage"+ i);
messageFileManager.sendMessage(queue, message);
expectedMessages.add(message);}// 获取 gc 前的文件大小File beforeGCFile =newFile("./data/"+ queueName1 +"/queue_data.txt");long beforeGCLength = beforeGCFile.length();// 删除偶数下标的消息for(int i =0; i <100; i +=2){
messageFileManager.deleteMessage(queue, expectedMessages.get(i));}// 手动调用 gc
messageFileManager.gc(queue);// 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50, actualMessages.size());for(int i =0; i < actualMessages.size(); i++){// 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.// actual 中的 0 对应 expected 的 1// actual 中的 1 对应 expected 的 3// actual 中的 2 对应 expected 的 5// actual 中的 i 对应 expected 的 2 * i + 1Message expectedMessage = expectedMessages.get(2* i +1);Message actualMessage = actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}// 获取新的文件的大小File afterGCFile =newFile("./data/"+ queueName1 +"/queue_data.txt");long afterGCLength = afterGCFile.length();System.out.println("before: "+ beforeGCLength);System.out.println("after: "+ afterGCLength);Assertions.assertTrue(beforeGCLength > afterGCLength);}}
⼋. 整合数据库和⽂件
上述代码中, 使⽤数据库存储了 Exchange, Queue, Binding, 使⽤⽂本⽂件存储了 Message.
接下来我们把两个部分整合起来, 统⼀进⾏管理.
1. 创建 DiskDataCenter
使⽤ DiskDataCenter 来综合管理数据库和⽂本⽂件的内容.
DiskDataCenter 会持有 DataBaseManager 和 MessageFileManager 对象
publicclassDiskDataCenter{// 这个实例用来管理数据库中的数据privateDataBaseManager dataBaseManager =newDataBaseManager();// 这个实例用来管理数据文件中的数据privateMessageFileManager messageFileManager =newMessageFileManager();publicvoidinit(){// 针对上述两个实例进行初始化.
dataBaseManager.init();// 当前 messageFileManager.init 是空的方法, 只是先列在这里, 一旦后续需要扩展, 就在这里进行初始化即可.
messageFileManager.init();}
实现 initDir【如何是有虚拟主机逻辑,这样写】
封装 Exchange ⽅法
// 封装交换机操作publicvoidinsertExchange(Exchange exchange){
dataBaseManager.insertExchange(exchange);}publicvoiddeleteExchange(String exchangeName){
dataBaseManager.deleteExchange(exchangeName);}publicList<Exchange>selectAllExchanges(){return dataBaseManager.selectAllExchanges();}
封装 Queue ⽅法
// 封装队列操作publicvoidinsertQueue(MSGQueue queue)throwsIOException{
dataBaseManager.insertQueue(queue);// 创建队列的同时, 不仅仅是把队列对象写到数据库中, 还需要创建出对应的目录和文件
messageFileManager.createQueueFiles(queue.getName());}publicvoiddeleteQueue(String queueName)throwsIOException{
dataBaseManager.deleteQueue(queueName);// 删除队列的同时, 不仅仅是把队列从数据库中删除, 还需要删除对应的目录和文件
messageFileManager.destroyQueueFiles(queueName);}publicList<MSGQueue>selectAllQueues(){return dataBaseManager.selectAllQueues();}
创建/删除队列的时候同时创建/删除队列⽬录.
封装 Binding ⽅法
// 封装绑定操作publicvoidinsertBinding(Binding binding){
dataBaseManager.insertBinding(binding);}publicvoiddeleteBinding(Binding binding){
dataBaseManager.deleteBinding(binding);}publicList<Binding>selectAllBindings(){return dataBaseManager.selectAllBindings();}
封装 Message ⽅法
在 deleteMessage 的时候判定是否进⾏ GC
// 封装消息操作publicvoidsendMessage(MSGQueue queue,Message message)throwsIOException,MqException{
messageFileManager.sendMessage(queue, message);}publicvoiddeleteMessage(MSGQueue queue,Message message)throwsIOException,ClassNotFoundException,MqException{
messageFileManager.deleteMessage(queue, message);if(messageFileManager.checkGC(queue.getName())){
messageFileManager.gc(queue);}}publicLinkedList<Message>loadAllMessageFromQueue(String queueName)throwsIOException,MqException,ClassNotFoundException{return messageFileManager.loadAllMessageFromQueue(queueName);}
⼩结
通过上述封装, 把数据库和硬盘⽂件两部分合并成⼀个整体. 上层代码在调⽤的时候则不再关⼼该数据是存储在哪个部分的.
九. 内存数据结构设计
硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构.
对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发
1. 创建 MemoryDataCenter
1.交换机 2.队列 3.绑定 4.消息 5.待确认消息 6.队列所对应的消息
创建 mqserver.datacenter.MemoryDataCenter
publicclassMemoryDataCenter{// key 是 exchangeName, value 是 Exchange 对象privateConcurrentHashMap<String,Exchange> exchangeMap =newConcurrentHashMap<>();// key 是 queueName, value 是 MSGQueue 对象privateConcurrentHashMap<String,MSGQueue> queueMap =newConcurrentHashMap<>();// 第一个 key 是 exchangeName, 第二个 key 是 queueNameprivateConcurrentHashMap<String,ConcurrentHashMap<String,Binding>> bindingsMap =newConcurrentHashMap<>();// key 是 messageId, value 是 Message 对象privateConcurrentHashMap<String,Message> messageMap =newConcurrentHashMap<>();// key 是 queueName, value 是一个 Message 的链表privateConcurrentHashMap<String,LinkedList<Message>> queueMessageMap =newConcurrentHashMap<>();// 第一个 key 是 queueName, 第二个 key 是 messageIdprivateConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap =newConcurrentHashMap<>();
• 使⽤四个哈希表, 管理 Exchange, Queue, Binding, Message.
• 使⽤⼀个哈希表 + 链表管理 队列 -> 消息 之间的关系.
• 使⽤⼀个哈希表 + 哈希表管理所有的未被确认的消息.
为了保证消息被正确消费了, 会使⽤两种⽅式进⾏确认. ⾃动 ACK 和 ⼿动 ACK.
其中⾃动 ACK 是指当消息被消费之后, 就会⽴即被销毁释放.
其中⼿动 ACK 是指当消息被消费之后, 由消费者主动调⽤⼀个 basicAck ⽅法, 进⾏主动确认. 服务器
收到这个确认之后, 才能真正销毁消息.
此处的 “未确认消息” 就是指在⼿动 ACK 模式下, 该消息还没有被调⽤ basicAck. 此时消息不能删除,
但是要和其他未消费的消息区分开. 于是另搞了个结构.
当后续 basicAck 到了, 就可以删除消息了
2. 封装 Exchange ⽅法
publicvoidinsertExchange(Exchange exchange){
exchangeMap.put(exchange.getName(), exchange);System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName="+ exchange.getName());}publicExchangegetExchange(String exchangeName){return exchangeMap.get(exchangeName);}publicvoiddeleteExchange(String exchangeName){
exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName="+ exchangeName);}
3. 封装 Queue ⽅法
publicvoidinsertQueue(MSGQueue queue){
queueMap.put(queue.getName(), queue);System.out.println("[MemoryDataCenter] 新队列添加成功! queueName="+ queue.getName());}publicMSGQueuegetQueue(String queueName){return queueMap.get(queueName);}publicvoiddeleteQueue(String queueName){
queueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功! queueName="+ queueName);}
4. 封装 Binding ⽅法
publicvoidinsertBinding(Binding binding)throwsMqException{// ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());// if (bindingMap == null) {// bindingMap = new ConcurrentHashMap<>();// bindingsMap.put(binding.getExchangeName(), bindingMap);// }// 先使用 exchangeName 查一下, 对应的哈希表是否存在. 不存在就创建一个.ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k ->newConcurrentHashMap<>());synchronized(bindingMap){// 再根据 queueName 查一下. 如果已经存在, 就抛出异常. 不存在才能插入.if(bindingMap.get(binding.getQueueName())!=null){thrownewMqException("[MemoryDataCenter] 绑定已经存在! exchangeName="+ binding.getExchangeName()+", queueName="+ binding.getQueueName());}
bindingMap.put(binding.getQueueName(), binding);}System.out.println("[MemoryDataCenter] 新绑定添加成功! exchangeName="+ binding.getExchangeName()+", queueName="+ binding.getQueueName());}// 获取绑定, 写两个版本:// 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding// 2. 根据 exchangeName 获取到所有的 BindingpublicBindinggetBinding(String exchangeName,String queueName){ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);if(bindingMap ==null){returnnull;}return bindingMap.get(queueName);}publicConcurrentHashMap<String,Binding>getBindings(String exchangeName){return bindingsMap.get(exchangeName);}publicvoiddeleteBinding(Binding binding)throwsMqException{ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap ==null){// 该交换机没有绑定任何队列. 报错.thrownewMqException("[MemoryDataCenter] 绑定不存在! exchangeName="+ binding.getExchangeName()+", queueName="+ binding.getQueueName());}
bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功! exchangeName="+ binding.getExchangeName()+", queueName="+ binding.getQueueName());}
5. 封装 Message ⽅法
publicvoidaddMessage(Message message){
messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 新消息添加成功! messageId="+ message.getMessageId());}// 根据 id 查询消息publicMessagegetMessage(String messageId){return messageMap.get(messageId);}// 根据 id 删除消息publicvoidremoveMessage(String messageId){
messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息被移除! messageId="+ messageId);}// 发送消息到指定队列publicvoidsendMessage(MSGQueue queue,Message message){// 把消息放到对应的队列数据结构中.// 先根据队列的名字, 找到该队列对应的消息链表.LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k ->newLinkedList<>());// 再把数据加到 messages 里面synchronized(messages){
messages.add(message);}// 在这里把该消息也往消息中心中插入一下. 假设如果 message 已经在消息中心存在, 重复插入也没关系.// 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器代码不会对 Message 内容做修改 basicProperties 和 body)addMessage(message);System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId="+ message.getMessageId());}// 从队列中取消息publicMessagepollMessage(String queueName){// 根据队列名, 查找一下, 对应的队列的消息链表.LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages ==null){returnnull;}synchronized(messages){// 如果没找到, 说明队列中没有任何消息.if(messages.size()==0){returnnull;}// 链表中有元素, 就进行头删.Message currentMessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId="+ currentMessage.getMessageId());return currentMessage;}}// 获取指定队列中消息的个数publicintgetMessageCount(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages ==null){// 队列中没有消息return0;}synchronized(messages){return messages.size();}}// 添加未确认的消息publicvoidaddMessageWaitAck(String queueName,Message message){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
k ->newConcurrentHashMap<>());
messageHashMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId="+ message.getMessageId());}// 删除未确认的消息(消息已经确认了)publicvoidremoveMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap ==null){return;}
messageHashMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId="+ messageId);}// 获取指定的未确认的消息publicMessagegetMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap ==null){returnnull;}return messageHashMap.get(messageId);}
6. 针对未确认的消息的处理
// 添加未确认的消息publicvoidaddMessageWaitAck(String queueName,Message message){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
k ->newConcurrentHashMap<>());
messageHashMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId="+ message.getMessageId());}// 删除未确认的消息(消息已经确认了)publicvoidremoveMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap ==null){return;}
messageHashMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId="+ messageId);}// 获取指定的未确认的消息publicMessagegetMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap ==null){returnnull;}return messageHashMap.get(messageId);}
7. 实现重启后恢复内存
// 这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中.publicvoidrecovery(DiskDataCenter diskDataCenter)throwsIOException,MqException,ClassNotFoundException{// 0. 清空之前的所有数据
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageMap.clear();// 1. 恢复所有的交换机数据List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for(Exchange exchange : exchanges){
exchangeMap.put(exchange.getName(), exchange);}// 2. 恢复所有的队列数据List<MSGQueue> queues = diskDataCenter.selectAllQueues();for(MSGQueue queue : queues){
queueMap.put(queue.getName(), queue);}// 3. 恢复所有的绑定数据List<Binding> bindings = diskDataCenter.selectAllBindings();for(Binding binding : bindings){ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k ->newConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(), binding);}// 4. 恢复所有的消息数据// 遍历所有的队列, 根据每个队列的名字, 获取到所有的消息.for(MSGQueue queue : queues){LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
queueMessageMap.put(queue.getName(), messages);for(Message message : messages){
messageMap.put(message.getMessageId(), message);}}// 注意!! 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复. 之前考虑硬盘存储的时候, 也没设定这一块.// 一旦在等待 ack 的过程中, 服务器重启了, 此时这些 "未被确认的消息", 就恢复成 "未被取走的消息" .// 这个消息在硬盘上存储的时候, 就是当做 "未被取走"}
8. 测试 MemoryDataCenter
创建 MemoryDataCenterTests
packagecom.example.mq;importcom.example.mq.common.MqException;importcom.example.mq.mqserver.core.*;importcom.example.mq.mqserver.datacenter.DiskDataCenter;importcom.example.mq.mqserver.datacenter.MemoryDataCenter;importorg.apache.tomcat.util.http.fileupload.FileUtils;importorg.junit.jupiter.api.AfterEach;importorg.junit.jupiter.api.Assertions;importorg.junit.jupiter.api.BeforeEach;importorg.junit.jupiter.api.Test;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.test.context.SpringBootTest;importjava.io.File;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.ConcurrentHashMap;@SpringBootTestpublicclassMemoryDataCenterTests{privateMemoryDataCenter memoryDataCenter =null;@BeforeEachpublicvoidsetUp(){
memoryDataCenter =newMemoryDataCenter();}@AfterEachpublicvoidtearDown(){
memoryDataCenter =null;}// 创建一个测试交换机privateExchangecreateTestExchange(String exchangeName){Exchange exchange =newExchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.DIRECT);
exchange.setAutoDelete(false);
exchange.setDurable(true);return exchange;}// 创建一个测试队列privateMSGQueuecreateTestQueue(String queueName){MSGQueue queue =newMSGQueue();
queue.setName(queueName);
queue.setDurable(true);
queue.setExclusive(false);
queue.setAutoDelete(false);return queue;}// 针对交换机进行测试@TestpublicvoidtestExchange(){// 1. 先构造一个交换机并插入.Exchange expectedExchange =createTestExchange("testExchange");
memoryDataCenter.insertExchange(expectedExchange);// 2. 查询出这个交换机, 比较结果是否一致. 此处直接比较这俩引用指向同一个对象.Exchange actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertEquals(expectedExchange, actualExchange);// 3. 删除这个交换机
memoryDataCenter.deleteExchange("testExchange");// 4. 再查一次, 看是否就查不到了
actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertNull(actualExchange);}// 针对队列进行测试@TestpublicvoidtestQueue(){// 1. 构造一个队列, 并插入MSGQueue expectedQueue =createTestQueue("testQueue");
memoryDataCenter.insertQueue(expectedQueue);// 2. 查询这个队列, 并比较MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(expectedQueue, actualQueue);// 3. 删除这个队列
memoryDataCenter.deleteQueue("testQueue");// 4. 再次查询队列, 看是否能查到
actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertNull(actualQueue);}// 针对绑定进行测试@TestpublicvoidtestBinding()throwsMqException{Binding expectedBinding =newBinding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
expectedBinding.setBindingKey("testBindingKey");
memoryDataCenter.insertBinding(expectedBinding);Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertEquals(expectedBinding, actualBinding);ConcurrentHashMap<String,Binding> bindingMap = memoryDataCenter.getBindings("testExchange");Assertions.assertEquals(1, bindingMap.size());Assertions.assertEquals(expectedBinding, bindingMap.get("testQueue"));
memoryDataCenter.deleteBinding(expectedBinding);
actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertNull(actualBinding);}privateMessagecreateTestMessage(String content){Message message =Message.createMessageWithId("testRoutingKey",null, content.getBytes());return message;}@TestpublicvoidtestMessage(){Message expectedMessage =createTestMessage("testMessage");
memoryDataCenter.addMessage(expectedMessage);Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage, actualMessage);
memoryDataCenter.removeMessage(expectedMessage.getMessageId());
actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}@TestpublicvoidtestSendMessage(){// 1. 创建一个队列, 创建 10 条消息, 把这些消息都插入队列中.MSGQueue queue =createTestQueue("testQueue");List<Message> expectedMessages =newArrayList<>();for(int i =0; i <10; i++){Message message =createTestMessage("testMessage"+ i);
memoryDataCenter.sendMessage(queue, message);
expectedMessages.add(message);}// 2. 从队列中取出这些消息.List<Message> actualMessages =newArrayList<>();while(true){Message message = memoryDataCenter.pollMessage("testQueue");if(message ==null){break;}
actualMessages.add(message);}// 3. 比较取出的消息和之前的消息是否一致.Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for(int i =0; i < expectedMessages.size(); i++){Assertions.assertEquals(expectedMessages.get(i), actualMessages.get(i));}}@TestpublicvoidtestMessageWaitAck(){Message expectedMessage =createTestMessage("expectedMessage");
memoryDataCenter.addMessageWaitAck("testQueue", expectedMessage);Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue", expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage, actualMessage);
memoryDataCenter.removeMessageWaitAck("testQueue", expectedMessage.getMessageId());
actualMessage = memoryDataCenter.getMessageWaitAck("testQueue", expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}@TestpublicvoidtestRecovery()throwsIOException,MqException,ClassNotFoundException{// 由于后续需要进行数据库操作, 依赖 MyBatis. 就需要先启动 SpringApplication, 这样才能进行后续的数据库操作.MqApplication.context =SpringApplication.run(MqApplication.class);// 1. 在硬盘上构造好数据DiskDataCenter diskDataCenter =newDiskDataCenter();
diskDataCenter.init();// 构造交换机Exchange expectedExchange =createTestExchange("testExchange");
diskDataCenter.insertExchange(expectedExchange);// 构造队列MSGQueue expectedQueue =createTestQueue("testQueue");
diskDataCenter.insertQueue(expectedQueue);// 构造绑定Binding expectedBinding =newBinding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
expectedBinding.setBindingKey("testBindingKey");
diskDataCenter.insertBinding(expectedBinding);// 构造消息Message expectedMessage =createTestMessage("testContent");
diskDataCenter.sendMessage(expectedQueue, expectedMessage);// 2. 执行恢复操作
memoryDataCenter.recovery(diskDataCenter);// 3. 对比结果Exchange actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());Assertions.assertEquals(expectedQueue.isAutoDelete(), actualQueue.isAutoDelete());Assertions.assertEquals(expectedQueue.isExclusive(), actualQueue.isExclusive());Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());Message actualMessage = memoryDataCenter.pollMessage("testQueue");Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());// 4. 清理硬盘的数据, 把整个 data 目录里的内容都删掉(包含了 meta.db 和 队列的目录).MqApplication.context.close();File dataDir =newFile("./data");FileUtils.deleteDirectory(dataDir);}}
⼗、创建 VirtualHost
创建 mqserver.VirtualHost
/*
* 通过这个类, 来表示 虚拟主机.
* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.
* 同时提供 api 供上层调用.
* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了.
*/publicclassVirtualHost{privateString virtualHostName;privateMemoryDataCenter memoryDataCenter =newMemoryDataCenter();privateDiskDataCenter diskDataCenter =newDiskDataCenter();privateRouter router =newRouter();privateConsumerManager consumerManager =newConsumerManager(this);
其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 这两个内容后续再介绍
1.实现构造⽅法和 getter
构造方法:自动初始化
publicStringgetVirtualHostName(){return virtualHostName;}publicMemoryDataCentergetMemoryDataCenter(){return memoryDataCenter;}publicDiskDataCentergetDiskDataCenter(){return diskDataCenter;}publicVirtualHost(String name){this.virtualHostName = name;// 对于 MemoryDataCenter 来说, 不需要额外的初始化操作的. 只要对象 new 出来就行了// 但是, 针对 DiskDataCenter 来说, 则需要进行初始化操作. 建库建表和初始数据的设定.
diskDataCenter.init();// 另外还需要针对硬盘的数据, 进行恢复到内存中.try{
memoryDataCenter.recovery(diskDataCenter);}catch(IOException|MqException|ClassNotFoundException e){
e.printStackTrace();System.out.println("[VirtualHost] 恢复内存数据失败!");}}
2. 创建交换机
• 此处的 autoDelete, arguments 其实并没有使⽤. 只是先预留出来. (RabbitMQ 是⽀持的) .
• 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀. 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了.
• exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回. 因此不叫做 “exchangeCreate”.
• 先写硬盘, 后写内存. 因为写硬盘失败概率更⼤. 如果硬盘写失败了, 也就不必写内存了.
// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublicbooleanexchangeDeclare(String exchangeName,ExchangeType exchangeType,boolean durable,boolean autoDelete,Map<String,Object> arguments){// 把交换机的名字, 加上虚拟主机作为前缀.
exchangeName = virtualHostName + exchangeName;try{synchronized(exchangeLocker){// 1. 判定该交换机是否已经存在. 直接通过内存查询.Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if(existsExchange !=null){// 该交换机已经存在!System.out.println("[VirtualHost] 交换机已经存在! exchangeName="+ exchangeName);returntrue;}// 2. 真正创建交换机. 先构造 Exchange 对象Exchange exchange =newExchange();
exchange.setName(exchangeName);
exchange.setType(exchangeType);
exchange.setDurable(durable);
exchange.setAutoDelete(autoDelete);
exchange.setArguments(arguments);// 3. 把交换机对象写入硬盘if(durable){
diskDataCenter.insertExchange(exchange);}// 4. 把交换机对象写入内存
memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建完成! exchangeName="+ exchangeName);// 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了.// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了.}returntrue;}catch(Exception e){System.out.println("[VirtualHost] 交换机创建失败! exchangeName="+ exchangeName);
e.printStackTrace();returnfalse;}}
3. 删除交换机
// 删除交换机publicbooleanexchangeDelete(String exchangeName){
exchangeName = virtualHostName + exchangeName;try{synchronized(exchangeLocker){// 1. 先找到对应的交换机.Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if(toDelete ==null){thrownewMqException("[VirtualHost] 交换机不存在无法删除!");}// 2. 删除硬盘上的数据if(toDelete.isDurable()){
diskDataCenter.deleteExchange(exchangeName);}// 3. 删除内存中的交换机数据
memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName="+ exchangeName);}returntrue;}catch(Exception e){System.out.println("[VirtualHost] 交换机删除失败! exchangeName="+ exchangeName);
e.printStackTrace();returnfalse;}}
4. 创建队列
// 创建队列publicbooleanqueueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments){// 把队列的名字, 给拼接上虚拟主机的名字.
queueName = virtualHostName + queueName;try{synchronized(queueLocker){// 1. 判定队列是否存在MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue !=null){System.out.println("[VirtualHost] 队列已经存在! queueName="+ queueName);returntrue;}// 2. 创建队列对象MSGQueue queue =newMSGQueue();
queue.setName(queueName);
queue.setDurable(durable);
queue.setExclusive(exclusive);
queue.setAutoDelete(autoDelete);
queue.setArguments(arguments);// 3. 写硬盘if(durable){
diskDataCenter.insertQueue(queue);}// 4. 写内存
memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName="+ queueName);}returntrue;}catch(Exception e){System.out.println("[VirtualHost] 队列创建失败! queueName="+ queueName);
e.printStackTrace();returnfalse;}}
5. 删除队列
// 删除队列publicbooleanqueueDelete(String queueName){
queueName = virtualHostName + queueName;try{synchronized(queueLocker){// 1. 根据队列名字, 查询下当前的队列对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue ==null){thrownewMqException("[VirtualHost] 队列不存在! 无法删除! queueName="+ queueName);}// 2. 删除硬盘数据if(queue.isDurable()){
diskDataCenter.deleteQueue(queueName);}// 3. 删除内存数据
memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost] 删除队列成功! queueName="+ queueName);}returntrue;}catch(Exception e){System.out.println("[VirtualHost] 删除队列失败! queueName="+ queueName);
e.printStackTrace();returnfalse;}}
6. 创建绑定
publicbooleanqueueBind(String queueName,String exchangeName,String bindingKey){
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;try{synchronized(exchangeLocker){synchronized(queueLocker){// 1. 判定当前的绑定是否已经存在了.Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);if(existsBinding !=null){thrownewMqException("[VirtualHost] binding 已经存在! queueName="+ queueName
+", exchangeName="+ exchangeName);}// 2. 验证 bindingKey 是否合法.if(!router.checkBindingKey(bindingKey)){thrownewMqException("[VirtualHost] bindingKey 非法! bindingKey="+ bindingKey);}// 3. 创建 Binding 对象Binding binding =newBinding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);// 4. 获取一下对应的交换机和队列. 如果交换机或者队列不存在, 这样的绑定也是无法创建的.MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue ==null){thrownewMqException("[VirtualHost] 队列不存在! queueName="+ queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange ==null){thrownewMqException("[VirtualHost] 交换机不存在! exchangeName="+ exchangeName);}// 5. 先写硬盘if(queue.isDurable()&& exchange.isDurable()){
diskDataCenter.insertBinding(binding);}// 6. 写入内存
memoryDataCenter.insertBinding(binding);}}System.out.println("[VirtualHost] 绑定创建成功! exchangeName="+ exchangeName
+", queueName="+ queueName);returntrue;}catch(Exception e){System.out.println("[VirtualHost] 绑定创建失败! exchangeName="+ exchangeName
+", queueName="+ queueName);
e.printStackTrace();returnfalse;}}
7. 删除绑定
publicbooleanqueueUnbind(String queueName,String exchangeName){
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;try{synchronized(exchangeLocker){synchronized(queueLocker){// 1. 获取 binding 看是否已经存在~Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if(binding ==null){thrownewMqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName="+ exchangeName +", queueName="+ queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.
diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据
memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 删除绑定成功!");}}returntrue;}catch(Exception e){System.out.println("[VirtualHost] 删除绑定失败!");
e.printStackTrace();returnfalse;}}
8. 发布消息 ★
• 发布消息其实是把消息发送给指定的 Exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中.
• 发送消息需要指定 routingKey, 这个值的作⽤和 ExchangeType 是相关的.
◦ Direct: routingKey 就是对应队列的名字. 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息.
◦ Fanout: routingKey 不起作⽤, bindingKey 也不起作⽤. 此时消息会转发给绑定到该交换机上的所有队列中.
◦ Topic: routingKey 是⼀个特定的字符串, 会和 bindingKey 进⾏匹配. 如果匹配成功, 则发到对应的队列中. 具体规则后续介绍.
• BasicProperties 是消息的元信息. body 是消息本体.
// 发送消息到指定的交换机/队列中.publicbooleanbasicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body){try{// 1. 转换交换机的名字
exchangeName = virtualHostName + exchangeName;// 2. 检查 routingKey 是否合法.if(!router.checkRoutingKey(routingKey)){thrownewMqException("[VirtualHost] routingKey 非法! routingKey="+ routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange ==null){thrownewMqException("[VirtualHost] 交换机不存在! exchangeName="+ exchangeName);}// 4. 判定交换机的类型if(exchange.getType()==ExchangeType.DIRECT){// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message =Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue ==null){thrownewMqException("[VirtualHost] 队列不存在! queueName="+ queueName);}// 7. 队列存在, 直接给队列中写入消息sendMessage(queue, message);}else{// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String,Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for(Map.Entry<String,Binding> entry : bindingsMap.entrySet()){// 1) 获取到绑定对象, 判定对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if(queue ==null){// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName="+ binding.getQueueName());continue;}// 2) 构造消息对象Message message =Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列.// 如果是 fanout, 所有绑定的队列都要转发的.// 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.if(!router.route(exchange.getType(), binding, message)){continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}returntrue;}catch(Exception e){System.out.println("[VirtualHost] 消息发送失败!");
e.printStackTrace();returnfalse;}}privatevoidsendMessage(MSGQueue queue,Message message)throwsIOException,MqException,InterruptedException{// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上.int deliverMode = message.getDeliverMode();// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.if(deliverMode ==2){
diskDataCenter.sendMessage(queue, message);}// 写入内存
memoryDataCenter.sendMessage(queue, message);// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.
consumerManager.notifyConsume(queue.getName());}
9. 路由规则
1) 实现 route ⽅法
publicbooleanroute(ExchangeType exchangeType,Binding binding,Message message)throwsMqException{// 根据不同的 exchangeType 使用不同的判定转发规则.if(exchangeType ==ExchangeType.FANOUT){// 如果是 FANOUT 类型, 则该交换机上绑定的所有队列都需要转发returntrue;}elseif(exchangeType ==ExchangeType.TOPIC){// 如果是 TOPIC 主题交换机, 规则就要更复杂一些.returnrouteTopic(binding, message);}else{// 其他情况是不应该存在的.thrownewMqException("[Router] 交换机类型非法! exchangeType="+ exchangeType);}}
2) 实现 checkRoutingKeyValid
publicbooleancheckBindingKey(String bindingKey){if(bindingKey.length()==0){// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.returntrue;}// 检查字符串中不能存在非法字符for(int i =0; i < bindingKey.length(); i++){char ch = bindingKey.charAt(i);if(ch >='A'&& ch <='Z'){continue;}if(ch >='a'&& ch <='z'){continue;}if(ch >='0'&& ch <='9'){continue;}if(ch =='_'|| ch =='.'|| ch =='*'|| ch =='#'){continue;}returnfalse;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for(String word : words){// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if(word.length()>1&&(word.contains("*")|| word.contains("#"))){returnfalse;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb => 非法// 2. aaa.#.*.bbb => 非法// 3. aaa.*.#.bbb => 非法// 4. aaa.*.*.bbb => 合法for(int i =0; i < words.length -1; i++){// 连续两个 ##if(words[i].equals("#")&& words[i +1].equals("#")){returnfalse;}// # 连着 *if(words[i].equals("#")&& words[i +1].equals("*")){returnfalse;}// * 连着 #if(words[i].equals("*")&& words[i +1].equals("#")){returnfalse;}}returntrue;}
3) 实现 checkBindingKeyValid
// bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.publicbooleancheckBindingKey(String bindingKey){if(bindingKey.length()==0){// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.returntrue;}// 检查字符串中不能存在非法字符for(int i =0; i < bindingKey.length(); i++){char ch = bindingKey.charAt(i);if(ch >='A'&& ch <='Z'){continue;}if(ch >='a'&& ch <='z'){continue;}if(ch >='0'&& ch <='9'){continue;}if(ch =='_'|| ch =='.'|| ch =='*'|| ch =='#'){continue;}returnfalse;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for(String word : words){// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if(word.length()>1&&(word.contains("*")|| word.contains("#"))){returnfalse;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb => 非法// 2. aaa.#.*.bbb => 非法// 3. aaa.*.#.bbb => 非法// 4. aaa.*.*.bbb => 合法for(int i =0; i < words.length -1; i++){// 连续两个 ##if(words[i].equals("#")&& words[i +1].equals("#")){returnfalse;}// # 连着 *if(words[i].equals("#")&& words[i +1].equals("*")){returnfalse;}// * 连着 #if(words[i].equals("*")&& words[i +1].equals("#")){returnfalse;}}returntrue;}
4) 实现 routeTopic
// [测试用例]// binding key routing key result// aaa aaa true// aaa.bbb aaa.bbb true// aaa.bbb aaa.bbb.ccc false// aaa.bbb aaa.ccc false// aaa.bbb.ccc aaa.bbb.ccc true// aaa.* aaa.bbb true// aaa.*.bbb aaa.bbb.ccc false// *.aaa.bbb aaa.bbb false// # aaa.bbb.ccc true// aaa.# aaa.bbb true// aaa.# aaa.bbb.ccc true// aaa.#.ccc aaa.ccc true// aaa.#.ccc aaa.bbb.ccc true// aaa.#.ccc aaa.aaa.bbb.ccc true// #.ccc ccc true// #.ccc aaa.bbb.ccc trueprivatebooleanrouteTopic(Binding binding,Message message){// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex =0;int routingIndex =0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile(bindingIndex < bindingTokens.length && routingIndex < routingTokens.length){if(bindingTokens[bindingIndex].equals("*")){// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!
bindingIndex++;
routingIndex++;continue;}elseif(bindingTokens[bindingIndex].equals("#")){// 如果遇到 #, 需要先看看有没有下一个位置.
bindingIndex++;if(bindingIndex == bindingTokens.length){// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!returntrue;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1
routingIndex =findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if(routingIndex ==-1){// 没找到匹配的结果. 匹配失败returnfalse;}// 找到的匹配的情况, 继续往后匹配.
bindingIndex++;
routingIndex++;}else{// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){returnfalse;}
bindingIndex++;
routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的.if(bindingIndex == bindingTokens.length && routingIndex == routingTokens.length){returntrue;}returnfalse;}privateintfindNextMatch(String[] routingTokens,int routingIndex,String bindingToken){for(int i = routingIndex; i < routingTokens.length; i++){if(routingTokens[i].equals(bindingToken)){return i;}}return-1;}
5) 匹配规则测试⽤例
// [测试用例]
// binding key routing key result
// aaa aaa true
// aaa.bbb aaa.bbb true
// aaa.bbb aaa.bbb.ccc false
// aaa.bbb aaa.ccc false
// aaa.bbb.ccc aaa.bbb.ccc true
// aaa.* aaa.bbb true
// aaa.*.bbb aaa.bbb.ccc false
// *.aaa.bbb aaa.bbb false
// # aaa.bbb.ccc true
// aaa.# aaa.bbb true
// aaa.# aaa.bbb.ccc true
// aaa.#.ccc aaa.ccc true
// aaa.#.ccc aaa.bbb.ccc true
// aaa.#.ccc aaa.aaa.bbb.ccc true
// #.ccc ccc true
// #.ccc aaa.bbb.ccc true
6) 测试 Router
packagecom.example.mq;importcom.example.mq.common.MqException;importcom.example.mq.mqserver.core.Binding;importcom.example.mq.mqserver.core.ExchangeType;importcom.example.mq.mqserver.core.Message;importcom.example.mq.mqserver.core.Router;importorg.junit.jupiter.api.AfterEach;importorg.junit.jupiter.api.Assertions;importorg.junit.jupiter.api.BeforeEach;importorg.junit.jupiter.api.Test;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublicclassRouterTests{privateRouter router =newRouter();privateBinding binding =null;privateMessage message =null;@BeforeEachpublicvoidsetUp(){
binding =newBinding();
message =newMessage();}@AfterEachpublicvoidtearDown(){
binding =null;
message =null;}// [测试用例]// binding key routing key result// aaa aaa true// aaa.bbb aaa.bbb true// aaa.bbb aaa.bbb.ccc false// aaa.bbb aaa.ccc false// aaa.bbb.ccc aaa.bbb.ccc true// aaa.* aaa.bbb true// aaa.*.bbb aaa.bbb.ccc false// *.aaa.bbb aaa.bbb false// # aaa.bbb.ccc true// aaa.# aaa.bbb true// aaa.# aaa.bbb.ccc true// aaa.#.ccc aaa.ccc true// aaa.#.ccc aaa.bbb.ccc true// aaa.#.ccc aaa.aaa.bbb.ccc true// #.ccc ccc true// #.ccc aaa.bbb.ccc true@Testpublicvoidtest1()throwsMqException{
binding.setBindingKey("aaa");
message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest2()throwsMqException{
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest3()throwsMqException{
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest4()throwsMqException{
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest5()throwsMqException{
binding.setBindingKey("aaa.bbb.ccc");
message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest6()throwsMqException{
binding.setBindingKey("aaa.*");
message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest7()throwsMqException{
binding.setBindingKey("aaa.*.bbb");
message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest8()throwsMqException{
binding.setBindingKey("*.aaa.bbb");
message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest9()throwsMqException{
binding.setBindingKey("#");
message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest10()throwsMqException{
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest11()throwsMqException{
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest12()throwsMqException{
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest13()throwsMqException{
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest14()throwsMqException{
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest15()throwsMqException{
binding.setBindingKey("#.ccc");
message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublicvoidtest16()throwsMqException{
binding.setBindingKey("#.ccc");
message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}}
10. 订阅消息
属性:
1.管理的是哪个虚拟机 2.执行具体任务的线程池 3.存放哪些队列需要消费的阻塞队列 4.扫描线程
方法:
- 初始化:给虚拟机赋值;扫描现场读取阻塞队列,拿到需要消费的队列,针对这个队列进行消费consumeMessage;设置后台线程,启动。
- 通知消费方法:就是把阻塞队列中加入需要消费的队列
- 新增一个消费者:①内存中新增消费者 ②消费消息
- 消费消息方法:①从消费队列中找到一个消费者 ②取出队列中的一个消息 ③消费消息:1.把消息放入待确认队列 2.执行消息者的消费方法 3.如果是自动应答 则把消息在内存和硬盘删除
/*
* 通过这个类, 来实现消费消息的核心逻辑.
*/publicclassConsumerManager{// 持有上层的 VirtualHost 对象的引用. 用来操作数据.privateVirtualHost parent;// 指定一个线程池, 负责去执行具体的回调任务.privateExecutorService workerPool =Executors.newFixedThreadPool(4);// 存放令牌的队列privateBlockingQueue<String> tokenQueue =newLinkedBlockingQueue<>();// 扫描线程privateThread scannerThread =null;publicConsumerManager(VirtualHost p){
parent = p;
scannerThread =newThread(()->{while(true){try{// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue ==null){thrownewMqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName="+ queueName);}// 3. 从这个队列中消费一个消息.synchronized(queue){consumeMessage(queue);}}catch(InterruptedException|MqException e){
e.printStackTrace();}}});// 把线程设为后台线程.
scannerThread.setDaemon(true);
scannerThread.start();}// 这个方法的调用时机就是发送消息的时候.publicvoidnotifyConsume(String queueName)throwsInterruptedException{
tokenQueue.put(queueName);}publicvoidaddConsumer(String consumerTag,String queueName,boolean autoAck,Consumer consumer)throwsMqException{// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue ==null){thrownewMqException("[ConsumerManager] 队列不存在! queueName="+ queueName);}ConsumerEnv consumerEnv =newConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized(queue){
queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for(int i =0; i < n; i++){// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}privatevoidconsumeMessage(MSGQueue queue){// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if(luckyDog ==null){// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message ==null){// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
workerPool.submit(()->{try{// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作
luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.// 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if(luckyDog.isAutoAck()){// 1) 删除硬盘上的消息if(message.getDeliverMode()==2){
parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName="+ queue.getName());}}catch(Exception e){
e.printStackTrace();}});}}
1) 添加⼀个订阅者
// 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.publicbooleanbasicConsume(String consumerTag,String queueName,boolean autoAck,Consumer consumer){// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.
queueName = virtualHostName + queueName;try{
consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName="+ queueName);returntrue;}catch(Exception e){System.out.println("[VirtualHost] basicConsume 失败! queueName="+ queueName);
e.printStackTrace();returnfalse;}}
Consumer 相当于⼀个回调函数. 放到 common.Consumer 中
@FunctionalInterfacepublicinterfaceConsumer{// Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.// 通过这个方法把消息推送给对应的消费者.// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)voidhandleDelivery(String consumerTag,BasicProperties basicProperties,byte[] body)throwsMqException,IOException;}
2) 创建订阅者管理管理类
• parent ⽤来记录虚拟主机.
• 使⽤⼀个阻塞队列⽤来触发消息消费. 称为令牌队列. 每次有消息过来了, 都往队列中放⼀个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.
• 使⽤⼀个线程池⽤来执⾏消息回调.
这样令牌队列的设定避免搞出来太多线程. 否则就需要给每个队列都安排⼀个单独的线程了, 如果队列很多则开销就⽐较⼤了
3) 添加令牌接⼝
// 这个方法的调用时机就是发送消息的时候.publicvoidnotifyConsume(String queueName)throwsInterruptedException{
tokenQueue.put(queueName);}
4) 实现添加订阅者
• 新来订阅者的时候, 需要先消费掉之前积压的消息.
• consumeMessage 真正的消息消费操作, ⼀会再实现.
publicvoidaddConsumer(String consumerTag,String queueName,boolean autoAck,Consumer consumer)throwsMqException{// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue ==null){thrownewMqException("[ConsumerManager] 队列不存在! queueName="+ queueName);}ConsumerEnv consumerEnv =newConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized(queue){
queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for(int i =0; i < n; i++){// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}
创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境
publicclassConsumerEnv{privateString consumerTag;privateString queueName;privateboolean autoAck;// 通过这个回调来处理收到的消息.privateConsumer consumer;publicConsumerEnv(String consumerTag,String queueName,boolean autoAck,Consumer consumer){this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
给 MsgQueue 添加⼀个订阅者列表
此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮
流拿到消息
5) 实现扫描线程
在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作
publicConsumerManager(VirtualHost p){
parent = p;
scannerThread =newThread(()->{while(true){try{// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue ==null){thrownewMqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName="+ queueName);}// 3. 从这个队列中消费一个消息.synchronized(queue){consumeMessage(queue);}}catch(InterruptedException|MqException e){
e.printStackTrace();}}});// 把线程设为后台线程.
scannerThread.setDaemon(true);
scannerThread.start();}
6) 实现消费消息
privatevoidconsumeMessage(MSGQueue queue){// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if(luckyDog ==null){// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message ==null){// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
workerPool.submit(()->{try{// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作
luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.// 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if(luckyDog.isAutoAck()){// 1) 删除硬盘上的消息if(message.getDeliverMode()==2){
parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName="+ queue.getName());}}catch(Exception e){
e.printStackTrace();}});}
注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.
⼩结
⼀. 消费消息的两种典型情况
- 订阅者已经存在了, 才发送消息。这种直接获取队列的订阅者, 从中按照轮询的⽅式挑⼀个消费者来调⽤回调即可.
- 消息先发送到队列了, 订阅者还没到. 此时当订阅者到达, 就快速把指定队列中的消息全都消费掉. ⼆. 关于消息不丢失的论证 每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中. • 如果 autoAck 为 true 消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作. 分别清除硬盘数据, 待确认队列, 消息中⼼. • 如果 autoAck 为 false 在回调内部, 进⾏清除⼯作. 分别清除硬盘数据, 待确认队列, 消息中⼼.
- 执⾏消息回调的时候抛出异常 此时消息仍然处在待确认队列中. 此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列. 死信队列咱们此处暂不实现.
- 执⾏消息回调的时候服务器宕机 内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.
11. 消息确认
publicbooleanbasicAck(String queueName,String messageId){
queueName = virtualHostName + queueName;try{// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if(message ==null){thrownewMqException("[VirtualHost] 要确认的消息不存在! messageId="+ messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue ==null){thrownewMqException("[VirtualHost] 要确认的队列不存在! queueName="+ queueName);}// 2. 删除硬盘上的数据if(message.getDeliverMode()==2){
diskDataCenter.deleteMessage(queue, message);}// 3. 删除消息中心中的数据
memoryDataCenter.removeMessage(messageId);// 4. 删除待确认的集合中的数据
memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName="+ queueName
+", messageId="+ messageId);returntrue;}catch(Exception e){System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName="+ queueName
+", messageId="+ messageId);
e.printStackTrace();returnfalse;}}
11. 测试 VirtualHost
@SpringBootTestpublicclassVirtualHostTests{privateVirtualHost virtualHost =null;@BeforeEachpublicvoidsetUp(){MqApplication.context =SpringApplication.run(MqApplication.class);
virtualHost =newVirtualHost("default");}@AfterEachpublicvoidtearDown()throwsIOException{MqApplication.context.close();
virtualHost =null;// 把硬盘的目录删除掉File dataDir =newFile("./data");FileUtils.deleteDirectory(dataDir);}@TestpublicvoidtestExchangeDeclare(){boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);}@TestpublicvoidtestExchangeDelete(){boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);
ok = virtualHost.exchangeDelete("testExchange");Assertions.assertTrue(ok);}@TestpublicvoidtestQueueDeclare(){boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);}@TestpublicvoidtestQueueDelete(){boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.queueDelete("testQueue");Assertions.assertTrue(ok);}@TestpublicvoidtestQueueBind(){boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey");Assertions.assertTrue(ok);}@TestpublicvoidtestQueueUnbind(){boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey");Assertions.assertTrue(ok);
ok = virtualHost.queueUnbind("testQueue","testExchange");Assertions.assertTrue(ok);}@TestpublicvoidtestBasicPublish(){boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());Assertions.assertTrue(ok);}// 先订阅队列, 后发送消息@TestpublicvoidtestBasicConsume1()throwsInterruptedException{boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先订阅队列
ok = virtualHost.basicConsume("testConsumerTag","testQueue",true,newConsumer(){@OverridepublicvoidhandleDelivery(String consumerTag,BasicProperties basicProperties,byte[] body){try{// 消费者自身设定的回调方法.System.out.println("messageId="+ basicProperties.getMessageId());System.out.println("body="+newString(body,0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}catch(Error e){// 断言如果失败, 抛出的是 Error, 而不是 Exception!
e.printStackTrace();System.out.println("error");}}});Assertions.assertTrue(ok);Thread.sleep(500);// 再发送消息
ok = virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());Assertions.assertTrue(ok);}// 先发送消息, 后订阅队列.@TestpublicvoidtestBasicConsume2()throwsInterruptedException{boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先发送消息
ok = virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列
ok = virtualHost.basicConsume("testConsumerTag","testQueue",true,newConsumer(){@OverridepublicvoidhandleDelivery(String consumerTag,BasicProperties basicProperties,byte[] body){// 消费者自身设定的回调方法.System.out.println("messageId="+ basicProperties.getMessageId());System.out.println("body="+newString(body,0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@TestpublicvoidtestBasicConsumeFanout()throwsInterruptedException{boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.FANOUT,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue1",false,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue1","testExchange","");Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue2",false,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue2","testExchange","");Assertions.assertTrue(ok);// 往交换机中发布一个消息
ok = virtualHost.basicPublish("testExchange","",null,"hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.
ok = virtualHost.basicConsume("testConsumer1","testQueue1",true,newConsumer(){@OverridepublicvoidhandleDelivery(String consumerTag,BasicProperties basicProperties,byte[] body){System.out.println("consumerTag="+ consumerTag);System.out.println("messageId="+ basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);
ok = virtualHost.basicConsume("testConsumer2","testQueue2",true,newConsumer(){@OverridepublicvoidhandleDelivery(String consumerTag,BasicProperties basicProperties,byte[] body){System.out.println("consumerTag="+ consumerTag);System.out.println("messageId="+ basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@TestpublicvoidtestBasicConsumeTopic()throwsInterruptedException{boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.TOPIC,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",false,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue","testExchange","aaa.*.bbb");Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","aaa.ccc.bbb",null,"hello".getBytes());Assertions.assertTrue(ok);
ok = virtualHost.basicConsume("testConsumer","testQueue",true,newConsumer(){@OverridepublicvoidhandleDelivery(String consumerTag,BasicProperties basicProperties,byte[] body){System.out.println("consumerTag="+ consumerTag);System.out.println("messageId="+ basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@TestpublicvoidtestBasicAck()throwsInterruptedException{boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先发送消息
ok = virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]
ok = virtualHost.basicConsume("testConsumerTag","testQueue",false,newConsumer(){@OverridepublicvoidhandleDelivery(String consumerTag,BasicProperties basicProperties,byte[] body){// 消费者自身设定的回调方法.System.out.println("messageId="+ basicProperties.getMessageId());System.out.println("body="+newString(body,0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}}
版权归原作者 _蓝天IT_ 所有, 如有侵权,请联系我们删除。