0


基于 RabbitMQ 实现 Eureka 服务平滑灰度发布

前⾔

前段时间⼀位⼤龄程序来公司⾯试,已经是做到技术 leader 的级别,⾯试开始他⽐较⾃豪地向

我们介绍的设计技术架构、缓存设计、业务设计等等,他说该项⽬是他⼀⼿打造起来的。在服

务⾼可⽤⽅⾯,声称可以做到 4 个 9。因为也是 Spring Boot、Spring Cloud 这套⽐较流⾏的东⻄。因为我们也是⽤ Spring Cloud 全家桶,但是有些问题我们还没有解决的,⼀听到对⽅说 4 个 9。顿时来了兴趣,准备膜拜⼀下⼤神怎么解决⾼可⽤问题的。

他说⽤是 Eureka、Ribbon 这套机制实现⾼可⽤,我们问题是 Eureka、Ribbon 怎么实现⾼可

⽤的,有什么弊端。对⽅语塞,开始东拉⻄扯,⼤失所望。可能有些同学不知道,在使⽤ Eureka、Ribbon 时候,由于 Eureka、Ribbon 内部的缓存机 制。会导致服务上线或者下线的时候会出现服务不可⽤的情况,极端情况下会出现 90 秒服务不可能。在没有解决这些问题的前提下,使⽤ Eureka、Ribbon 搭建微服务应⽤是不可能会有 4 个 9 的。

对于这种情况,官⽅也没有给具体的解决⽅案。在没有解决这个问题之前,很多公司也许会跟

我们采取同样的做法,就是在晚上流量⽐较少情况停服发布。但是试下⼀想,每次发布都只能

在 23 点以后发布,关闭 SLB,停机,执⾏ SQL,起服务。⼗⼏个服务,验证功能,⼀顿操作

⼀下,⾄少要搞到两三点。这对于整个开发团队来说都是⼀种负担。

本⽂将分享是如何解决 Eureka、Ribbon 组件使⽤上的弊端。

Eureka 注册,服务发现机制原理

相信熟悉微服务架构的⼈,都知道服务注册与发现的作⽤。在成百上千个微服务中,我们必须

需要⼀个中⼼来通知⽣产者与消费者的服务状态变化,以便我们在微服务架构中理清我们的服

务调⽤关系。

经过微服务架构多年的发展,出现很多种注册中⼼,如 Naco、Eureka、etc、ZooKeeper 等

等,其实⼤体思想都⼀样。在服务启动时候向注册中⼼发送消息告诉注册中⼼我已经 ready,

可以被调⽤了,然后再持续运⾏过程通过⼼跳的⽅式向注册中⼼汇报⾃身的状态。

注册中⼼收到服务注册信息后,向调⽤⽅推送或者调⽤⽅主动拉取需要调⽤的服务的状态信

息。在这个过程中其实并没有很⾼深的理论与思想。注册中⼼与各个服务的交互⽅式⽆⾮就是

⻓连或者短连。

服务注册与续约机制,缓存刷新

在服务环境完全启动完成之后,集成了 eureka-client 的服务会实例化com.netflix.discovery.DiscoveryClient 实例,DiscoveryClient 实例对 Eureka 客户端来说⾄关重要,各种线程池 初始化、服务注册、续约、刷新缓存都在这个对象完成。DiscoveryClient 有个强⼤的构造⽅法,在初始化的构造三个⾄关的重要的线程池

// 线程调度器
private final ScheduledExecutorService scheduler;
// ⼼跳线程池 ,主要⽤于注册与续约
private final ThreadPoolExecutor heartbeatExecutor;
// 本地缓存刷新线程池
private final ThreadPoolExecutor cacheRefreshExecutor;

🖐️****在构造⽅法⾥⾯同时调⽤⼀个重要⽅法 initScheduledTasks

