0


Springboot集成Nacos配置

参考文档

Nacos 融合 Spring Boot,成为注册配置中心 | Nacos 官网​​​​​​​

从源码角度一步步窥探Nacos配置拉取与动态刷新、监听原理_监听nacos配置中心刷新回调-CSDN博客

版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.1.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.boot</groupId>
    <artifactId>nacos-config-spring-boot-starter</artifactId>
    <version>0.2.8</version>
</dependency>

注意:nacos-client的版本要与nacos-server的版本一致,我的服务端版本是1.4.2,所以
nacos-config-spring-boot-starter使用的0.2.8;如果服务端版本是2.X,则nacos-
config-spring-boot-starter需要使用0.2.9(包含)以上版本,主要看其中依赖的nacos-client的版本要与nacos-server的版本一致;

配置文件

application.yml文件中的配置

nacos:
  config:
    bootstrap:
      // 一定要开启
      enable: true
      log-enable: true
    server-addr: xxx
    auto-refresh: true
    data-id: xxx
    type: yaml

使用方式

方式1:类似@ConfigurationProperties

@Getter
@Setter
@Component
@NacosConfigurationProperties(dataId = "${nacos.config.data-id}", autoRefreshed = true)
public class MyNacosConfig {

    private String useLocalCache;

}

方式2:类似@Value

    @NacosValue(value = "${useLocalCache}", autoRefreshed = true)
    private String useLocalCache;

原理

核心组件和机制:

  1. 一致性协议(比如Raft):Nacos使用一致性协议来保证集群环境下配置的一致性,确保各个节点的配置信息保持同步。
  2. 客户端长轮询和监听:当客户端向Nacos服务器请求配置时,Nacos服务器可以持续保持连接,并在配置更新时立刻推送最新的配置信息给客户端。
  3. 版本控制:Nacos会对配置信息进行版本管理,每次配置更新都会导致版本号的变化,客户端可以通过版本号来判断是否有新的配置更新。

基本的工作流程:

  1. 客户端向Nacos服务器订阅某个配置项,比如一个特定的数据ID。
  2. Nacos服务器收到订阅请求后,会将该订阅信息加入到订阅管理中,同时监听该配置项的变化。
  3. 当有更新操作触发时,Nacos服务器会更新配置项的内容,并通知所有订阅了该配置项的客户端。Nacos会将最新的配置信息推送给所有订阅的客户端。
  4. 客户端收到配置更新通知后,根据实际需要进行配置的更新和加载,保证配置信息的最新性和一致性。

通过这种机制,Nacos能够实现配置的动态更新。这种基于监听和推送的方式,确保了配置信息可以及时地被客户端获取和应用,从而实现了动态配置的管理和更新。

总体来说,Nacos动态更新配置的原理是基于订阅、监听和推送机制,加上版本控制和一致性协议的支持,来保证配置信息在分布式环境下的动态更新和同步。

源码分析-客户端

jar包中的spring.factorise文件中

EnableAutoConfiguration自动装配

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.boot.nacos.config.autoconfigure.NacosConfigAutoConfiguration

导入配置类

@Import

@ConditionalOnProperty(name = NacosConfigConstants.ENABLED, matchIfMissing = true)
@ConditionalOnMissingBean(name = CONFIG_GLOBAL_NACOS_PROPERTIES_BEAN_NAME)
@EnableConfigurationProperties(value = NacosConfigProperties.class)
@ConditionalOnClass(name = "org.springframework.boot.context.properties.bind.Binder")
// 导入配置类
@Import(value = { NacosConfigBootBeanDefinitionRegistrar.class })
@EnableNacosConfig
public class NacosConfigAutoConfiguration {

}

注册NacosBootConfigurationPropertiesBinder

@Configuration
public class NacosConfigBootBeanDefinitionRegistrar
        implements ImportBeanDefinitionRegistrar, BeanFactoryAware {

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
        // 声明bd构造器
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder
                .rootBeanDefinition(NacosBootConfigurationPropertiesBinder.class);
        // 注册db,名字:nacosConfigurationPropertiesBinder
        defaultListableBeanFactory.registerBeanDefinition(
                NacosBootConfigurationPropertiesBinder.BEAN_NAME,
                beanDefinitionBuilder.getBeanDefinition());
    }

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
            BeanDefinitionRegistry registry) {

    }
}

启用Nacos配置

@EnableNacosConfig

启用nacos配置,@Import(),导入NacosConfigBeanDefinitionRegistrar.class

    @Override
    public void registerBeanDefinitions(AnnotationMetadata metadata,
            BeanDefinitionRegistry registry) {
        AnnotationAttributes attributes = fromMap(
                metadata.getAnnotationAttributes(EnableNacosConfig.class.getName()));

        // 注册全局Nacos属性Bean
        registerGlobalNacosProperties(attributes, registry, environment,
                CONFIG_GLOBAL_NACOS_PROPERTIES_BEAN_NAME);
        // 注册nacos通用bean
        registerNacosCommonBeans(registry);
        // 注册nacos配置bean
        registerNacosConfigBeans(registry, environment, beanFactory);
        // 立即执行NacosPropertySourcePostProcessor
        // 为了提高@NacosPropertySource进程的优先级
        invokeNacosPropertySourcePostProcessor(beanFactory);
    }
