0


Dubbo学习记录(十二)--服务导出之注册到zookeeper

服务导出之注册到zookeeper

  • ServiceConfig#export导出, 层层调用到最后, 就是调用RegistryProtocol#export来进行服务导出,上一篇解释了,也解释了容器的启动;
  • 由于我们使用的zookeeper注册中心, 因此dubbo会将服务信息注册到注册中心中;
@Overridepublic<T> Exporter<T>export(final Invoker<T> originInvoker)throws RpcException {
      
        URL registryUrl =getRegistryUrl(originInvoker);//
        URL providerUrl =getProviderUrl(originInvoker);// dubbo://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=27656&release=2.7.0&side=provider&timeout=3000&timestamp=1590735956489final URL overrideSubscribeUrl =getSubscribedOverrideUrl(providerUrl);final OverrideListener overrideSubscribeListener =newOverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl =overrideUrlWithConfig(providerUrl, overrideSubscribeListener);// export invoker// 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了final ExporterChangeableWrapper<T> exporter =doLocalExport(originInvoker, providerUrl);// url to registry// 得到注册中心-ZookeeperRegistryfinal Registry registry =getRegistry(originInvoker);// 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化final URL registeredProviderUrl =getRegisteredProviderUrl(providerUrl, registryUrl);// 将当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);//to judge if we need to delay publish//是否需要注册到注册中心boolean register = providerUrl.getParameter(REGISTER_KEY,true);if(register){// 注册服务,把简化后的服务提供者url注册到registryUrl中去register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);}//监听内容
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);//Ensure that a new exporter instance is returned every time exportreturnnewDestroyableExporter<>(exporter);}

Registry#getRegistry(originInvoker)

目的:获取一个Registry实例对象;
工作:

  1. 获取registryUrl;
  2. 通过工厂模式,创建一个Registry对象;
private Registry getRegistry(final Invoker<?> originInvoker){
        URL registryUrl =getRegistryUrl(originInvoker);return registryFactory.getRegistry(registryUrl);}private URL getRegistryUrl(Invoker<?> originInvoker){// 将registry://xxx?xx=xx&registry=zookeeper 转为 zookeeper://xxx?xx=xx

        URL registryUrl = originInvoker.getUrl();if(REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())){
            String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);}return registryUrl;}

RegistryFactory#getRegistry(registryUrl)

是一个接口;

@SPI("dubbo")publicinterfaceRegistryFactory{@Adaptive({"protocol"})
    Registry getRegistry(URL url);}
  • ZookeeperRegistryFactory类实现了AbstractRegistryFactory抽象类, 而AbstractRegistryFactory实现了RegistryFactory接口;
  • 调用getRegistry方法调用的是ZookeeperRegistryFactory父类AbstractRegistryFactory的getRegistry方法
  • AbstractRegistryFactory#getRegistry定义了创建注册中心的一些公共逻辑,而由于注册中心有好几种,因此将注册中心的创建createRegistry逻辑交由子类去实现;这是工厂模式的通常做法;、
  • 使用ReentrantLock可重入锁, 保证线程安全, 避免重复创建Registry实例;
@Overridepublic Registry getRegistry(URL url){
        url = URLBuilder.from(url).setPath(RegistryService.class.getName()).addParameter(INTERFACE_KEY, RegistryService.class.getName()).removeParameters(EXPORT_KEY, REFER_KEY).build();
        String key = url.toServiceStringWithoutResolving();// Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();try{
            Registry registry = REGISTRIES.get(key);if(registry != null){return registry;}//create registry by spi/ioc
            registry =createRegistry(url);if(registry == null){thrownewIllegalStateException("Can not create registry "+ url);}
            REGISTRIES.put(key, registry);return registry;}finally{// Release the lock
            LOCK.unlock();}}protectedabstract Registry createRegistry(URL url);

ZookeeperRegistryFactory#createRegistry

创建了一个ZookeeperRegistry实例;

publicclassZookeeperRegistryFactoryextendsAbstractRegistryFactory{private ZookeeperTransporter zookeeperTransporter;/**
     * Invisible injection of zookeeper client via IOC/SPI
     * @param zookeeperTransporter
     */publicvoidsetZookeeperTransporter(ZookeeperTransporter zookeeperTransporter){this.zookeeperTransporter = zookeeperTransporter;}@Overridepublic Registry createRegistry(URL url){returnnewZookeeperRegistry(url, zookeeperTransporter);}}

