0


源码解析-Spring Eureka(更新ing)

源码解析-Spring Eureka

文章目录


前言


一、从Spring.factory和注解开始

我们可以看到,Eureka通过spring boot的自动配置机制引入了一个类

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

通过这个配置我们找到对应的配置类,可以看到,这个配置类使用了Marker作为条件注入

@Configuration(
    proxyBeanMethods =false)@Import({EurekaServerInitializerConfiguration.class})@ConditionalOnBean({EurekaServerMarkerConfiguration.Marker.class})@EnableConfigurationProperties({EurekaDashboardProperties.class,InstanceRegistryProperties.class})@PropertySource({"classpath:/eureka/server.properties"})publicclassEurekaServerAutoConfigurationimplementsWebMvcConfigurer

这个时候我们返回查看我们配置一个eureka所需要的基本注解可以看到,我们正在这个这个@EnableEurekaServer 注解里面初始化了这个类

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//packageorg.springframework.cloud.netflix.eureka.server;importjava.lang.annotation.Documented;importjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;importorg.springframework.context.annotation.Import;@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Import({EurekaServerMarkerConfiguration.class})public@interfaceEnableEurekaServer{}

通过spring.factory的自动配置以及@EnableEurekaServer 就可以实现eureka服务端的手动注入(通过加入注解)

二、重要的一步EurekaServerInitializerConfiguration

在上面的EurekaServerAutoConfiguration里面我们可以看到它import了一个初始化类
注意在这个初始化类实现了SmartLifeCycle接口,实现了其Start方法

@Configuration(
    proxyBeanMethods =false)publicclassEurekaServerInitializerConfigurationimplementsServletContextAware,SmartLifecycle,Ordered

实现的start方法,会在bean在启动的时候调用,该方法会new一个线程并发布订阅

publicvoidstart(){(newThread(()->{try{this.eurekaServerBootstrap.contextInitialized(this.servletContext);
                log.info("Started Eureka Server");this.publish(newEurekaRegistryAvailableEvent(this.getEurekaServerConfig()));this.running =true;this.publish(newEurekaServerStartedEvent(this.getEurekaServerConfig()));}catch(Exception var2){
                log.error("Could not initialize Eureka servlet context", var2);}})).start();}

可以看到,通过这个start方法,eureka初始化了它自己的context上下文并发布了一些事件。

三、初始化了什么?

进入到contextInitialized方法,我们可以看到

publicvoidcontextInitialized(ServletContext context){try{this.initEurekaEnvironment();this.initEurekaServerContext();
            context.setAttribute(EurekaServerContext.class.getName(),this.serverContext);}catch(Throwable var3){
            log.error("Cannot bootstrap eureka server :", var3);thrownewRuntimeException("Cannot bootstrap eureka server :", var3);}}

eureka首先初始化了配置信息,然后进行上下文的初始化

protectedvoidinitEurekaServerContext()throwsException{JsonXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(),10000);XmlXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(),10000);if(this.isAws(this.applicationInfoManager.getInfo())){this.awsBinder =newAwsBinderDelegate(this.eurekaServerConfig,this.eurekaClientConfig,this.registry,this.applicationInfoManager);this.awsBinder.start();}EurekaServerContextHolder.initialize(this.serverContext);
        log.info("Initialized server context");int registryCount =this.registry.syncUp();this.registry.openForTraffic(this.applicationInfoManager, registryCount);EurekaMonitors.registerAllStats();}

进入到initEurekaServerContext方法,我们可以看到几个重要的方法
在openForTraffic方法里面

publicvoidopenForTraffic(ApplicationInfoManager applicationInfoManager,int count){this.expectedNumberOfClientsSendingRenews = count;this.updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}",this.numberOfRenewsPerMinThreshold);this.startupTime =System.currentTimeMillis();if(count >0){this.peerInstancesTransferEmptyOnStartup =false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws =Name.Amazon== selfName;if(isAws &&this.serverConfig.shouldPrimeAwsReplicaConnections()){
            logger.info("Priming AWS connections for all replicas..");this.primeAwsReplicas(applicationInfoManager);}

        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);super.postInit();}

