0


一文搞懂EureKa原理

关注公众号 不爱总结的麦穗 将不定期推送技术好文

Eureka Server 服务注册中心,提供服务发现并实现负载均衡和故障转移。它由两个组件组成:Eureka服务器和Eureka客户端。

Eureka Server

  • 以 REST API 的形式为服务实例提供了注册、管理和查询等操作。同时,Eureka Server 也为我们提供了可视化的监控页面,可以直观地看到各个 Eureka Server 当前的运行状态和所有已注册服务的情况。

Eureka Client

  • 客户端将自身服务注册到Eureka,从而使服务消费方能够找到
  • 从Eureka服务器获取注册服务列表,从而能够消费服务

Eureka架构

在这里插入图片描述

从图上标注出来的地方,我们可以看到Eureka Server 和 Eureka Client通信过程:

  • Register :服务注册

Eureka客户端向Eureka Server注册时,它提供自身的元数据,比如IP地址、端口等

  • Renew:服务续约

Eureka客户端会每隔30秒发送一次心跳来续约。通过续约来告知Eureka Server该客户端仍然存在。

  • Get Registries:获取注册列表信息

Eureka客户端从服务器获取注册表信息,将其缓存到本地。客户端会使用该信息查找其他服务,从而进行远程调用。该注册列表信息定期(每30秒)更新一次。

  • Cancel:服务下线

Eureka客户端在程序关闭时向Eureka服务器发送取消请求。

  • Make Remote Call:

从eureka client到eureka client,远程调用

  • 服务剔除:

Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。

源码解析

接下来,从源码层面来分析上述过程。

服务注册

在这里插入图片描述

当Eureka Client 启动时

EurekaClientAutoConfiguration

配置类生效,会注入Bean CloudEurekaClient,然后调用父类DiscoveryClient的构造方法。最终通过

RestTemplateEurekaHttpClient#register

方法发出REST请求。

Eureka Server

ApplicationResource#addInstance

在收到请求后,最终通过

AbstractInstanceRegistry#register

方法完成了客户端注册的主要逻辑。

  • AbstractInstanceRegistry#register
publicvoidregister(InstanceInfo registrant,int leaseDuration,boolean isReplication){
    read.lock();try{// 根据微服务名称从注册表 `registry` 中获取注册的 `Map<String, Lease<InstanceInfo>>`Map<String,Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);if(gMap ==null){finalConcurrentHashMap<String,Lease<InstanceInfo>> gNewMap =newConcurrentHashMap<String,Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if(gMap ==null){
                gMap = gNewMap;}}// 通过实例id,从map中获取服务实例对应的租约Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif(existingLease !=null&&(existingLease.getHolder()!=null)){Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if(existingLastDirtyTimestamp > registrationLastDirtyTimestamp){
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();}}else{// 新注册的服务实例synchronized(lock){if(this.expectedNumberOfClientsSendingRenews >0){// 预期续约客户端数量+1this.expectedNumberOfClientsSendingRenews =this.expectedNumberOfClientsSendingRenews +1;// 自我保护阀值计算 2*(60/30)*0.85updateRenewsPerMinThreshold();}}
            logger.debug("No previous lease information found; it is new registration");}// 通过服务实例信息和默认的90s租期构建Lease 租约信息Lease<InstanceInfo> lease =newLease<>(registrant, leaseDuration);if(existingLease !=null){// 更新服务启动时间
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 将封装了服务实例信息的Lease对象,放到gMap里面去
        gMap.put(registrant.getId(), lease);
        recentRegisteredQueue.add(newPair<Long,String>(System.currentTimeMillis(),
                registrant.getAppName()+"("+ registrant.getId()+")"));// This is where the initial state transfer of overridden status happensif(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())){
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+"overrides", registrant.getOverriddenStatus(), registrant.getId());if(!overriddenInstanceStatusMap.containsKey(registrant.getId())){
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if(overriddenStatusFromMap !=null){
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus =getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 启动状态 设置服务启动时间if(InstanceStatus.UP.equals(registrant.getStatus())){
            lease.serviceUp();}
        registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(newRecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();// 删除缓存invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);}finally{
        read.unlock();}}

Eureka Server会把客户端的实列信息保存在自己的注册表中(双层Map结构)。

服务续约

在这里插入图片描述

当Eureka Client 启动时,会开启一个心跳任务,每隔30s向服务端发送一次心跳请求

  • DiscoveryClient#initScheduledTasks
privatevoidinitScheduledTasks(){// 部分代码省略// 构建一个心跳续约的定时任务,每30s(默认)执行一次
        heartbeatTask =newTimedSupervisorTask("heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,TimeUnit.SECONDS,
                expBackOffBound,// 心跳续约逻辑newHeartbeatThread());
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs,TimeUnit.SECONDS);// 部分代码省略

Eureka Server

InstanceResource#renewLease

方法接收到请求进行续约

  • InstanceResource#renewLease
@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // 调用renew进行续约
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

// 部分代码省略
}
  • AbstractInstanceRegistry#renew
publicbooleanrenew(String appName,String id,boolean isReplication){RENEW.increment(isReplication);// 根据服务名字获取实例信息Map<String,Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew =null;if(gMap !=null){
        leaseToRenew = gMap.get(id);}// 服务实例不存在,直接返回续约失败if(leaseToRenew ==null){RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);returnfalse;}else{// 获取服务实例的基本信息InstanceInfo instanceInfo = leaseToRenew.getHolder();if(instanceInfo !=null){// touchASGCache(instanceInfo.getASGName());// 获取服务实例的运行状态InstanceStatus overriddenInstanceStatus =this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);// 如果运行状态未知,也返回续约失败if(overriddenInstanceStatus ==InstanceStatus.UNKNOWN){
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+"; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);returnfalse;}if(!instanceInfo.getStatus().equals(overriddenInstanceStatus)){
                logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+"Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                overriddenInstanceStatus.name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}
        renewsLastMin.increment();// 更新服务端最后一次收到心跳请求的时间
        leaseToRenew.renew();returntrue;}}

