🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2022年6月4日
🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
消息路由
在RocketMQ的系统架构里,由于服务器端(
Broker
)会根据实时压力实施弹性扩缩容等发生变动,客户端为了做负载均衡,就需要有注册中心来提供
Broker
的信息:
注册中心的作用是及时发现Broker服务器的变化,并将存活的
Broker
信息返回给客户端做负载均衡。
获取Topic
获取路由信息函数
// DefaultMQProducerImpl#tryToFindTopicPublishInfoTopicPublishInfo topicPublishInfo =this.tryToFindTopicPublishInfo(msg.getTopic());
发送消息前,必须先从注册中心里获取
Broker
服务器信息,包括
Topic
、队列、IP,然后采取负载均衡算法发送消息。
常见的负载均衡算法:
- 轮询法:将请求按照顺序轮流地分配到各个服务器上。
- 加权轮询法:在轮询算法的基础上添加了权重的条件
- 随机法
- 加权随机法
- 最小连接法:哪个服务器的连接数少,就分配给哪个服务器新的请求
- 哈希法:计算哈希值,映射到服务器上
tryToFindTopicPublishInfo
/**
* 根据topic获取路由信息
*
* @param topic
* @return
*/privateTopicPublishInfotryToFindTopicPublishInfo(finalString topic){// 1 先从本地 topicPublishInfoTable 中获取路由信息TopicPublishInfo topicPublishInfo =this.topicPublishInfoTable.get(topic);// 2 路由信息或 messageQueueList 为空if(null== topicPublishInfo ||!topicPublishInfo.ok()){// 2.1 添加空路由对象this.topicPublishInfoTable.putIfAbsent(topic,newTopicPublishInfo());// 2.2 更新路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 2.3 从更新后的路由表中获取路由信息
topicPublishInfo =this.topicPublishInfoTable.get(topic);}// 2.4 获取到了就返回if(topicPublishInfo.isHaveTopicRouterInfo()||(topicPublishInfo !=null&& topicPublishInfo.ok())){return topicPublishInfo;}else{// 3 没有获取到路由信息则从注册中心获取this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
topicPublishInfo =this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}
从上面的源码可以看出获取路由信息的步骤如下:
- 先从本地
topicPublishInfoTable
中获取路由信息 - 如果路由信息或
messageQueueList
为空,则尝试本地更新一下路由信息 - 本地更新
PublishInfo
路由信息,并尝试获取 - 如果此时能获取到路由信息了,则返回
TopicPublishInfo
对象 - 本地无法获取到路由信息,则从注册中心尝试获取并更新本地缓存
Topic 路由信息表
上述过程的第一步就是获取路由信息
TopicPublishInfo topicPublishInfo =this.topicPublishInfoTable.get(topic);
其中路由信息存储在
TopicPublishInfo
对象里:
各个字段含义如下:
orderTopic
:Topic
是否支持排序haveTopicRouterInfo
:是否存在路由信息messageQueueList
:消息队列ListsendWhichQueue
:生产者发送消息到哪个队列的索引topicRouteData
:路由数据,包括队列、Broker地址、Broker数据
此外,
TopicPublishInfo
类还提供了选择某个队列发送消息的默认负载均衡策略:
/**
* 默认【轮询】策略选择一个MessageQueue
*
* @param lastBrokerName
* @return
*/publicMessageQueueselectOneMessageQueue(finalString lastBrokerName){if(lastBrokerName ==null){returnselectOneMessageQueue();}else{int index =this.sendWhichQueue.getAndIncrement();for(int i =0; i <this.messageQueueList.size(); i++){int pos =Math.abs(index++)%this.messageQueueList.size();if(pos <0)
pos =0;MessageQueue mq =this.messageQueueList.get(pos);if(!mq.getBrokerName().equals(lastBrokerName)){return mq;}}returnselectOneMessageQueue();}}/**
* 选择一个消息队列
*
* @return
*/publicMessageQueueselectOneMessageQueue(){int index =this.sendWhichQueue.getAndIncrement();int pos =Math.abs(index)%this.messageQueueList.size();if(pos <0)
pos =0;returnthis.messageQueueList.get(pos);}
从上面代码可以看出,默认的选择策略是采用轮询的方法:
lastBrokerName == null
时,说明在此之前还没有进行过选择,直接返回第一个可用的消息队列lastBrokerName != null
时,且当前轮询到的消息队列不是上一次使用的,则返回当前队列,否则轮询至下一个
更新路由信息
两个子方法
根据
tryToFindTopicPublishInfo
的源码,接下来会进行更新路由信息的步骤,访问的主要是
MQClientInstance
类下的
updateTopicRouteInfoFromNameServer
方法,该方法又调用了两个关键的方法,分别是
topicRouteData2TopicPublishInfo
和
topicRouteData2TopicSubscribeInfo
**1.
topicRouteData2TopicPublishInfo
方法的作用是将
TopicRouteData
类转换成
TopicPublishInfo
,并过滤掉Master挂了的Slave的
MessageQueue
**
publicstaticTopicPublishInfotopicRouteData2TopicPublishInfo(finalString topic,finalTopicRouteData route){TopicPublishInfo info =newTopicPublishInfo();
info.setTopicRouteData(route);// 如果指定了Topic的Queue的发送顺序if(route.getOrderTopicConf()!=null&& route.getOrderTopicConf().length()>0){// 解析配置文件,创建消息队列String[] brokers = route.getOrderTopicConf().split(";");for(String broker : brokers){String[] item = broker.split(":");int nums =Integer.parseInt(item[1]);for(int i =0; i < nums; i++){MessageQueue mq =newMessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);}}// 设置Topic是有序的(消息的发送顺序按配置来)
info.setOrderTopic(true);}else{List<QueueData> qds = route.getQueueDatas();Collections.sort(qds);// 找到每个QueueData的BrokerDatafor(QueueData qd : qds){if(PermName.isWriteable(qd.getPerm())){BrokerData brokerData =null;for(BrokerData bd : route.getBrokerDatas()){if(bd.getBrokerName().equals(qd.getBrokerName())){
brokerData = bd;break;}}if(null== brokerData){continue;}// 如果BrokerData中没有Master节点id,可能Master挂了,此时不处理消息if(!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)){continue;}// 创建消息队列for(int i =0; i < qd.getWriteQueueNums(); i++){MessageQueue mq =newMessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);}}}// 设置Topic消息发送是无序的
info.setOrderTopic(false);}return info;}
topicRouteData2TopicSubscribeInfo
方法作用是提取TopicRouteData
内的QueueData
字段,生成消息队列,也就是订阅了该Topic的队列
publicstaticSet<MessageQueue>topicRouteData2TopicSubscribeInfo(finalString topic,finalTopicRouteData route){Set<MessageQueue> mqList =newHashSet<MessageQueue>();List<QueueData> qds = route.getQueueDatas();for(QueueData qd : qds){// QueueData是否可读,只有是可读的才能被订阅if(PermName.isReadable(qd.getPerm())){for(int i =0; i < qd.getReadQueueNums(); i++){MessageQueue mq =newMessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);}}}return mqList;}
介绍完了
updateTopicRouteInfoFromNameServer
方法里调用的两个子方法之后,下面就来看一下
updateTopicRouteInfoFromNameServer
的代码。
updateTopicRouteInfoFromNameServer
更新路由信息是消息投递过程中非常重要的一环,为了防止并发修改注册信息导致数据不一致,这里使用了
ReentrantLock
可重入锁。
对于路由消息,就需要注意它可能不存在这种情况
1. 路由消息不存在
第一次访问时,生产者还没有在Broker中创建Topic和消息队列时会发生,此时的解决方案是:如果满足
isDefault && defaultMQProducer != null
,则使用默认Topic来获取路由消息
TopicRouteData
由上面两张图可以清楚看到,默认Topic名称为
TBW102
但如果默认主题获取到的
TopicRouteData
实例为空呢?此时就要根据Topic名称从注册中心查询了,如果还查询不出来的话就会返回
false
2. 路由消息不存在,但是从注册中心获取到了
此时就需要判断本地的路由表和注册中心获取到的路由信息是否有差异,如果差异存在话就把本地路由信息更新为最新版本
上面所有文字部分对应的源码如下:
publicbooleanupdateTopicRouteInfoFromNameServer(finalString topic,boolean isDefault,DefaultMQProducer defaultMQProducer){try{if(this.lockNamesrv.tryLock(LockTimeoutMillis,TimeUnit.MILLISECONDS)){try{TopicRouteData topicRouteData;if(isDefault && defaultMQProducer !=null){// 使用默认的TopicKey尝试获取TopicRouteData// 当Broker开启自动创建Topic时,会自动进行创建
topicRouteData =this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000*3);if(topicRouteData !=null){for(QueueData data : topicRouteData.getQueueDatas()){int queueNums =Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);}}}else{
topicRouteData =this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,1000*3);}if(topicRouteData !=null){// 判断本地路由表存放的信息和远端注册中心存放的信息TopicRouteData old =this.topicRouteTable.get(topic);boolean changed =topicRouteDataIsChange(old, topicRouteData);if(!changed){
changed =this.isNeedUpdateTopicRouteInfo(topic);}else{
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if(changed){// 克隆的原因是topicRouteData要被设置到下面的publishInfo和subscribeInfo里TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();// 更新Broker相关信息,当某个Broker心跳超时后,会被从BrokerData的BrokerAddrs中移除// brokerAddrTable也存有Slave的BrokerAddrfor(BrokerData bd : topicRouteData.getBrokerDatas()){this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{TopicPublishInfo publishInfo =topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);Iterator<Map.Entry<String,MQProducerInner>> it =this.producerTable.entrySet().iterator();while(it.hasNext()){Map.Entry<String,MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if(impl !=null){
impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{Set<MessageQueue> subscribeInfo =topicRouteData2TopicSubscribeInfo(topic, topicRouteData);Iterator<Map.Entry<String,MQConsumerInner>> it =this.consumerTable.entrySet().iterator();while(it.hasNext()){Map.Entry<String,MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if(impl !=null){
impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}
log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);returntrue;}}else{
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}}catch(Exception e){if(!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)&&!topic.equals(MixAll.DEFAULT_TOPIC)){
log.warn("updateTopicRouteInfoFromNameServer Exception", e);}}finally{this.lockNamesrv.unlock();}}else{
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms",LockTimeoutMillis);}}catch(InterruptedException e){
log.warn("updateTopicRouteInfoFromNameServer Exception", e);}returnfalse;}
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。