1、服务端
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency>
spring-cloud-netflix-eureka-server中spring.factories中EurekaServerAutoConfiguration
/**
* Register the Jersey filter.
* @param eurekaJerseyApp an {@link Application} for the filter to be registered
* @return a jersey {@link FilterRegistrationBean}
*/@BeanpublicFilterRegistrationBean<?>jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp){FilterRegistrationBean<Filter> bean =newFilterRegistrationBean<Filter>();
bean.setFilter(newServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX+"/*"));return bean;}
1.1、服务注册
ApplicationResource类的addInstance方法接收请求,在对实例的信息进行验证后,向服务注册中心添加实例。
/注意是POST方法
@POST@Consumes({"application/json","application/xml"})publicResponseaddInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION)String isReplication){
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// validate that the instanceinfo contains all the necessary required fieldsif(isBlank(info.getId())){returnResponse.status(400).entity("Missing instanceId").build();}elseif(isBlank(info.getHostName())){returnResponse.status(400).entity("Missing hostname").build();}elseif(isBlank(info.getIPAddr())){returnResponse.status(400).entity("Missing ip address").build();}elseif(isBlank(info.getAppName())){returnResponse.status(400).entity("Missing appName").build();}elseif(!appName.equals(info.getAppName())){returnResponse.status(400).entity("Mismatched appName, expecting "+ appName +" but was "+ info.getAppName()).build();}elseif(info.getDataCenterInfo()==null){returnResponse.status(400).entity("Missing dataCenterInfo").build();}elseif(info.getDataCenterInfo().getName()==null){returnResponse.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if(dataCenterInfo instanceofUniqueIdentifier){String dataCenterInfoId =((UniqueIdentifier) dataCenterInfo).getId();if(isBlank(dataCenterInfoId)){boolean experimental ="true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if(experimental){String entity ="DataCenterInfo of type "+ dataCenterInfo.getClass()+" must contain a valid id";returnResponse.status(400).entity(entity).build();}elseif(dataCenterInfo instanceofAmazonInfo){AmazonInfo amazonInfo =(AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if(effectiveId ==null){
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}}else{
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}//进入InstanceRegistry的register
registry.register(info,"true".equals(isReplication));returnResponse.status(204).build();// 204 to be backwards compatible}
进入InstanceRegistry的register在这里做了两个功能
@Overridepublicvoidregister(finalInstanceInfo info,finalboolean isReplication){//在方法中使用publishEvent发布了监听事件 。Spring支持事件驱动,可以监听者模式进行事件的监听,这里广播给所有监听者,收到一个服务注册的请求。至于监听器,可以由我们自己手写实现,参数中的事件类型spring会帮我们直接注入handleRegistration(info,resolveInstanceLeaseDuration(info), isReplication);//调用父类PeerAwareInstanceRegistryImpl的register方法super.register(info, isReplication);}@ComponentpublicclassEurekaRegisterListener{@EventListenerpublicvoidregiste(EurekaInstanceRegisteredEvent event){System.out.println(event.getInstanceInfo().getAppName());}}
父类PeerAwareInstanceRegistryImpl的register方法
publicvoidregister(finalInstanceInfo info,finalboolean isReplication){//拿到微服务的过期时间,并进行更新int leaseDuration =Lease.DEFAULT_DURATION_IN_SECS;if(info.getLeaseInfo()!=null&& info.getLeaseInfo().getDurationInSecs()>0){
leaseDuration = info.getLeaseInfo().getDurationInSecs();}//将服务注册交给父类完成 父类AbstractInstanceRegistry的register方法,在这开始真正开始做服务注册super.register(info, leaseDuration, isReplication);//完成集群信息同步replicateToPeers(Action.Register, info.getAppName(), info.getId(), info,null, isReplication);}
AbstractInstanceRegistry的register方法
AbstractInstanceRegistry这个类中定义的Eureka-server的服务注册列表的结构
ConcurrentHashMap中外层的String表示服务名称;
Map中的String表示服务节点的id (也就是实例的instanceid);
Lease是一个心跳续约的对象,InstanceInfo表示实例信息。
privatefinalConcurrentHashMap<String,Map<String,Lease<InstanceInfo>>> registry
=newConcurrentHashMap<String,Map<String,Lease<InstanceInfo>>>();
1、将新的实例写入注册表的数据map中
2、清理缓存
3、主动失效读写缓存里的数据
publicvoidregister(InstanceInfo registrant,int leaseDuration,boolean isReplication){try{
read.lock();Map<String,Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);//首先,注册表根据微服务的名称或取Map,如果不存在就新建,使用putIfAbsent。if(gMap ==null){//1、将新的实例写入注册表的数据map中finalConcurrentHashMap<String,Lease<InstanceInfo>> gNewMap =newConcurrentHashMap<String,Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if(gMap ==null){
gMap = gNewMap;}}Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a lease//然后,从gMap(gMap就是该服务的实例列表)获取一次服务实例,判断这个微服务的节点是否存在,第一次注册的情况下一般是不存在的。当然,也有可能会发生注册信息冲突时,这时Eureka会根据最后活跃时间来判断到底覆盖哪一个:if(existingLease !=null&&(existingLease.getHolder()!=null)){Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if(existingLastDirtyTimestamp > registrationLastDirtyTimestamp){
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();}}else{// The lease does not exist and hence it is a new registrationsynchronized(lock){if(this.expectedNumberOfClientsSendingRenews >0){// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews =this.expectedNumberOfClientsSendingRenews +1;updateRenewsPerMinThreshold();}}
logger.debug("No previous lease information found; it is new registration");}//这段代码中,Eureka拿到存在节点的最后活跃时间,和当前注册节点的发起注册时间,进行对比。当存在的节点的最后活跃时间大于当前注册节点的时间,就说明之前存在的节点更活跃,就替换当前节点。
这里有一个思想,就是如果Eureka缓存的老节点更活跃,就说明它能够使用,而新来的服务我并不知道是否能用,那么Eureka就保守的使用了可用的老节点,从这一点也保证了可用性。
在拿到服务实例后对其进行封装:
Lease<InstanceInfo> lease =newLease<InstanceInfo>(registrant, leaseDuration);if(existingLease !=null){
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}//Lease是一个心跳续约的包装类,里面存放了注册信息,最后操作时间,注册时间,过期时间,剔除时间等信息。在这里把注册实例及过期时间放到这个心跳续约对象中,再把心跳续约对象放到gmap注册表中去。之后进行改变服务状态,系统数据统计,至此一个服务注册的流程就完成了。注册完成后,查看一下registry中的服务实例,发现我们启动的Eureka-client都已经放在里面了:
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(newPair<Long,String>(System.currentTimeMillis(),
registrant.getAppName()+"("+ registrant.getId()+")"));// This is where the initial state transfer of overridden status happensif(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())){
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+"overrides", registrant.getOverriddenStatus(), registrant.getId());if(!overriddenInstanceStatusMap.containsKey(registrant.getId())){
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if(overriddenStatusFromMap !=null){
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus =getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);// If the lease is registered with UP status, set lease service up timestampif(InstanceStatus.UP.equals(registrant.getStatus())){
lease.serviceUp();}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(newRecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();//2、清理缓存invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);}finally{
read.unlock();}}
com.netflix.eureka.registry.AbstractInstanceRegistry#invalidateCache
privatevoidinvalidateCache(String appName,@NullableString vipAddress,@NullableString secureVipAddress){// invalidate cache
responseCache.invalidate(appName, vipAddress, secureVipAddress);}
com.netflix.eureka.registry.ResponseCacheImpl#invalidate
@Overridepublicvoidinvalidate(String appName,@NullableString vipAddress,@NullableString secureVipAddress){for(Key.KeyType type :Key.KeyType.values()){for(Version v :Version.values()){invalidate(newKey(Key.EntityType.Application, appName, type, v,EurekaAccept.full),newKey(Key.EntityType.Application, appName, type, v,EurekaAccept.compact),newKey(Key.EntityType.Application,ALL_APPS, type, v,EurekaAccept.full),newKey(Key.EntityType.Application,ALL_APPS, type, v,EurekaAccept.compact),newKey(Key.EntityType.Application,ALL_APPS_DELTA, type, v,EurekaAccept.full),newKey(Key.EntityType.Application,ALL_APPS_DELTA, type, v,EurekaAccept.compact));if(null!= vipAddress){invalidate(newKey(Key.EntityType.VIP, vipAddress, type, v,EurekaAccept.full));}if(null!= secureVipAddress){invalidate(newKey(Key.EntityType.SVIP, secureVipAddress, type, v,EurekaAccept.full));}}}}//主动失效读写缓存里的数据publicvoidinvalidate(Key... keys){for(Key key : keys){
logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(key);Collection<Key> keysWithRegions = regionSpecificKeys.get(key);if(null!= keysWithRegions &&!keysWithRegions.isEmpty()){for(Key keysWithRegion : keysWithRegions){
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);}}}}
1.2、服务续约
InstanceRegistry # renew() ->
PeerAwareInstanceRegistry # renew()->
AbstractInstanceRegistry # renew()
先从注册表获取该服务的实例列表(gMap),再从gMap中通过实例的id 获取具体的 要续约的实例。之后根据服务实例的InstanceStatus判断是否处于宕机状态,以及是否和之前状态相同。如果一切状态正常,最终调用Lease中的renew方法:
可以看出,其实服务续约的操作非常简单,它的本质就是修改服务的最后的更新时间。将最后更新时间改为系统当前时间加上服务的过期时间。
值得提一下的是,lastUpdateTimestamp这个变量是被volatile关键字修饰的。
publicbooleanrenew(String appName,String id,boolean isReplication){RENEW.increment(isReplication);Map<String,Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew =null;if(gMap !=null){
leaseToRenew = gMap.get(id);}if(leaseToRenew ==null){RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);returnfalse;}else{InstanceInfo instanceInfo = leaseToRenew.getHolder();if(instanceInfo !=null){// touchASGCache(instanceInfo.getASGName());InstanceStatus overriddenInstanceStatus =this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);if(overriddenInstanceStatus ==InstanceStatus.UNKNOWN){
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+"; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);returnfalse;}if(!instanceInfo.getStatus().equals(overriddenInstanceStatus)){
logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+"Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}
renewsLastMin.increment();
leaseToRenew.renew();returntrue;}}
1.3、服务剔除
当Eureka-server发现有的实例没有续约超过一定时间,则将该服务从注册列表剔除,该项工作由一个定时任务完成的。该任务的定义过程比较复杂,仅列出其调用过程:
EurekaServerInitializerConfiguration # start() ->
EurekaServerBootstrap # contextInitialized() ->initEurekaServerContext() ->
PeerAwareInstanceRegistryImpl # openForTraffic() ->
AbstractInstanceRegistry # postInit()
在AbstractInstanceRegistry的postInit方法中,定义EvictionTask定时任务,构建定时器启动该任务,执行任务中剔除方法 evict()。
private long evictionIntervalTimerInMs = 60 * 1000;
任务的时间被定义为60秒,即默认每分钟执行一次。
具体查看evit()剔除方法:
protectedvoidpostInit(){
renewsLastMin.start();if(evictionTaskRef.get()!=null){
evictionTaskRef.get().cancel();}
evictionTaskRef.set(newEvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());}
publicvoidevict(long additionalLeaseMs){
logger.debug("Running the evict task");if(!isLeaseExpirationEnabled()){
logger.debug("DS: lease expiration is currently disabled.");return;}// We collect first all expired items, to evict them in random order. For large eviction sets,// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,// the impact should be evenly distributed across all applications.//新建实例列表expiredLeases,用来存放过期的实例。List<Lease<InstanceInfo>> expiredLeases =newArrayList<>();//遍历registry注册表,对实例进行检测工作,使用isExpired方法判断实例是否过期//evictionTimestamp:剔除时间,当剔除节点的时候,将系统当前时间赋值给这个 evictionTimestamp//additionalLeaseMs:集群同步产生的预留时间,这个时间是程序中传过来的for(Entry<String,Map<String,Lease<InstanceInfo>>> groupEntry : registry.entrySet()){Map<String,Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if(leaseMap !=null){for(Entry<String,Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()){Lease<InstanceInfo> lease = leaseEntry.getValue();if(lease.isExpired(additionalLeaseMs)&& lease.getHolder()!=null){
expiredLeases.add(lease);}}}}// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for// triggering self-preservation. Without that we would wipe out full registry.int registrySize =(int)getLocalRegistrySize();//根据阈值计算可以被剔除的服务数量最大值int registrySizeThreshold =(int)(registrySize * serverConfig.getRenewalPercentThreshold());//剔除后剩余最小数量int evictionLimit = registrySize - registrySizeThreshold;//expiredLeases.size() 剔除列表的数量int toEvict =Math.min(expiredLeases.size(), evictionLimit);if(toEvict >0){
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);//上面的代码中根据自我保护机制进行了判断,使用Min函数计算两者的最小值,剔除较小数量的服务实例。举个例子,假如当前共有100个服务,那么剔除阈值为85,如果list中有60个服务,那么就会剔除该60个服务。但是如果list中有95个服务,那么只会剔除其中的85个服务,在这种情况下,又会产生一个问题,eureka-server该如何判断去剔除哪些服务,保留哪些服务呢?Random random =newRandom(System.currentTimeMillis());for(int i =0; i < toEvict; i++){// Pick a random item (Knuth shuffle algorithm)int next = i + random.nextInt(expiredLeases.size()- i);Collections.swap(expiredLeases, i, next);Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);//这里使用了随机算法进行剔除,保证不会连续剔除某个微服务的全部实例。最终调用internalCancel方法,实际执行剔除。internalCancel(appName, id,false);}}}
系统当前时间 >最后更新时间 + 过期时间 + 预留时间
这里进行判断:
系统当前时间 >最后更新时间 + 过期时间 + 预留时间
当该条件成立时,认为服务过期。在Eureka中过期时间默认定义为3个心跳的时间,一个心跳是30秒,因此过期时间是90秒。
当以上两个条件之一成立时,判断该实例过期,将该过期实例放入上面创建的列表中。注意这里仅仅是将实例放入List中,并没有实际剔除。
在实际剔除任务前,需要提一下eureka的自我保护机制:
当15分钟内,心跳失败的服务大于一定比例时,会触发自我保护机制。
这个值在Eureka中被定义为85%,一旦触发自我保护机制,Eureka会尝试保护其服务注册表中的信息,不再删除服务注册表中的数据。
1.4、服务下线
客户端发送http请求告诉eureka-server自己下线,调用 AbstractInstanceRegistry 中 cancel方法:
@Overridepublicbooleancancel(String appName,String id,boolean isReplication){returninternalCancel(appName, id, isReplication);}
最终还是调用了和服务剔除中一样的方法,remove掉了gMap中的实例。
1.5、集群信息同步
集群信息同步发生在Eureka-server之间,之前提到在PeerAwareInstanceRegistryImpl类中,在执行register方法注册微服务实例完成后,执行了集群信息同步方法replicateToPeers,具体分析一下该方法:
privatevoidreplicateToPeers(Action action,String appName,String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */,boolean isReplication){Stopwatch tracer = action.getTimer().start();try{if(isReplication){
numberOfReplicationsLastMin.increment();}// If it is a replication already, do not replicate again as this will create a poison replicationif(peerEurekaNodes ==Collections.EMPTY_LIST|| isReplication){return;}for(finalPeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()){// If the url represents this host, do not replicate to yourself.if(peerEurekaNodes.isThisMyUrl(node.getServiceUrl())){continue;}//这replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}}finally{
tracer.stop();}}privatevoidreplicateInstanceActionsToPeers(Action action,String appName,String id,InstanceInfo info,InstanceStatus newStatus,PeerEurekaNode node){try{InstanceInfo infoFromRegistry;CurrentRequestVersion.set(Version.V2);switch(action){caseCancel:
node.cancel(appName, id);break;caseHeartbeat:InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry =getInstanceByAppAndId(appName, id,false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus,false);break;caseRegister://这
node.register(info);break;caseStatusUpdate:
infoFromRegistry =getInstanceByAppAndId(appName, id,false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;caseDeleteStatusOverride:
infoFromRegistry =getInstanceByAppAndId(appName, id,false);
node.deleteStatusOverride(appName, id, infoFromRegistry);break;}}catch(Throwable t){
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);}finally{CurrentRequestVersion.remove();}}
首先,遍历集群节点,用以给各个集群信息节点进行信息同步。
然后,调用replicateInstanceActionsToPeers方法,在该方法中根据具体的操作类型Action,选择分支,最终调用PeerEurekaNode的register方法:
publicvoidregister(finalInstanceInfo info)throwsException{long expiryTime =System.currentTimeMillis()+getLeaseRenewalOf(info);
batchingDispatcher.process(taskId("register", info),newInstanceReplicationTask(targetHost,Action.Register, info,null,true){publicEurekaHttpResponse<Void>execute(){//z这return replicationClient.register(info);}},
expiryTime
);}
最终发送http请求,但是与普通注册操作不同的时,这时将集群同步的标识置为true,说明注册信息是来自集群同步。
在注册过程中运行到addInstance方法时,单独注册时isReplication的值为false,集群同步时为true。通过该值,能够避免集群间出现死循环,进行循环同步的问题。
1.6、获取注册信息接口
com.netflix.eureka.resources.ApplicationsResource#getContainers
1、从缓存中获取responseCache.get(cacheKey)
2、首先从只读缓存里获取
@GETpublicResponsegetContainers(@PathParam("version")String version,@HeaderParam(HEADER_ACCEPT)String acceptHeader,@HeaderParam(HEADER_ACCEPT_ENCODING)String acceptEncoding,@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT)String eurekaAccept,@ContextUriInfo uriInfo,@Nullable@QueryParam("regions")String regionsStr){boolean isRemoteRegionRequested =null!= regionsStr &&!regionsStr.isEmpty();String[] regions =null;if(!isRemoteRegionRequested){EurekaMonitors.GET_ALL.increment();}else{
regions = regionsStr.toLowerCase().split(",");Arrays.sort(regions);// So we don't have different caches for same regions queried in different order.EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();}// Check if the server allows the access to the registry. The server can// restrict access if it is not// ready to serve traffic depending on various reasons.if(!registry.shouldAllowAccess(isRemoteRegionRequested)){returnResponse.status(Status.FORBIDDEN).build();}CurrentRequestVersion.set(Version.toEnum(version));KeyType keyType =Key.KeyType.JSON;String returnMediaType =MediaType.APPLICATION_JSON;if(acceptHeader ==null||!acceptHeader.contains(HEADER_JSON_VALUE)){
keyType =Key.KeyType.XML;
returnMediaType =MediaType.APPLICATION_XML;}Key cacheKey =newKey(Key.EntityType.Application,ResponseCacheImpl.ALL_APPS,
keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept), regions
);Response response;if(acceptEncoding !=null&& acceptEncoding.contains(HEADER_GZIP_VALUE)){
response =Response.ok(responseCache.getGZIP(cacheKey)).header(HEADER_CONTENT_ENCODING,HEADER_GZIP_VALUE).header(HEADER_CONTENT_TYPE, returnMediaType).build();}else{//1、从缓存中获取responseCache.get(cacheKey)
response =Response.ok(responseCache.get(cacheKey)).build();}CurrentRequestVersion.remove();return response;}
com.netflix.eureka.registry.ResponseCacheImpl#get(com.netflix.eureka.registry.Key)
publicStringget(finalKey key){//shouldUseReadOnlyResponseCache = truereturnget(key, shouldUseReadOnlyResponseCache);}@VisibleForTestingStringget(finalKey key,boolean useReadOnlyCache){Value payload =getValue(key, useReadOnlyCache);if(payload ==null|| payload.getPayload().equals(EMPTY_PAYLOAD)){returnnull;}else{return payload.getPayload();}}@VisibleForTestingValuegetValue(finalKey key,boolean useReadOnlyCache){Value payload =null;try{if(useReadOnlyCache){//2、首先从只读缓存里获取finalValue currentPayload = readOnlyCacheMap.get(key);if(currentPayload !=null){
payload = currentPayload;}else{//3、只读缓存中没有就从读写缓存中获取 //读写缓存默认180秒会自动过期
payload = readWriteCacheMap.get(key);//回写只读缓存 //timeSchedule每隔30秒执行getCacheUpdate定时任务将读写缓存中的数据更新到只读缓存中
readOnlyCacheMap.put(key, payload);}}else{
payload = readWriteCacheMap.get(key);}}catch(Throwable t){
logger.error("Cannot get value for key : {}", key, t);}return payload;}
ResponseCacheImpl
ResponseCacheImpl(EurekaServerConfig serverConfig,ServerCodecs serverCodecs,AbstractInstanceRegistry registry){this.serverConfig = serverConfig;this.serverCodecs = serverCodecs;this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();this.registry = registry;long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();//读写缓存this.readWriteCacheMap =CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())//180秒自动过期.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(),TimeUnit.SECONDS).removalListener(newRemovalListener<Key,Value>(){@OverridepublicvoidonRemoval(RemovalNotification<Key,Value> notification){Key removedKey = notification.getKey();if(removedKey.hasRegions()){Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);}}}).build(newCacheLoader<Key,Value>(){@OverridepublicValueload(Key key)throwsException{if(key.hasRegions()){Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);}Value value =generatePayload(key);return value;}});//每隔30秒执行getCacheUpdateTask定时任务将读写缓存中数据更新到只读缓存中if(shouldUseReadOnlyResponseCache){
timer.schedule(getCacheUpdateTask(),newDate(((System.currentTimeMillis()/ responseCacheUpdateIntervalMs)* responseCacheUpdateIntervalMs)+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);}try{Monitors.registerObject(this);}catch(Throwable e){
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);}}
版权归原作者 室内篮球 所有, 如有侵权,请联系我们删除。