0


【SpringCloud】这一次终于使用MQ解决了Eureka服务下线延迟感知问题

前言

其实,“通过Redis手动更新Ribbon缓存来解决Eureka微服务架构中服务下线感知的问题”是一种解,但不是最优解

1.痛点

上一篇文章的标题是:
通过Redis手动更新Ribbon缓存来解决Eureka微服务架构中服务下线感知的问题
当时在文章的末尾就指出,使用Redis+AOP的方式有很多漏洞,只有在服务调用方发送调用请求的情况下才会触发切面中更新Ribbon缓存的逻辑。如果每次在发布Eureka新服务的场景下,告警的接口都能准确定位到,那将这些接口方法通过切面去针对性的加上更新Ribbon缓存的前置操作完全是没问题的。但是如果告警接口数量众多,并且无法定位,上述方法就有些不够看了。

2.解决方案

于是,基于此种困境,我想到了用mq的事件驱动模式来推进Ribbon缓存更新(“下线”这一事件驱动,而不是“发送跨服务调用请求”这一事件),具体如下:
在这里插入图片描述
即,当服务被调用方中调用了下线接口下线了指定服务,会生产消息到MQ里,服务被调用方会监听这个队列去消费消息,并通过消费消息这一事件(消费下线服务端口信息)去驱动更新Ribbon缓存。
说明:
在以前我觉得用MQ不能做下线,压测了很多次也没成功,这本质还是没搞懂Eureka-Server,Eureka-Client,Ribbon三者的关系和之间的动作,其实这个体系里有两个非常关键的点(在配置文件中设置),可以直接影响无感知下线的结果,需要动态调整:那就是要关闭Eureka-server的三级缓存

useReadOnlyResponseCache: false

,并且缩短Eureka-Client端向Eureka-server端拉取服务列表的时间

registry-fetch-interval-seconds: 3

。可能这里大家看到去改配置有点鸡肋并且在实际场景中也不太现实,但别急,暂时先往下看,后面我会专门写一篇文章来解决这一问题

3.具体实现

3.1配置RabbitMQ

1.配置RabbitMQ(安装这些大家可以去看看平台比较成熟的文章)这里就不写了,我是直接在服务器上用docker容器化运行的:
在这里插入图片描述
在调用方与被调用方都配好MQ

3.2生产下线消息

首先声明一个队列:

@Configuration@EnableRabbitpublicclassRabbitMqConfig{@BeanpublicQueuetheQueue(){returnnewQueue("SERVER_LIST");}}

服务下线接口处,生产下线消息到MQ,向这接口

/service-down-list

发送GET请求,传递指定的下线服务实例信息即可下线服务,即

http://localhost:8081/control/service-down-list?portParams=8083

就下线了8083服务实例

@Value("${eureka-server.ipAddress}")privateString ipAddress;@Value("${eureka-server.appName}")privateString appName;@Value("${DIY_QUEUE.VALUE}")privateString queueName;@GetMapping(value ="/service-down-list")publicStringoffLine(@RequestParamList<Integer> portParams){List<Integer> successList =newArrayList<>();//得到服务信息List<InstanceInfo> instances = eurekaClient.getInstancesByVipAddress(appName,false);List<Integer> servicePorts = instances.stream().map(InstanceInfo::getPort).collect(Collectors.toList());//去服务列表里挨个下线OkHttpClient client =newOkHttpClient();
        log.error("开始时间:{}",System.currentTimeMillis());
        portParams.parallelStream().forEach(temp ->{if(servicePorts.contains(temp)){String url ="http://"+ ipAddress +":"+ temp +"/control/service-down";try{Response response = client.newCall(newRequest.Builder().url(url).build()).execute();if(response.code()==200){
                        log.debug(temp +"服务下线成功");
                        successList.add(temp);}else{
                        log.debug(temp +"服务下线失败");}}catch(IOException e){
                    log.error(e.toString());}}});//todo MQ通知HashMap<String,List<Integer>> portInfo =newHashMap<>();
        portInfo.put(appName,successList);
        rabbitTemplate.convertAndSend(queueName,portInfo);return successList +"优雅下线成功";}

这里向MQ的队列里传递了下线的服务实例端口信息

3.3更新Ribbon缓存

服务调用方通过“下线“这一事件驱动Ribbon缓存更新