private void initScheduledTasks() {
......
if (this.clientConfig.shouldFetchRegistry()) {
    ......
    // 这⾥⽤ cacheRefreshExecutor 线程池启动定时任务来定时刷新缓存 的操作
    this.scheduler.schedule(new TimedSupervisorTask("cacheRe‐
                                                    fresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalIn‐
                                                    Secs, TimeUnit.SECONDS, expBackOffBound, new
                                                    DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, 
                            TimeUnit.SECONDS);
}
if (this.clientConfig.shouldRegisterWithEureka()) {
    .......
    // 这⾥使⽤了 heartbeatExecutor 线程池来来启动任务进⾏注册与续约的 操作
    this.scheduler.schedule(new
                            TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecu‐
                                                tor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new
                                                DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, 
                            TimeUnit.SECONDS);
    ..... 
} else {
    logger.info("Not registering with Eureka server per con‐
    figuration");
} }

注册与续约的时候的线程是 HeartbeatThread:

private class HeartbeatThread implements Runnable {
    private HeartbeatThread() { }
    public void run() {
        if (DiscoveryClient.this.renew()) {
            DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp 
            = System.currentTimeMillis();
        } }
}

到这⾥我们基本明⽩注册与续约都是⽤了 DiscoveryClient 对象的 renew ⽅法。

同理我们看到缓存刷新使⽤的是 CacheRefreshThread 线程:

class CacheRefreshThread implements Runnable {
    CacheRefreshThread() {
    }
    public void run() {
        DiscoveryClient.this.refreshRegistry();
    } 
}

由代码可以看出,刷新 Eureka 客户端的缓存是通过 DiscoveryClient 对象的 refreshRegistry

⽅法实现的。

Ribbon 的负载负载均衡策略

在使⽤ Spring Cloud 全家桶的 Ribbon 做负载均衡时候,Ribbon 不同于 F5、Nginx 等等通过

软件或者硬件做负载均衡,Ribbon 直接就在应⽤端做负载均衡。Ribbon ⾃⼰实现负载均衡器,根据⼀定规则,从 Ribbon 的本地缓存的服务列表⾥⾯选择服务进⾏调⽤。值得注意的是,Ribbon给每个服务都初始化了⼀个 Spring 容器。Eureka 的缓存列表是通过定 时任务去注册中⼼拉取,Ribbon 的本地服务缓存列表则是通过定时任务通过 Eureka 的缓存列表同步过来。

打开 org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration 在这个配置类⾥

⾯,可以看到定义了各种 Ribbon 负载均衡需要的各种对象。其中有定义了默认的负载均衡规

则,默认的拉取服务列表的策略。

@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
    if (this.propertiesFactory.isSet(IRule.class, this.name)) {
        return (IRule)this.propertiesFactory.get(IRule.class, con‐
                                                 fig, this.name);
    } else {
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    } 
}

上⾯的代码给我们定义了 Ribbon 默认的负载均衡策略,在我们没有初始化其他实现 IRule 类

的时候执⾏。

**Ribbon 主要实现了⼀下⼏种负载均衡策略: **

  • RoundRobinRule:轮询策略
  • RandomRule:随机策略
  • AvailabilityFilteringRule:可⽤过滤策略
  • WeightedResponseTimeRule:响应时间权重策略
  • RetryRule:轮询失败重试策略
  • BestAvailableRule:并发量最⼩可⽤策略
  • ZoneAvoidanceRule:根据 server 所在区域的性能和 server 的可⽤性

当然,我们也可以根据业务要求⾃定义负载均衡规则。

如下代码,Ribbon 定义了默认的拉取服务列表的⽅式:

@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig 
                                                 config) {
    return new PollingServerListUpdater(config);
}

**Ribbon 实现了两种拉取服务的⽅式: **

  • PollingServerListUpdater:通过线程池定时任务的⽅式每隔 30s 从 Eureka 缓存拉取
  • EurekaNotificationServerListUpdater:通过监听 Eureka 的缓存更新事件,当 Eureka 客 户端从注册中⼼拉取服务列表的时候,同时同步到 Ribbon 的缓存列表中。上⾯两个类都是 ServerListUpdater 的实现类,都实现了 ServerListUpdate 的 start ⽅法。⽽ start ⽅法会在 DynamicServerListLoadBalancer(Ribbon 负载均衡器)初始化的时候的被调⽤。下⾯可以看下 start ⽅法的代码实现:
public class EurekaNotificationServerListUpdater implements ServerListUpdater {
    //......省略代码
    public synchronized void start(final UpdateAction updateAction) {
        if (this.isActive.compareAndSet(false, true)) {
            // 初始化 Eureka 时间监听器
            this.updateListener = new EurekaEventListener() {
                public void onEvent(EurekaEvent event) {
                    // ......省略代码 }
                };
                //......省略代码
                // 把监听器注册到 Eureka 的缓存更新事件中
                this.eurekaClient.registerEventListener(this.updateListen‐
                                                        er);
        } else {
            logger.info("Update listener already registered, no-op");
        }
        ......省略代码 }
    public class PollingServerListUpdater implements ServerListUpdater {
        //...... 省略代码
        public synchronized void start(final UpdateAction updateAction) {
            if (this.isActive.compareAndSet(false, true)) {
                // 初始化线程
                Runnable wrapperRunnable = new Runnable() {
                    public void run() {
                        if (!PollingServerListUpdater.this.isActive.get())
                        {
                            if (PollingServerListUpdater.this.scheduledFu‐
                                ture != null) {
                                PollingServerListUpdater.this.scheduledFu‐
                                ture.cancel(true);
                            }
                        } else {
                            try {
                                updateAction.doUpdate();
                                PollingServerListUpdater.this.lastUpdated 
                                = System.currentTimeMillis();
                            } catch (Exception var2) {
                                PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
                            } } }
                };
                // 启动定时任务更新 Ribbon 缓存
                this.scheduledFuture = getRefreshExecutor().scheduleWith‐
                FixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshInter‐
                           valMs, TimeUnit.MILLISECONDS);
            } else {
                logger.info("Already active, no-op");
            } 
        }

Eureka、Ribbon 的缓存机制

通过上⾯的分析,我们所了解到有两个地⽅的缓存,⼀是 Eureka 客户端缓存,⼆是 Ribbon 负

载均衡器的缓存。这些缓存机制是否都是必须存在的呢?很明显,这些缓存机制都是有存在的必要的,⽽且是⾮

常合理的。因为你的服务状态不可能是时时变化的。在 Eureka、Ribbon 这两个组件中还有

eureka-server 也是有缓存中的。加起来三个地⽅有缓存。eureka-server 中⽤ guava 定义了 readWriteCacheMap,readOnlyCacheMap 两个缓存。当我们的服务注册到 eureka-server,服务的各类元数据信息会先存储在 readWriteCacheMap,然后定时任务每隔 30s 同步到 readOnlyCacheMap 中,所以服务提供者刚注册到注册中⼼,服务调⽤者是拉取不到的。

**综上各级缓存,我们可以看下图⽐较直观: **

由于服务启动的时候,服务会⾃动注册到注册中⼼,所以只是在续约的时候,服务提供者到注

册中⼼才有 30s。

Eureka、Ribbon 使⽤的弊端

通过上⽂的各种分析,相信⼤家已经知道使⽤ Eureka、Ribbon 的问题所在了。正式因为各级

缓存,各类定时任务的存在,所以我们在正常使⽤ Eureka、Ribbon 的时候是没有办法做到 4

个 9 ⾼可⽤的。试想⼀下,当我们⼀个服务启动提供,服务注册到 eureka-server。readWriteCacheMap 到 readOnlyCacheMap 需要 30s,readOnlyCacheMap 到 eureka-client 需要 30s,eureka-client 到 Ribbon 需要 30s,所以极端情况下,调⽤⽅ 90s 之后才能知道服务提供者的信息。 同理,当我们下线⼀个服务的时候,由于 readOnlyCacheMap、eureka-client、Ribbon 三个 端都存在缓存,所以在极端情况下,90s 内会出现服务提供者的接⼝调⽤异常,因为它已经下 线了。如果像天猫淘宝这种流量级别⽹站,这种情况当然是不可接受的。

如何使⽤ RabbitMQ 解决 Eureka、Ribbon 的弊端

**主要实现两个⽬标: **

  • 服务平滑下线,先让客户端感知即将要下线,不负载均衡到即将要下线的服务中,等下线服务已经请求进来的流量跑完,最后平滑下线
  • 服务快速感知,当服务上线后,服务调⽤⽅可以快速感知已上线的服务。

我们要实现上⾯两个⽬标,⾸先是要想办法⼲掉各种缓存机制。⼲掉缓存操作办法也很简单。

第⼀,使⽤在 eureka-server 使⽤下⾯两个配置:

# 禁⽤保护模式
eureka.server.enable-self-preservation=false
# 禁⽤ readOnlyCacheMap
eureka.server.use-read-only-response-cache=false

第⼆,eureka-client 端使⽤ EurekaNotificationServerListUpdater ⽅式同步可⽤服务列表更新

到 Ribbon,即 eureka-client 更新缓存的同时⻢上 Ribbon 缓存。

当我们分析服务注册推送到注册中⼼的 com.netflix.appinfo.InstanceInfo 对象发现:

public class InstanceInfo {
    // eureka 允许我们⾃定义业务的元数据
    @XStreamAlias("metadata")
    private volatile Map<String, String> metadata;
    @Auto
    private volatile Long lastUpdatedTimestamp;
    @Auto
    private volatile Long lastDirtyTimestamp;
    @Auto
    private volatile InstanceInfo.ActionType actionType;
    @Auto
    private volatile String asgName;
    private String version;
}

InstanceInfo 提供了 metadata 属性,允许我们⾃定义业务元数据推到注册中⼼。基于这个,当服务下线时,先保持服务状态不变,设置服务下线的标志元数据推送到注册中⼼,通知所有的服务消费者这个服务要下线了,重新拉取服务列表的状态信息。 这个通知的过程我们需要借助⼀些消息中间件,如 RabbitMQ、RocketMQ 等等,⾃定义Ribbon 负载均衡规则,服务列表⾥⾯剔除带有下线标志位的服务后再做负载均衡。最后 30s 后再杀掉服务,⾄此,这个服务实现平滑下线。当服务要上线的时候,监听 Spring 容器的初始化完成事件,在监听事件⾥⾯只做⼀件事,发送服务上线消息到 MQ,服务消费者收到消息后执⾏重新拉取服务列表的操作,实现服务快速感知。

平滑发布实现的设计实现原理

总体思路,利⽤ MQ 的原⼦消息⼴播,通知每个服务消费者,在此前提下,我们必须⽐较了解

Eureka、Ribbon 的⼀些代码实现,上⽂已跟⼤家分析了个⼤概。现在来看下总体的设计思路图

平滑发布代码实现

有了上⾯的⽅法论,接下来就是代码实现了。

1. 监听服务启动事件:

@Component
@Slf4j
public class SpringContextGcdListener implements
ApplicationListener<ContextRefreshedEvent> {
    private volatile AtomicBoolean isSend =new AtomicBoolean(false);
    @Autowired
    private ApplicationInfoManager applicationInfoManager;
    @Autowired
    private SksEurekaManager sksEurekaManager;
    @SneakyThrows
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        try{
            if(event.getApplicationContext().getParent() instanceof
               AnnotationConfigApplicationContext) {
                //防⽌重复触发
                if (isSend.compareAndSet(false, true)) {
                    for (int i = 0; i < 10; i++) {
                        DiscoveryClient discoveryClient = 
                        SpringContextHolder.getBean(DiscoveryClient.class);
                        if (Objects.nonNull(discoveryClient)) {
                            InstanceInfo instanceInfo = applicationIn‐
                            foManager.getInfo();
                            EurekaConstant.setQueue(instanceInfo.getAppName(), instanceInfo.getH‐
                                                    ostName(), instanceInfo.getPort());
                            ServerConsumer consumer = 
                            SpringContextHolder.getBean(ServerConsumer.class);
                            consumer.setQueueName(EurekaConstant.EU‐
                                                  REKA_RIBBON_QUEUE);
                            consumer.start();
                            LogUtil.info(log, "SpringContextListener",
                                         "MQ 消费启动监听");
                            ServerChangeEvent changeEvent = change‐
                            Event(instanceInfo, ServerStatus.UP);
                            // 推送 MQ 消息
                            EurekaManager.sendMqMessage(changeEvent);
                            //LogUtil.info(log, 
                            "SpringContextListener", "queue :{}", 
                            EurekaConstant.EUREKA_RIBBON_QUEUE);
                            LogUtil.info(log, "SpringContextListener",
                                         "app start up send mq message :{}", JsonUtil.toJson(changeEvent));
                            break; }
                        Thread.sleep(1000);
                    } } } }catch (Exception e){
            LogUtil.error(log,"SpringContextListener","onApplication‐
                          Event:{}",e);
        } } 
}

2. MQ 消费处理:

@Slf4j
@Component
public class ConsumerTask {
    public boolean consumer( byte[] bytes) {
        String json = new String(bytes);
        ServerChangeEvent event = JsonUtil.fromStr(json, 
                                                   ServerChangeEvent.class);
        try{
            DiscoveryClient discoveryClient = 
            SpringContextHolder.getBean(DiscoveryClient.class);
            // 重新拉取服务列表
            Method refresh = 
            DiscoveryClient.class.getDeclaredMethod(EurekaConstant.REFRESH_METHOD)
            ;
            refresh.setAccessible(true);
            refresh.invoke(discoveryClient);
        }catch (Exception e){
            LogUtil.error(log,"EurekaServerConsumerTask","eureka rib‐
                          bon refresh :{}",e);
        }
        return true; } 
}

3. ⾃定义负载均衡规则:

@Slf4j
public class EurekaRibbonRule extends ClientConfigEnabledRoundRobinRule {
    private LoadBalancerStats loadBalancerStats;
    @Override
    public Server choose(Object key) {
        if (loadBalancerStats == null) {
            return super.choose(key);
        }
        List<Server> serverList = getLoadBalancer().getAllServers();
        // 踢掉要下线的服务
        List<Server> upServerList = Lists.newArrayList();
        for(Server server: serverList){
            if (server instanceof DiscoveryEnabledServer) {
                final DiscoveryEnabledServer discoveryEnabledServer = 
                (DiscoveryEnabledServer) server;
                final Map<String, String> metadata = discoveryEnabled‐
                Server.getInstanceInfo().getMetadata();
                final String down = metadata.get(SERVER_STATUS_KEY);
                if (!ServerStatus.DOWN.name().equals(down)) {
                    upServerList.add(server);
                } } }
        if(upServerList.isEmpty()){
            return null; }
        //... 根据需要做负载负载均衡规则
        if (chosen == null) {
            return super.choose(key);
        } else {
            return chosen;
        } 
}

4. 服务上线发送原⼦消息⼴播:

private void changeServer(ServerStatus serverStatus) {
    Map<String,String> engMetaMap = Maps.newHashMap();
    engMetaMap.put(EurekaConstant.SERVER_STATUS_KEY, serverSta‐
                   tus.name());
    applicationInfoManager.registerAppMetadata(engMetaMap);
    DiscoveryClient discoveryClient = 
    SpringContextHolder.getBean(DiscoveryClient.class);
    try {
        Method renew = 
        DiscoveryClient.class.getDeclaredMethod(EurekaConstant.RENEW_METHOD);
        renew.setAccessible(true);
        renew.invoke(discoveryClient);
        ServerChangeEvent event = changeEvent(applicationInfoMan‐
                                              ager.getInfo(),serverStatus);
        sendMqMessage(event);
    } catch (Exception e) {
    } 
}

5. 初始化负载均衡规则和 Ribbon 缓存更新⽅式:

由 于 Ribbon 会 给 每 个 服 务 提 供 初 始 化 容 器 , 为 了 使 Ribbon 容 器 都 加 载 到RibbonAutoGcdConfiguration 配置,我们需要在服务启动类上加上

@RibbonClients(defaultConfiguration = RibbonAutoGcdConfiguration.class)

⾄此,所有核⼼代码完毕,上⾯只是⼀些核⼼代码,根据思路,相信⼤家也能搞出来,如有不详尽,欢迎⼤家留⾔讨论,我将知⽆不⾔。

基于 Eureka、Ribbon 的灰度发布

在使⽤ Eureka、Ribbon 要做灰度的话,要利⽤⼀下⽹关的组件,灰度的实现⽅式有很多,我发⼀下我的思路,希望跟⼤家讨论交流⼀下。

免费IT资源小站,最后送给大家一套学习RabbitMQ的课程:中间件MQ-RabbitMQ入门到实战课程 - IT资源小站


本文转载自: https://blog.csdn.net/weixin_46786684/article/details/138254971
版权归原作者 程序员理查德 所有, 如有侵权,请联系我们删除。

“基于 RabbitMQ 实现 Eureka 服务平滑灰度发布”的评论:

还没有评论