参考文档
Nacos 融合 Spring Boot,成为注册配置中心 | Nacos 官网
从源码角度一步步窥探Nacos配置拉取与动态刷新、监听原理_监听nacos配置中心刷新回调-CSDN博客
版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.8</version>
</dependency>
注意:nacos-client的版本要与nacos-server的版本一致,我的服务端版本是1.4.2,所以
nacos-config-spring-boot-starter使用的0.2.8;如果服务端版本是2.X,则nacos-
config-spring-boot-starter需要使用0.2.9(包含)以上版本,主要看其中依赖的nacos-client的版本要与nacos-server的版本一致;
配置文件
application.yml文件中的配置
nacos:
config:
bootstrap:
// 一定要开启
enable: true
log-enable: true
server-addr: xxx
auto-refresh: true
data-id: xxx
type: yaml
使用方式
方式1:类似@ConfigurationProperties
@Getter
@Setter
@Component
@NacosConfigurationProperties(dataId = "${nacos.config.data-id}", autoRefreshed = true)
public class MyNacosConfig {
private String useLocalCache;
}
方式2:类似@Value
@NacosValue(value = "${useLocalCache}", autoRefreshed = true)
private String useLocalCache;
原理
核心组件和机制:
- 一致性协议(比如Raft):Nacos使用一致性协议来保证集群环境下配置的一致性,确保各个节点的配置信息保持同步。
- 客户端长轮询和监听:当客户端向Nacos服务器请求配置时,Nacos服务器可以持续保持连接,并在配置更新时立刻推送最新的配置信息给客户端。
- 版本控制:Nacos会对配置信息进行版本管理,每次配置更新都会导致版本号的变化,客户端可以通过版本号来判断是否有新的配置更新。
基本的工作流程:
- 客户端向Nacos服务器订阅某个配置项,比如一个特定的数据ID。
- Nacos服务器收到订阅请求后,会将该订阅信息加入到订阅管理中,同时监听该配置项的变化。
- 当有更新操作触发时,Nacos服务器会更新配置项的内容,并通知所有订阅了该配置项的客户端。Nacos会将最新的配置信息推送给所有订阅的客户端。
- 客户端收到配置更新通知后,根据实际需要进行配置的更新和加载,保证配置信息的最新性和一致性。
通过这种机制,Nacos能够实现配置的动态更新。这种基于监听和推送的方式,确保了配置信息可以及时地被客户端获取和应用,从而实现了动态配置的管理和更新。
总体来说,Nacos动态更新配置的原理是基于订阅、监听和推送机制,加上版本控制和一致性协议的支持,来保证配置信息在分布式环境下的动态更新和同步。
源码分析-客户端
jar包中的spring.factorise文件中
EnableAutoConfiguration自动装配
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.boot.nacos.config.autoconfigure.NacosConfigAutoConfiguration
导入配置类
@Import
@ConditionalOnProperty(name = NacosConfigConstants.ENABLED, matchIfMissing = true)
@ConditionalOnMissingBean(name = CONFIG_GLOBAL_NACOS_PROPERTIES_BEAN_NAME)
@EnableConfigurationProperties(value = NacosConfigProperties.class)
@ConditionalOnClass(name = "org.springframework.boot.context.properties.bind.Binder")
// 导入配置类
@Import(value = { NacosConfigBootBeanDefinitionRegistrar.class })
@EnableNacosConfig
public class NacosConfigAutoConfiguration {
}
注册NacosBootConfigurationPropertiesBinder
@Configuration
public class NacosConfigBootBeanDefinitionRegistrar
implements ImportBeanDefinitionRegistrar, BeanFactoryAware {
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
// 声明bd构造器
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder
.rootBeanDefinition(NacosBootConfigurationPropertiesBinder.class);
// 注册db,名字:nacosConfigurationPropertiesBinder
defaultListableBeanFactory.registerBeanDefinition(
NacosBootConfigurationPropertiesBinder.BEAN_NAME,
beanDefinitionBuilder.getBeanDefinition());
}
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {
}
}
启用Nacos配置
@EnableNacosConfig
启用nacos配置,@Import(),导入NacosConfigBeanDefinitionRegistrar.class
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
AnnotationAttributes attributes = fromMap(
metadata.getAnnotationAttributes(EnableNacosConfig.class.getName()));
// 注册全局Nacos属性Bean
registerGlobalNacosProperties(attributes, registry, environment,
CONFIG_GLOBAL_NACOS_PROPERTIES_BEAN_NAME);
// 注册nacos通用bean
registerNacosCommonBeans(registry);
// 注册nacos配置bean
registerNacosConfigBeans(registry, environment, beanFactory);
// 立即执行NacosPropertySourcePostProcessor
// 为了提高@NacosPropertySource进程的优先级
invokeNacosPropertySourcePostProcessor(beanFactory);
}
public static void registerNacosConfigBeans(BeanDefinitionRegistry registry,
Environment environment, BeanFactory beanFactory) {
// Register PropertySourcesPlaceholderConfigurer Bean
registerPropertySourcesPlaceholderConfigurer(registry, beanFactory);
registerNacosConfigPropertiesBindingPostProcessor(registry);
registerNacosConfigListenerMethodProcessor(registry);
registerNacosPropertySourcePostProcessor(registry);
// 处理注解@NacosPropertySources
registerAnnotationNacosPropertySourceBuilder(registry);
registerNacosConfigListenerExecutor(registry, environment);
// 处理@NacosValue
registerNacosValueAnnotationBeanPostProcessor(registry);
registerConfigServiceBeanBuilder(registry);
registerLoggingNacosConfigMetadataEventListener(registry);
}
@NacosValue处理
NacosValueAnnotationBeanPostProcessor
@NacosValue注解bean后置处理器
配置变更:CacheData#checkListenerMd5,发送事件NacosConfigReceivedEvent,NacosValueAnnotationBeanPostProcessor接收处理事件
@Override
public void onApplicationEvent(NacosConfigReceivedEvent event) {
for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap
.entrySet()) {
String key = environment.resolvePlaceholders(entry.getKey());
// 此时env中已经是最新的数据了
String newValue = environment.getProperty(key);
if (newValue == null) {
continue;
}
List<NacosValueTarget> beanPropertyList = entry.getValue();
for (NacosValueTarget target : beanPropertyList) {
String md5String = MD5Utils.md5Hex(newValue, "UTF-8");
boolean isUpdate = !target.lastMD5.equals(md5String);
if (isUpdate) {
target.updateLastMD5(md5String);
Object evaluatedValue = resolveNotifyValue(target.nacosValueExpr, key, newValue);
if (target.method == null) {
// 通过反射设置到bean中
setField(target, evaluatedValue);
}
else {
setMethod(target, evaluatedValue);
}
}
}
}
}
EnvironmentPostProcessor环境处理器
org.springframework.boot.env.EnvironmentPostProcessor=\
com.alibaba.boot.nacos.config.autoconfigure.NacosConfigEnvironmentProcessor
环境后置处理
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment,
SpringApplication application) {
// 添加初始化器
application.addInitializers(new NacosConfigApplicationContextInitializer(this));
nacosConfigProperties = NacosConfigPropertiesUtils
.buildNacosConfigProperties(environment);
if (enable()) {
System.out.println(
"[Nacos Config Boot] : The preload log configuration is enabled");
// 加载配置,保存到env
loadConfig(environment);
}
}
加载配置
private void loadConfig(ConfigurableEnvironment environment) {
NacosConfigLoader configLoader = new NacosConfigLoader(nacosConfigProperties,
environment, builder);
// 加载配置
configLoader.loadConfig();
// set defer NacosPropertySource
deferPropertySources.addAll(configLoader.getNacosPropertySources());
}
获取nacos服务端配置
private NacosPropertySource[] reqNacosConfig(Properties configProperties,
String[] dataIds, String groupId, ConfigType type, boolean isAutoRefresh) {
final NacosPropertySource[] propertySources = new NacosPropertySource[dataIds.length];
for (int i = 0; i < dataIds.length; i++) {
if (StringUtils.isEmpty(dataIds[i])) {
continue;
}
// 解析占位符
final String dataId = environment.resolvePlaceholders(dataIds[i].trim());
// builder.apply获取NacosConfigService对象
// 通过get调接口方式获取配置内容
final String config = NacosUtils.getContent(builder.apply(configProperties),
dataId, groupId);
// 构建NacosPropertySource和DeferNacosPropertySource
final NacosPropertySource nacosPropertySource = new NacosPropertySource(
dataId, groupId,
buildDefaultPropertySourceName(dataId, groupId, configProperties),
config, type.getType());
nacosPropertySource.setDataId(dataId);
nacosPropertySource.setType(type.getType());
nacosPropertySource.setGroupId(groupId);
nacosPropertySource.setAutoRefreshed(isAutoRefresh);
logger.info("load config from nacos, data-id is : {}, group is : {}",
nacosPropertySource.getDataId(), nacosPropertySource.getGroupId());
propertySources[i] = nacosPropertySource;
DeferNacosPropertySource defer = new DeferNacosPropertySource(
nacosPropertySource, configProperties, environment);
nacosPropertySources.add(defer);
}
return propertySources;
}
初始化NacosConfigService对象
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
initNamespace(properties);
this.configFilterChainManager = new ConfigFilterChainManager(properties);
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
// 初始化nacos客户端,ClientWorker里面会启动一个线程,创建与服务端的长连接,接受服务端的各种事件通知,其中就包括配置变更事件
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
nacos客户端
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
init(properties);
// 1个线程的周期线程池
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
// cpu核心数周期线程池,执行长轮询
this.executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 延迟1ms,每10ms执行一次
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
在刷新上下文之前执行NacosConfigApplicationContextInitializer
初始化
@Override
public void initialize(ConfigurableApplicationContext context) {
singleton.setApplicationContext(context);
environment = context.getEnvironment();
nacosConfigProperties = NacosConfigPropertiesUtils
.buildNacosConfigProperties(environment);
final NacosConfigLoader configLoader = new NacosConfigLoader(
nacosConfigProperties, environment, builder);
else {
if (processor.enable()) {
processor.publishDeferService(context);
// 添加CacheData到本地缓存,并绑定监听器
configLoader
.addListenerIfAutoRefreshed(processor.getDeferPropertySources());
}
else {
configLoader.loadConfig();
configLoader.addListenerIfAutoRefreshed();
}
}
final ConfigurableListableBeanFactory factory = context.getBeanFactory();
if (!factory
.containsSingleton(NacosBeanUtils.GLOBAL_NACOS_PROPERTIES_BEAN_NAME)) {
// 注册单例globalNacosProperties
factory.registerSingleton(NacosBeanUtils.GLOBAL_NACOS_PROPERTIES_BEAN_NAME,
configLoader.buildGlobalNacosProperties());
}
}
添加监听器DelegatingEventPublishingListener
通过EventPublishingConfigService 注册 DelegatingEventPublishingListener;
发布事件NacosConfigListenerRegisteredEvent;
向cacheMap中增加任务
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
String key = GroupKey.getKeyTenant(dataId, group, tenant);
CacheData cacheData = cacheMap.get(key);
if (cacheData != null) {
return cacheData;
}
cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
// 将cacheData放入cacheMap中
CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
if (lastCacheData == null) {
if (enableRemoteSyncConfig) {
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
cacheData.setContent(response.getContent());
}
int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
cacheData.setTaskId(taskId);
lastCacheData = cacheData;
}
lastCacheData.setInitializing(true);
LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
// 监听器计数
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());
return lastCacheData;
}
长轮询
public void checkConfigInfo() {
// Dispatch tasks.
int listenerSize = cacheMap.size();
// Round up the longingTaskCount.
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 执行长轮询任务
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
LongPollingRunnable
class LongPollingRunnable implements Runnable {
private final int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// 检查配置是否正确
for (CacheData cacheData : cacheMap.values()) {
// 省略
}
// 调服务端post接口,返回哪些数据变化,一般要30s
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
// 遍历所有变化的数据
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
// 更新本地缓存中的配置
cache.setContent(response.getContent());
cache.setEncryptedDataKey(response.getEncryptedDataKey());
if (null != response.getConfigType()) {
cache.setType(response.getConfigType());
}
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(response.getContent()), response.getConfigType());
} catch (NacosException ioe) {
String message = String
.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
// 递归
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
ApplicationListener应用监听器
日志
org.springframework.context.ApplicationListener=\
com.alibaba.boot.nacos.config.logging.NacosLoggingListener
源码分析-服务端
版权归原作者 虔诚才会前程 所有, 如有侵权,请联系我们删除。