0


Ribbon 负载均衡策略 —— 图解、源码级解析

🍊 Java学习:社区快速通道

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南

📆 最近更新:2023年6月4日

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

通过本文你可以学习到:

  1. 常见的7种负载均衡策略思想
  2. 自旋锁的使用方式
  3. 防御性编程

负载均衡策略

RandomRule

该策略会从当前可用的服务节点中,随机挑选一个节点访问,使用了yield+自旋的方式做重试,还采用了严格的防御性编程。

RoundRobinRule

该策略会从一个节点一步一步地向后选取节点,如下图所示:
在这里插入图片描述
在多线程环境下,两个请求同时访问这个Rule也不会读取到相同节点:这靠的是RandomRobinRule底层的自旋锁+CAS的同步操作。

CAS+自旋锁这套组合技是高并发下最廉价的线程安全手段,因为这套操作不需要锁定系统资源。但缺点是,自旋锁如果迟迟不能释放,将会带来CPU资源的浪费,因为自旋本身并不会执行任何业务逻辑,而是单纯的使CPU空转。所以通常情况下会对自旋锁的旋转次数做一个限制,比如JDK中

synchronize

底层的锁升级策略,就对自旋次数做了动态调整。

while(true){// cas操作if(cas(expected, update)){// 业务逻辑代码// break或退出return}}

Eureka为了防止服务下线被重复调用,就使用AtomicBoolean的CAS方法做同步控制;

奈飞提供的SpringCloud组件有特别多用到CAS的地方,感兴趣的小伙伴们可以发现一下

RetryRule

RetryRule是一个类似装饰器模式的规则,装饰器相当于一层套一层的套娃,每一层都会加上一层独特的功能。

经典的装饰器模式示意图:
在这里插入图片描述
借助上面的思路,

RetryRule

就是给其他负载均衡策略加上重试功能。在

RetryRule

里还藏着一个

subRule

,这才是真正被执行的负载均衡策略,

RetryRule

正是要为它添加重试功能(如果初始化时没指定

subRule

,将默认使用

RoundRibinRule

)。

WeightedResponseTimeRule

这个规则继承自

RoundRibbonRule

,他会根据服务节点的响应时间计算权重,响应时间越长权重就越低,响应越快则权重越高,权重的高低决定了机器被选中概率的高低。也就是说,响应时间越小的机器,被选中的概率越大。

服务器刚启动的时候,对各个服务节点采样不足,因此会采用轮询策略,当积累到一定的样本时候,才会切换到

WeightedResponseTimeRule

模式。

BestAvailableRule

在过滤掉故障服务以后,它会基于过去30分钟的统计结果选取当前并发量最小的服务节点作为目标地址。如果统计结果尚未生成,则采用轮询的方式选定节点。

AvailabilityFilteringRule

这个规则底层依赖

RandomRobinRule

来选取节点,但必须要满足它的最低要求的节点才会被选中。如果节点满足了要求,无论其响应时间或者当前并发量是什么,都会被选中。

每次

AvailabilityFilteringRule

都会请求

RobinRule

挑选一个节点,然后对这个节点做以下两步检查:

  1. 是否处于熔断状态
  2. 节点当前的请求连接数超过阈值,超过了则表示节点目前太忙

如果被选中的

server

挂了,那么AFR会自动重试(最多10次),让

RobinRule

重新选择一个服务节点

ZoneAvoidanceRule

这个过滤器包含了组合过滤条件,分别是Zone级别和可用性级别。
在这里插入图片描述

  • Zone Filter: Zone可以理解为机房所属的大区域,这里会对这个Zone下面所有的服务节点进行健康情况过滤。
  • 可用性过滤: 这里和AvailabilityFilteringRule的验证过程很像,会过滤掉当前并发量较大,或者处于熔断状态的服务节点。

Ribbon 负载均衡策略源码

RandomRule源码

先从

RandomRule

看起,核心的方法是:
请添加图片描述

publicServerchoose(ILoadBalancer lb,Object key){if(lb ==null){returnnull;}Server server =null;while(server ==null){if(Thread.interrupted()){returnnull;}List<Server> upList = lb.getReachableServers();List<Server> allList = lb.getAllServers();int serverCount = allList.size();if(serverCount ==0){/*
             * No servers. End regardless of pass, because subsequent passes
             * only get more restrictive.
             */returnnull;}int index =chooseRandomInt(serverCount);
        server = upList.get(index);if(server ==null){Thread.yield();continue;}if(server.isAlive()){return(server);}

        server =null;Thread.yield();}return server;}

RandomRule

里方法的入参

key

没有用到,所以可以先暂时忽略

while

循环逻辑是如果

server

为空,则找到一个可用的

server
if(Thread.interrupted()){returnnull;}

如果线程暂停了,则直接返回空(防御性编程)

List<Server> upList = lb.getReachableServers();List<Server> allList = lb.getAllServers();
allList

存储的是所有的服务,

upList

存储的是可运行状态的服务

int serverCount = allList.size();if(serverCount ==0){returnnull;}

服务中心上没有

server

注册,则返回空

int index =chooseRandomInt(serverCount);
server = upList.get(index);

随机选择一个server

其中,chooseRandomInt的逻辑如下:

protectedintchooseRandomInt(int serverCount){returnThreadLocalRandom.current().nextInt(serverCount);}

返回0到

serverCount

中间的任意一个值

java中的随机是可以预测到结果的,真随机数一般会掺杂一些不可预测的数据,比如当前cpu的温度


**回到

RandomRule

choose

方法:**

如果发现随机选择的

server

为空表示此时

serverList

正在被修正,此时让出线程资源,进行下一次循环,对应最开始的防御性编程

if(server ==null){Thread.yield();continue;}
if(server.isAlive()){return(server);}

如果

server

可用直接

return
server =null;Thread.yield();

如果不可用则

server

置为空,下一次循环会选一个新的,最后让出资源。

所以该方法每次进入下一次循环时都会让出线程。


RoundRobinRule源码

接下来看

RoundRobinRule
publicServerchoose(ILoadBalancer lb,Object key){if(lb ==null){
        log.warn("no load balancer");returnnull;}Server server =null;int count =0;while(server ==null&& count++<10){List<Server> reachableServers = lb.getReachableServers();List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();if((upCount ==0)||(serverCount ==0)){
            log.warn("No up servers available from load balancer: "+ lb);returnnull;}int nextServerIndex =incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);if(server ==null){Thread.yield();continue;}if(server.isAlive()&&(server.isReadyToServe())){return(server);}

        server =null;}if(count >=10){
        log.warn("No available alive servers after 10 tries from load balancer: "+ lb);}return server;}
