前⾔
前段时间⼀位⼤龄程序来公司⾯试,已经是做到技术 leader 的级别,⾯试开始他⽐较⾃豪地向
我们介绍的设计技术架构、缓存设计、业务设计等等,他说该项⽬是他⼀⼿打造起来的。在服
务⾼可⽤⽅⾯,声称可以做到 4 个 9。因为也是 Spring Boot、Spring Cloud 这套⽐较流⾏的东⻄。因为我们也是⽤ Spring Cloud 全家桶,但是有些问题我们还没有解决的,⼀听到对⽅说 4 个 9。顿时来了兴趣,准备膜拜⼀下⼤神怎么解决⾼可⽤问题的。
他说⽤是 Eureka、Ribbon 这套机制实现⾼可⽤,我们问题是 Eureka、Ribbon 怎么实现⾼可
⽤的,有什么弊端。对⽅语塞,开始东拉⻄扯,⼤失所望。可能有些同学不知道,在使⽤ Eureka、Ribbon 时候,由于 Eureka、Ribbon 内部的缓存机 制。会导致服务上线或者下线的时候会出现服务不可⽤的情况,极端情况下会出现 90 秒服务不可能。在没有解决这些问题的前提下,使⽤ Eureka、Ribbon 搭建微服务应⽤是不可能会有 4 个 9 的。
对于这种情况,官⽅也没有给具体的解决⽅案。在没有解决这个问题之前,很多公司也许会跟
我们采取同样的做法,就是在晚上流量⽐较少情况停服发布。但是试下⼀想,每次发布都只能
在 23 点以后发布,关闭 SLB,停机,执⾏ SQL,起服务。⼗⼏个服务,验证功能,⼀顿操作
⼀下,⾄少要搞到两三点。这对于整个开发团队来说都是⼀种负担。
本⽂将分享是如何解决 Eureka、Ribbon 组件使⽤上的弊端。
Eureka 注册,服务发现机制原理
相信熟悉微服务架构的⼈,都知道服务注册与发现的作⽤。在成百上千个微服务中,我们必须
需要⼀个中⼼来通知⽣产者与消费者的服务状态变化,以便我们在微服务架构中理清我们的服
务调⽤关系。
经过微服务架构多年的发展,出现很多种注册中⼼,如 Naco、Eureka、etc、ZooKeeper 等
等,其实⼤体思想都⼀样。在服务启动时候向注册中⼼发送消息告诉注册中⼼我已经 ready,
可以被调⽤了,然后再持续运⾏过程通过⼼跳的⽅式向注册中⼼汇报⾃身的状态。
注册中⼼收到服务注册信息后,向调⽤⽅推送或者调⽤⽅主动拉取需要调⽤的服务的状态信
息。在这个过程中其实并没有很⾼深的理论与思想。注册中⼼与各个服务的交互⽅式⽆⾮就是
⻓连或者短连。
服务注册与续约机制,缓存刷新
在服务环境完全启动完成之后,集成了 eureka-client 的服务会实例化com.netflix.discovery.DiscoveryClient 实例,DiscoveryClient 实例对 Eureka 客户端来说⾄关重要,各种线程池 初始化、服务注册、续约、刷新缓存都在这个对象完成。DiscoveryClient 有个强⼤的构造⽅法,在初始化的构造三个⾄关的重要的线程池
// 线程调度器
private final ScheduledExecutorService scheduler;
// ⼼跳线程池 ,主要⽤于注册与续约
private final ThreadPoolExecutor heartbeatExecutor;
// 本地缓存刷新线程池
private final ThreadPoolExecutor cacheRefreshExecutor;
🖐️****在构造⽅法⾥⾯同时调⽤⼀个重要⽅法 initScheduledTasks
private void initScheduledTasks() {
......
if (this.clientConfig.shouldFetchRegistry()) {
......
// 这⾥⽤ cacheRefreshExecutor 线程池启动定时任务来定时刷新缓存 的操作
this.scheduler.schedule(new TimedSupervisorTask("cacheRe‐
fresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalIn‐
Secs, TimeUnit.SECONDS, expBackOffBound, new
DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs,
TimeUnit.SECONDS);
}
if (this.clientConfig.shouldRegisterWithEureka()) {
.......
// 这⾥使⽤了 heartbeatExecutor 线程池来来启动任务进⾏注册与续约的 操作
this.scheduler.schedule(new
TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecu‐
tor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new
DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs,
TimeUnit.SECONDS);
.....
} else {
logger.info("Not registering with Eureka server per con‐
figuration");
} }
注册与续约的时候的线程是 HeartbeatThread:
private class HeartbeatThread implements Runnable {
private HeartbeatThread() { }
public void run() {
if (DiscoveryClient.this.renew()) {
DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp
= System.currentTimeMillis();
} }
}
到这⾥我们基本明⽩注册与续约都是⽤了 DiscoveryClient 对象的 renew ⽅法。
同理我们看到缓存刷新使⽤的是 CacheRefreshThread 线程:
class CacheRefreshThread implements Runnable {
CacheRefreshThread() {
}
public void run() {
DiscoveryClient.this.refreshRegistry();
}
}
由代码可以看出,刷新 Eureka 客户端的缓存是通过 DiscoveryClient 对象的 refreshRegistry
⽅法实现的。
Ribbon 的负载负载均衡策略
在使⽤ Spring Cloud 全家桶的 Ribbon 做负载均衡时候,Ribbon 不同于 F5、Nginx 等等通过
软件或者硬件做负载均衡,Ribbon 直接就在应⽤端做负载均衡。Ribbon ⾃⼰实现负载均衡器,根据⼀定规则,从 Ribbon 的本地缓存的服务列表⾥⾯选择服务进⾏调⽤。值得注意的是,Ribbon给每个服务都初始化了⼀个 Spring 容器。Eureka 的缓存列表是通过定 时任务去注册中⼼拉取,Ribbon 的本地服务缓存列表则是通过定时任务通过 Eureka 的缓存列表同步过来。
打开 org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration 在这个配置类⾥
⾯,可以看到定义了各种 Ribbon 负载均衡需要的各种对象。其中有定义了默认的负载均衡规
则,默认的拉取服务列表的策略。
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, this.name)) {
return (IRule)this.propertiesFactory.get(IRule.class, con‐
fig, this.name);
} else {
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
}
上⾯的代码给我们定义了 Ribbon 默认的负载均衡策略,在我们没有初始化其他实现 IRule 类
的时候执⾏。
**Ribbon 主要实现了⼀下⼏种负载均衡策略: **
- RoundRobinRule:轮询策略
- RandomRule:随机策略
- AvailabilityFilteringRule:可⽤过滤策略
- WeightedResponseTimeRule:响应时间权重策略
- RetryRule:轮询失败重试策略
- BestAvailableRule:并发量最⼩可⽤策略
- ZoneAvoidanceRule:根据 server 所在区域的性能和 server 的可⽤性
当然,我们也可以根据业务要求⾃定义负载均衡规则。
如下代码,Ribbon 定义了默认的拉取服务列表的⽅式:
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig
config) {
return new PollingServerListUpdater(config);
}
**Ribbon 实现了两种拉取服务的⽅式: **
- PollingServerListUpdater:通过线程池定时任务的⽅式每隔 30s 从 Eureka 缓存拉取
- EurekaNotificationServerListUpdater:通过监听 Eureka 的缓存更新事件,当 Eureka 客 户端从注册中⼼拉取服务列表的时候,同时同步到 Ribbon 的缓存列表中。上⾯两个类都是 ServerListUpdater 的实现类,都实现了 ServerListUpdate 的 start ⽅法。⽽ start ⽅法会在 DynamicServerListLoadBalancer(Ribbon 负载均衡器)初始化的时候的被调⽤。下⾯可以看下 start ⽅法的代码实现:
public class EurekaNotificationServerListUpdater implements ServerListUpdater {
//......省略代码
public synchronized void start(final UpdateAction updateAction) {
if (this.isActive.compareAndSet(false, true)) {
// 初始化 Eureka 时间监听器
this.updateListener = new EurekaEventListener() {
public void onEvent(EurekaEvent event) {
// ......省略代码 }
};
//......省略代码
// 把监听器注册到 Eureka 的缓存更新事件中
this.eurekaClient.registerEventListener(this.updateListen‐
er);
} else {
logger.info("Update listener already registered, no-op");
}
......省略代码 }
public class PollingServerListUpdater implements ServerListUpdater {
//...... 省略代码
public synchronized void start(final UpdateAction updateAction) {
if (this.isActive.compareAndSet(false, true)) {
// 初始化线程
Runnable wrapperRunnable = new Runnable() {
public void run() {
if (!PollingServerListUpdater.this.isActive.get())
{
if (PollingServerListUpdater.this.scheduledFu‐
ture != null) {
PollingServerListUpdater.this.scheduledFu‐
ture.cancel(true);
}
} else {
try {
updateAction.doUpdate();
PollingServerListUpdater.this.lastUpdated
= System.currentTimeMillis();
} catch (Exception var2) {
PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
} } }
};
// 启动定时任务更新 Ribbon 缓存
this.scheduledFuture = getRefreshExecutor().scheduleWith‐
FixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshInter‐
valMs, TimeUnit.MILLISECONDS);
} else {
logger.info("Already active, no-op");
}
}
Eureka、Ribbon 的缓存机制
通过上⾯的分析,我们所了解到有两个地⽅的缓存,⼀是 Eureka 客户端缓存,⼆是 Ribbon 负
载均衡器的缓存。这些缓存机制是否都是必须存在的呢?很明显,这些缓存机制都是有存在的必要的,⽽且是⾮
常合理的。因为你的服务状态不可能是时时变化的。在 Eureka、Ribbon 这两个组件中还有
eureka-server 也是有缓存中的。加起来三个地⽅有缓存。eureka-server 中⽤ guava 定义了 readWriteCacheMap,readOnlyCacheMap 两个缓存。当我们的服务注册到 eureka-server,服务的各类元数据信息会先存储在 readWriteCacheMap,然后定时任务每隔 30s 同步到 readOnlyCacheMap 中,所以服务提供者刚注册到注册中⼼,服务调⽤者是拉取不到的。
**综上各级缓存,我们可以看下图⽐较直观: **
由于服务启动的时候,服务会⾃动注册到注册中⼼,所以只是在续约的时候,服务提供者到注
册中⼼才有 30s。
Eureka、Ribbon 使⽤的弊端
通过上⽂的各种分析,相信⼤家已经知道使⽤ Eureka、Ribbon 的问题所在了。正式因为各级
缓存,各类定时任务的存在,所以我们在正常使⽤ Eureka、Ribbon 的时候是没有办法做到 4
个 9 ⾼可⽤的。试想⼀下,当我们⼀个服务启动提供,服务注册到 eureka-server。readWriteCacheMap 到 readOnlyCacheMap 需要 30s,readOnlyCacheMap 到 eureka-client 需要 30s,eureka-client 到 Ribbon 需要 30s,所以极端情况下,调⽤⽅ 90s 之后才能知道服务提供者的信息。 同理,当我们下线⼀个服务的时候,由于 readOnlyCacheMap、eureka-client、Ribbon 三个 端都存在缓存,所以在极端情况下,90s 内会出现服务提供者的接⼝调⽤异常,因为它已经下 线了。如果像天猫淘宝这种流量级别⽹站,这种情况当然是不可接受的。
如何使⽤ RabbitMQ 解决 Eureka、Ribbon 的弊端
**主要实现两个⽬标: **
- 服务平滑下线,先让客户端感知即将要下线,不负载均衡到即将要下线的服务中,等下线服务已经请求进来的流量跑完,最后平滑下线
- 服务快速感知,当服务上线后,服务调⽤⽅可以快速感知已上线的服务。
我们要实现上⾯两个⽬标,⾸先是要想办法⼲掉各种缓存机制。⼲掉缓存操作办法也很简单。
第⼀,使⽤在 eureka-server 使⽤下⾯两个配置:
# 禁⽤保护模式
eureka.server.enable-self-preservation=false
# 禁⽤ readOnlyCacheMap
eureka.server.use-read-only-response-cache=false
第⼆,eureka-client 端使⽤ EurekaNotificationServerListUpdater ⽅式同步可⽤服务列表更新
到 Ribbon,即 eureka-client 更新缓存的同时⻢上 Ribbon 缓存。
当我们分析服务注册推送到注册中⼼的 com.netflix.appinfo.InstanceInfo 对象发现:
public class InstanceInfo {
// eureka 允许我们⾃定义业务的元数据
@XStreamAlias("metadata")
private volatile Map<String, String> metadata;
@Auto
private volatile Long lastUpdatedTimestamp;
@Auto
private volatile Long lastDirtyTimestamp;
@Auto
private volatile InstanceInfo.ActionType actionType;
@Auto
private volatile String asgName;
private String version;
}
InstanceInfo 提供了 metadata 属性,允许我们⾃定义业务元数据推到注册中⼼。基于这个,当服务下线时,先保持服务状态不变,设置服务下线的标志元数据推送到注册中⼼,通知所有的服务消费者这个服务要下线了,重新拉取服务列表的状态信息。 这个通知的过程我们需要借助⼀些消息中间件,如 RabbitMQ、RocketMQ 等等,⾃定义Ribbon 负载均衡规则,服务列表⾥⾯剔除带有下线标志位的服务后再做负载均衡。最后 30s 后再杀掉服务,⾄此,这个服务实现平滑下线。当服务要上线的时候,监听 Spring 容器的初始化完成事件,在监听事件⾥⾯只做⼀件事,发送服务上线消息到 MQ,服务消费者收到消息后执⾏重新拉取服务列表的操作,实现服务快速感知。
平滑发布实现的设计实现原理
总体思路,利⽤ MQ 的原⼦消息⼴播,通知每个服务消费者,在此前提下,我们必须⽐较了解
Eureka、Ribbon 的⼀些代码实现,上⽂已跟⼤家分析了个⼤概。现在来看下总体的设计思路图
平滑发布代码实现
有了上⾯的⽅法论,接下来就是代码实现了。
1. 监听服务启动事件:
@Component
@Slf4j
public class SpringContextGcdListener implements
ApplicationListener<ContextRefreshedEvent> {
private volatile AtomicBoolean isSend =new AtomicBoolean(false);
@Autowired
private ApplicationInfoManager applicationInfoManager;
@Autowired
private SksEurekaManager sksEurekaManager;
@SneakyThrows
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
try{
if(event.getApplicationContext().getParent() instanceof
AnnotationConfigApplicationContext) {
//防⽌重复触发
if (isSend.compareAndSet(false, true)) {
for (int i = 0; i < 10; i++) {
DiscoveryClient discoveryClient =
SpringContextHolder.getBean(DiscoveryClient.class);
if (Objects.nonNull(discoveryClient)) {
InstanceInfo instanceInfo = applicationIn‐
foManager.getInfo();
EurekaConstant.setQueue(instanceInfo.getAppName(), instanceInfo.getH‐
ostName(), instanceInfo.getPort());
ServerConsumer consumer =
SpringContextHolder.getBean(ServerConsumer.class);
consumer.setQueueName(EurekaConstant.EU‐
REKA_RIBBON_QUEUE);
consumer.start();
LogUtil.info(log, "SpringContextListener",
"MQ 消费启动监听");
ServerChangeEvent changeEvent = change‐
Event(instanceInfo, ServerStatus.UP);
// 推送 MQ 消息
EurekaManager.sendMqMessage(changeEvent);
//LogUtil.info(log,
"SpringContextListener", "queue :{}",
EurekaConstant.EUREKA_RIBBON_QUEUE);
LogUtil.info(log, "SpringContextListener",
"app start up send mq message :{}", JsonUtil.toJson(changeEvent));
break; }
Thread.sleep(1000);
} } } }catch (Exception e){
LogUtil.error(log,"SpringContextListener","onApplication‐
Event:{}",e);
} }
}
2. MQ 消费处理:
@Slf4j
@Component
public class ConsumerTask {
public boolean consumer( byte[] bytes) {
String json = new String(bytes);
ServerChangeEvent event = JsonUtil.fromStr(json,
ServerChangeEvent.class);
try{
DiscoveryClient discoveryClient =
SpringContextHolder.getBean(DiscoveryClient.class);
// 重新拉取服务列表
Method refresh =
DiscoveryClient.class.getDeclaredMethod(EurekaConstant.REFRESH_METHOD)
;
refresh.setAccessible(true);
refresh.invoke(discoveryClient);
}catch (Exception e){
LogUtil.error(log,"EurekaServerConsumerTask","eureka rib‐
bon refresh :{}",e);
}
return true; }
}
3. ⾃定义负载均衡规则:
@Slf4j
public class EurekaRibbonRule extends ClientConfigEnabledRoundRobinRule {
private LoadBalancerStats loadBalancerStats;
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
// 踢掉要下线的服务
List<Server> upServerList = Lists.newArrayList();
for(Server server: serverList){
if (server instanceof DiscoveryEnabledServer) {
final DiscoveryEnabledServer discoveryEnabledServer =
(DiscoveryEnabledServer) server;
final Map<String, String> metadata = discoveryEnabled‐
Server.getInstanceInfo().getMetadata();
final String down = metadata.get(SERVER_STATUS_KEY);
if (!ServerStatus.DOWN.name().equals(down)) {
upServerList.add(server);
} } }
if(upServerList.isEmpty()){
return null; }
//... 根据需要做负载负载均衡规则
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
4. 服务上线发送原⼦消息⼴播:
private void changeServer(ServerStatus serverStatus) {
Map<String,String> engMetaMap = Maps.newHashMap();
engMetaMap.put(EurekaConstant.SERVER_STATUS_KEY, serverSta‐
tus.name());
applicationInfoManager.registerAppMetadata(engMetaMap);
DiscoveryClient discoveryClient =
SpringContextHolder.getBean(DiscoveryClient.class);
try {
Method renew =
DiscoveryClient.class.getDeclaredMethod(EurekaConstant.RENEW_METHOD);
renew.setAccessible(true);
renew.invoke(discoveryClient);
ServerChangeEvent event = changeEvent(applicationInfoMan‐
ager.getInfo(),serverStatus);
sendMqMessage(event);
} catch (Exception e) {
}
}
5. 初始化负载均衡规则和 Ribbon 缓存更新⽅式:
由 于 Ribbon 会 给 每 个 服 务 提 供 初 始 化 容 器 , 为 了 使 Ribbon 容 器 都 加 载 到RibbonAutoGcdConfiguration 配置,我们需要在服务启动类上加上
@RibbonClients(defaultConfiguration = RibbonAutoGcdConfiguration.class)
⾄此,所有核⼼代码完毕,上⾯只是⼀些核⼼代码,根据思路,相信⼤家也能搞出来,如有不详尽,欢迎⼤家留⾔讨论,我将知⽆不⾔。
基于 Eureka、Ribbon 的灰度发布
在使⽤ Eureka、Ribbon 要做灰度的话,要利⽤⼀下⽹关的组件,灰度的实现⽅式有很多,我发⼀下我的思路,希望跟⼤家讨论交流⼀下。
免费IT资源小站,最后送给大家一套学习RabbitMQ的课程:中间件MQ-RabbitMQ入门到实战课程 - IT资源小站
版权归原作者 程序员理查德 所有, 如有侵权,请联系我们删除。