文章目录
虚拟主机设计
虚拟主机分析
类似于 MySQL 的 database,把交换机,队列,绑定,消息…进⾏逻辑上的隔离,⼀个服务器可以有多
个虚拟主机~,此处我们项⽬就设计了⼀个虚拟主机(VirtualHost)来提供 API 供上层调用
咱们这里采取的方案是,在客户提供的交换机等的身份标识(交换机名字),前加上虚拟机的名字. 即 客户要在虚拟机
VirtualHostA中创建交换机 exchangeC,咱们服务器存储的交换机名字是 VirtualHostAexchangeC.
交换机和虚拟主机之间的从属关系
- ⽅案⼀:参考数据库设计,“⼀对多”⽅案,⽐如给交换机表,添加个属性,虚拟主机 id/name
- ⽅案⼆:交换机的名字 = 虚拟主机名字 + 交换机的真实名字(按照⽅案⼆,也可以去区分不同的队列,进⼀步由于,绑定和队列和交换机都相关,直接就隔离开了,再进⼀步,消息和队列是强相关的,队列名区分开,消息⾃然区分开。)
核心 API
发布消息
发布消息API:
- 其实就是生产者将消息发送给对应的交换机,交换机再根据不同的转发规则,转发给与之相绑定且符合规则的消息队列.
- 绑定关系 Binding 中有一个 bindingKey 属性
- 消息 Message 中 有一个 routingKey 属性
下面就来讲解一下三种交换机的转发规则已经这两个 Key 的不同含义.
- 直接交换机 DIRECT 转发规则- 在直接交换机中,bindingKye是无意义的,routingKey是要转发到的队列的队列名.- 直接交换机的转发规则, 是无视 bindingKey的,即 直接交换机是否与这个队列绑定都没有关系,而直接将消息转发到 routingKey指定的队列名的队列中.
- 扇出交换机 FANOUT 转发规则- 在扇出交换机中,bindingKye是绑定的要转发的队列,routingKey是无意义的.- 扇出交换机的转发规则,是将收到的消息转发到与之绑定的所有队列中.与bindingKye和routingKey是没有任何关系的.
- 主题交换机 TOPIC 转发规则- 在主题交换机中,- bindingKey是创建绑定时,给绑定指定的特殊字符串(相当于一把锁),- routingKey是转发消息时,给消息指定的特殊字符串(相当于一把钥匙).- 主题交换机的转发规则,是将收到的消息的routingKey与绑定的所有队列中的 bindingKey 进行匹配,当且仅当匹配成功时,才将消息转发给该队列.
匹配规则 - AMQP 协议
- routingKey规则 由数字,字母,下划线组成,使用 . 将routingKey分成多个部分.
- bindingKey规则 由数字,字母,下划线组成,使用 . 将routingKey分成多个部分(支持两种特殊的符号作为通配符 * 与 # (和#必须是作为被 . 分割出来的单独部分如 aaa.bb就是非法的,* 可以匹配任何一个独立的部分,# 可以匹配0个或多个的独立部分)
相关代码实现
importcom.example.demo.common.MqException;// 使用这个类来实现交换机的转发规则// 同时通过这个类来验证 bindingKey 是否合法publicclassRouter{publicbooleancheckBindingKey(String bindingKey){if(bindingKey.length()==0){returntrue;}// 检查字符串中不能存在非法字符for(int i =0; i < bindingKey.length(); i++){char ch = bindingKey.charAt(i);if(ch >='A'&& ch <='Z'){continue;}if(ch >='a'&& ch <='z'){continue;}if(ch >='0'&& ch <='9'){continue;}if(ch =='_'|| ch =='.'|| ch =='*'|| ch =='#'){continue;}returnfalse;}// 检查 * 或者 # 是否是独立的部分.String[] words = bindingKey.split("\\.");for(String word : words){// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if(word.length()>1&&(word.contains("*")|| word.contains("#"))){returnfalse;}}// 约定一下, 通配符之间的相邻关系(人为约定)// 只有 aaa.*.*.bbb => 合法for(int i =0; i < words.length -1; i++){// 连续两个 ##if(words[i].equals("#")&& words[i +1].equals("#")){returnfalse;}// # 连着 *if(words[i].equals("#")&& words[i +1].equals("*")){returnfalse;}// * 连着 #if(words[i].equals("*")&& words[i +1].equals("#")){returnfalse;}}returntrue;}// 数字 + 字母 + 下划线// 使用.分割若干部分publicbooleancheckRoutingKey(String routingKey){if(routingKey.length()==0){returntrue;}for(int i =0; i < routingKey.length(); i++){char ch = routingKey.charAt(i);// 判断if(ch >='A'&& ch <='Z'){continue;}if(ch >='a'&& ch <='z'){continue;}if(ch >='0'&& ch <='9'){continue;}if(ch =='_'|| ch =='.'){continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturnfalse;}returntrue;}// 判定该消息是否可以转发给这个绑定对应的队列publicbooleanroute(ExchangeType exchangeType,Binding binding,Message message)throwsMqException{if(exchangeType ==ExchangeType.FANOUT){returntrue;}elseif(exchangeType ==ExchangeType.TOPIC){returnrouteTopic(binding,message);}else{thrownewMqException("[Router] 交换机类型非法! exchange= "+ exchangeType);}}// 约定匹配规则privatebooleanrouteTopic(Binding binding,Message message){String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标int bindingIndex =0;int routingIndex =0;while(bindingIndex < bindingTokens.length && routingIndex < routingTokens.length){if(bindingTokens[bindingIndex].equals("*")){// * 可以匹配到任意部分
bindingIndex++;
routingIndex++;continue;}elseif(bindingTokens[bindingIndex].equals("#")){// 如果遇到 #, 需要先看看有没有下一个位置.
bindingIndex++;if(bindingIndex == bindingTokens.length){// # 匹配成功returntrue;}// # 拿着后面的内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 查找 返回该下标. 没找到, 就返回 -1
routingIndex =findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if(routingIndex ==-1){// 没找到匹配的结果. 匹配失败returnfalse;}// 找到的匹配的情况, 继续往后匹配.
bindingIndex++;
routingIndex++;}else{// 普通字符串, 要求两边的内容一致.if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){returnfalse;}
bindingIndex++;
routingIndex++;}}// 判定是否是双方同时到达末尾if(bindingIndex == bindingTokens.length && routingIndex == routingTokens.length){returntrue;}returnfalse;}// # 查找privateintfindNextMatch(String[] routingTokens,int routingIndex,String bindingToken){for(int i = routingIndex; i < routingTokens.length; i++){if(routingTokens[i].equals(bindingToken)){return i;}}return-1;}}
订阅消息
- 新来的消息要转发给哪个消费者呢? 咱们在这里采取轮询策略,即让消费者排队,依次将消息发送给消费者,当消费者收到消息后,则移动到队伍的最后等待下个消息. 因此咱们要给核心类 Message类再增加几个属性和方法,来管理消费者
- 自动发送消息至订阅者 那么消费者要如何拿到消息呢?即如何将消息发送给消费者,咱们这里采取的是自动发送,即队列中来了新消息,就自动将新消息发送给订阅了这个队列的消费者.
咱们实现的方法是,使用一个阻塞队列,当生产者发布消息到交换机时,交换机转发消息到对应的队列后,就把队列名当作令牌添加到这个阻塞队列中,再配置一个扫描线程,去时刻扫描这个阻塞队列中是否有新的令牌了,有了新令牌,则根据令牌去对应的队列中,去把新消息安装轮询策略转发给消费者.
应答消息
应答消息共有两种模式.
自动应答:将消息发送给消费者就算应答了(不关心消费者收没收到,相当于没应答)
手动应答:需要消费者手动调用应答方法(确保消费者收到消息了)
消费者管理类
关于消费者,咱们并不打算持久化存储消费者的信息,即只在内存中存储消费者信息,如果服务器重启后,那么内存中的消费者信息也会清空,此时消费者就需要重新订阅消息.
版权归原作者 我不是大叔丶 所有, 如有侵权,请联系我们删除。