创建ZookeeperRegistry实例;

  1. 调用super(url)去加载一些公共的配置;
  2. 获取group组名 ,默认值为“dubbo”
  3. 判断组名是否以“/”开头, 没有则拼接"/"
  4. Zookeeper注册中心的根节点设置为组名 group;
  5. 连接注册中心;
  6. 监听是否连接成功,连接失败,则进行重连;
publicZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter){super(url);if(url.isAnyHost()){thrownewIllegalStateException("registry address == null");}
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);if(!group.startsWith(PATH_SEPARATOR)){
            group = PATH_SEPARATOR + group;}this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(state ->{if(state == StateListener.RECONNECTED){try{recover();}catch(Exception e){
                    logger.error(e.getMessage(), e);}}});}

创建完ZookeeperRegistry实例后,层层返回, 返回到获取Registry的地方;

RegistryProtocol#getRegisteredProviderUrl

目的: 简化删除URL中的一些无用数据;

// 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化final URL registeredProviderUrl =getRegisteredProviderUrl(providerUrl, registryUrl);

工作:

  1. 判断RegistryProtocol是否配置简化简述Simplified, 默认时false;
  2. 过滤URL中以“.”开头的key, 以及monitor, bind.ip, bind.port, qos.enable, qos_host, qos_port, qos.accept.foreign.ip, validation,interfaces的key ; 过滤后,剩下的key就是待删除的key;
  3. 删除URL中 待删除的key;
  4. 返回
/**
     * Return the url that is registered to the registry and filter the url parameter once
     * 得到能存入到注册中心去的providerUrl
     * @param providerUrl
     * @return url to registry.
     */private URL getRegisteredProviderUrl(final URL providerUrl,final URL registryUrl){//The address you see at the registry// 默认都是走这里if(!registryUrl.getParameter(SIMPLIFIED_KEY,false)){// 移除key以"."开始的参数,以及其他跟服务本身没有关系的参数,比如监控,绑定ip,qosd等等return providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameters(
                    MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE, QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY,
                    INTERFACES);}else{
            String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY,"");// if path is not the same as interface name then we should keep INTERFACE_KEY,// otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',// but what we expect is '/dubbo/interface/providers'// 如果path不等于interface的值,那么则把INTERFACE_KEY添加到extraKeys中if(!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))){if(StringUtils.isNotEmpty(extraKeys)){
                    extraKeys +=",";}
                extraKeys += INTERFACE_KEY;}// paramsToRegistry包括了DEFAULT_REGISTER_PROVIDER_KEYS和extraKeys
            String[] paramsToRegistry =getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS
                    , COMMA_SPLIT_PATTERN.split(extraKeys));// 生成只含有paramsToRegistry的对应的参数,并且该参数不能为空return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY,(String[]) null));}}//过滤掉以"."开头的key, 返回一个数组;privatestatic String[]getFilteredKeys(URL url){
        Map<String, String> params = url.getParameters();// 过滤url的参数,找到startsWith(HIDE_KEY_PREFIX)的keyif(CollectionUtils.isNotEmptyMap(params)){return params.keySet().stream().filter(k -> k.startsWith(HIDE_KEY_PREFIX)).toArray(String[]::new);}else{returnnewString[0];}}

ProviderConsumerRegTable.registerProvider(originInvoker,registryUrl, registeredProviderUrl)

目的: 当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
工作:

  1. 创建一个ProviderInvokerWrapper实例,包装了注册中URL,服务执行器, 以及简化后的URL;
  2. 获取providerUrl中的serviceKey;
  3. 从缓存providerInvokers根据key获取map;
  4. 如果为空,则放入一个空的ConcurrentHashMap, 再根据key将这个ConcurrentHashMap拿出来;
  5. 以invoker为key, wrapperInvoker包装实例为value, 放入invokers,即刚创建的ConcurrenthashMap;
publicstatic<T> ProviderInvokerWrapper<T>registerProvider(Invoker<T> invoker, URL registryUrl, URL providerUrl){
        ProviderInvokerWrapper<T> wrapperInvoker =newProviderInvokerWrapper<>(invoker, registryUrl, providerUrl);
        String serviceUniqueName = providerUrl.getServiceKey();// 根据
        ConcurrentMap<Invoker, ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);if(invokers == null){
            providerInvokers.putIfAbsent(serviceUniqueName,newConcurrentHashMap<>());
            invokers = providerInvokers.get(serviceUniqueName);}
        invokers.put(invoker, wrapperInvoker);return wrapperInvoker;}

register(registryUrl, registeredProviderUrl)

注册到Zookeeper注册中心;

  1. 获取REGISTER_KEY的值,判断是否要注册;
  2. 需要注册则将简化后的服务URL放入注册中心;