/**
 * 消费者
 */@Slf4j@ComponentpublicclassConsumer{@ResourceSpringClientFactory springClientFactory;@ResourceClearRibbonCacheBean clearRibbonCacheBean;@RabbitListener(queues ="SERVER_LIST")publicvoidlistenWorkQueue1(HashMap<String,List<Integer>> message){
        log.debug("消费者1接收到消息——"+ message +"时间为:"+LocalTime.now());for(String key : message.keySet()){List<Integer> value = message.get(key);
            log.debug("Key: "+ key);
            log.debug("Value: "+ value);if(ObjectUtils.isNotEmpty(value)){
                clearRibbonCacheBean.clearRibbonCache(springClientFactory, value.toString(), key);}
            log.debug("现在的所有服务列表:{}", springClientFactory.getLoadBalancer(key).getAllServers());}}}

清理Ribbon缓存的Bean:

/**
 * 手动清除Ribbon缓存
 */@Configuration@Slf4jpublicclassClearRibbonCacheBean{/**
     * 削减
     */publicstaticbooleancutDown(List<Integer> ports,Server index){return ports.contains(index.getPort());}publicvoidclearRibbonCache(SpringClientFactory clientFactory,String portParams,String appName){// 获取指定服务的负载均衡器ILoadBalancer loadBalancer = clientFactory.getLoadBalancer(appName);//在主动拉取可用列表,而不是走拦截器被动的方式——这里为什么获取可用的之后还要过滤,就是因为所谓的可用不是实时的可用而是缓存中的可用List<Server> reachableServers = loadBalancer.getReachableServers();//这里从客户端获取,会等待客户端同步三级缓存//过滤掉已经下线的端口,符合条件端口的服务过滤出来List<Integer> portList =StringChange.stringToList(portParams);List<Server> ableServers = reachableServers.stream().filter(temp ->!cutDown(portList, temp)).collect(Collectors.toList());
        log.debug("可用服务列表:{}", ableServers);// 在某个时机需要清除Ribbon缓存((BaseLoadBalancer) loadBalancer).setServersList(ableServers);// 清除Ribbon负载均衡器的缓存}

3.4压测

运行项目,调用下线接口并压测来模拟一下线上场景:
此时我们调用下线接口,下线8083服务实例:
在这里插入图片描述
压测结果,均无异常:在这里插入图片描述
观察服务实例的日志输出:
未下线的8081,8084
在这里插入图片描述在这里插入图片描述
下线的8083
在这里插入图片描述
这说明,Eureka服务下线感知的延迟已经完全被消除

4.优化

以上的MQ还是采用简单队列的模式,即生产者生产一条消息到队列中,该消息也只能被一个消费者消费到。在微服务架构中,用户微服务肯定不只是被单方面调用,而是会被多方调用。那这就要求我们不能单纯只将消息生产到队列里,应该通过广播的模式进行消息的分发。为了更方便交换机与队列的灵活绑定,以及方便扩展,采用Topic话题的模型进行消息的广播:
在这里插入图片描述
声明一个新队列:

@BeanpublicQueuetheQueue(){returnnewQueue("USER-QUEUE");}

将生产消息的地方改为携带一个routingkey并发送到交换机中:

//todo MQ通知HashMap<String,List<Integer>> portInfo =newHashMap<>();
  portInfo.put(appName,successList);
  rabbitTemplate.convertAndSend(exchangeName,"USER.SERVICE-DOWN",portInfo);// 这个队列以后可能会发USER话题下的很多信息

将消费者端的消息监听器进行改造,变为监听指定话题的消息:

@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="USER-QUEUE"),
            exchange =@Exchange(name ="USER-TOPIC", type =ExchangeTypes.TOPIC),
            key ="USER.SERVICE-DOWN"))publicvoidlistenWorkQueue1(HashMap<String,List<Integer>> message){
        log.debug("消费者1接收到消息——"+ message +"时间为:"+LocalTime.now());for(String key : message.keySet()){List<Integer> value = message.get(key);
            log.debug("Key: "+ key);
            log.debug("Value: "+ value);if(ObjectUtils.isNotEmpty(value)){
                clearRibbonCacheBean.clearRibbonCache(springClientFactory, value.toString(), key);}
            log.debug("现在的所有服务列表:{}", springClientFactory.getLoadBalancer(key).getAllServers());}}
标签: eureka 云原生

本文转载自: https://blog.csdn.net/weixin_57535055/article/details/135560050
版权归原作者 懒羊羊.java 所有, 如有侵权,请联系我们删除。

“【SpringCloud】这一次终于使用MQ解决了Eureka服务下线延迟感知问题”的评论:

还没有评论