服务续约,就是更新服务端最后一次收到心跳请求的时间。

获取注册列表信息

当Eureka Client 启动时,会开启一个缓存刷新任务,每隔30s向服务端发送一次请求

  • DiscoveryClient#initScheduledTasks
privatevoidinitScheduledTasks(){if(clientConfig.shouldFetchRegistry()){// 构建一个缓存刷新的定时任务,每30s(默认)执行一次int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask =newTimedSupervisorTask("cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,TimeUnit.SECONDS,
                expBackOffBound,// 缓存刷新逻辑newCacheRefreshThread());
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds,TimeUnit.SECONDS);}// 部分代码省略}
  • DiscoveryClient#fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            // 全量获取服务列表
            getAndStoreFullRegistry();
        } else {
             // 增量获取服务列表
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
                appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

Eureka Server ApplicationsResource接收到请求后,从缓存中返回服务列表

  • ApplicationsResource#getContainers 全量获取服务列表
Key cacheKey =newKey(Key.EntityType.Application,// 全量ResponseCacheImpl.ALL_APPS,
        keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept), regions
);
  • ApplicationsResource#getContainerDifferential 增量获取服务列表
Key cacheKey =newKey(Key.EntityType.Application,// 增量ResponseCacheImpl.ALL_APPS_DELTA,
        keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept), regions
);

通过源码可以看到,这两个方法都是从缓存中获取服务列表。区别在于获取缓存的key

那Eureka Server是怎样把客户端的注册信息存到缓存的呢?

Eureka缓存机制

Eureka Server 为了提供响应效率,提供了三层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中。
在这里插入图片描述

  • 第一层缓存:readOnlyCacheMap,定时从 readWriteCacheMap 同步数据(默认时间为 30 秒)。定时器通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。
  • 第二层缓存:readWriteCacheMap,主要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 方法去加载,加载成功之后将数据放入缓存,同时返回数据。缓存过期时间,默认为 180 秒。
  • 第三层缓存:registry注册信息表,当服务下线、过期、注册、状态变更,都会来清除readWriteCacheMap缓存中的数据。
服务下线

当Eureka Client 服务关闭的时候会取消本机的各种定时任务,给服务端发送请求告知自己下线

  • DiscoveryClient#shutdown
@PreDestroy@Overridepublicsynchronizedvoidshutdown(){// 部分代码省略// 取消定时任务cancelScheduledTasks();// If APPINFO was registeredif(applicationInfoManager !=null&& clientConfig.shouldRegisterWithEureka()&& clientConfig.shouldUnregisterOnShutdown()){
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);// 向服务端发送请求告知自己下线unregister();}// 部分代码省略}}

Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。

服务剔除

Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。

  • AbstractInstanceRegistry#postInit
protectedvoidpostInit(){
    renewsLastMin.start();if(evictionTaskRef.get()!=null){
        evictionTaskRef.get().cancel();}//剔除逻辑
    evictionTaskRef.set(newEvictionTask());
    创建服务剔除定时任务,60s(默认)
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());}

本文转载自: https://blog.csdn.net/Tan_xiong/article/details/142139187
版权归原作者 不爱总结的麦穗 所有, 如有侵权,请联系我们删除。

“一文搞懂EureKa原理”的评论:

还没有评论