boolean register = providerUrl.getParameter(REGISTER_KEY,true);if(register){// 注册服务,把简化后的服务提供者url注册到registryUrl中去register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);}
  1. 首先调用registryFactory.getRegistry获取注册中心实例, 由于前面已经创建一个注册中心实例,并且放入了缓存,因此,这个步骤中,直接从map中获取就可以了;
  2. 调用的ZookeeperRegistry#register方法;
publicvoidregister(URL registryUrl, URL registeredProviderUrl){
        Registry registry = registryFactory.getRegistry(registryUrl);// 调用ZookeeperRegistry的register方法
        registry.register(registeredProviderUrl);}

ZookeeperRegistry#register(registeredProviderUrl)

  • ZookeeperRegistry本身并没有register方法,但继承了FailbackRegistry父类, 父类中已经实现了register方法
  • 因此,调用ZookeeperRegistry实际上调用了父类FailbackRegistry的register方法;
  • 调用了doRegister(url)方法, 由于注册中心有好几种,不可能是通用的操作,因此实现逻辑由子类去实现;即调用了ZookeeperRegistry#doRegister(url)方法
  • 个人看法:普通工厂模式也是这种设计理念;
//FailbackRegistry#register@Overridepublicvoidregister(URL url){super.register(url);removeFailedRegistered(url);removeFailedUnregistered(url);try{// Sending a registration request to the server sidedoRegister(url);}catch(Exception e){
            Throwable t = e;//省略部分代码;// Record a failed registration request to a failed list, retry regularlyaddFailedRegistered(url);}}

ZookeeperRegistry#doRegister(url)

目的:给Zookeeper发送请求,创建一个值toUrlPath(url)为动态的配置结点;

@OverridepublicvoiddoRegister(URL url){try{
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY,true));}catch(Throwable e){thrownewRpcException("Failed to register "+ url +" to zookeeper "+getUrl()+", cause: "+ e.getMessage(), e);}}

toUrlPath(url)

获取结点的值;

//"/dubbo/com.demo.DemoService/providers/#{url}"// toCategoryPath(url) ⇒ "/dubbo/com.demo.DemoService/providers"// PATH_SEPARATOR  ==> "/"//URL.encode(url.toFullString())==>对url进行中文编码private String toUrlPath(URL url){returntoCategoryPath(url)+ PATH_SEPARATOR + URL.encode(url.toFullString());}//返回的结果:"/dubbo/com.demo.DemoService/providers"// toServicePath(url) ⇒  "/dubbo/com.demo.DemoService"// PATH_SEPARATOR  ==> "/"// 获取URL的catogory的值, 默认为providers;private String toCategoryPath(URL url){returntoServicePath(url)+ PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);}//结果: "/dubbo/com.demo.DemoService"//toRootDir() --->  "/dubbo/"//URL.encode(name) --> 对URL的name属性进行URL编码; name的值为 com.demo.DemoServiceprivate String toServicePath(URL url){
        String name = url.getServiceInterface();if(ANY_VALUE.equals(name)){returntoRootPath();}returntoRootDir()+ URL.encode(name);}//结果为:根节点 的值 + “/”, 这里为 "/dubbo/"private String toRootDir(){if(root.equals(PATH_SEPARATOR)){return root;}return root + PATH_SEPARATOR;}