我们重点关注这里的super.postInit()

protectedvoidpostInit(){this.renewsLastMin.start();if(this.evictionTaskRef.get()!=null){((EvictionTask)this.evictionTaskRef.get()).cancel();}this.evictionTaskRef.set(newEvictionTask());this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(),this.serverConfig.getEvictionIntervalTimerInMs(),this.serverConfig.getEvictionIntervalTimerInMs());}

可以看到this.evictionTaskRef.set(new EvictionTask());,这里注册了一个剔除任务

int registrySize =(int)this.getLocalRegistrySize();int registrySizeThreshold =(int)((double)registrySize *this.serverConfig.getRenewalPercentThreshold());int evictionLimit = registrySize - registrySizeThreshold;int toEvict =Math.min(expiredLeases.size(), evictionLimit);

这里的剔除与eureka配置里面的自我保护配置有关

自动保护

在eureka中,如果打开了自我保护配置并设置了剔除阈值,eureka集群就会在计算正常超过阈值的时候执行上面的代码把的节点给剔除

  1. 如果现在有10个节点,7个节点是正常,3个节点是由有问题的,阈值设置了80%,这个时候7个节点中的一个节点出现了问题,但是没有超过阈值(变成了60%),这个时候就会访问到失败的节点
  2. 如果现在有100个节点,3个节点有问题,阈值也是80%,现在的值是(97%)超过了阈值,如果这个时候有节点出现问题则会立即剔除,
  3. 但是不能把自我保护关闭,如果3个节点是因为波动导致的暂时访问不到则会立即被剔除
eureka:
  server:
    enable-self-preservation:true
eureka:
  server:
    renewal-percent-threshold:0.85

我们再进到syncUp方法里面

publicintsyncUp(){int count =0;for(int i =0; i <this.serverConfig.getRegistrySyncRetries()&& count ==0;++i){if(i >0){try{Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs());}catch(InterruptedException var10){
                    logger.warn("Interrupted during registry transfer..");break;}}Applications apps =this.eurekaClient.getApplications();Iterator var4 = apps.getRegisteredApplications().iterator();

可以看到当一个eureka服务启动的时候,会作为一个eureka客户端去peer节点拉取配置(这也是eureka为什么不是强一致性的)

四, 重新回到EurekaServerAutoConfiguration

首先就是eureka的控制器类,eureka的dashboard上面的数据通过这个控制器(springWeb)来获取

@Bean@ConditionalOnProperty(
        prefix ="eureka.dashboard",
        name ={"enabled"},
        matchIfMissing =true)publicEurekaControllereurekaController(){returnnewEurekaController(this.applicationInfoManager);}

接着再eurekaServerContext里面实例化了属于eureka的上下文

@Bean@ConditionalOnMissingBeanpublicEurekaServerContexteurekaServerContext(ServerCodecs serverCodecs,PeerAwareInstanceRegistry registry,PeerEurekaNodes peerEurekaNodes){returnnewDefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes,this.applicationInfoManager);}

进入context的初始化方法,可以看在context初始化里面有重要的一环就是设置三级缓存initializedResponseCache