while

循环里面有一个计数器,如果重试10次依然没有结果返回就不重试了。

List<Server> reachableServers = lb.getReachableServers();List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();
reachableServers

就是

up

状态的

server
if((upCount ==0)||(serverCount ==0)){
    log.warn("No up servers available from load balancer: "+ lb);returnnull;}

没有可用服务器则返回空

int nextServerIndex =incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);

选择哪个下标的

server

,进入

incrementAndGetModulo

方法

privateintincrementAndGetModulo(int modulo){for(;;){int current = nextServerCyclicCounter.get();int next =(current +1)% modulo;if(nextServerCyclicCounter.compareAndSet(current, next))return next;}}

使用了自旋锁,

nextServerCyclicCounter

是一个线程安全的数字。

if(server ==null){Thread.yield();continue;}

如果获取到的

server

为空则让出资源,继续下一次循环

if(server.isAlive()&&(server.isReadyToServe())){return(server);}
server

是正常的则返回

server =null;

最后没有让出线程资源,因为重试10次后就退出循环了


BestAvailableRule源码

接下来看

BestAvailableRule
@OverridepublicServerchoose(Object key){if(loadBalancerStats ==null){returnsuper.choose(key);}List<Server> serverList =getLoadBalancer().getAllServers();int minimalConcurrentConnections =Integer.MAX_VALUE;long currentTime =System.currentTimeMillis();Server chosen =null;for(Server server: serverList){ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);if(!serverStats.isCircuitBreakerTripped(currentTime)){int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);if(concurrentConnections < minimalConcurrentConnections){
                minimalConcurrentConnections = concurrentConnections;
                chosen = server;}}}if(chosen ==null){returnsuper.choose(key);}else{return chosen;}}
if(loadBalancerStats ==null){returnsuper.choose(key);}

如果

loadBalancerStats

为空则调用父类的

choose

方法,父类方法直接委托给

RoundRobinRule

来完成

choose

for

循环里先从

loadBalancerStats