服务导出源码流程

  1. ServiceBean.export()方法是导出的入口方法,会执行ServiceConfig.export()方法完成服务导出,导出完了之后会发布一个Spring事件ServiceBeanExportedEvent
  2. 在ServiceConfig.export()方法中会先调用checkAndUpdateSubConfigs(),这个方法主要完成AbstractConfig的参数刷新(从配置中心获取参数等等),AbstractConfig是指ApplicationConfig、ProtocolConfig、ServiceConfig等等,刷新完后会检查stub、local、mock等参数是否配置正确
  3. 参数刷新和检查完成了之后,就会开始导出服务,如果配置了延迟导出,那么则按指定的时间利用ScheduledExecutorService来进行延迟导出
  4. 否则调用doExport()进行服务导出
  5. 继续调用doExportUrls()进行服务导出
  6. 首先通过loadRegistries()方法获得所配置的注册中心的URL,可能配了多个配置中心,那么当前所导出的服务需要注册到每个配置中心去,这里,注册中心的是以URL的方式来表示的,使用的是什么注册中心、注册中心的地址和端口,给注册中心所配置的参数等等,都会存在在URL上,此URL以registry://开始
  7. 获得到注册中心的registryURLs之后,就会遍历当前服务所有的ProtocolConfig,调用doExportUrlsFor1Protocol(protocolConfig, registryURLs);方法把当前服务按每个协议每个注册中心分别进行导出
  8. 在doExportUrlsFor1Protocol()方法中,会先构造一个服务URL,包括 a. 服务的协议dubbo://, b. 服务的IP和PORT,如果指定了就取指定的,没有指定IP就获取服务器上网卡的IP, c. 以及服务的PATH,如果没有指定PATH参数,则取接口名 d. 以及服务的参数,参数包括服务的参数,服务中某个方法的参数 e. 最终得到的URL类似: dubbo://192.168.1.110:20880/com.tuling.DemoService?timeout=3000&&sayHello.loadbalance=random
  9. 得到服务的URL之后,会把服务URL作为一个参数添加到registryURL中去,然后把registryURL、服务的接口、当前服务实现类ref生成一个Invoker代理对象,再把这个代理对象和当前ServiceConfig对象包装成一个DelegateProviderMetaDataInvoker对象,DelegateProviderMetaDataInvoker就表示了完整的一个服务
  10. 接下来就会使用Protocol去export导出服务了,导出之后将得到一个Exporter对象(该Exporter对象,可以理解为主要可以用来卸载(unexport)服务,什么时候会卸载服务?在优雅关闭Dubbo应用的时候)
  11. 接下来我们来详细看看Protocol是怎么导出服务的?
  12. 但调用protocol.export(wrapperInvoker)方法时,因为protocol是Protocol接口的一个Adaptive对象,所以此时会根据wrapperInvoker的genUrl方法得到一个url,根据此url的协议找到对应的扩展点,此时扩展点就是RegistryProtocol,但是,因为Protocol接口有两个包装类,一个是ProtocolFilterWrapper、ProtocolListenerWrapper,所以实际上在调用export方法时,会经过这两个包装类的export方法,但是在这两个包装类的export方法中都会Registry协议进行了判断,不会做过多处理,所以最终会直接调用到RegistryProtocol的export(Invoker originInvoker)方法
  13. 在RegistryProtocol的export(Invoker originInvoker)方法中,主要完成了以下几件事情: a. 生成监听器,监听动态配置中心此服务的参数数据的变化,一旦监听到变化,则重写服务URL,并且在服务导出时先重写一次服务URL b. 拿到重写之后的URL之后,调用doLocalExport()进行服务导出,在这个方法中就会调用DubboProtocol的export方法去导出服务了,导出成功后将得到一个ExporterChangeableWrapper ⅰ. 在DubboProtocol的export方法中主要要做的事情就是启动NettyServer,并且设置一系列的RequestHandler,以便在接收到请求时能依次被这些RequestHandler所处理 ⅱ. 这些RequestHandler在上文已经整理过了 c. 从originInvoker中获取注册中心的实现类,比如ZookeeperRegistry d. 将重写后的服务URL进行简化,把不用存到注册中心去的参数去除 e. 把简化后的服务URL调用ZookeeperRegistry.registry()方法注册到注册中心去 f. 最后将ExporterChangeableWrapper封装为DestroyableExporter对象返回,完成服务导出

Exporter架构

一个服务导出成功后,会生成对应的Exporter:

  1. DestroyableExporter:Exporter的最外层包装类,这个类的主要作用是可以用来unexporter对应的服务
  2. ExporterChangeableWrapper:这个类主要负责在unexport对应服务之前,把服务URL从注册中心中移除,把该服务对应的动态配置监听器移除
  3. ListenerExporterWrapper:这个类主要负责在unexport对应服务之后,把服务导出监听器移除
  4. DubboExporter:这个类中保存了对应服务的Invoker对象,和当前服务的唯一标志,当NettyServer接收到请求后,会根据请求中的服务信息,找到服务对应的DubboExporter对象,然后从对象中得到Invoker对象在这里插入图片描述

总结

  • 服务导出中使用构造器模式,反射, 工厂模式,代码通用性也很强;
  • 大概了解服务导出的过程, 至于Dubbo配置的动态变化,这涉及到监听机制,Zookeeper的一大特性吧。 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener) 订阅注册中心配置, 同时有一个监听器,如果配置发生变动,则触发overrideSubscribeListener监听器,重新导入配置;
标签: dubbo

本文转载自: https://blog.csdn.net/yaoyaochengxian/article/details/123600130
版权归原作者 ~玄霄- 所有, 如有侵权,请联系我们删除。

“Dubbo学习记录(十二)--服务导出之注册到zookeeper”的评论:

还没有评论