public static void registerNacosConfigBeans(BeanDefinitionRegistry registry,
            Environment environment, BeanFactory beanFactory) {
        // Register PropertySourcesPlaceholderConfigurer Bean
        registerPropertySourcesPlaceholderConfigurer(registry, beanFactory);

        registerNacosConfigPropertiesBindingPostProcessor(registry);

        registerNacosConfigListenerMethodProcessor(registry);

        registerNacosPropertySourcePostProcessor(registry);

        // 处理注解@NacosPropertySources
        registerAnnotationNacosPropertySourceBuilder(registry);

        registerNacosConfigListenerExecutor(registry, environment);

        // 处理@NacosValue
        registerNacosValueAnnotationBeanPostProcessor(registry);

        registerConfigServiceBeanBuilder(registry);

        registerLoggingNacosConfigMetadataEventListener(registry);
    }

@NacosValue处理

NacosValueAnnotationBeanPostProcessor

@NacosValue注解bean后置处理器

配置变更:CacheData#checkListenerMd5,发送事件NacosConfigReceivedEvent,NacosValueAnnotationBeanPostProcessor接收处理事件

@Override
    public void onApplicationEvent(NacosConfigReceivedEvent event) {
        for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap
                .entrySet()) {
            String key = environment.resolvePlaceholders(entry.getKey());
            // 此时env中已经是最新的数据了
            String newValue = environment.getProperty(key);

            if (newValue == null) {
                continue;
            }
            List<NacosValueTarget> beanPropertyList = entry.getValue();
            for (NacosValueTarget target : beanPropertyList) {
                String md5String = MD5Utils.md5Hex(newValue, "UTF-8");
                boolean isUpdate = !target.lastMD5.equals(md5String);
                if (isUpdate) {
                    target.updateLastMD5(md5String);
                    Object evaluatedValue = resolveNotifyValue(target.nacosValueExpr, key, newValue);
                    if (target.method == null) {
                        // 通过反射设置到bean中
                        setField(target, evaluatedValue);
                    }
                    else {
                        setMethod(target, evaluatedValue);
                    }
                }
            }
        }
    }

EnvironmentPostProcessor环境处理器

org.springframework.boot.env.EnvironmentPostProcessor=\
  com.alibaba.boot.nacos.config.autoconfigure.NacosConfigEnvironmentProcessor

环境后置处理

    @Override
    public void postProcessEnvironment(ConfigurableEnvironment environment,
            SpringApplication application) {
        // 添加初始化器
        application.addInitializers(new NacosConfigApplicationContextInitializer(this));
        nacosConfigProperties = NacosConfigPropertiesUtils
                .buildNacosConfigProperties(environment);
        if (enable()) {
            System.out.println(
                    "[Nacos Config Boot] : The preload log configuration is enabled");  
            // 加载配置,保存到env
            loadConfig(environment);
        }
    }

加载配置

    private void loadConfig(ConfigurableEnvironment environment) {
        NacosConfigLoader configLoader = new NacosConfigLoader(nacosConfigProperties,
                environment, builder);
        // 加载配置
        configLoader.loadConfig();
        // set defer NacosPropertySource
        deferPropertySources.addAll(configLoader.getNacosPropertySources());
    }

获取nacos服务端配置

private NacosPropertySource[] reqNacosConfig(Properties configProperties,
            String[] dataIds, String groupId, ConfigType type, boolean isAutoRefresh) {
        final NacosPropertySource[] propertySources = new NacosPropertySource[dataIds.length];
        for (int i = 0; i < dataIds.length; i++) {
            if (StringUtils.isEmpty(dataIds[i])) {
                continue;
            }
            // 解析占位符
            final String dataId = environment.resolvePlaceholders(dataIds[i].trim());
            // builder.apply获取NacosConfigService对象
            // 通过get调接口方式获取配置内容
            final String config = NacosUtils.getContent(builder.apply(configProperties),
                    dataId, groupId);
            // 构建NacosPropertySource和DeferNacosPropertySource
            final NacosPropertySource nacosPropertySource = new NacosPropertySource(
                    dataId, groupId,
                    buildDefaultPropertySourceName(dataId, groupId, configProperties),
                    config, type.getType());
            nacosPropertySource.setDataId(dataId);
            nacosPropertySource.setType(type.getType());
            nacosPropertySource.setGroupId(groupId);
            nacosPropertySource.setAutoRefreshed(isAutoRefresh);
            logger.info("load config from nacos, data-id is : {}, group is : {}",
                    nacosPropertySource.getDataId(), nacosPropertySource.getGroupId());
            propertySources[i] = nacosPropertySource;
            DeferNacosPropertySource defer = new DeferNacosPropertySource(
                    nacosPropertySource, configProperties, environment);
            nacosPropertySources.add(defer);
        }
        return propertySources;
    }