中获取到当前服务的状态

ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
publicServerStatsgetSingleServerStat(Server server){returngetServerStats(server);}
protectedServerStatsgetServerStats(Server server){try{return serverStatsCache.get(server);}catch(ExecutionException e){ServerStats stats =createServerStats(server);
        serverStatsCache.asMap().putIfAbsent(server, stats);return serverStatsCache.asMap().get(server);}}

这里是从缓存中获取

server的stats

,如果获取失败则默认创建一个

stats

并添加到缓存中,然后从

cache

中再获取一次。

随后判断是否处于熔断状态

if(!serverStats.isCircuitBreakerTripped(currentTime)){...}
publicbooleanisCircuitBreakerTripped(long currentTime){long circuitBreakerTimeout =getCircuitBreakerTimeout();if(circuitBreakerTimeout <=0){returnfalse;}return circuitBreakerTimeout > currentTime;}

首先获得熔断的

TimeOut

(表示截止到未来某个时间熔断终止),如果大于当前时间说明处于熔断状态。

熔断的

TimeOut

由下面方法计算得到:

privatelonggetCircuitBreakerTimeout(){long blackOutPeriod =getCircuitBreakerBlackoutPeriod();if(blackOutPeriod <=0){return0;}return lastConnectionFailedTimestamp + blackOutPeriod;}

返回上一次连接失败的时间戳 + blackOutPeriod

其中又调用了

privatelonggetCircuitBreakerBlackoutPeriod(){int failureCount = successiveConnectionFailureCount.get();int threshold = connectionFailureThreshold.get();if(failureCount < threshold){return0;}int diff =(failureCount - threshold)>16?16:(failureCount - threshold);int blackOutSeconds =(1<< diff)* circuitTrippedTimeoutFactor.get();if(blackOutSeconds > maxCircuitTrippedTimeout.get()){
        blackOutSeconds = maxCircuitTrippedTimeout.get();}return blackOutSeconds *1000L;}
failureCount

是失败的个数,从一个计数器里获得,阈值从一个缓存的属性中获得,之后计算两个的差值,再根据缓存中的一些属性计算最终的秒数,最后乘以1000返回。


回到

BestAvailableRule

choose

方法,只有不处于熔断状态才能继续走后面的流程

if(concurrentConnections < minimalConcurrentConnections){
    minimalConcurrentConnections = concurrentConnections;
    chosen = server;}

选出连接数最小的服务器

if(chosen ==null){returnsuper.choose(key);}else{return chosen;}

最后返回

核心是找到一个最轻松的服务器。


RetryRule源码

查看

RetryRule

源码:

publicServerchoose(ILoadBalancer lb,Object key){long requestTime =System.currentTimeMillis();long deadline = requestTime + maxRetryMillis;Server answer =null;

   answer = subRule.choose(key);if(((answer ==null)||(!answer.isAlive()))&&(System.currentTimeMillis()< deadline)){InterruptTask task =newInterruptTask(deadline
            -System.currentTimeMillis());while(!Thread.interrupted()){
         answer = subRule.choose(key);if(((answer ==null)||(!answer.isAlive()))&&(System.currentTimeMillis()< deadline)){/* pause and retry hoping it's transient */Thread.yield();}else{break;}}

      task.cancel();}if((answer ==null)||(!answer.isAlive())){returnnull;}else{return answer;}}
long requestTime =System.currentTimeMillis();long deadline = requestTime + maxRetryMillis;

先记录当前时间和

deadline

,在截止时间之前可以一直重试。

answer = subRule.choose(key);

方法里面是由

subRule

来实现具体的负载均衡逻辑,这里默认类型是

RoundRobinRule

如果选到的是空或者选到的不是up的,且时间在ddl之前则进入重试逻辑:

while(!Thread.interrupted()){
   answer = subRule.choose(key);if(((answer ==null)||(!answer.isAlive()))&&(System.currentTimeMillis()< deadline)){/* pause and retry hoping it's transient */Thread.yield();}else{break;}}

如果线程中断了就中断重试。之后重新选择服务器,如果又没选到则把资源让出去,下一次

while

循环再选,在

while

循环之前会起一个任务

InterruptTask task =newInterruptTask(deadline -System.currentTimeMillis());

到了截止时间之后,程序会中断重试的流程

task.cancel();

最后返回

if((answer ==null)||(!answer.isAlive())){returnnull;}else{return answer;}

本文转载自: https://blog.csdn.net/HNU_Csee_wjw/article/details/124339999
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。

“Ribbon 负载均衡策略 —— 图解、源码级解析”的评论:

还没有评论