0


Eureka 心跳和服务续约源码探秘——图解、源码级解析

🍊 Java学习:社区快速通道

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南

📆 最近更新:2023年5月25日

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

分布式系统的心跳机制

分布式系统是由多个计算机节点构成的系统,这些节点之间通过网络进行通信和协作。由于节点之间的网络连接不可靠,因此在分布式系统中,一个节点可能会因为网络故障或其他原因而失去与其他节点的联系。为了解决这个问题,分布式系统引入了心跳机制。

在这里插入图片描述

心跳机制是指每个节点定期向其他节点发送“心跳”消息,以表明自己的存在和正常运行。如果一个节点在一段时间内没有收到来自其他节点的心跳消息,那么它就会认为这些节点已经失去了联系,并采取相应的措施,例如重新选举领导节点或者启动备份节点。

心跳机制的实现方式

心跳机制的实现方式可以分为基于UDP协议、基于TCP协议和基于HTTP协议三种。

  • 基于UDP协议的心跳机制具有实现简单、网络开销小等优点。但由于UDP协议本身不可靠,因此可能会出现一些误判情况。
  • 基于TCP协议的心跳机制具有可靠性高、误判率低等优点。但由于TCP协议本身的特性,可能会出现一些网络延迟等问题。
  • 基于HTTP协议的心跳机制具有易于实现、可扩展性好等优点。但由于HTTP协议本身的特性,可能会出现一些网络延迟等问题。

在实际应用中,需要根据具体情况选择合适的心跳机制实现方式。

对于实时性要求较高的应用场景,可以选择基于UDP协议的心跳机制;对于对可靠性要求较高的应用场景,可以选择基于TCP协议的心跳机制。

SpringCloud中的心跳

SpringCloud也借助“心跳”来知晓服务的可用性,心跳检测有以下四种特点:

  1. 客户端发起: 心跳服务是由每个服务节点根据配置的时间主动发起的。
  2. 同步状态: 要告诉注册中心自己的状态,快不行了(OUT_OF_SERVICE)或是一切正常(UP)。
  3. 服务剔除: 对一段时间无响应的服务,要主动将其从注册列表中剔除,以防服务调用方请求失败。
  4. 服务续约: 服务续约底层也是靠着心跳来实现的,其中包含了一套处理流程。

关于服务续约的一些细节

服务续约分为两步:

  1. 将服务节点的状态同步到注册中心,这一步需要借助客户端的心跳功能来主动发送。
  2. 当心跳包到达注册中心的时候,注册中心有一套判别机制,来判定当前的续约心跳是否合理。并根据判断结果修改当前instance在注册中心记录的同步时间。

服务节点向注册中心发送续约请求:

  1. 服务续约请求: 客户端有一个DiscoverClient类,它是所有操作的入口。所以续约服务就从这个类的renew方法开始
  2. 发送心跳: 服务续约借助心跳来实现,因此发给注册中心的两个重要参数分别是服务的状态(UP)和lastDirtyTimeStamp
  • 如果续约成功,注册中心则会返回200的HTTP code
  • 如果续约不成功,注册中心返回404,这里的404并不是说没有找到注册中心的地址,而是注册中心认为当前服务节点并不存在。这个时候再怎么续约也不行了,客户端需要触发一次重新注册操作。
  1. 在重新注册之前,客户端会做下面两个操作,然后再主动调用服务注册流程:
  • 设置lastDirtyTimeStamp :由于重新注册意味着服务节点和注册中心的信息不同步,因此需要将当前系统时间更新到lastDirtyTimeStamp
  • 标记为脏节点
  1. 当注册成功的时候,清除脏节点标记,但是lastDirtyTimeStamp不会清除,因为这个属性将会在后面的服务续约中作为参数发给注册中心,以便服务中心判断节点的同步状态。

Eureka 心跳和服务续约源码

通过本章节,可以了解到:

  • 客户端心跳发送内容是什么?
  • 客户端续约流程
  • 服务端租约更新流程

在真正阅读之前,不妨先尝试回答一下上面的3个问题,带着疑问看文章,收获会更大 ~ 下面开始进入正题

客户端源码

打开

DiscoveryClient

,入口便是构造函数:

请添加图片描述
这里只关注服务的心跳是怎么发送的
请添加图片描述
通过方法名就可以看出这是一个在后台定时触发的任务

