0


RocketMQ 容错策略 解析——图解、源码级解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南

📆 最近更新:2022年8月12日

🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

容错方案

容错通常也是主链路规划里很重要的一环,消息队列RocketMQ作为集团中间件成员里的主要分子,每年也会参与到双十一中,经受各种考验。

容错方案:

  • 通常是使用自动的降级熔断策略,当性能达到阈值时就会自动开启。此外还会设置一个手动的降级开关,来人工开启降级流程
  • 还可以采用分段限流,根据系统承压能力、集群规模等设置不同的限流方案,同时还能根据应用水位来动态调整限流策略,例如接口响应时间是100ms时,可以设置每秒访问10万次,接口响应时间是200ms时,可以设置每秒访问5万次

容错策略类继承关系图

在这里插入图片描述

  • LatencyFaultTolerance:延迟故障容错接口
  • LatencyFaultToleranceImpl:延迟故障容错实现类,具体容错功能的实现
  • MQFaultStrategy:RocketMQ提供的容错策略

源码分析

MQFaultStrategy

**

MQFaultStrategy

主要维护的属性:**

  1. 每个Broker发送消息的延迟
  2. 发送消息延迟容错开关
  3. 不可用时长与延迟级别的映射关系
publicclassMQFaultStrategy{privatefinalstaticInternalLogger log =ClientLogger.getLog();/**
    * 维护每个Broker发送消息的延迟
    * key:brokerName
    */privatefinalLatencyFaultTolerance<String> latencyFaultTolerance =newLatencyFaultToleranceImpl();/**
    * 发送消息延迟容错开关
    */privateboolean sendLatencyFaultEnable =false;/**
    * 延迟级别数组
    */privatelong[] latencyMax ={50L,100L,550L,1000L,2000L,3000L,15000L};/**
    * 不可用时长
    */privatelong[] notAvailableDuration ={0L,0L,30000L,60000L,120000L,180000L,600000L};publiclong[]getNotAvailableDuration(){return notAvailableDuration;}publicvoidsetNotAvailableDuration(finallong[] notAvailableDuration){this.notAvailableDuration = notAvailableDuration;}publiclong[]getLatencyMax(){return latencyMax;}publicvoidsetLatencyMax(finallong[] latencyMax){this.latencyMax = latencyMax;}publicbooleanisSendLatencyFaultEnable(){return sendLatencyFaultEnable;}publicvoidsetSendLatencyFaultEnable(finalboolean sendLatencyFaultEnable){this.sendLatencyFaultEnable = sendLatencyFaultEnable;}/**
    * 根据`TopicPublishInfo`,选择一个消息队列
    */publicMessageQueueselectOneMessageQueue(finalTopicPublishInfo tpInfo,finalString lastBrokerName){// 判断容错开关是否打开,默认是falseif(this.sendLatencyFaultEnable){try{// 根据负载均衡策略选择一个MQ,brokerName == lastBrokerName && 可用的MQint index = tpInfo.getSendWhichQueue().incrementAndGet();for(int i =0; i < tpInfo.getMessageQueueList().size(); i++){int pos =Math.abs(index++)% tpInfo.getMessageQueueList().size();if(pos <0)
                        pos =0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if(latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}// 上一步没选出来时,选一个相对较好的BrokerfinalString notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if(writeQueueNums >0){finalMessageQueue mq = tpInfo.selectOneMessageQueue();if(notBestBroker !=null){
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet()% writeQueueNums);}return mq;}else{
                    latencyFaultTolerance.remove(notBestBroker);}}catch(Exception e){
                log.error("Error occurred when selecting message queue", e);}// 上面两步都没选出来时,默认负载均衡策略选一个MQreturn tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);}
/**
    * 更新延迟容错信息
    * 
    * currentLatency:延迟
    * isolation:是否隔离
    */publicvoidupdateFaultItem(finalString brokerName,finallong currentLatency,boolean isolation){if(this.sendLatencyFaultEnable){// 当开启隔离时,延迟取默认30000long duration =computeNotAvailableDuration(isolation ?30000: currentLatency);// 更新broker的延迟this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}}/**
    * 计算延迟对应的不可用时间,采用的查表法
    */privatelongcomputeNotAvailableDuration(finallong currentLatency){for(int i = latencyMax.length -1; i >=0; i--){if(currentLatency >= latencyMax[i])returnthis.notAvailableDuration[i];}return0;}}

从源码中不难看出,**

selectOneMessageQueue

在容错策略下选择MQ的步骤:**

  1. 优先获取上一次用过的Broker(上一次用的很大程度上是可用的)
  2. 选择一个次优的Broker
  3. 默认负载均衡策略返回一个Broker
updateFaultItem

更新Broker对应的延迟,如果

Producer

发送消息时间过长,则认为一段时间N内不可用,N的取值与

Producer

发送消息持续时长的关系如下表:(其实就是上面源码中的

latencyMax

notAvailableDuration

数组)

Producer

发送消息消耗时长

Broker

不可用时长≥15000ms600×1000ms≥3000ms180×1000ms≥2000ms120×1000ms≥1000ms60×1000ms≥550ms30×1000ms≥100ms0ms≥50ms0ms

标签: 云原生 java

本文转载自: https://blog.csdn.net/HNU_Csee_wjw/article/details/123106724
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。

“RocketMQ 容错策略 解析&mdash;&mdash;图解、源码级解析”的评论:

还没有评论