publicvoidinit(PeerEurekaNodes peerEurekaNodes)throwsException{this.numberOfReplicationsLastMin.start();this.peerEurekaNodes = peerEurekaNodes;this.initializedResponseCache();this.scheduleRenewalThresholdUpdateTask();this.initRemoteRegionRegistry();try{Monitors.registerObject(this);}catch(Throwable var3){
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", var3);}}

里面有一个定时任务,就是定期刷新缓存

if(this.shouldUseReadOnlyResponseCache){this.timer.schedule(this.getCacheUpdateTask(),newDate(System.currentTimeMillis()/ responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs);}

继续查看自动配置类,可以看到,eureka通过jersey框架包装了注册服务

@BeanpublicFilterRegistrationBean<?>jerseyFilterRegistration(Application eurekaJerseyApp){FilterRegistrationBean<Filter> bean =newFilterRegistrationBean();
        bean.setFilter(newServletContainer(eurekaJerseyApp));
        bean.setOrder(Integer.MAX_VALUE);
        bean.setUrlPatterns(Collections.singletonList("/eureka/*"));return bean;}

再看jerseyApplication,这里面会对eureka的包路径进行扫描, 并将其中的候选类进行注入,其中非常重要的就是resource目录下的applicationsresoure,该方法会返回一个Application的Bean(SPring的config加Bean)

@BeanpublicApplicationjerseyApplication(Environment environment,ResourceLoader resourceLoader){ClassPathScanningCandidateComponentProvider provider =newClassPathScanningCandidateComponentProvider(false, environment);
        provider.addIncludeFilter(newAnnotationTypeFilter(Path.class));
        provider.addIncludeFilter(newAnnotationTypeFilter(Provider.class));Set<Class<?>> classes =newHashSet();String[] var5 = EUREKA_PACKAGES;int var6 = var5.length;for(int var7 =0; var7 < var6;++var7){String basePackage = var5[var7];Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);Iterator var10 = beans.iterator();while(var10.hasNext()){BeanDefinition bd =(BeanDefinition)var10.next();Class<?> cls =ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
                classes.add(cls);}}Map<String,Object> propsAndFeatures =newHashMap();
        propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex","/eureka/(fonts|images|css|js)/.*");DefaultResourceConfig rc =newDefaultResourceConfig(classes);
        rc.setPropertiesAndFeatures(propsAndFeatures);return rc;}

在applicationsResource里面,通过jersey编写一系列关于注册中心的“注册”,“取消”,“续约”等的HTTP方法
我们先来看获取ALLKey的方法(拉取所有配置)

@GETpublicResponsegetContainers(@PathParam("version")String version,@HeaderParam("Accept")String acceptHeader,@HeaderParam("Accept-Encoding")String acceptEncoding,@HeaderParam("X-Eureka-Accept")String eurekaAccept,@ContextUriInfo uriInfo,@Nullable@QueryParam("regions")String regionsStr){boolean isRemoteRegionRequested =null!= regionsStr &&!regionsStr.isEmpty();String[] regions =null;if(!isRemoteRegionRequested){EurekaMonitors.GET_ALL.increment();}else{
            regions = regionsStr.toLowerCase().split(",");Arrays.sort(regions);EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();}if(!this.registry.shouldAllowAccess(isRemoteRegionRequested)){returnResponse.status(Status.FORBIDDEN).build();}else{CurrentRequestVersion.set(Version.toEnum(version));Key.KeyType keyType =KeyType.JSON;String returnMediaType ="application/json";if(acceptHeader ==null||!acceptHeader.contains("json")){
                keyType =KeyType.XML;
                returnMediaType ="application/xml";}Key cacheKey =newKey(EntityType.Application,"ALL_APPS", keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept), regions);Response response;if(acceptEncoding !=null&& acceptEncoding.contains("gzip")){
                response =Response.ok(this.responseCache.getGZIP(cacheKey)).header("Content-Encoding","gzip").header("Content-Type", returnMediaType).build();}else{
                response =Response.ok(this.responseCache.get(cacheKey)).build();}CurrentRequestVersion.remove();return response;}}

其中ResponseCache注入了了eureka自己实现的三级缓存的getValue方法

@VisibleForTestingValuegetValue(Key key,boolean useReadOnlyCache){Value payload =null;try{if(useReadOnlyCache){Value currentPayload =(Value)this.readOnlyCacheMap.get(key);if(currentPayload !=null){
                    payload = currentPayload;}else{
                    payload =(Value)this.readWriteCacheMap.get(key);this.readOnlyCacheMap.put(key, payload);}}else{
                payload =(Value)this.readWriteCacheMap.get(key);}}catch(Throwable var5){
            logger.error("Cannot get value for key : {}", key, var5);}return payload;}

我们再来看在eureka中一个服务是如何被维护的
在applicationResoure中,有添加服务的方法

@POST@Consumes({"application/json","application/xml"})publicResponseaddInstance(InstanceInfo info,@HeaderParam("x-netflix-discovery-replication")String isReplication){
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);if(this.isBlank(info.getId())){returnResponse.status(400).entity("Missing instanceId").build();}elseif(this.isBlank(info.getHostName())){returnResponse.status(400).entity("Missing hostname").build();}elseif(this.isBlank(info.getIPAddr())){returnResponse.status(400).entity("Missing ip address").build();}elseif(this.isBlank(info.getAppName())){returnResponse.status(400).entity("Missing appName").build();}elseif(!this.appName.equals(info.getAppName())){returnResponse.status(400).entity("Mismatched appName, expecting "+this.appName +" but was "+ info.getAppName()).build();}elseif(info.getDataCenterInfo()==null){returnResponse.status(400).entity("Missing dataCenterInfo").build();}elseif(info.getDataCenterInfo().getName()==null){returnResponse.status(400).entity("Missing dataCenterInfo Name").build();}else{DataCenterInfo dataCenterInfo = info.getDataCenterInfo();if(dataCenterInfo instanceofUniqueIdentifier){String dataCenterInfoId =((UniqueIdentifier)dataCenterInfo).getId();if(this.isBlank(dataCenterInfoId)){boolean experimental ="true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if(experimental){String entity ="DataCenterInfo of type "+ dataCenterInfo.getClass()+" must contain a valid id";returnResponse.status(400).entity(entity).build();}if(dataCenterInfo instanceofAmazonInfo){AmazonInfo amazonInfo =(AmazonInfo)dataCenterInfo;String effectiveId = amazonInfo.get(MetaDataKey.instanceId);if(effectiveId ==null){
                            amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());}}else{
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}this.registry.register(info,"true".equals(isReplication));returnResponse.status(204).build();}}

