文章目录
前置知识 pull模式
sentinel持久化push推模式
pull拉模式的缺点,以保存本地文件举例:
- 定时任务是每隔3s执行一次,去判断规则持久化文件的最后修改时间。这里有一定时间的延迟,但如果时间设置的太短,有影响服务器的性能
- 我们的微服务是集群部署的,其他服务实例可读取不到我这台服务器的本地文件
所以还有一种push推送模式。我们一般会引入第三方中间件来实现,以Nacos为例。我们修改了nacos中的配置,它就会将更新后的数据推送给微服务。
push模式有两种实现方式:
- 在微服务端添加读数据源,为dataId添加监听器,当规则配置文件更改之后我就获取到更改后的规则内存并更新内存中的数据;再添加一个写数据源,每当dashboard中更新了规则,我除了更新内存中的数据之外,我通过
ConfigService.publishConfig()
方法还往Nacos端进行写入 - 在dashboard源码中进行更改,在获取规则内容、更新规则内容的接口中,不要和微服务端进行交互,直接去和Nacos通信,通过
ConfigService.publishConfig()
和ConfigService.getConfig()
来实现。这种方式主要注意dashboard端的规则实体对象和微服务端的规则实体对象不一致问题,需要经过转换相关的操作。sentinel默认情况下就直接把规则实体转换为json字符串推送给Nacos,Nacos配置文件更改了,又推送给微服务,微服务这边再把json字符串转换为规则实体对象这一步就会发现,转换失败了,某些属性对应不上。进而就导致了dashboard端设置的规则在微服务这边未生效。
微服务端的实现
具体实现
引入读数据源的依赖
<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId></dependency>
配置文件中添加规则持久化的dataId
server:port:8806spring:application:name: mall-user-sentinel-rule-push #微服务名称#配置nacos注册中心地址cloud:nacos:discovery:server-addr: 127.0.0.1:8848sentinel:transport:# 添加sentinel的控制台地址dashboard: 127.0.0.1:8080datasource:# 名称自定义,可以随便定义字符串flow-rules:nacos:server-addr: 127.0.0.1:8848# dataId取了微服务名字,后面再拼接字符串dataId: ${spring.application.name}-flow-rules
# 我这里在Nacos配置中心,单独使用了一个组groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: flow
degrade-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-degrade-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: degrade
param-flow-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-param-flow-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: param-flow
authority-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-authority-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: authority
system-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-system-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: system
在Nacos配置中心中创建对应的配置文件
编写java类,定义写数据源
importcom.alibaba.cloud.sentinel.SentinelProperties;importcom.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration;importorg.springframework.boot.autoconfigure.AutoConfigureAfter;importorg.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* 添加往Nacos的写数据源,只不过未使用InitFunc
* 如果要使用就需要放开注解
*/@Configuration(proxyBeanMethods =false)@AutoConfigureAfter(SentinelAutoConfiguration.class)publicclassSentinelNacosDataSourceConfiguration{@Bean@ConditionalOnMissingBeanpublicSentinelNacosDataSourceHandlersentinelNacosDataSourceHandler(SentinelProperties sentinelProperties){returnnewSentinelNacosDataSourceHandler(sentinelProperties);}}
importcom.alibaba.cloud.sentinel.SentinelProperties;importcom.alibaba.cloud.sentinel.datasource.RuleType;importcom.alibaba.cloud.sentinel.datasource.config.DataSourcePropertiesConfiguration;importcom.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties;importcom.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler;importcom.alibaba.csp.sentinel.datasource.WritableDataSource;importcom.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;importcom.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule;importcom.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;importcom.alibaba.csp.sentinel.slots.system.SystemRule;importcom.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;importcom.alibaba.fastjson.JSON;importorg.springframework.beans.factory.SmartInitializingSingleton;importjava.util.List;/**
* sentinel 规则持久化到 nacos配置中心
*/publicclassSentinelNacosDataSourceHandlerimplementsSmartInitializingSingleton{privatefinalSentinelProperties sentinelProperties;publicSentinelNacosDataSourceHandler(SentinelProperties sentinelProperties){this.sentinelProperties = sentinelProperties;}@OverridepublicvoidafterSingletonsInstantiated(){// 遍历我们配置文件中指定的多个spring.cloud.sentinel.datasource的多个配置
sentinelProperties.getDatasource().values().forEach(this::registryWriter);}privatevoidregistryWriter(DataSourcePropertiesConfiguration dataSourceProperties){// 只获取application.yml文件中 nacos配置的数据源finalNacosDataSourceProperties nacosDataSourceProperties = dataSourceProperties.getNacos();if(nacosDataSourceProperties ==null){return;}// 获取规则类型,然后根据各个类型创建相应的写数据源finalRuleType ruleType = nacosDataSourceProperties.getRuleType();switch(ruleType){caseFLOW:WritableDataSource<List<FlowRule>> flowRuleWriter =newNacosWritableDataSource<>(nacosDataSourceProperties,JSON::toJSONString);WritableDataSourceRegistry.registerFlowDataSource(flowRuleWriter);break;caseDEGRADE:WritableDataSource<List<DegradeRule>> degradeRuleWriter =newNacosWritableDataSource<>(nacosDataSourceProperties,JSON::toJSONString);WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWriter);break;casePARAM_FLOW:WritableDataSource<List<ParamFlowRule>> paramFlowRuleWriter =newNacosWritableDataSource<>(nacosDataSourceProperties,JSON::toJSONString);ModifyParamFlowRulesCommandHandler.setWritableDataSource(paramFlowRuleWriter);break;caseSYSTEM:WritableDataSource<List<SystemRule>> systemRuleWriter =newNacosWritableDataSource<>(nacosDataSourceProperties,JSON::toJSONString);WritableDataSourceRegistry.registerSystemDataSource(systemRuleWriter);break;caseAUTHORITY:WritableDataSource<List<AuthorityRule>> authRuleWriter =newNacosWritableDataSource<>(nacosDataSourceProperties,JSON::toJSONString);WritableDataSourceRegistry.registerAuthorityDataSource(authRuleWriter);break;default:break;}}}
importcom.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties;importcom.alibaba.csp.sentinel.datasource.Converter;importcom.alibaba.csp.sentinel.datasource.WritableDataSource;importcom.alibaba.nacos.api.NacosFactory;importcom.alibaba.nacos.api.PropertyKeyConst;importcom.alibaba.nacos.api.config.ConfigService;importcom.alibaba.nacos.api.exception.NacosException;importlombok.extern.slf4j.Slf4j;importorg.springframework.util.StringUtils;importjava.util.Properties;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;/**
* 将sentinel规则写入到nacos配置中心
* @param <T>
*/@Slf4jpublicclassNacosWritableDataSource<T>implementsWritableDataSource<T>{privatefinalConverter<T,String> configEncoder;privatefinalNacosDataSourceProperties nacosDataSourceProperties;privatefinalLock lock =newReentrantLock(true);privateConfigService configService =null;publicNacosWritableDataSource(NacosDataSourceProperties nacosDataSourceProperties,Converter<T,String> configEncoder){if(configEncoder ==null){thrownewIllegalArgumentException("Config encoder cannot be null");}if(nacosDataSourceProperties ==null){thrownewIllegalArgumentException("Config nacosDataSourceProperties cannot be null");}this.configEncoder = configEncoder;this.nacosDataSourceProperties = nacosDataSourceProperties;finalProperties properties =buildProperties(nacosDataSourceProperties);try{// 也可以直接注入NacosDataSource,然后反射获取其configService属性this.configService =NacosFactory.createConfigService(properties);}catch(NacosException e){
log.error("create configService failed.", e);}}privatePropertiesbuildProperties(NacosDataSourceProperties nacosDataSourceProperties){Properties properties =newProperties();if(!StringUtils.isEmpty(nacosDataSourceProperties.getServerAddr())){
properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDataSourceProperties.getServerAddr());}else{
properties.setProperty(PropertyKeyConst.ACCESS_KEY, nacosDataSourceProperties.getAccessKey());
properties.setProperty(PropertyKeyConst.SECRET_KEY, nacosDataSourceProperties.getSecretKey());
properties.setProperty(PropertyKeyConst.ENDPOINT, nacosDataSourceProperties.getEndpoint());}if(!StringUtils.isEmpty(nacosDataSourceProperties.getNamespace())){
properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDataSourceProperties.getNamespace());}if(!StringUtils.isEmpty(nacosDataSourceProperties.getUsername())){
properties.setProperty(PropertyKeyConst.USERNAME, nacosDataSourceProperties.getUsername());}if(!StringUtils.isEmpty(nacosDataSourceProperties.getPassword())){
properties.setProperty(PropertyKeyConst.PASSWORD, nacosDataSourceProperties.getPassword());}return properties;}@Overridepublicvoidwrite(T value)throwsException{
lock.lock();// todo handle cluster concurrent problemtry{String convertResult = configEncoder.convert(value);if(configService ==null){
log.error("configServer is null, can not continue.");return;}// 规则配置数据推送到nacos配置中心finalboolean published = configService.publishConfig(nacosDataSourceProperties.getDataId(), nacosDataSourceProperties.getGroupId(), convertResult);if(!published){
log.error("sentinel {} publish to nacos failed.", nacosDataSourceProperties.getRuleType());}}finally{
lock.unlock();}}@Overridepublicvoidclose()throwsException{}}
启动微服务进行测试。
dashboard中为某个接口定义一个流控规则
调用接口测试,发送三次请求
查看Nacos中的配置文件,就会发现也成功写入了
源码分析
读数据源
引入读数据源的依赖,我们来看看具体是怎么实现的
<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId></dependency>
实现思路:
- 和文件的读数据源一样,继承了
AbstractDataSource
类,这样就不需要我们再去写一遍加载配置、更新内存中的配置
在源码中的这个扩展包下面,就有nacos读数据源的实现
我们先看看
NacosDataSource
类的父类的代码
- 创建一个DynamicSentinelProperty对象,主要作用是更新内存中的规则配置
- 加载配置、解析配置
publicabstractclassAbstractDataSource<S,T>implementsReadableDataSource<S,T>{protectedfinalConverter<S,T> parser;protectedfinalSentinelProperty<T> property;publicAbstractDataSource(Converter<S,T> parser){if(parser ==null){thrownewIllegalArgumentException("parser can't be null");}// 子类传过来的解析器this.parser = parser;// 更新内存中的配置// 我们会经常看见 getProperty().updateValue(newValue); 这样的代码this.property =newDynamicSentinelProperty<T>();}@OverridepublicTloadConfig()throwsException{// 调用子类的readSource()方法,一般会得到一个String,// 在通过解析器Converter 并解析配置转换成对应的对象returnloadConfig(readSource());}publicTloadConfig(S conf)throwsException{// 解析配置T value = parser.convert(conf);return value;}@OverridepublicSentinelProperty<T>getProperty(){return property;}}
读配置源的具体实现:
- 通过Nacos的serverAddr构建一个Properties对象,该对象会用于初始化ConfigService接口的对象
- 利用线程池中唯一一个线程,创建一个监听器,监听dataId,当配置中心的配置更改后就会调用微服务客户端,微服务客户端这边有一个while+阻塞队列实现的轮询机制,它调用监听器的方法,监听器里面会更新内存中的规则配置
- 初始化configService对象,并通过configService.addListener(…)为指定的dataId添加监听器
- 微服务刚启动会调用父类的loadConfig()方法,父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析;再更新内存中的规则配置
publicclassNacosDataSource<T>extendsAbstractDataSource<String,T>{privatestaticfinalintDEFAULT_TIMEOUT=3000;// 创建一个只有一个线程的线程池,用来执行dataId的监听器privatefinalExecutorService pool =newThreadPoolExecutor(...);privatefinalListener configListener;privatefinalString groupId;privatefinalString dataId;privatefinalProperties properties;privateConfigService configService =null;publicNacosDataSource(finalString serverAddr,finalString groupId,finalString dataId,Converter<String,T> parser){this(NacosDataSource.buildProperties(serverAddr), groupId, dataId, parser);}publicNacosDataSource(finalProperties properties,finalString groupId,finalString dataId,Converter<String,T> parser){super(parser);if(StringUtil.isBlank(groupId)||StringUtil.isBlank(dataId)){thrownewIllegalArgumentException(...);}AssertUtil.notNull(properties,"Nacos properties must not be null, you could put some keys from PropertyKeyConst");this.groupId = groupId;this.dataId = dataId;this.properties = properties;// 创建一个监听器this.configListener =newListener(){@OverridepublicExecutorgetExecutor(){return pool;}@OverridepublicvoidreceiveConfigInfo(finalString configInfo){RecordLog.info(...);// 通过转换器进行转换T newValue =NacosDataSource.this.parser.convert(configInfo);// 调用父类的SentinelProperty对象,更新内存中的规则配置getProperty().updateValue(newValue);}};// 初始化configService对象,并通过configService.addListener(..)为指定的dataId添加监听器initNacosListener();// 微服务刚启动,会从Nacos配置中心加载一次配置loadInitialConfig();}privatevoidloadInitialConfig(){try{// 调用父类的loadConfig() 父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析T newValue =loadConfig();if(newValue ==null){RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source");}// 调用父类的SentinelProperty对象,更新内存中的规则配置getProperty().updateValue(newValue);}catch(Exception ex){RecordLog.warn("[NacosDataSource] Error when loading initial config", ex);}}privatevoidinitNacosListener(){try{// 初始化configService对象this.configService =NacosFactory.createConfigService(this.properties);// Add config listener.// 通过configService.addListener(..)为指定的dataId添加监听器
configService.addListener(dataId, groupId, configListener);}catch(Exception e){RecordLog.warn("[NacosDataSource] Error occurred when initializing Nacos data source", e);
e.printStackTrace();}}@OverridepublicStringreadSource()throwsException{if(configService ==null){thrownewIllegalStateException("Nacos config service has not been initialized or error occurred");}// 通过ConfigService接口中的getConfig()方法,从Nacos配置中心获取配置return configService.getConfig(dataId, groupId,DEFAULT_TIMEOUT);}@Overridepublicvoidclose(){if(configService !=null){
configService.removeListener(dataId, groupId, configListener);}
pool.shutdownNow();}privatestaticPropertiesbuildProperties(String serverAddr){// 构建一个Properties对象,该对象会在初始化ConfigService时会用上Properties properties =newProperties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr);return properties;}}
写数据源的实现
写数据源源码实现流程相对简单。我们知道dashboard更新配置后调用微服务端,微服务这边的
ModifyRulesCommandHandler
类会处理规则更改的请求。这里会有一个写数据源相关的操作
// 注意name = "setRules",这就是控制台请求服务端的url路径@CommandMapping(name ="setRules", desc ="modify the rules, accept param: type={ruleType}&data={ruleJson}")publicclassModifyRulesCommandHandlerimplementsCommandHandler<String>{publicCommandResponse<String>handle(CommandRequest request){//......// 处理流控规则if(FLOW_RULE_TYPE.equalsIgnoreCase(type)){List<FlowRule> flowRules =JSONArray.parseArray(data,FlowRule.class);FlowRuleManager.loadRules(flowRules);// 关键一步,这里会有一个写数据源的操作。默认情况下是没有WritableDataSource,我们可以在这里进行扩展if(!writeToDataSource(getFlowDataSource(), flowRules)){
result =WRITE_DS_FAILURE_MSG;}returnCommandResponse.ofSuccess(result);// 处理权限规则}elseif(AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)){...// 处理熔断规则}elseif(DEGRADE_RULE_TYPE.equalsIgnoreCase(type)){...// 处理系统规则}elseif(SYSTEM_RULE_TYPE.equalsIgnoreCase(type)){...}returnCommandResponse.ofFailure(newIllegalArgumentException("invalid type"));}}
所以我们要做的事情就是创建一个写数据源,并进行注册写数据源
WritableDataSourceRegistry
。我们先来看看源码中的Demo,通过读写文件的方式实现的读写数据源。
publicvoidinit()throwsException{// 文件保存路径String flowRuleDir =System.getProperty("user.home")+File.separator +"sentinel"+File.separator +"rules";String flowRuleFile ="flowRule.json";String flowRulePath = flowRuleDir +File.separator + flowRuleFile;// 添加读数据源ReadableDataSource<String,List<FlowRule>> ds =newFileRefreshableDataSource<>(
flowRulePath, source ->JSON.parseObject(source,newTypeReference<List<FlowRule>>(){}));FlowRuleManager.register2Property(ds.getProperty());// 添加写数据源WritableDataSource<List<FlowRule>> wds =newFileWritableDataSource<>(flowRulePath,this::encodeJson);WritableDataSourceRegistry.registerFlowDataSource(wds);}
我在定义一个往Nacos的写数据源,一个简单的实现,具体项目中能用的请参考上面 <具体实现>一章 。这里只是用更少的代码来理解nacos的写数据源
importcom.alibaba.csp.sentinel.datasource.WritableDataSource;importcom.alibaba.csp.sentinel.init.InitFunc;importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule;importcom.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;importcom.alibaba.fastjson.JSON;importjava.util.List;publicclassNacosDataSourceInitFuncimplementsInitFunc{@Overridepublicvoidinit()throwsException{//流控规则WritableDataSource<List<FlowRule>> writableDataSource =newNacosWritableDataSource<>("127.0.0.1:8848","DEFAULT_GROUP","mall-user-sentinel-rule-push-demo-flow",JSON::toJSONString);WritableDataSourceRegistry.registerFlowDataSource(writableDataSource);}}
importcom.alibaba.csp.sentinel.datasource.Converter;importcom.alibaba.csp.sentinel.datasource.WritableDataSource;importcom.alibaba.nacos.api.NacosFactory;importcom.alibaba.nacos.api.PropertyKeyConst;importcom.alibaba.nacos.api.config.ConfigService;importcom.alibaba.nacos.api.config.ConfigType;importcom.alibaba.nacos.api.exception.NacosException;importjava.util.Properties;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;publicclassNacosWritableDataSource<T>implementsWritableDataSource<T>{privatefinalString serverAddr;privatefinalString groupId;privatefinalString dataId;privatefinalProperties properties;privateConfigService configService;privatefinalConverter<T,String> configEncoder;privatefinalLock lock =newReentrantLock(true);publicNacosWritableDataSource(String serverAddr,String groupId,String dataId,Converter<T,String> configEncoder){this.serverAddr = serverAddr;this.groupId = groupId;this.dataId = dataId;// 通过serverAddr构建一个properties对象this.properties =NacosWritableDataSource.buildProperties(serverAddr);this.configEncoder = configEncoder;initConfigService();}privatevoidinitConfigService(){try{// 通过properties对象初始化ConfigServicethis.configService =NacosFactory.createConfigService(properties);}catch(NacosException e){
e.printStackTrace();}}privatestaticPropertiesbuildProperties(String serverAddr){Properties properties =newProperties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr);return properties;}@Overridepublicvoidwrite(T t)throwsException{
lock.lock();try{// 通过ConfigService往Nacos配置中心写入数据
configService.publishConfig(dataId, groupId,this.configEncoder.convert(t),ConfigType.JSON.getType());}finally{
lock.unlock();}}@Overridepublicvoidclose()throwsException{}}
微服务端解析读数据源流程
我们引入了下面的依赖
<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId></dependency>
并在配置文件中指定了多个读数据源。这些数据源是如何创建的嘞?
server:port:8806spring:application:name: mall-user-sentinel-rule-push #微服务名称#配置nacos注册中心地址cloud:nacos:discovery:server-addr: 127.0.0.1:8848sentinel:transport:# 添加sentinel的控制台地址dashboard: 127.0.0.1:8080datasource:# 名称自定义,可以随便定义字符串# 每一个都是一个读数据源flow-rules:nacos:server-addr: 127.0.0.1:8848# dataId取了微服务名字,后面再拼接字符串dataId: ${spring.application.name}-flow-rules
# 我这里在Nacos配置中心,单独使用了一个组groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: flow
# 读数据源degrade-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-degrade-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: degrade
param-flow-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-param-flow-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: param-flow
authority-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-authority-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: authority
system-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-system-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: system
源码的入口是
SentinelDataSourceHandler
类,它实现了
SmartInitializingSingleton
接口,这是Spring中的接口,所有非懒加载单例bean创建完成之后会调用这个接口的实现类:
- 在构造函数中依赖注入SentinelProperties对象,该对象中保存了我们配置文件中所有读数据源的配置
- 遍历SentinelProperties对象中的读数据源,并为每一个读数据源生成一个beanName
- 为每一个读数据源对象 + beanName 创建一个BeanDefinition
- 将BeanDefinition添加进BeanFactory中
- BeanFactory.getBean(beanName) 创建读数据源对象。该对象其实是FactoryBean类型的
- 上方的getBean()方法最终会调用至NacosDataSourceFactoryBean.getObject()方法,在这里创建NacosDataSource对象。该对象就是上方引入maven依赖中的读数据源对象。
publicclassSentinelDataSourceHandlerimplementsSmartInitializingSingleton{//......// SentinelProperties中保存着Map<String, DataSourcePropertiesConfiguration> datasource// 也就是我们上方yml文件中定义的多个数据源,我们自定义的名字就是StringprivatefinalSentinelProperties sentinelProperties;// 构造方法中进行依赖注入 sentinelProperties对象publicSentinelDataSourceHandler(DefaultListableBeanFactory beanFactory,SentinelProperties sentinelProperties,...){//...this.sentinelProperties = sentinelProperties;}// 遍历Map<String, DataSourcePropertiesConfiguration>集合,最终取出我们的每一个配置的数据源@OverridepublicvoidafterSingletonsInstantiated(){
sentinelProperties.getDatasource().forEach((dataSourceName, dataSourceProperties)->{try{List<String> validFields = dataSourceProperties.getValidField();// ...// AbstractDataSourceProperties就是我们在配置文件中具体的每一个配置对象的公共父类AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties
.getValidDataSourceProperties();
abstractDataSourceProperties.setEnv(env);
abstractDataSourceProperties.preCheck(dataSourceName);// 把我们配置的每一个数据源,还有这里字符串凭借的一个beanName。调用下面的registerBean()方法// beanName为 flow-rules + "-sentinel-" + nacos + "-datasource"// flow-rules是我们在yml文件中自定义的名字,nacos就是下面的validFields.get(0)值registerBean(abstractDataSourceProperties, dataSourceName+"-sentinel-"+ validFields.get(0)+"-datasource");}catch(Exception e){
log.error(...);}});}privatevoidregisterBean(finalAbstractDataSourceProperties dataSourceProperties,String dataSourceName){// 对我们的数据源生成一个BeanDefinitionBeanDefinitionBuilder builder =parseBeanDefinition(dataSourceProperties,dataSourceName);// 将BeanDefinition添加进BeanFactory中this.beanFactory.registerBeanDefinition(dataSourceName,builder.getBeanDefinition());// 通过beanFactory.getBean(dataSourceName)方法,创建bean对象// 我们配置文件中定义的每一个读数据源就变为了一个一个的bean// 注意,我们的读数据源它是一个FactoryBean,这里的getBean()方法最终会去到NacosDataSourceFactoryBean.getObject()AbstractDataSource newDataSource =(AbstractDataSource)this.beanFactory.getBean(dataSourceName);// 将读数据源添加进对应的规则管理器中
dataSourceProperties.postRegister(newDataSource);}
publicclassNacosDataSourceFactoryBeanimplementsFactoryBean<NacosDataSource>{//......@OverridepublicNacosDataSourcegetObject()throwsException{// 为properties对象赋值Properties properties =newProperties();if(!StringUtils.isEmpty(this.serverAddr)){
properties.setProperty(PropertyKeyConst.SERVER_ADDR,this.serverAddr);}else{
properties.setProperty(PropertyKeyConst.ENDPOINT,this.endpoint);}if(!StringUtils.isEmpty(this.contextPath)){
properties.setProperty(PropertyKeyConst.CONTEXT_PATH,this.contextPath);}if(!StringUtils.isEmpty(this.accessKey)){
properties.setProperty(PropertyKeyConst.ACCESS_KEY,this.accessKey);}if(!StringUtils.isEmpty(this.secretKey)){
properties.setProperty(PropertyKeyConst.SECRET_KEY,this.secretKey);}if(!StringUtils.isEmpty(this.namespace)){
properties.setProperty(PropertyKeyConst.NAMESPACE,this.namespace);}if(!StringUtils.isEmpty(this.username)){
properties.setProperty(PropertyKeyConst.USERNAME,this.username);}if(!StringUtils.isEmpty(this.password)){
properties.setProperty(PropertyKeyConst.PASSWORD,this.password);}// 创建一个Nacos读数据源对象,这里也就是上方:<源码分析> —— <读数据源> 的那一个对象returnnewNacosDataSource(properties, groupId, dataId, converter);}// ......}
修改源码的实现
我们需要在Sentinel源码中进行修改,将dashboard和微服务之间的通信,改为dashboard和nacos的通信。在通过Nacos配置中心的推送机制去更新微服务内存中的规则配置。
从 Sentinel 1.4.0 开始,Sentinel 控制台提供
DynamicRulePublisher
和
DynamicRuleProvider
接口用于实现应用维度的规则推送和拉取:
- DynamicRuleProvider: 拉取规则
- DynamicRulePublisher: 推送规则
在dashboard工程下的com.alibaba.csp.sentinel.dashboard.rule包下创建nacos包,然后把各种场景的配置规则拉取和推送的实现类写到此包下
可以参考Sentinel Dashboard test包下的流控规则拉取和推送的实现逻辑
官方demo
我们看看官方的demo是如何实现的
首先创建一个
NacosConfigUtil
类,用来定义常量
publicfinalclassNacosConfigUtil{// 其实demo中也就用到了上面两个常量// 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置publicstaticfinalStringGROUP_ID="SENTINEL_GROUP";// 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样// 避免你写一个dataId,微服务从另一个dataId去读publicstaticfinalStringFLOW_DATA_ID_POSTFIX="-flow-rules";publicstaticfinalStringPARAM_FLOW_DATA_ID_POSTFIX="-param-rules";publicstaticfinalStringCLUSTER_MAP_DATA_ID_POSTFIX="-cluster-map";publicstaticfinalStringCLIENT_CONFIG_DATA_ID_POSTFIX="-cc-config";publicstaticfinalStringSERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX="-cs-transport-config";publicstaticfinalStringSERVER_FLOW_CONFIG_DATA_ID_POSTFIX="-cs-flow-config";publicstaticfinalStringSERVER_NAMESPACE_SET_DATA_ID_POSTFIX="-cs-namespace-set";privateNacosConfigUtil(){}}
创建一个
NacosConfig
配置类,这里就定义了流控规则相关的转换器
@ConfigurationpublicclassNacosConfig{// 流控规则相关 定义 List<FlowRuleEntity> 到 String的转换器@BeanpublicConverter<List<FlowRuleEntity>,String>flowRuleEntityEncoder(){returnJSON::toJSONString;}// 流控规则相关 定义 String 到 List<FlowRuleEntity>的转换器@BeanpublicConverter<String,List<FlowRuleEntity>>flowRuleEntityDecoder(){return s ->JSON.parseArray(s,FlowRuleEntity.class);}// 根据一个Nacos的serverAddr,创建ConfigService对象。推送配置/拉取配置都是通过该对象来完成的@BeanpublicConfigServicenacosConfigService()throwsException{returnConfigFactory.createConfigService("localhost");}}
接下来我们来看看dashboard推送规则配置的实现代码,它实现了
DynamicRulePublisher
接口
@Component("flowRuleNacosPublisher")publicclassFlowRuleNacosPublisherimplementsDynamicRulePublisher<List<FlowRuleEntity>>{// 注入上面配置类的中定义的ConfigService和Converter转换器@AutowiredprivateConfigService configService;@AutowiredprivateConverter<List<FlowRuleEntity>,String> converter;@Overridepublicvoidpublish(String app,List<FlowRuleEntity> rules)throwsException{AssertUtil.notEmpty(app,"app name cannot be empty");if(rules ==null){return;}// 调用Nacos的configService.publishConfig(..)方法 推送配置// dataId为 appName + 最上方的常量文件后缀-flow-rules , 分组为最上方定义的常量SENTINEL_GROUP , 并对规则配置集合转换为json字符串
configService.publishConfig(app +NacosConfigUtil.FLOW_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID, converter.convert(rules));}}
接下来我们来看看dashboard拉取规则配置的实现代码,它实现了
DynamicRuleProvider
接口
@Component("flowRuleNacosProvider")publicclassFlowRuleNacosProviderimplementsDynamicRuleProvider<List<FlowRuleEntity>>{// 注入上面配置类的中定义的ConfigService和Converter转换器@AutowiredprivateConfigService configService;@AutowiredprivateConverter<String,List<FlowRuleEntity>> converter;@OverridepublicList<FlowRuleEntity>getRules(String appName)throwsException{// 调用Nacos的configService.getConfig(dataId, group, timeoutMs)方法 拉取配置String rules = configService.getConfig(appName +NacosConfigUtil.FLOW_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,3000);if(StringUtil.isEmpty(rules)){returnnewArrayList<>();}// 将json字符串转换为 List<FlowRuleEntity> 规则实体对象集合return converter.convert(rules);}}
官方Demo这种方式功能上的确是实现了与Nacos通信,对Nacos配置中心进行读写。但存在一个小问题。那就是dashboard这边规则实体对象是
FlowRuleEntity
,但是微服务端规则实体对象是
FlowRule
。Nacos把配置推送给微服务端时,微服务端把json字符串转换为实体对象时可能就会出现不匹配的情况 —> 微服务规则实体对象没有相应的值 ----> 内存中的规则也就不完善 ----> 出现了dashboard端更新的规则微服务端未生效情况。
当然,流控规则都还好,如下图所示,这两个之间的实体对象成员属性基本上都能对应上
但热点规则这边的实体就不行了,他们之间的层级关系就不同了
publicclassParamFlowRuleEntityextendsAbstractRuleEntity<ParamFlowRule>{publicParamFlowRuleEntity(){}// ParamFlowRule为客户端的规则实体,但是这里将一整个实体对象变为了ParamFlowRuleEntity的其中一个属性// 所以这里转json之后的层级关系就发生了改变publicParamFlowRuleEntity(ParamFlowRule rule){AssertUtil.notNull(rule,"Authority rule should not be null");// 父类中的属性this.rule = rule;}...}// 父类publicabstractclassAbstractRuleEntity<TextendsAbstractRule>implementsRuleEntity{protectedLong id;protectedString app;protectedString ip;protectedInteger port;// ParamFlowRule为客户端的规则实体,成为了ParamFlowRuleEntity实体的一个成员属性protectedT rule;privateDate gmtCreate;privateDate gmtModified;...}
为了解决这种情况,那么就需要定义一个规范,存入Nacos配置中心的数据只能是微服务那边的规则实体对象,不能是dashboard这边的规则实体对象
修改源码实现
naocs配置中心保存的是微服务端的规则实体对象
各个规则都先在dashboard端将规则实体转换为微服务能用的规则实体在推送至Nacos配置中心
从Nacos配置中心获取配置后,都先将json字符串转换为dashboard端的规则实体对象
项目结构如下
配置类
importcom.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.ApiDefinitionEntity;importcom.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.GatewayFlowRuleEntity;importcom.alibaba.csp.sentinel.dashboard.datasource.entity.rule.RuleEntity;importcom.alibaba.csp.sentinel.util.StringUtil;importcom.alibaba.fastjson.JSON;importcom.alibaba.nacos.api.config.ConfigService;importcom.alibaba.nacos.api.exception.NacosException;importjava.util.ArrayList;importjava.util.List;importjava.util.stream.Collectors;publicfinalclassNacosConfigUtil{// 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置publicstaticfinalStringGROUP_ID="SENTINEL_GROUP";// 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样// 避免你写一个dataId,微服务从另一个dataId去读publicstaticfinalStringFLOW_DATA_ID_POSTFIX="-flow-rules";publicstaticfinalStringPARAM_FLOW_DATA_ID_POSTFIX="-param-flow-rules";publicstaticfinalStringDEGRADE_DATA_ID_POSTFIX="-degrade-rules";publicstaticfinalStringSYSTEM_DATA_ID_POSTFIX="-system-rules";publicstaticfinalStringAUTHORITY_DATA_ID_POSTFIX="-authority-rules";publicstaticfinalStringGATEWAY_FLOW_DATA_ID_POSTFIX="-gateway-flow-rules";publicstaticfinalStringGATEWAY_API_DATA_ID_POSTFIX="-gateway-api-rules";publicstaticfinalStringCLUSTER_MAP_DATA_ID_POSTFIX="-cluster-map";/**
* cc for `cluster-client`
*/publicstaticfinalStringCLIENT_CONFIG_DATA_ID_POSTFIX="-cc-config";/**
* cs for `cluster-server`
*/publicstaticfinalStringSERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX="-cs-transport-config";publicstaticfinalStringSERVER_FLOW_CONFIG_DATA_ID_POSTFIX="-cs-flow-config";publicstaticfinalStringSERVER_NAMESPACE_SET_DATA_ID_POSTFIX="-cs-namespace-set";//超时时间publicstaticfinalintREAD_TIMEOUT=3000;privateNacosConfigUtil(){}/**
* RuleEntity----->Rule
* 控制台这边的规则实体都是RuleEntity类型的,这里就调用各个规则实体对象中的toRule()方法,转换为微服务端的规则实体对象
* 例如 FlowRuleEntity#toRule ----> FlowRule ParamFlowRuleEntity#toRule ----> ParamFlowRule
* @param entities
* @return
*/publicstaticStringconvertToRule(List<?extendsRuleEntity> entities){returnJSON.toJSONString(
entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));}/**
* ApiDefinitionEntity----->ApiDefinition
* @param entities
* @return
*/publicstaticStringconvertToApiDefinition(List<?extendsApiDefinitionEntity> entities){returnJSON.toJSONString(
entities.stream().map(r -> r.toApiDefinition()).collect(Collectors.toList()));}/**
* GatewayFlowRuleEntity----->GatewayFlowRule
* @param entities
* @return
*/publicstaticStringconvertToGatewayFlowRule(List<?extendsGatewayFlowRuleEntity> entities){returnJSON.toJSONString(
entities.stream().map(r -> r.toGatewayFlowRule()).collect(Collectors.toList()));}}
通过Nacos配置中心的地址,创建对应的ConfigService对象,并存入Spring容器中
@ConfigurationpublicclassNacosConfig{@Value("${sentinel.nacos.config.serverAddr}")privateString serverAddr="localhost:8848";@BeanpublicConfigServicenacosConfigService()throwsException{returnConfigFactory.createConfigService(serverAddr);}/*
对于Nacos开启了认证,那么就需要添加Naocs的用户名和密码了
@Bean
public ConfigService nacosConfigService() throws Exception {
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
properties.put(PropertyKeyConst.USERNAME, "nacos");
properties.put(PropertyKeyConst.PASSWORD, "nacos");
return ConfigFactory.createConfigService(properties);
}*/}
flow
拉取配置,实现
DynamicRuleProvider
接口
@Component("flowRuleNacosProvider")publicclassFlowRuleNacosProviderimplementsDynamicRuleProvider<List<FlowRuleEntity>>{// 注入我们上面创建的ConfigService对象@AutowiredprivateConfigService configService;@OverridepublicList<FlowRuleEntity>getRules(String appName,String ip,Integer port)throwsNacosException{// 从Nacos配置中心拉取配置String rules = configService.getConfig(appName +NacosConfigUtil.FLOW_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.READ_TIMEOUT);if(StringUtil.isEmpty(rules)){returnnewArrayList<>();}// 解析json获取到 List<FlowRule>List<FlowRule> list =JSON.parseArray(rules,FlowRule.class);// 通过FlowRuleEntity.fromFlowRule(..) 方法实现 FlowRule------->FlowRuleEntityreturn list.stream().map(rule ->FlowRuleEntity.fromFlowRule(appName,ip,port,rule)).collect(Collectors.toList());}}/*
FlowRuleEntity.fromFlowRule(..) 方法如下所示,Sentinel的dashboard端的规则实体对象内其实都自己写了对应的fromFlowRule()方法
public static FlowRuleEntity fromFlowRule(String app, String ip, Integer port, FlowRule rule) {
FlowRuleEntity entity = new FlowRuleEntity();
entity.setApp(app);
entity.setIp(ip);
entity.setPort(port);
entity.setLimitApp(rule.getLimitApp());
entity.setResource(rule.getResource());
entity.setGrade(rule.getGrade());
entity.setCount(rule.getCount());
entity.setStrategy(rule.getStrategy());
entity.setRefResource(rule.getRefResource());
entity.setControlBehavior(rule.getControlBehavior());
entity.setWarmUpPeriodSec(rule.getWarmUpPeriodSec());
entity.setMaxQueueingTimeMs(rule.getMaxQueueingTimeMs());
entity.setClusterMode(rule.isClusterMode());
entity.setClusterConfig(rule.getClusterConfig());
return entity;
}
*/
推送配置,实现
DynamicRulePublisher
接口
@Component("flowRuleNacosPublisher")publicclassFlowRuleNacosPublisherimplementsDynamicRulePublisher<List<FlowRuleEntity>>{// 注入我们上面创建的ConfigService对象@AutowiredprivateConfigService configService;@Overridepublicvoidpublish(String app,List<FlowRuleEntity> rules)throwsException{AssertUtil.notEmpty(app,"app name cannot be empty");if(rules ==null){return;}//发布配置到Nacos配置中心,这里会调用我们工具类中编写的方法NacosConfigUtil.convertToRule(rules)
configService.publishConfig(app +NacosConfigUtil.FLOW_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.convertToRule(rules));}}
authority
@Component("authorityRuleNacosProvider")publicclassAuthorityRuleNacosProviderimplementsDynamicRuleProvider<List<AuthorityRuleEntity>>{@AutowiredprivateConfigService configService;@OverridepublicList<AuthorityRuleEntity>getRules(String appName,String ip,Integer port)throwsException{String rules = configService.getConfig(appName +NacosConfigUtil.AUTHORITY_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.READ_TIMEOUT);if(StringUtil.isEmpty(rules)){returnnewArrayList<>();}List<AuthorityRule> list =JSON.parseArray(rules,AuthorityRule.class);return list.stream().map(rule ->AuthorityRuleEntity.fromAuthorityRule(appName, ip, port, rule)).collect(Collectors.toList());}}
@Component("authorityRuleNacosPublisher")publicclassAuthorityRuleNacosPublisherimplementsDynamicRulePublisher<List<AuthorityRuleEntity>>{@AutowiredprivateConfigService configService;@Overridepublicvoidpublish(String app,List<AuthorityRuleEntity> rules)throwsException{AssertUtil.notEmpty(app,"app name cannot be empty");if(rules ==null){return;}
configService.publishConfig(app +NacosConfigUtil.AUTHORITY_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.convertToRule(rules));}}
degread
@Component("degradeRuleNacosProvider")publicclassDegradeRuleNacosProviderimplementsDynamicRuleProvider<List<DegradeRuleEntity>>{@AutowiredprivateConfigService configService;@OverridepublicList<DegradeRuleEntity>getRules(String appName,String ip,Integer port)throwsException{String rules = configService.getConfig(appName +NacosConfigUtil.DEGRADE_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.READ_TIMEOUT);if(StringUtil.isEmpty(rules)){returnnewArrayList<>();}List<DegradeRule> list =JSON.parseArray(rules,DegradeRule.class);return list.stream().map(rule ->DegradeRuleEntity.fromDegradeRule(appName, ip, port, rule)).collect(Collectors.toList());}}
@Component("degradeRuleNacosPublisher")publicclassDegradeRuleNacosPublisherimplementsDynamicRulePublisher<List<DegradeRuleEntity>>{@AutowiredprivateConfigService configService;@Overridepublicvoidpublish(String app,List<DegradeRuleEntity> rules)throwsException{AssertUtil.notEmpty(app,"app name cannot be empty");if(rules ==null){return;}
configService.publishConfig(app +NacosConfigUtil.DEGRADE_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.convertToRule(rules));}}
param
@Component("paramFlowRuleNacosProvider")publicclassParamFlowRuleNacosProviderimplementsDynamicRuleProvider<List<ParamFlowRuleEntity>>{@AutowiredprivateConfigService configService;@OverridepublicList<ParamFlowRuleEntity>getRules(String appName,String ip,Integer port)throwsException{String rules = configService.getConfig(appName +NacosConfigUtil.PARAM_FLOW_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.READ_TIMEOUT);if(StringUtil.isEmpty(rules)){returnnewArrayList<>();}List<ParamFlowRule> list =JSON.parseArray(rules,ParamFlowRule.class);return list.stream().map(rule ->ParamFlowRuleEntity.fromParamFlowRule(appName, ip, port, rule)).collect(Collectors.toList());}}
@Component("paramFlowRuleNacosPublisher")publicclassParamFlowRuleNacosPublisherimplementsDynamicRulePublisher<List<ParamFlowRuleEntity>>{@AutowiredprivateConfigService configService;@Overridepublicvoidpublish(String app,List<ParamFlowRuleEntity> rules)throwsException{AssertUtil.notEmpty(app,"app name cannot be empty");if(rules ==null){return;}
configService.publishConfig(app +NacosConfigUtil.PARAM_FLOW_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.convertToRule(rules));}}
system
@Component("systemRuleNacosProvider")publicclassSystemRuleNacosProviderimplementsDynamicRuleProvider<List<SystemRuleEntity>>{@AutowiredprivateConfigService configService;@OverridepublicList<SystemRuleEntity>getRules(String appName,String ip,Integer port)throwsException{String rules = configService.getConfig(appName +NacosConfigUtil.SYSTEM_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.READ_TIMEOUT);if(StringUtil.isEmpty(rules)){returnnewArrayList<>();}List<SystemRule> list =JSON.parseArray(rules,SystemRule.class);return list.stream().map(rule ->SystemRuleEntity.fromSystemRule(appName, ip, port, rule)).collect(Collectors.toList());}}
@Component("systemRuleNacosPublisher")publicclassSystemRuleNacosPublisherimplementsDynamicRulePublisher<List<SystemRuleEntity>>{@AutowiredprivateConfigService configService;@Overridepublicvoidpublish(String app,List<SystemRuleEntity> rules)throwsException{AssertUtil.notEmpty(app,"app name cannot be empty");if(rules ==null){return;}
configService.publishConfig(app +NacosConfigUtil.SYSTEM_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.convertToRule(rules));}}
gateway
publicclassApiDefinition2{privateString apiName;privateSet<ApiPathPredicateItem> predicateItems;publicApiDefinition2(){}publicStringgetApiName(){return apiName;}publicvoidsetApiName(String apiName){this.apiName = apiName;}publicSet<ApiPathPredicateItem>getPredicateItems(){return predicateItems;}publicvoidsetPredicateItems(Set<ApiPathPredicateItem> predicateItems){this.predicateItems = predicateItems;}@OverridepublicStringtoString(){return"ApiDefinition2{"+"apiName='"+ apiName +'\''+", predicateItems="+ predicateItems +'}';}publicApiDefinitiontoApiDefinition(){ApiDefinition apiDefinition =newApiDefinition();
apiDefinition.setApiName(apiName);Set<ApiPredicateItem> apiPredicateItems =newLinkedHashSet<>();
apiDefinition.setPredicateItems(apiPredicateItems);if(predicateItems !=null){for(ApiPathPredicateItem predicateItem : predicateItems){
apiPredicateItems.add(predicateItem);}}return apiDefinition;}}
@Component("gatewayApiRuleNacosProvider")publicclassGatewayApiRuleNacosProviderimplementsDynamicRuleProvider<List<ApiDefinitionEntity>>{@AutowiredprivateConfigService configService;@OverridepublicList<ApiDefinitionEntity>getRules(String appName,String ip,Integer port)throwsException{String rules = configService.getConfig(appName +NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.READ_TIMEOUT);if(StringUtil.isEmpty(rules)){returnnewArrayList<>();}// 注意 ApiDefinition的属性Set<ApiPredicateItem> predicateItems中元素 是接口类型,JSON解析丢失数据// 重写实体类ApiDefinition2,再转换为ApiDefinitionList<ApiDefinition2> list =JSON.parseArray(rules,ApiDefinition2.class);return list.stream().map(rule ->ApiDefinitionEntity.fromApiDefinition(appName, ip, port, rule.toApiDefinition())).collect(Collectors.toList());}publicstaticvoidmain(String[] args){String rules ="[{\"apiName\":\"/pms/productInfo/${id}\",\"predicateItems\":[{\"matchStrategy\":1,\"pattern\":\"/pms/productInfo/\"}]}]";List<ApiDefinition> list =JSON.parseArray(rules,ApiDefinition.class);System.out.println(list);List<ApiDefinition2> list2 =JSON.parseArray(rules,ApiDefinition2.class);System.out.println(list2);System.out.println(list2.get(0).toApiDefinition());}}
@Component("gatewayApiRuleNacosPublisher")publicclassGatewayApiRuleNacosPublisherimplementsDynamicRulePublisher<List<ApiDefinitionEntity>>{@AutowiredprivateConfigService configService;@Overridepublicvoidpublish(String app,List<ApiDefinitionEntity> rules)throwsException{AssertUtil.notEmpty(app,"app name cannot be empty");if(rules ==null){return;}
configService.publishConfig(app +NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.convertToApiDefinition(rules));}}
@Component("gatewayFlowRuleNacosProvider")publicclassGatewayFlowRuleNacosProviderimplementsDynamicRuleProvider<List<GatewayFlowRuleEntity>>{@AutowiredprivateConfigService configService;@OverridepublicList<GatewayFlowRuleEntity>getRules(String appName,String ip,Integer port)throwsException{String rules = configService.getConfig(appName +NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.READ_TIMEOUT);if(StringUtil.isEmpty(rules)){returnnewArrayList<>();}List<GatewayFlowRule> list =JSON.parseArray(rules,GatewayFlowRule.class);return list.stream().map(rule ->GatewayFlowRuleEntity.fromGatewayFlowRule(appName, ip, port, rule)).collect(Collectors.toList());}}
@Component("gatewayFlowRuleNacosPublisher")publicclassGatewayFlowRuleNacosPublisherimplementsDynamicRulePublisher<List<GatewayFlowRuleEntity>>{@AutowiredprivateConfigService configService;@Overridepublicvoidpublish(String app,List<GatewayFlowRuleEntity> rules)throwsException{AssertUtil.notEmpty(app,"app name cannot be empty");if(rules ==null){return;}
configService.publishConfig(app +NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,NacosConfigUtil.GROUP_ID,NacosConfigUtil.convertToGatewayFlowRule(rules));}}
修改源码
我们现在需要在controller层,将原本dashboard从微服务获取规则配置、dashboard更新规则后调用微服务,这一过程改为Nacos。
以流控规则举例,在
FlowControllerV1
层中注入我们写的类
/** 从远程配置中心拉取规则*/@Autowired@Qualifier("flowRuleNacosProvider")privateDynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;/** 推送规则到远程配置中心*/@Autowired@Qualifier("flowRuleNacosPublisher")privateDynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;
原本dashboard从微服务获取规则配置改为通过
flowRuleNacosProvider
从Nacos拉取配置
@GetMapping("/rules")@AuthAction(PrivilegeType.READ_RULE)publicResult<List<FlowRuleEntity>>apiQueryMachineRules(@RequestParamString app,@RequestParamString ip,@RequestParamInteger port){if(StringUtil.isEmpty(app)){returnResult.ofFail(-1,"app can't be null or empty");}if(StringUtil.isEmpty(ip)){returnResult.ofFail(-1,"ip can't be null or empty");}if(port ==null){returnResult.ofFail(-1,"port can't be null");}try{//从客户端内存获取规则配置//List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port);//从远程配置中心获取规则配置List<FlowRuleEntity> rules = ruleProvider.getRules(app,ip,port);if(rules !=null&&!rules.isEmpty()){for(FlowRuleEntity entity : rules){
entity.setApp(app);if(entity.getClusterConfig()!=null&& entity.getClusterConfig().getFlowId()!=null){
entity.setId(entity.getClusterConfig().getFlowId());}}}
rules = repository.saveAll(rules);returnResult.ofSuccess(rules);}catch(Throwable throwable){
logger.error("Error when querying flow rules", throwable);returnResult.ofThrowable(-1, throwable);}}
规则更改后推送至Nacos
@PostMapping("/rule")@AuthAction(PrivilegeType.WRITE_RULE)publicResult<FlowRuleEntity>apiAddFlowRule(@RequestBodyFlowRuleEntity entity){Result<FlowRuleEntity> checkResult =checkEntityInternal(entity);if(checkResult !=null){return checkResult;}
entity.setId(null);Date date =newDate();
entity.setGmtCreate(date);
entity.setGmtModified(date);
entity.setLimitApp(entity.getLimitApp().trim());
entity.setResource(entity.getResource().trim());try{// 规则写入dashboard的内存中,会写入三个map中
entity = repository.save(entity);//发布规则到客户端内存中//publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);//发布规则到远程配置中心publishRules(entity.getApp());returnResult.ofSuccess(entity);}catch(Throwable t){Throwable e = t instanceofExecutionException? t.getCause(): t;
logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);returnResult.ofFail(-1, e.getMessage());}}/**
* 发布规则到远程配置中心
* @param app
* @throws Exception
*/privatevoidpublishRules(/*@NonNull*/String app)throwsException{// 从三个Map中的其中一个获取规则实体集合List<FlowRuleEntity> rules = repository.findAllByApp(app);// 推送Nacos
rulePublisher.publish(app, rules);}
在配置文件中指定NacosConfig的地址,因为在最上方的配置类中使用到了该配置项
#接入nacos配置中心用于规则数据持久化
sentinel.nacos.config.serverAddr=localhost:8848
测试
微服务端引入Nacos的读数据源
还是需要它监听dataId的更改,并更新内存中的规则数据
<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId></dependency>
配置文件中添加相应的配置
server:port:8806spring:application:name: mall-user-sentinel-rule-push #微服务名称#配置nacos注册中心地址cloud:nacos:discovery:server-addr: 127.0.0.1:8848sentinel:transport:# 添加sentinel的控制台地址dashboard: 127.0.0.1:8080datasource:# 名称自定义,可以随便定义字符串flow-rules:nacos:server-addr: 127.0.0.1:8848# dataId取了微服务名字,后面再拼接字符串# 注意需要和配置类中常量定义的一致dataId: ${spring.application.name}-flow-rules
# 这里的组名需要和配置类中常量定义的一致groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: flow
degrade-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-degrade-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: degrade
param-flow-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-param-flow-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: param-flow
authority-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-authority-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: authority
system-rules:nacos:server-addr: 127.0.0.1:8848dataId: ${spring.application.name}-system-rules
groupId: SENTINEL_GROUP
username: nacos
password: nacos
data-type: json
rule-type: system
在Sentinel中进行了两个限流规则的配置
Naocs的配置中心也有相应的更改
微服务中也会生效
补充
如果在工作中sentinel的持久化这一块已经被其他项目组的人完成了,但他们是直接把dashboard端的规则实体转json,存入了Nacos配置中心。进而导致了热点参数规则不生效,并且不允许我们修改源码。
当出现了上面这种情况,那我们应该怎么处理嘞?
解决方案:
自定义一个解析热点规则配置的解析器FlowParamJsonConverter,继承JsonConverter,重写convert方法。
利用BeanPostProcessor机制替换beanName为
param-flow-rules-sentinel-nacos-datasource
的
converter
属性,注入
FlowParamJsonConverter
。
@ConfigurationpublicclassConverterConfig{@Bean("sentinel-json-param-flow-converter2")@PrimarypublicJsonConverterjsonParamFlowConverter(){returnnewFlowParamJsonConverter(newObjectMapper(),ParamFlowRule.class);}}@ComponentpublicclassFlowParamConverterBeanPostProcessorimplementsBeanPostProcessor{@AutowiredprivateJsonConverter jsonParamFlowConverter;@OverridepublicObjectpostProcessBeforeInitialization(Object bean,String beanName)throwsBeansException{if(beanName.equals("param-flow-rules-sentinel-nacos-datasource")){NacosDataSourceFactoryBean nacosDataSourceFactoryBean =(NacosDataSourceFactoryBean) bean;
nacosDataSourceFactoryBean.setConverter(jsonParamFlowConverter);return bean;}return bean;}}publicclassFlowParamJsonConverterextendsJsonConverter{Class ruleClass;publicFlowParamJsonConverter(ObjectMapper objectMapper,Class ruleClass){super(objectMapper, ruleClass);this.ruleClass = ruleClass;}@OverridepublicCollection<Object>convert(String source){List<Object> list =newArrayList<>();JSONArray jsonArray =JSON.parseArray(source);for(int i =0; i < jsonArray.size(); i++){//解析rule属性JSONObject jsonObject =(JSONObject) jsonArray.getJSONObject(i).get("rule");Object object =JSON.toJavaObject(jsonObject, ruleClass);
list.add(object);}return list;}}
版权归原作者 胡尚 所有, 如有侵权,请联系我们删除。