关注公众号 不爱总结的麦穗 将不定期推送技术好文
Eureka Server 服务注册中心,提供服务发现并实现负载均衡和故障转移。它由两个组件组成:Eureka服务器和Eureka客户端。
Eureka Server
- 以 REST API 的形式为服务实例提供了注册、管理和查询等操作。同时,Eureka Server 也为我们提供了可视化的监控页面,可以直观地看到各个 Eureka Server 当前的运行状态和所有已注册服务的情况。
Eureka Client
- 客户端将自身服务注册到Eureka,从而使服务消费方能够找到
- 从Eureka服务器获取注册服务列表,从而能够消费服务
Eureka架构
从图上标注出来的地方,我们可以看到Eureka Server 和 Eureka Client通信过程:
- Register :服务注册
Eureka客户端向Eureka Server注册时,它提供自身的元数据,比如IP地址、端口等
- Renew:服务续约
Eureka客户端会每隔30秒发送一次心跳来续约。通过续约来告知Eureka Server该客户端仍然存在。
- Get Registries:获取注册列表信息
Eureka客户端从服务器获取注册表信息,将其缓存到本地。客户端会使用该信息查找其他服务,从而进行远程调用。该注册列表信息定期(每30秒)更新一次。
- Cancel:服务下线
Eureka客户端在程序关闭时向Eureka服务器发送取消请求。
- Make Remote Call:
从eureka client到eureka client,远程调用
- 服务剔除:
Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。
源码解析
接下来,从源码层面来分析上述过程。
服务注册
当Eureka Client 启动时
EurekaClientAutoConfiguration
配置类生效,会注入Bean CloudEurekaClient,然后调用父类DiscoveryClient的构造方法。最终通过
RestTemplateEurekaHttpClient#register
方法发出REST请求。
Eureka Server
ApplicationResource#addInstance
在收到请求后,最终通过
AbstractInstanceRegistry#register
方法完成了客户端注册的主要逻辑。
- AbstractInstanceRegistry#register
publicvoidregister(InstanceInfo registrant,int leaseDuration,boolean isReplication){
read.lock();try{// 根据微服务名称从注册表 `registry` 中获取注册的 `Map<String, Lease<InstanceInfo>>`Map<String,Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);if(gMap ==null){finalConcurrentHashMap<String,Lease<InstanceInfo>> gNewMap =newConcurrentHashMap<String,Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if(gMap ==null){
gMap = gNewMap;}}// 通过实例id,从map中获取服务实例对应的租约Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif(existingLease !=null&&(existingLease.getHolder()!=null)){Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if(existingLastDirtyTimestamp > registrationLastDirtyTimestamp){
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();}}else{// 新注册的服务实例synchronized(lock){if(this.expectedNumberOfClientsSendingRenews >0){// 预期续约客户端数量+1this.expectedNumberOfClientsSendingRenews =this.expectedNumberOfClientsSendingRenews +1;// 自我保护阀值计算 2*(60/30)*0.85updateRenewsPerMinThreshold();}}
logger.debug("No previous lease information found; it is new registration");}// 通过服务实例信息和默认的90s租期构建Lease 租约信息Lease<InstanceInfo> lease =newLease<>(registrant, leaseDuration);if(existingLease !=null){// 更新服务启动时间
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 将封装了服务实例信息的Lease对象,放到gMap里面去
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(newPair<Long,String>(System.currentTimeMillis(),
registrant.getAppName()+"("+ registrant.getId()+")"));// This is where the initial state transfer of overridden status happensif(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())){
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+"overrides", registrant.getOverriddenStatus(), registrant.getId());if(!overriddenInstanceStatusMap.containsKey(registrant.getId())){
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if(overriddenStatusFromMap !=null){
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus =getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 启动状态 设置服务启动时间if(InstanceStatus.UP.equals(registrant.getStatus())){
lease.serviceUp();}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(newRecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();// 删除缓存invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);}finally{
read.unlock();}}
Eureka Server会把客户端的实列信息保存在自己的注册表中(双层Map结构)。
服务续约
当Eureka Client 启动时,会开启一个心跳任务,每隔30s向服务端发送一次心跳请求。
- DiscoveryClient#initScheduledTasks
privatevoidinitScheduledTasks(){// 部分代码省略// 构建一个心跳续约的定时任务,每30s(默认)执行一次
heartbeatTask =newTimedSupervisorTask("heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,TimeUnit.SECONDS,
expBackOffBound,// 心跳续约逻辑newHeartbeatThread());
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs,TimeUnit.SECONDS);// 部分代码省略
Eureka Server
InstanceResource#renewLease
方法接收到请求进行续约
- InstanceResource#renewLease
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 调用renew进行续约
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// 部分代码省略
}
- AbstractInstanceRegistry#renew
publicbooleanrenew(String appName,String id,boolean isReplication){RENEW.increment(isReplication);// 根据服务名字获取实例信息Map<String,Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew =null;if(gMap !=null){
leaseToRenew = gMap.get(id);}// 服务实例不存在,直接返回续约失败if(leaseToRenew ==null){RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);returnfalse;}else{// 获取服务实例的基本信息InstanceInfo instanceInfo = leaseToRenew.getHolder();if(instanceInfo !=null){// touchASGCache(instanceInfo.getASGName());// 获取服务实例的运行状态InstanceStatus overriddenInstanceStatus =this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);// 如果运行状态未知,也返回续约失败if(overriddenInstanceStatus ==InstanceStatus.UNKNOWN){
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+"; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);returnfalse;}if(!instanceInfo.getStatus().equals(overriddenInstanceStatus)){
logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+"Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}
renewsLastMin.increment();// 更新服务端最后一次收到心跳请求的时间
leaseToRenew.renew();returntrue;}}
服务续约,就是更新服务端最后一次收到心跳请求的时间。
获取注册列表信息
当Eureka Client 启动时,会开启一个缓存刷新任务,每隔30s向服务端发送一次请求
- DiscoveryClient#initScheduledTasks
privatevoidinitScheduledTasks(){if(clientConfig.shouldFetchRegistry()){// 构建一个缓存刷新的定时任务,每30s(默认)执行一次int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask =newTimedSupervisorTask("cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,TimeUnit.SECONDS,
expBackOffBound,// 缓存刷新逻辑newCacheRefreshThread());
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds,TimeUnit.SECONDS);}// 部分代码省略}
- DiscoveryClient#fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 全量获取服务列表
getAndStoreFullRegistry();
} else {
// 增量获取服务列表
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
Eureka Server ApplicationsResource接收到请求后,从缓存中返回服务列表。
- ApplicationsResource#getContainers 全量获取服务列表
Key cacheKey =newKey(Key.EntityType.Application,// 全量ResponseCacheImpl.ALL_APPS,
keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept), regions
);
- ApplicationsResource#getContainerDifferential 增量获取服务列表
Key cacheKey =newKey(Key.EntityType.Application,// 增量ResponseCacheImpl.ALL_APPS_DELTA,
keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept), regions
);
通过源码可以看到,这两个方法都是从缓存中获取服务列表。区别在于获取缓存的key。
那Eureka Server是怎样把客户端的注册信息存到缓存的呢?
Eureka缓存机制
Eureka Server 为了提供响应效率,提供了三层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中。
- 第一层缓存:readOnlyCacheMap,定时从 readWriteCacheMap 同步数据(默认时间为 30 秒)。定时器通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。
- 第二层缓存:readWriteCacheMap,主要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 方法去加载,加载成功之后将数据放入缓存,同时返回数据。缓存过期时间,默认为 180 秒。
- 第三层缓存:registry注册信息表,当服务下线、过期、注册、状态变更,都会来清除readWriteCacheMap缓存中的数据。
服务下线
当Eureka Client 服务关闭的时候会取消本机的各种定时任务,给服务端发送请求告知自己下线。
- DiscoveryClient#shutdown
@PreDestroy@Overridepublicsynchronizedvoidshutdown(){// 部分代码省略// 取消定时任务cancelScheduledTasks();// If APPINFO was registeredif(applicationInfoManager !=null&& clientConfig.shouldRegisterWithEureka()&& clientConfig.shouldUnregisterOnShutdown()){
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);// 向服务端发送请求告知自己下线unregister();}// 部分代码省略}}
Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。
服务剔除
Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。
- AbstractInstanceRegistry#postInit
protectedvoidpostInit(){
renewsLastMin.start();if(evictionTaskRef.get()!=null){
evictionTaskRef.get().cancel();}//剔除逻辑
evictionTaskRef.set(newEvictionTask());
创建服务剔除定时任务,60s(默认)
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());}
版权归原作者 不爱总结的麦穗 所有, 如有侵权,请联系我们删除。