这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
背景
由于一些老项目,使用的注册中心还是
Zookeeper
,众所周知在
Spring Cloud
组件中是有客户端的负载均衡组件
Spring Cloud LoadBalancer
会存在客户端缓存。
那么就会出现一个问题:
由于服务提供者已经在
Zookeeper
下线了,而客户端缓存了旧的
ServiceInstance
数据,导致调用失败。
之前也在spring-cloud-zookeeper提过这个issues,不过没人理我,所以需要自己改造
改造思路
知道了问题所在改造起来就非常容易了,思路很简单,就是服务提供者在
Zookeeper
下线后需要客户端去删除客户端的本地缓存
所以我们需要知道
Zookeeper
本地缓存在哪。接下来就是我们源码分析找找看
客户端获取消费者(
ServiceInstance
)源码分析
我们知道
Spring Cloud
统一了服务变成模型,有一个
DiscoveryClient
接口,所以我们直接看
DiscoveryClient
接口的实现类
然后我们简单看看
ZookeeperDiscoveryClient
获取服务的方法实现
这一段方法比较简单,就是去
zookeeper
获取注册数据,没有缓存,那么客户端缓存是再哪里缓存的呢。我们必须找到调用的缓存的地方
可以看到这里是响应式获取数据,也没有缓存,我还需要向上寻找
功夫不负有心人,我们总算找到了这个缓存类
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
.getBeanProvider(LoadBalancerCacheManager.class);
如果看过我之前的这篇Spring Cloud落地之Spring Cloud LoadBalancer 线上优化方案
就知道他的缓存用的什么缓存,这里我们就不再介绍使用的什么缓存了。只需要知道我们拿到了这个缓存,就可以做我们想做的事情了。
改造缓存分布式删除
首先这里的客户端缓存是本地缓存,我们的机器一般是部署了多个节点,我们需要删除所有节点的缓存。
所以我们可以这么设计
- 客户端(网关)直接使用
Zookeeper
的事件监听然后去删除缓存 - 由下线的服务提供者去调用客户端(网关的接口),然后客户端通知其他节点一起删除缓存
现在又两种方案,最简单的方案肯定是第一种
Zookeeper事件监听
实现代码大致如下
@Component@Slf4jpublicclassZookeeperListenerimplementsApplicationContextAware{privateApplicationContext applicationContext;@ResourceprivateCuratorFramework curatorClient;@Value("${spring.cloud.zookeeper.discovery.root}")privateString path;@PostConstructpublicvoidinit(){//当前节点CuratorCache curatorCache =CuratorCache.builder(curatorClient, path).build();//监听子节点,不监听当前节点CuratorCacheListener pathCacheListener =CuratorCacheListener.builder().forPathChildrenCache(path, curatorClient,(client, event)->{String type = event.getType().name();
log.info("PathChildrenCacheListener {}", type);if(Objects.equals(event.getType(),PathChildrenCacheEvent.Type.CHILD_REMOVED)){ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = applicationContext
.getBeanProvider(LoadBalancerCacheManager.class);LoadBalancerCacheManager ifAvailable = cacheManagerProvider.getIfAvailable();assert ifAvailable !=null;Cache cache = ifAvailable.getCache(SERVICE_INSTANCE_CACHE_NAME);if(Objects.nonNull(cache)){// todo 这里需要删除指定key 而不是全量清除缓存
cache.clear();}
log.info("本地缓存清除完成");}}).build();
curatorCache.listenable().addListener(pathCacheListener);
curatorCache.start();}@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{this.applicationContext = applicationContext;}}
在写完代码上线测试发现比较多的问题,大致如下
- Zookeeper 不同版本导致事件监听失效
由于我们zk线上版本是3.5,测试是3.7.导致这段代码测试环境有效线上报错
- Zookeeper 事件延迟
- Zookeeper 事件存在丢失的情况
http删除缓存
Zookeeper
事件监听不靠谱我们就使用第二种方案
多节点的缓存删除我们使用redis作通知
- RedissonConfig
@ConfigurationpublicclassRedissonConfig{@Value("${redis..host}")privateString redisLoginHost;@Value("${redis..port}")privateInteger redisLoginPort;@Value("${redis..password}")privateString redisLoginPassword;@BeanpublicRedissonClientredissonClient(){returncreateRedis(redisLoginHost, redisLoginPort, redisLoginPassword);}privateRedissonClientcreateRedis(String redisHost,Integer redisPort,String redisPassword){Config config =newConfig();SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://"+ redisHost +":"+ redisPort +"");if(DataUtils.isNotEmpty(redisPassword)){
singleServerConfig.setPassword(redisPassword);}returnRedisson.create(config);}}
- RedisSubscriber
@Component@Slf4jpublicclassRedisSubscriberimplementsApplicationRunner,ApplicationContextAware{publicstaticfinalString GRACEFUL_SHUTDOWN ="graceful-shutdown";privateApplicationContext applicationContext;@AutowiredprivateRedissonClient redisson;@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{this.applicationContext = applicationContext;}@Overridepublicvoidrun(ApplicationArguments args){RTopic topic = redisson.getTopic(GRACEFUL_SHUTDOWN);
topic.addListener(ClientDTO.class,(channel, clientDTO)->{String applicationName = clientDTO.getApplicationName();ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = applicationContext
.getBeanProvider(LoadBalancerCacheManager.class);LoadBalancerCacheManager ifAvailable = cacheManagerProvider.getIfAvailable();assert ifAvailable !=null;Cache cache = ifAvailable.getCache(SERVICE_INSTANCE_CACHE_NAME);if(Objects.nonNull(cache)){List<ZookeeperServiceInstance> serviceInstances = cache.get(applicationName,List.class);if(DataUtils.isNotEmpty(serviceInstances)){List<ZookeeperServiceInstance> collect = serviceInstances.stream().filter(s ->{ServiceInstance<ZookeeperInstance> serviceInstance = s.getServiceInstance();String id = serviceInstance.getId();return!Objects.equals(id, clientDTO.getId());}).collect(Collectors.toList());
cache.put(applicationName, collect);
log.info("本地缓存清除完成 id {} ", clientDTO.getId());}else{
log.info("本地缓存null");}}});}}
- controller
@GetMapping("/flushCache")publicMap<String,Object>flushCache(ClientDTO clientDTO){
log.info("flushCache, applicationName : {}", clientDTO.getApplicationName());if(DataUtils.isNotEmpty(clientDTO)){RTopic topic = redissonClient.getTopic(GRACEFUL_SHUTDOWN);
topic.publish(clientDTO);
log.info("flushCache 发送缓存topic, applicationName : {}", clientDTO.getApplicationName());}Map<String,Object> result =newHashMap<>();
result.put("code",100);
result.put("message","ok");return result;}
这样我们服务提供者在销毁的时候注销zk,然后调用该接口去删除客户端缓存,就可以解决如下问题。实现
Spring Cloud Zookeeper
的优雅下线
客户端优雅下线sdk
我们可以给接入的服务消费者提供一个简单的sdk,在接受到Spring ContextClosedEvent事件后进行调用上面的接口清除缓存
核心代码如下
publicvoidgracefulShutdown(){this.serviceRegistry.deregister(this.serviceInstanceRegistration);
log.info("shutdown 注销Zookeeper服务");this.serviceRegistry.close();
log.info("shutdown 关闭Zookeeper连接");try{ServiceInstance<ZookeeperInstance> instance =this.serviceInstanceRegistration.getServiceInstance();String serviceName =this.serviceInstanceRegistration.getServiceInstance().getName();String host =this.serviceInstanceRegistration.getServiceInstance().getAddress();String id =this.serviceInstanceRegistration.getServiceInstance().getId();String url =String.format("%s?applicationName=%s&host=%s&id=%s",this.flushCacheUrl, serviceName, host, id);String ret =OkHttpUtils.get(url);
log.info("ret: {}", ret);}catch(Exception var7){
log.error("flush cache error : {}",this.flushCacheUrl);}}
总结
基于该方案改造后,线上服务发版下线就再也没有报错了,非常优雅
版权归原作者 weihubeats 所有, 如有侵权,请联系我们删除。