privatevoidinitScheduledTasks(){int renewalIntervalInSecs;int expBackOffBound;if(this.clientConfig.shouldFetchRegistry()){
        renewalIntervalInSecs =this.clientConfig.getRegistryFetchIntervalSeconds();
        expBackOffBound =this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();this.scheduler.schedule(newTimedSupervisorTask("cacheRefresh",this.scheduler,this.cacheRefreshExecutor, renewalIntervalInSecs,TimeUnit.SECONDS, expBackOffBound,newDiscoveryClient.CacheRefreshThread()),(long)renewalIntervalInSecs,TimeUnit.SECONDS);}// 从这里开始看if(this.clientConfig.shouldRegisterWithEureka()){
        renewalIntervalInSecs =this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        expBackOffBound =this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);this.scheduler.schedule(newTimedSupervisorTask("heartbeat",this.scheduler,this.heartbeatExecutor, renewalIntervalInSecs,TimeUnit.SECONDS, expBackOffBound,newDiscoveryClient.HeartbeatThread()),(long)renewalIntervalInSecs,TimeUnit.SECONDS);this.instanceInfoReplicator =newInstanceInfoReplicator(this,this.instanceInfo,this.clientConfig.getInstanceInfoReplicationIntervalSeconds(),2);this.statusChangeListener =newStatusChangeListener(){publicStringgetId(){return"statusChangeListener";}publicvoidnotify(StatusChangeEvent statusChangeEvent){if(InstanceStatus.DOWN != statusChangeEvent.getStatus()&&InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()){DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);}else{DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);}DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();}};if(this.clientConfig.shouldOnDemandUpdateStatusChange()){this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);}this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());}else{
        logger.info("Not registering with Eureka server per configuration");}}

可以直接从上面的第10行开始关注

this.scheduler.schedule(newTimedSupervisorTask("heartbeat",this.scheduler,this.heartbeatExecutor, 
        renewalIntervalInSecs,TimeUnit.SECONDS, 
        expBackOffBound,newDiscoveryClient.HeartbeatThread()),(long)renewalIntervalInSecs,TimeUnit.SECONDS);

是定时启动后台任务的方法

  • renewalIntervalInSecs表示每多少秒启动一次定时任务
  • expBackOffBound是用来计算最大delay时间的
this.maxDelay =this.timeoutMillis *(long)expBackOffBound;
new DiscoveryClient.HeartbeatThread()

是发送心跳的具体逻辑

privateclassHeartbeatThreadimplementsRunnable{privateHeartbeatThread(){}publicvoidrun(){if(DiscoveryClient.this.renew()){DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp =System.currentTimeMillis();}}}

其中

renew

相当于续约的逻辑,心跳和续约是一套相互作用的机制,

renew

在客户端是发送了一个心跳,服务端接收了心跳之后会进行服务的续约

booleanrenew(){try{EurekaHttpResponse<InstanceInfo> httpResponse =this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(),this.instanceInfo.getId(),this.instanceInfo,(InstanceStatus)null);
        logger.debug("DiscoveryClient_{} - Heartbeat status: {}",this.appPathIdentifier, httpResponse.getStatusCode());if(httpResponse.getStatusCode()==Status.NOT_FOUND.getStatusCode()){this.REREGISTER_COUNTER.increment();
            logger.info("DiscoveryClient_{} - Re-registering apps/{}",this.appPathIdentifier,this.instanceInfo.getAppName());long timestamp =this.instanceInfo.setIsDirtyWithTime();boolean success =this.register();if(success){this.instanceInfo.unsetIsDirty(timestamp);}return success;}else{return httpResponse.getStatusCode()==Status.OK.getStatusCode();}}catch(Throwable var5){
        logger.error("DiscoveryClient_{} - was unable to send heartbeat!",this.appPathIdentifier, var5);returnfalse;}}

心跳包的发送逻辑

EurekaHttpResponse<InstanceInfo> httpResponse =this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(),this.instanceInfo.getId(),this.instanceInfo,(InstanceStatus)null);

和前面的服务注册一样一层层嵌套,第一层嵌套先是

SessionEurekaClient


请添加图片描述
下一层是

retry

,再下一层是

redirective

,再下一层是

matrix

…和服务注册一模一样


直接进到最后一层

AbstractJerseyEurekaHttpClient
publicEurekaHttpResponse<InstanceInfo>sendHeartBeat(String appName,String id,InstanceInfo info,InstanceStatus overriddenStatus){String urlPath ="apps/"+ appName +'/'+ id;ClientResponse response =null;EurekaHttpResponse var10;try{WebResource webResource =this.jerseyClient.resource(this.serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());if(overriddenStatus !=null){
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());}Builder requestBuilder = webResource.getRequestBuilder();this.addExtraHeaders(requestBuilder);
        response =(ClientResponse)requestBuilder.put(ClientResponse.class);EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder =EurekaHttpResponse.anEurekaHttpResponse(response.getStatus(),InstanceInfo.class).headers(headersOf(response));if(response.hasEntity()){
            eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));}

        var10 = eurekaResponseBuilder.build();}finally{if(logger.isDebugEnabled()){
            logger.debug("Jersey HTTP PUT {}/{}; statusCode={}",newObject[]{this.serviceUrl, urlPath, response ==null?"N/A": response.getStatus()});}if(response !=null){
            response.close();}}return var10;}