进去到register方法里面,我们可以看到,eureka首先向自己注册了当前服务,然后同步到了peer节点上面

publicvoidregister(InstanceInfo info,boolean isReplication){int leaseDuration =90;if(info.getLeaseInfo()!=null&& info.getLeaseInfo().getDurationInSecs()>0){
            leaseDuration = info.getLeaseInfo().getDurationInSecs();}super.register(info, leaseDuration, isReplication);this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info,(InstanceInfo.InstanceStatus)null, isReplication);}

可以看到,register里面,我们的InstanceInfo 被一个Map<String, Lease> gMap维护着

publicvoidregister(InstanceInfo registrant,int leaseDuration,boolean isReplication){this.read.lock();try{Map<String,Lease<InstanceInfo>> gMap =(Map)this.registry.get(registrant.getAppName());EurekaMonitors.REGISTER.increment(isReplication);if(gMap ==null){ConcurrentHashMap<String,Lease<InstanceInfo>> gNewMap =newConcurrentHashMap();
                gMap =(Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);if(gMap ==null){
                    gMap = gNewMap;}}

我们看下Lease的结构,通过下面的这种结构,我们只需要修改Lease的long信息就可以对当前节点的生命状态进行修改而不需要修改节点本身

publicclassLease<T>{publicstaticfinalint DEFAULT_DURATION_IN_SECS =90;privateT holder;// 具体的实例信息// 一些用于维护节点状态的时间信息privatelong evictionTimestamp;privatelong registrationTimestamp;privatelong serviceUpTimestamp;privatevolatilelong lastUpdateTimestamp;privatelong duration;

关于unavailable-replicas

我们来看EurekaController

@RequestMapping(
        method ={RequestMethod.GET})publicStringstatus(HttpServletRequest request,Map<String,Object> model){this.populateBase(request, model);this.populateApps(model);StatusInfo statusInfo;try{
            statusInfo =(newStatusResource()).getStatusInfo();// 服务端上面的基本信息}catch(Exception var5){
            statusInfo =Builder.newBuilder().isHealthy(false).build();}

        model.put("statusInfo", statusInfo);this.populateInstanceInfo(model, statusInfo);this.filterReplicas(model, statusInfo);return"eureka/status";}

进去基本信息

publicStatusInfogetStatusInfo(){StatusInfo.Builder builder =Builder.newBuilder();int upReplicasCount =0;StringBuilder upReplicas =newStringBuilder();StringBuilder downReplicas =newStringBuilder();StringBuilder replicaHostNames =newStringBuilder();Iterator var6 =this.peerEurekaNodes.getPeerEurekaNodes().iterator();while(var6.hasNext()){PeerEurekaNode node =(PeerEurekaNode)var6.next();if(replicaHostNames.length()>0){
                replicaHostNames.append(", ");}

            replicaHostNames.append(node.getServiceUrl());// 关键逻辑,判断什么属于upReplicasif(this.isReplicaAvailable(node.getServiceUrl())){
                upReplicas.append(node.getServiceUrl()).append(',');++upReplicasCount;}else{
                downReplicas.append(node.getServiceUrl()).append(',');}}

        builder.add("registered-replicas", replicaHostNames.toString());
        builder.add("available-replicas", upReplicas.toString());
        builder.add("unavailable-replicas", downReplicas.toString());if(this.peerEurekaNodes.getMinNumberOfAvailablePeers()>-1){
            builder.isHealthy(upReplicasCount >=this.peerEurekaNodes.getMinNumberOfAvailablePeers());}

        builder.withInstanceInfo(this.instanceInfo);return builder.build();}

进去判断的逻辑

privatebooleanisReplicaAvailable(String url){try{// 获取自己的注册信息,所以为什么要把自己注册// 配置register-with-eureka: trueApplication app =this.registry.getApplication(this.myAppName,false);if(app ==null){returnfalse;}Iterator var3 = app.getInstances().iterator();while(var3.hasNext()){InstanceInfo info =(InstanceInfo)var3.next();// 对比自己的defalutZone和真正从peer拉取到的地址if(this.peerEurekaNodes.isInstanceURL(url, info)){returntrue;}}}catch(Throwable var5){
            logger.error("Could not determine if the replica is available ", var5);}returnfalse;}

所以为什么要配置主机名和自我注册
一台机器是可以有多个ip的,
AB都是注册中心
A主机配置B主机的IP(defaultZone)是1,
但是B主机注册到A主机的IP是2,
此时如果B主机的IP1和2都正常则没问题
但是如果B主机的IP2对应的网卡坏了,实际上B已经不可访问到了,
但似乎由于A配置的defaultZone配置的是IP1,所以还是会被访问到

为什么客户端不配注解也可以用

@Configuration(
    proxyBeanMethods =false)@EnableConfigurationProperties@ConditionalOnClass({EurekaClientConfig.class})@ConditionalOnProperty(
    value ={"eureka.client.enabled"},
    matchIfMissing =true)// 关键,默认为true , 禁用注册则配置eureka.client.enabled = false@ConditionalOnDiscoveryEnabled@AutoConfigureBefore({CommonsClientAutoConfiguration.class,ServiceRegistryAutoConfiguration.class})@AutoConfigureAfter(
    name ={"org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration","org.springframework.cloud.autoconfigure.RefreshAutoConfiguration","org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration","org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})publicclassEurekaClientAutoConfiguration

为什么要这样做?因为正常开发的时候不需要将本地服务注册到注册中心

标签: spring eureka java

本文转载自: https://blog.csdn.net/weixin_46637202/article/details/143721780
版权归原作者 噜啦啦噜啦啦噜啦噜啦嘞噜啦噜啦 所有, 如有侵权,请联系我们删除。

“源码解析-Spring Eureka(更新ing)”的评论:

还没有评论