0


Eureka 服务注册源码探秘——图解、源码级解析

🍊 Java学习:社区快速通道

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南

📆 最近更新:2023年5月2日

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

引言

服务注册是为了解决各个微服务的“你是谁”这个问题,即获取所有服务节点的身份信息和服务名称,站在注册中心的角度来看,有以下两种比较直观的解决方案:

  1. 由注册中心主动访问网络节点中所有机器
  2. 注册中心等待服务节点主动进行注册

在这里插入图片描述

目前主流的注册中心(Nacos、Eureka)都选择了第二种方案,主要原因是第一种方案有很多弊端:

  • 模型复杂: 网络结点构成了一张复杂的网,结点与结点之间的关系错综复杂,轮询每个节点的做法通常是注册中心发局域网广播,客户端响应的方式。现实中对于跨局域网的分布式系统来说,响应模型会更加复杂。
  • 网络开销大: 整个网络环境里会掺杂大量非服务节点,这些节点无需对送达的广播请求做出响应,这种广播的模式无疑增加了网络通信成本。
  • 服务端压力增加: 不仅要求注册中心向网络中所有节点主动发送广播请求,还需要对客户端的应答做出响应。考虑到注册中心的节点数远远少于服务节点,所以要尽可能地减轻服务中心承载的业务。

一一对照着看,第二种实现方案就有如下优点:

  1. 注册中心压力小: 网络中其它非服务节点不会产生任何无效请求,也就不用做额外的判断
  2. 效率高: 省去了广播环节的时间,使注册效率大大提高
  3. 节省成本: 节省了大量网络请求的开销

下面就来探索一下经典注册中心微服务

Eureka

服务注册源码。

Eureka 服务注册源码

寻找配置类

要使用Eureka,就需要在SpringBoot的启动类上添加

@EnableDiscoveryClient

注解,所以我们的源码解析,从启动类上的

@EnableDiscoveryClient

注解开始:
请添加图片描述

在Eureka已经启动的状态下,以debug模式启动

EurekaClientApplication

,会来到这里面的断点:

请添加图片描述
其中

metadata

main

函数里挂的注解:

请添加图片描述

attributes

是会获得

@EnableDiscoveryClient

EnableDiscoveryClient

注解,接下来读取注解里面的

autoRegister

属性,如果是true的话,会发现之后导入了一个配置类:

请添加图片描述

寻找服务注册的元数据

进入到该配置类:

请添加图片描述
继续进入到

AutoServiceRegistrationProperties

类里:
请添加图片描述
这些个属性一定会在某些配置项加载的流程中应用到,大家尝试找一下哪些类会引用它。

其中你会找到

AbstractAutoServiceRegistration

,发现其在初始化的流程里使用到:

protectedAbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry,AutoServiceRegistrationProperties properties){this.serviceRegistry = serviceRegistry;this.properties = properties;}

同时还发现了一个服务注册属性

serviceRegistry

privatefinalServiceRegistry<R> serviceRegistry;

进入到

ServiceRegistry

的实现类

EurekaServiceRegistry

请添加图片描述
进入到第一行的方法

maybeInitializeClient

里:

privatevoidmaybeInitializeClient(EurekaRegistration reg){
    reg.getApplicationInfoManager().getInfo();
    reg.getEurekaClient().getApplications();}

继续进入到

getInfo

请添加图片描述
请添加图片描述

发现这里面的信息其实就是我们要向服务中心注册的东西。

register方法

接下来继续执行

EurekaServiceRegistry

register

方法:

reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

首先设置了

instance

的状态,这里

reg.getInstanceConfig().getInitialStatus()

是UP

这里的

register

并没有发起服务调用请求,所以还要通过调用栈来继续寻找。来到上一层

EurekaAutoServiceRegistration

start

方法里:

请添加图片描述
停留在的这一行往上下文中发布了一个事件

InstanceRegisteredEvent

,但此时我们会发现服务其实并没有注册
请添加图片描述
说明在

event

发布前后肯定发生了什么事,让

eureka

服务提供者向注册中心发送了请求,既然

event

发布之后

running

的状态变为了

true

,那确实是运行起来了。

下一个流程

下一个流程在

DiscoveryClient

里,它封装了我们服务的

client

和注册中心之间的各种交互,里面有一个

register

方法

请添加图片描述
跟着断点继续往下走:
请添加图片描述
发现这里用的是

SessionedEurekaHttpClient

,接下来去找它的源码:

请添加图片描述

register

方法在其父类

EurekaHttpClientDecorator

请添加图片描述
通过一个子类实现的

execute

方法,参数是由父类传入的一个代理

delegate

execute

是由

SessionedEurekaHttpClient

子类实现的

protected<R>EurekaHttpResponse<R>execute(RequestExecutor<R> requestExecutor){long now =System.currentTimeMillis();long delay = now -this.lastReconnectTimeStamp;if(delay >=this.currentSessionDurationMs){
        logger.debug("Ending a session and starting anew");this.lastReconnectTimeStamp = now;this.currentSessionDurationMs =this.randomizeSessionDuration(this.sessionDurationMs);TransportUtils.shutdown((EurekaHttpClient)this.eurekaHttpClientRef.getAndSet((Object)null));}EurekaHttpClient eurekaHttpClient =(EurekaHttpClient)this.eurekaHttpClientRef.get();if(eurekaHttpClient ==null){
        eurekaHttpClient =TransportUtils.getOrSetAnotherClient(this.eurekaHttpClientRef,this.clientFactory.newClient());}return requestExecutor.execute(eurekaHttpClient);}