构造服务请求路径

请添加图片描述
之后构造

WebResource

对象

WebResource webResource =this.jerseyClient.resource(this.serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());

这里的

serviceUrl

是注册中心的url,前面的是当前机器的url,“lastDirtyTimestamp”是一个核心的属性

之后就是组装参数的流程,最后将请求发送出去,至此客户端发送心跳的逻辑就结束了。


服务端流程

服务端使用

InstanceResource

中的

renewLease

方法来接收心跳包:

publicResponserenewLease(@HeaderParam("x-netflix-discovery-replication")String isReplication,@QueryParam("overriddenstatus")String overriddenStatus,@QueryParam("status")String status,@QueryParam("lastDirtyTimestamp")String lastDirtyTimestamp){boolean isFromReplicaNode ="true".equals(isReplication);boolean isSuccess =this.registry.renew(this.app.getName(),this.id, isFromReplicaNode);if(!isSuccess){
        logger.warn("Not Found (Renew): {} - {}",this.app.getName(),this.id);returnResponse.status(Status.NOT_FOUND).build();}else{Response response;if(lastDirtyTimestamp !=null&&this.serverConfig.shouldSyncWhenTimestampDiffers()){
            response =this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);if(response.getStatus()==Status.NOT_FOUND.getStatusCode()&& overriddenStatus !=null&&!InstanceStatus.UNKNOWN.name().equals(overriddenStatus)&& isFromReplicaNode){this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(),this.id,InstanceStatus.valueOf(overriddenStatus));}}else{
            response =Response.ok().build();}

        logger.debug("Found (Renew): {} - {}; reply status={}",newObject[]{this.app.getName(),this.id, response.getStatus()});return response;}}

当前心跳包是来自服务的提供者,并不是冗余备份,所以

isFromReplicaNode

是false。下面代码

boolean isSuccess =this.registry.renew(this.app.getName(),this.id, isFromReplicaNode);

是续约的方法

publicbooleanrenew(finalString appName,finalString serverId,boolean isReplication){this.log("renew "+ appName +" serverId "+ serverId +", isReplication {}"+ isReplication);List<Application> applications =this.getSortedApplications();Iterator var5 = applications.iterator();while(var5.hasNext()){Application input =(Application)var5.next();if(input.getName().equals(appName)){InstanceInfo instance =null;Iterator var8 = input.getInstances().iterator();while(var8.hasNext()){InstanceInfo info =(InstanceInfo)var8.next();if(info.getId().equals(serverId)){
                    instance = info;break;}}this.publishEvent(newEurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));break;}}returnsuper.renew(appName, serverId, isReplication);}

方法的入参

serverId

一定是唯一的

其中

List<Application> applications =this.getSortedApplications();

获取所有的

application

,判断哪一个服务需要续约的时候是通过遍历的方式,当

list

里的

ApplicationName

和传入的name相同时再把

appication

下的所有

instance

全部拿到,找出

instance

id

serverId

相同的就知道该为哪一个

instance

进行续约了

this.publishEvent(newEurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));

发布一个续约成功的

event

最后进入到return后面调用的

renew

函数里

publicbooleanrenew(String appName,String id,boolean isReplication){if(super.renew(appName, id, isReplication)){this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Heartbeat, appName, id,(InstanceInfo)null,(InstanceStatus)null, isReplication);returntrue;}else{returnfalse;}}
replicateToPeers

表示高可用注册中心有多个中心节点,需要向

peer

同步,继续进到父类的

renew

方法:

publicbooleanrenew(String appName,String id,boolean isReplication){EurekaMonitors.RENEW.increment(isReplication);Map<String,Lease<InstanceInfo>> gMap =(Map)this.registry.get(appName);Lease<InstanceInfo> leaseToRenew =null;if(gMap !=null){
        leaseToRenew =(Lease)gMap.get(id);}if(leaseToRenew ==null){EurekaMonitors.RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);returnfalse;}else{InstanceInfo instanceInfo =(InstanceInfo)leaseToRenew.getHolder();if(instanceInfo !=null){InstanceStatus overriddenInstanceStatus =this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if(overriddenInstanceStatus ==InstanceStatus.UNKNOWN){
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}; re-register required", instanceInfo.getId());EurekaMonitors.RENEW_NOT_FOUND.increment(isReplication);returnfalse;}if(!instanceInfo.getStatus().equals(overriddenInstanceStatus)){
                logger.info("The instance status {} is different from overridden instance status {} for instance {}. Hence setting the status to overridden status",newObject[]{instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()});
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}this.renewsLastMin.increment();
        leaseToRenew.renew();returntrue;}}