初始化NacosConfigService对象

    public NacosConfigService(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties);
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            this.encode = Constants.ENCODE;
        } else {
            this.encode = encodeTmp.trim();
        }
        initNamespace(properties);
        this.configFilterChainManager = new ConfigFilterChainManager(properties);
        
        this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        this.agent.start();
        // 初始化nacos客户端,ClientWorker里面会启动一个线程,创建与服务端的长连接,接受服务端的各种事件通知,其中就包括配置变更事件
        this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
    }

nacos客户端

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
            final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        
        init(properties);
        
        // 1个线程的周期线程池
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        
        // cpu核心数周期线程池,执行长轮询
        this.executorService = Executors
                .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                        t.setDaemon(true);
                        return t;
                    }
                });
        
        // 延迟1ms,每10ms执行一次
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

在刷新上下文之前执行NacosConfigApplicationContextInitializer

初始化

@Override
    public void initialize(ConfigurableApplicationContext context) {
        singleton.setApplicationContext(context);
        environment = context.getEnvironment();
        nacosConfigProperties = NacosConfigPropertiesUtils
                .buildNacosConfigProperties(environment);
        final NacosConfigLoader configLoader = new NacosConfigLoader(
                nacosConfigProperties, environment, builder);
        else {
            if (processor.enable()) {
                processor.publishDeferService(context);
                // 添加CacheData到本地缓存,并绑定监听器
                configLoader
                .addListenerIfAutoRefreshed(processor.getDeferPropertySources());
            }
            else {
                configLoader.loadConfig();
                configLoader.addListenerIfAutoRefreshed();
            }
        }

        final ConfigurableListableBeanFactory factory = context.getBeanFactory();
        if (!factory
                .containsSingleton(NacosBeanUtils.GLOBAL_NACOS_PROPERTIES_BEAN_NAME)) {
            
// 注册单例globalNacosProperties
factory.registerSingleton(NacosBeanUtils.GLOBAL_NACOS_PROPERTIES_BEAN_NAME,
                    configLoader.buildGlobalNacosProperties());
        }
    }

添加监听器DelegatingEventPublishingListener

通过EventPublishingConfigService 注册 DelegatingEventPublishingListener;

发布事件NacosConfigListenerRegisteredEvent;

向cacheMap中增加任务

 public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        CacheData cacheData = cacheMap.get(key);
        if (cacheData != null) {
            return cacheData;
        }
        
        cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
        // 将cacheData放入cacheMap中
        CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
        if (lastCacheData == null) {
            if (enableRemoteSyncConfig) {
                ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
                cacheData.setContent(response.getContent());
            }
            int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
            cacheData.setTaskId(taskId);
            lastCacheData = cacheData;
        }
        
        lastCacheData.setInitializing(true);
        
        LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
        // 监听器计数
        MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());
        
        return lastCacheData;
    }

长轮询

    public void checkConfigInfo() {
        // Dispatch tasks.
        int listenerSize = cacheMap.size();
        // Round up the longingTaskCount.
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // 执行长轮询任务
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

LongPollingRunnable

class LongPollingRunnable implements Runnable {
        
        private final int taskId;
        
        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }
        
        @Override
        public void run() {
            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                // 检查配置是否正确
                for (CacheData cacheData : cacheMap.values()) {
                     // 省略
                }
                
                // 调服务端post接口,返回哪些数据变化,一般要30s
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
                }
                // 遍历所有变化的数据
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                        // 更新本地缓存中的配置
                        cache.setContent(response.getContent());
                        cache.setEncryptedDataKey(response.getEncryptedDataKey());
                        if (null != response.getConfigType()) {
                            cache.setType(response.getConfigType());
                        }
                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                                agent.getName(), dataId, group, tenant, cache.getMd5(),
                                ContentUtils.truncateContent(response.getContent()), response.getConfigType());
                    } catch (NacosException ioe) {
                        String message = String
                                .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                        agent.getName(), dataId, group, tenant);
                        LOGGER.error(message, ioe);
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                            .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();
                
                // 递归
                executorService.execute(this);
                
            } catch (Throwable e) {
                
                // If the rotation training task is abnormal, the next execution time of the task will be punished
                LOGGER.error("longPolling error : ", e);
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }

ApplicationListener应用监听器

日志

org.springframework.context.ApplicationListener=\
  com.alibaba.boot.nacos.config.logging.NacosLoggingListener

源码分析-服务端

标签: spring boot nacos

本文转载自: https://blog.csdn.net/GeAhardaharvest/article/details/140631591
版权归原作者 虔诚才会前程 所有, 如有侵权,请联系我们删除。

“Springboot集成Nacos配置”的评论:

还没有评论