这一段代码尝试从

HttpClient

里拿实例,如果实例为空则会调用一个工具类的方法

getOrSetAnotherClient

去获取一个新的实例,但这里我们会发现其实并不为空:

请添加图片描述
这里调用了另一个

httpclient

。其中

SessionedEurekaHttpClient

用到了装饰器模式,主要装饰的功能是

delay

时间过长时重新启动一个

session

进入到下一层

RetryableEurekaHttpClient

,这一层装饰的功能是可以重试,默认最大重试次数为3:

protected<R>EurekaHttpResponse<R>execute(RequestExecutor<R> requestExecutor){List<EurekaEndpoint> candidateHosts =null;int endpointIdx =0;for(int retry =0; retry <this.numberOfRetries;++retry){EurekaHttpClient currentHttpClient =(EurekaHttpClient)this.delegate.get();EurekaEndpoint currentEndpoint =null;if(currentHttpClient ==null){if(candidateHosts ==null){
                candidateHosts =this.getHostCandidates();if(candidateHosts.isEmpty()){thrownewTransportException("There is no known eureka server; cluster server list is empty");}}if(endpointIdx >= candidateHosts.size()){thrownewTransportException("Cannot execute request on any known server");}

            currentEndpoint =(EurekaEndpoint)candidateHosts.get(endpointIdx++);
            currentHttpClient =this.clientFactory.newClient(currentEndpoint);}try{EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);if(this.serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())){this.delegate.set(currentHttpClient);if(retry >0){
                    logger.info("Request execution succeeded on retry #{}", retry);}return response;}

            logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());}catch(Exception var8){
            logger.warn("Request execution failed with message: {}", var8.getMessage());}this.delegate.compareAndSet(currentHttpClient,(Object)null);if(currentEndpoint !=null){this.quarantineSet.add(currentEndpoint);}}thrownewTransportException("Retry limit reached; giving up on completing the request");}

其中

this.getHostCandidates();

获取的是注册中心:

privateList<EurekaEndpoint>getHostCandidates(){List<EurekaEndpoint> candidateHosts =this.clusterResolver.getClusterEndpoints();this.quarantineSet.retainAll((Collection)candidateHosts);int threshold =(int)((double)((List)candidateHosts).size()*this.transportConfig.getRetryableClientQuarantineRefreshPercentage());if(threshold >((List)candidateHosts).size()){
        threshold =((List)candidateHosts).size();}if(!this.quarantineSet.isEmpty()){if(this.quarantineSet.size()>= threshold){
            logger.debug("Clearing quarantined list of size {}",this.quarantineSet.size());this.quarantineSet.clear();}else{List<EurekaEndpoint> remainingHosts =newArrayList(((List)candidateHosts).size());Iterator var4 =((List)candidateHosts).iterator();while(var4.hasNext()){EurekaEndpoint endpoint =(EurekaEndpoint)var4.next();if(!this.quarantineSet.contains(endpoint)){
                    remainingHosts.add(endpoint);}}

            candidateHosts = remainingHosts;}}return(List)candidateHosts;}

如果坏注册中心节点的数量超过了阈值(66%),则要重启。

quarantineSet

存储的是失败的注册中心,

remainingHosts

存储的是成功的注册中心。

继续execute

回到上面的

execute

方法里,如果重试的索引大于候选注册中心的size时,就表示已知的所有注册中心都不能处理注册请求,此时会抛一个异常出来:

if(endpointIdx >= candidateHosts.size()){thrownewTransportException("Cannot execute request on any known server");}

currentEndpoint =(EurekaEndpoint)candidateHosts.get(endpointIdx++);
currentHttpClient =this.clientFactory.newClient(currentEndpoint);

如果在某一层

execute

成功了,则会将

deligate

设置为当前的

client

,如果不成功则会通过CAS操作将

currentHttpClient

设置为空,然后放置到失效的

EurekaEndpoint

加入到

quarantineSet

,下次不用了
请添加图片描述
此时还有好多层装饰器,这里直接快进跳到最后一层

AbstractJerseyEurekaHttpClient

中的

register

方法:

publicEurekaHttpResponse<Void>register(InstanceInfo info){String urlPath ="apps/"+ info.getAppName();ClientResponse response =null;EurekaHttpResponse var5;try{Builder resourceBuilder =this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();this.addExtraHeaders(resourceBuilder);
        response =(ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding","gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(newString[]{"application/json"})).post(ClientResponse.class, info);
        var5 =EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();}finally{if(logger.isDebugEnabled()){
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}",newObject[]{this.serviceUrl, urlPath, info.getId(), response ==null?"N/A": response.getStatus()});}if(response !=null){
            response.close();}}return var5;}

在这里发送了http请求,info里面存的是当前服务的所有信息

请添加图片描述
这一步结束之后就注册成功了

请添加图片描述


本文转载自: https://blog.csdn.net/HNU_Csee_wjw/article/details/124244398
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。

“Eureka 服务注册源码探秘——图解、源码级解析”的评论:

还没有评论