Map<String,Lease<InstanceInfo>> gMap =(Map)this.registry.get(appName);

通过

appName

得到所有的租约,因为现在只有一个节点,所以租约只是1,如果租约不为空则通过

serverId

拿到租约

leaseToRenew =(Lease)gMap.get(id);

租约不为空,先获得到instance的信息:

InstanceInfo instanceInfo =(InstanceInfo)leaseToRenew.getHolder();
  • 如果instance的状态是UNKNOWN,则EurekaMonitors.RENEW_NOT_FOUND增加isReplication
  • 如果instance和当前的instance不相同(之前是down,现在发来心跳包是up),需要执行
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
publicsynchronizedvoidsetStatusWithoutDirty(InstanceInfo.InstanceStatus status){if(this.status != status){this.status = status;}}

这里是将

status

设置到

instanceInfo

里。

下面的方法是记录过去一分钟有多少租约被更新了:

this.renewsLastMin.increment();

更新租约:

leaseToRenew.renew();
publicvoidrenew(){this.lastUpdateTimestamp =System.currentTimeMillis()+this.duration;}

这里仅仅是将lastUpdateTimestamp进行更新


回到

InstanceResource

renewLease

方法里:
此时如果

renew

的逻辑不成功,那么返回给客户端

NOT_FOUND

renew

成功则继续流程

Response response;if(lastDirtyTimestamp !=null&&this.serverConfig.shouldSyncWhenTimestampDiffers()){
    response =this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);if(response.getStatus()==Status.NOT_FOUND.getStatusCode()&& overriddenStatus !=null&&!InstanceStatus.UNKNOWN.name().equals(overriddenStatus)&& isFromReplicaNode){this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(),this.id,InstanceStatus.valueOf(overriddenStatus));}}else{
    response =Response.ok().build();}

logger.debug("Found (Renew): {} - {}; reply status={}",newObject[]{this.app.getName(),this.id, response.getStatus()});return response;
lastDirtyTimestamp

表示最近一次和服务端出现脏数据的时间戳,是从客户端发来的

如果

lastDirtyTimestamp

不为空且设置了需要做数据同步,则进入

if

逻辑,先验证一下

response =this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
privateResponsevalidateDirtyTimestamp(Long lastDirtyTimestamp,boolean isReplication){InstanceInfo appInfo =this.registry.getInstanceByAppAndId(this.app.getName(),this.id,false);if(appInfo !=null&& lastDirtyTimestamp !=null&&!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp())){Object[] args =newObject[]{this.id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};if(lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()){
            logger.debug("Time to sync, since the last dirty timestamp differs - ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);returnResponse.status(Status.NOT_FOUND).build();}if(appInfo.getLastDirtyTimestamp()> lastDirtyTimestamp){if(isReplication){
                logger.debug("Time to sync, since the last dirty timestamp differs - ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);returnResponse.status(Status.CONFLICT).entity(appInfo).build();}returnResponse.ok().build();}}returnResponse.ok().build();}

先根据

appName

serverId

获取

InstanceInfo

,如果产生了一段时间不同步的情况

  • 客户端发来的脏数据时间晚于服务端保存的脏数据时间,则客户端发生了事情没告诉服务端,返回NOT_FOUND
  • 服务端保存的脏数据时间比客户端发来的脏数据时间更新,则说明服务端保存的是新数据,如果是其他注册中心同步过来的则会返回CONFLICT;如果是客户端发过来的则直接返回OK

回到

InstanceResource

renewLease

方法里:

if(response.getStatus()==Status.NOT_FOUND.getStatusCode()&& overriddenStatus !=null&&!InstanceStatus.UNKNOWN.name().equals(overriddenStatus)&& isFromReplicaNode){this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(),this.id,InstanceStatus.valueOf(overriddenStatus));}

这个

if

进不去,此后服务续约的流程就完成了

标签: eureka 网络 java

本文转载自: https://blog.csdn.net/HNU_Csee_wjw/article/details/124268150
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。

“Eureka 心跳和服务续约源码探秘——图解、源码级解析”的评论:

还没有评论