前言
@FeignClient在微服务开发中经常用到,它是服务间数据交互的桥梁,用法很简单,如下
@FeignClient(contextId ="TestClient", value ="TestService")publicinterfaceTestClient{@GetMapping({"test/list"})List<String>list(@RequestParam("name")String name);}@SpringBootApplication@EnableFeignClients({"com.test.client"})publicclassTestApplication{@AutowiredprivatestaticTestClient testClient;publicstaticvoidmain(String[] args){SpringApplication.run(TestApplication.class, args);
testClient.list("czl");}}
上面我们只是定义了一个接口,然后就可以拿到实例,并且调用其他服务的接口了,如此简单的背后是什么原理,下面我先提出三个问题,再一个个问题解析.
- 接口如何变成实例
- 如何获取其他服务地址和如何发送请求
- 作为微服务的桥梁,其他第三方框架怎么切入Feign的,例如Sleuth和Seata框架
接口如何变成实例
Springboot应用启动后,在初始化容器的时候会执行所有BeanDefinitionRegistryPostProcessor.postProcessBeanDefinitionRegistry方法,如下
finalclassPostProcessorRegistrationDelegate{privatestaticvoidinvokeBeanDefinitionRegistryPostProcessors(Collection<?extendsBeanDefinitionRegistryPostProcessor> postProcessors,BeanDefinitionRegistry registry){for(BeanDefinitionRegistryPostProcessor postProcessor : postProcessors){
postProcessor.postProcessBeanDefinitionRegistry(registry);}}}
ConfigurationClassPostProcessor是Springboot自带的类,在执行processConfigBeanDefinitions方法时,会扫描所有带@Component的配置类,并封装成BeanDefinition,如
publicclassConfigurationClassPostProcessorimplementsBeanDefinitionRegistryPostProcessor,PriorityOrdered,ResourceLoaderAware,BeanClassLoaderAware,EnvironmentAware{publicvoidprocessConfigBeanDefinitions(BeanDefinitionRegistry registry){// Parse each @Configuration classConfigurationClassParser parser =newConfigurationClassParser(this.metadataReaderFactory,this.problemReporter,this.environment,this.resourceLoader,this.componentScanBeanNameGenerator, registry);Set<BeanDefinitionHolder> candidates =newLinkedHashSet<>(configCandidates);Set<ConfigurationClass> alreadyParsed =newHashSet<>(configCandidates.size());do{
parser.parse(candidates);
parser.validate();Set<ConfigurationClass> configClasses =newLinkedHashSet<>(parser.getConfigurationClasses());
configClasses.removeAll(alreadyParsed);// Read the model and create bean definitions based on its contentif(this.reader ==null){this.reader =newConfigurationClassBeanDefinitionReader(
registry,this.sourceExtractor,this.resourceLoader,this.environment,this.importBeanNameGenerator, parser.getImportRegistry());}this.reader.loadBeanDefinitions(configClasses);
alreadyParsed.addAll(configClasses);}while(!candidates.isEmpty());}}
里面使用ConfigurationClassBeanDefinitionReader加载BeanDefinition,并且执行BeanDefinition上的ImportBeanDefinitionRegistrar的registerBeanDefinitions方法,如
classConfigurationClassBeanDefinitionReader{privatevoidloadBeanDefinitionsForConfigurationClass(ConfigurationClass configClass,TrackedConditionEvaluator trackedConditionEvaluator){if(trackedConditionEvaluator.shouldSkip(configClass)){String beanName = configClass.getBeanName();if(StringUtils.hasLength(beanName)&&this.registry.containsBeanDefinition(beanName)){this.registry.removeBeanDefinition(beanName);}this.importRegistry.removeImportingClass(configClass.getMetadata().getClassName());return;}if(configClass.isImported()){registerBeanDefinitionForImportedConfigurationClass(configClass);}for(BeanMethod beanMethod : configClass.getBeanMethods()){loadBeanDefinitionsForBeanMethod(beanMethod);}loadBeanDefinitionsFromImportedResources(configClass.getImportedResources());loadBeanDefinitionsFromRegistrars(configClass.getImportBeanDefinitionRegistrars());}privatevoidloadBeanDefinitionsFromRegistrars(Map<ImportBeanDefinitionRegistrar,AnnotationMetadata> registrars){
registrars.forEach((registrar, metadata)->
registrar.registerBeanDefinitions(metadata,this.registry,this.importBeanNameGenerator));}}
而注解EnableFeignClients,导入了继承ImportBeanDefinitionRegistrar的FeignClientsRegistrar,如
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)@Documented@Import(FeignClientsRegistrar.class)public@interfaceEnableFeignClients{}
而FeignClientsRegistrar主要是负责扫描所有带注解@FeignClient的类,并且注册成BeanDefinition,如
classFeignClientsRegistrarimplementsImportBeanDefinitionRegistrar,ResourceLoaderAware,EnvironmentAware{privatevoidregisterFeignClient(BeanDefinitionRegistry registry,AnnotationMetadata annotationMetadata,Map<String,Object> attributes){String className = annotationMetadata.getClassName();Class clazz =ClassUtils.resolveClassName(className,null);ConfigurableBeanFactory beanFactory = registry instanceofConfigurableBeanFactory?(ConfigurableBeanFactory) registry :null;String contextId =getContextId(beanFactory, attributes);String name =getName(attributes);FeignClientFactoryBean factoryBean =newFeignClientFactoryBean();
factoryBean.setBeanFactory(beanFactory);
factoryBean.setName(name);
factoryBean.setContextId(contextId);
factoryBean.setType(clazz);BeanDefinitionBuilder definition =BeanDefinitionBuilder.genericBeanDefinition(clazz,()->{
factoryBean.setUrl(getUrl(beanFactory, attributes));
factoryBean.setPath(getPath(beanFactory, attributes));
factoryBean.setDecode404(Boolean.parseBoolean(String.valueOf(attributes.get("decode404"))));Object fallback = attributes.get("fallback");if(fallback !=null){
factoryBean.setFallback(fallback instanceofClass?(Class<?>) fallback
:ClassUtils.resolveClassName(fallback.toString(),null));}Object fallbackFactory = attributes.get("fallbackFactory");if(fallbackFactory !=null){
factoryBean.setFallbackFactory(fallbackFactory instanceofClass?(Class<?>) fallbackFactory
:ClassUtils.resolveClassName(fallbackFactory.toString(),null));}return factoryBean.getObject();});
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
definition.setLazyInit(true);validate(attributes);String alias = contextId +"FeignClient";AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);
beanDefinition.setAttribute("feignClientsRegistrarFactoryBean", factoryBean);// has a default, won't be nullboolean primary =(Boolean) attributes.get("primary");
beanDefinition.setPrimary(primary);String qualifier =getQualifier(attributes);if(StringUtils.hasText(qualifier)){
alias = qualifier;}BeanDefinitionHolder holder =newBeanDefinitionHolder(beanDefinition, className,newString[]{ alias });BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);}}
这些BeanDefinition在实例化的时候会调用ReflectiveFeign.newInstance方法,所以这里并不是实例化具体某个类,而是动态代理,真正执行逻辑在FeignInvocationHandler类中,如
publicclassReflectiveFeignextendsFeign{public<T>TnewInstance(Target<T> target){Map<String,MethodHandler> nameToHandler = targetToHandlersByName.apply(target);Map<Method,MethodHandler> methodToHandler =newLinkedHashMap<Method,MethodHandler>();List<DefaultMethodHandler> defaultMethodHandlers =newLinkedList<DefaultMethodHandler>();for(Method method : target.type().getMethods()){if(method.getDeclaringClass()==Object.class){continue;}elseif(Util.isDefault(method)){DefaultMethodHandler handler =newDefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);}else{
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));}}InvocationHandler handler = factory.create(target, methodToHandler);T proxy =(T)Proxy.newProxyInstance(target.type().getClassLoader(),newClass<?>[]{target.type()}, handler);for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers){
defaultMethodHandler.bindTo(proxy);}return proxy;}}
如何获取其他服务地址和发送请求
在初始化bean实例的时候,ReflectiveFeign.newInstance执行过程中,会调用ParseHandlersByName.apply方法,会给接口的每个方法生成一个对应SynchronousMethodHandler,在这个处理中,会将RequestMapping的值拼接成url,如
publicclassSpringMvcContractextendsContract.BaseContractimplementsResourceLoaderAware{@OverridepublicMethodMetadataparseAndValidateMetadata(Class<?> targetType,Method method){
processedMethods.put(Feign.configKey(targetType, method), method);MethodMetadata md =super.parseAndValidateMetadata(targetType, method);RequestMapping classAnnotation =findMergedAnnotation(targetType,RequestMapping.class);if(classAnnotation !=null){// produces - use from class annotation only if method has not specified thisif(!md.template().headers().containsKey(ACCEPT)){parseProduces(md, method, classAnnotation);}// consumes -- use from class annotation only if method has not specified thisif(!md.template().headers().containsKey(CONTENT_TYPE)){parseConsumes(md, method, classAnnotation);}// headers -- class annotation is inherited to methods, always write these if// presentparseHeaders(md, method, classAnnotation);}return md;}@OverrideprotectedvoidprocessAnnotationOnMethod(MethodMetadata data,Annotation methodAnnotation,Method method){if(CollectionFormat.class.isInstance(methodAnnotation)){CollectionFormat collectionFormat =findMergedAnnotation(method,CollectionFormat.class);
data.template().collectionFormat(collectionFormat.value());}if(!RequestMapping.class.isInstance(methodAnnotation)&&!methodAnnotation
.annotationType().isAnnotationPresent(RequestMapping.class)){return;}RequestMapping methodMapping =findMergedAnnotation(method,RequestMapping.class);// HTTP MethodRequestMethod[] methods = methodMapping.method();if(methods.length ==0){
methods =newRequestMethod[]{RequestMethod.GET };}checkOne(method, methods,"method");
data.template().method(Request.HttpMethod.valueOf(methods[0].name()));// pathcheckAtMostOne(method, methodMapping.value(),"value");if(methodMapping.value().length >0){String pathValue =emptyToNull(methodMapping.value()[0]);if(pathValue !=null){
pathValue =resolve(pathValue);// Append path from @RequestMapping if value is present on methodif(!pathValue.startsWith("/")&&!data.template().path().endsWith("/")){
pathValue ="/"+ pathValue;}
data.template().uri(pathValue,true);if(data.template().decodeSlash()!= decodeSlash){
data.template().decodeSlash(decodeSlash);}}}// producesparseProduces(data, method, methodMapping);// consumesparseConsumes(data, method, methodMapping);// headersparseHeaders(data, method, methodMapping);
data.indexToExpander(newLinkedHashMap<>());}}
当我们调用接口的方法发起请求时,实际上是执行FeignInvocationHandler.invoke方法,invoke方法会找到对应的SynchronousMethodHandler来处理,如
staticclassFeignInvocationHandlerimplementsInvocationHandler{@OverridepublicObjectinvoke(Object proxy,Method method,Object[] args)throwsThrowable{if("equals".equals(method.getName())){try{Object otherHandler =
args.length >0&& args[0]!=null?Proxy.getInvocationHandler(args[0]):null;returnequals(otherHandler);}catch(IllegalArgumentException e){returnfalse;}}elseif("hashCode".equals(method.getName())){returnhashCode();}elseif("toString".equals(method.getName())){returntoString();}return dispatch.get(method).invoke(args);}}
SynchronousMethodHandler的invoke,主要逻辑是调用接口Client的execute方法,如
finalclassSynchronousMethodHandlerimplementsMethodHandler{@OverridepublicObjectinvoke(Object[] argv)throwsThrowable{RequestTemplate template = buildTemplateFromArgs.create(argv);Options options =findOptions(argv);Retryer retryer =this.retryer.clone();while(true){try{returnexecuteAndDecode(template, options);}catch(RetryableException e){try{
retryer.continueOrPropagate(e);}catch(RetryableException th){Throwable cause = th.getCause();if(propagationPolicy == UNWRAP && cause !=null){throw cause;}else{throw th;}}if(logLevel !=Logger.Level.NONE){
logger.logRetry(metadata.configKey(), logLevel);}continue;}}}ObjectexecuteAndDecode(RequestTemplate template,Options options)throwsThrowable{Request request =targetRequest(template);if(logLevel !=Logger.Level.NONE){
logger.logRequest(metadata.configKey(), logLevel, request);}Response response;long start =System.nanoTime();try{
response = client.execute(request, options);// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder().request(request).requestTemplate(template).build();}catch(IOException e){if(logLevel !=Logger.Level.NONE){
logger.logIOException(metadata.configKey(), logLevel, e,elapsedTime(start));}throwerrorExecuting(request, e);}long elapsedTime =TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start);if(decoder !=null)return decoder.decode(response, metadata.returnType());CompletableFuture<Object> resultFuture =newCompletableFuture<>();
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);try{if(!resultFuture.isDone())thrownewIllegalStateException("Response handling not done");return resultFuture.join();}catch(CompletionException e){Throwable cause = e.getCause();if(cause !=null)throw cause;throw e;}}}
Feign提供了实现实现Client接口的LoadBalancerFeignClient,在LoadBalancerFeignClient.execute方法里开始调用Ribbon框架的AbstractLoadBalancerAwareClient.executeWithLoadBalancer方法,如
publicclassLoadBalancerFeignClientimplementsClient{@OverridepublicResponseexecute(Request request,Request.Options options)throwsIOException{try{URI asUri = URI.create(request.url());String clientName = asUri.getHost();URI uriWithoutHost =cleanUrl(request.url(), clientName);FeignLoadBalancer.RibbonRequest ribbonRequest =newFeignLoadBalancer.RibbonRequest(this.delegate, request, uriWithoutHost);IClientConfig requestConfig =getClientConfig(options, clientName);returnlbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();}catch(ClientException e){IOException io =findIOException(e);if(io !=null){throw io;}thrownewRuntimeException(e);}}}
在executeWithLoadBalancer方法里面,通过LoadBalancerCommand.submit方法,获取服务的实际ip和端口组成url后,调用FeignLoadBalancer.execute方法,如
publicabstractclassAbstractLoadBalancerAwareClient<SextendsClientRequest,TextendsIResponse>extendsLoadBalancerContextimplementsIClient<S,T>,IClientConfigAware{publicTexecuteWithLoadBalancer(finalS request,finalIClientConfig requestConfig)throwsClientException{LoadBalancerCommand<T> command =buildLoadBalancerCommand(request, requestConfig);try{return command.submit(newServerOperation<T>(){@OverridepublicObservable<T>call(Server server){URI finalUri =reconstructURIWithServer(server, request.getUri());S requestForServer =(S) request.replaceUri(finalUri);try{returnObservable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));}catch(Exception e){returnObservable.error(e);}}}).toBlocking().single();}catch(Exception e){Throwable t = e.getCause();if(t instanceofClientException){throw(ClientException) t;}else{thrownewClientException(e);}}}}
在FeignLoadBalancer.execute方法中会调用ApacheHttpClient.execute,如
publicclassFeignLoadBalancerextendsAbstractLoadBalancerAwareClient<FeignLoadBalancer.RibbonRequest,FeignLoadBalancer.RibbonResponse>{@OverridepublicRibbonResponseexecute(RibbonRequest request,IClientConfig configOverride)throwsIOException{Request.Options options;if(configOverride !=null){RibbonProperties override =RibbonProperties.from(configOverride);
options =newRequest.Options(override.connectTimeout(this.connectTimeout),
override.readTimeout(this.readTimeout));}else{
options =newRequest.Options(this.connectTimeout,this.readTimeout);}Response response = request.client().execute(request.toRequest(), options);returnnewRibbonResponse(request.getUri(), response);}}
最后,在ApacheHttpClient中,是直接调用HttpClient发送请求的,如
publicfinalclassApacheHttpClientimplementsClient{privatefinalHttpClient client;@OverridepublicResponseexecute(Request request,Request.Options options)throwsIOException{HttpUriRequest httpUriRequest;try{
httpUriRequest =toHttpUriRequest(request, options);}catch(URISyntaxException e){thrownewIOException("URL '"+ request.url()+"' couldn't be parsed into a URI", e);}HttpResponse httpResponse = client.execute(httpUriRequest);returntoFeignResponse(httpResponse, request);}}
Sleuth如何和Feign集成
Feign并没有提供很直接方便的入口,Sleuth和Seata等框架切入的方式比较迂回,也算大同小异,所以下面只说一下Sleuth框架就差不多了.
首先Sleuth提供了FeignContextBeanPostProcessor类,它实现了BeanPostProcessor接口,所以可以在Springboot启动时候,遍历所有bean并加以处理,如果bean是FeignContext类型,那用TraceFeignContext包装起来,如
finalclassFeignContextBeanPostProcessorimplementsBeanPostProcessor{privatefinalBeanFactory beanFactory;FeignContextBeanPostProcessor(BeanFactory beanFactory){this.beanFactory = beanFactory;}@OverridepublicObjectpostProcessBeforeInitialization(Object bean,String beanName)throwsBeansException{return bean;}@OverridepublicObjectpostProcessAfterInitialization(Object bean,String beanName)throwsBeansException{if(bean instanceofFeignContext&&!(bean instanceofTraceFeignContext)){returnnewTraceFeignContext(traceFeignObjectWrapper(),(FeignContext) bean);}return bean;}privateTraceFeignObjectWrappertraceFeignObjectWrapper(){returnnewTraceFeignObjectWrapper(this.beanFactory);}}
TraceFeignContext的主要作用是使用TraceFeignObjectWrapper将Client包装成TracingFeignClient,如
finalclassTraceFeignObjectWrapper{Objectwrap(Object bean){if(bean instanceofClient&&!(bean instanceofTracingFeignClient)&&!(bean instanceofLazyTracingFeignClient)){if(ribbonPresent && bean instanceofLoadBalancerFeignClient&&!(bean instanceofTraceLoadBalancerFeignClient)){returninstrumentedFeignRibbonClient(bean);}if(ribbonPresent && bean instanceofTraceLoadBalancerFeignClient){return bean;}if(loadBalancerPresent && bean instanceofFeignBlockingLoadBalancerClient&&!(bean instanceofTraceFeignBlockingLoadBalancerClient)){returninstrumentedFeignLoadBalancerClient(bean);}if(loadBalancerPresent
&& bean instanceofRetryableFeignBlockingLoadBalancerClient&&!(bean instanceofTraceRetryableFeignBlockingLoadBalancerClient)){returninstrumentedRetryableFeignLoadBalancerClient(bean);}if(ribbonPresent && bean instanceofTraceFeignBlockingLoadBalancerClient){return bean;}returnnewLazyTracingFeignClient(this.beanFactory,(Client) bean);}return bean;}}
另外,Sleuth还提供了一个切面TraceFeignAspect也做类似的逻辑,在Client.execute方法执行的时候进行包装,如
@AspectclassTraceFeignAspect{privatestaticfinalLog log =LogFactory.getLog(TraceFeignAspect.class);privatefinalBeanFactory beanFactory;TraceFeignAspect(BeanFactory beanFactory){this.beanFactory = beanFactory;}@Around("execution (* feign.Client.*(..)) && !within(is(FinalType))")publicObjectfeignClientWasCalled(finalProceedingJoinPoint pjp)throwsThrowable{Object bean = pjp.getTarget();Object wrappedBean =newTraceFeignObjectWrapper(this.beanFactory).wrap(bean);if(log.isDebugEnabled()){
log.debug("Executing feign client via TraceFeignAspect");}if(bean != wrappedBean){returnexecuteTraceFeignClient(wrappedBean, pjp);}return pjp.proceed();}ObjectexecuteTraceFeignClient(Object bean,ProceedingJoinPoint pjp)throwsIOException{Object[] args = pjp.getArgs();Request request =(Request) args[0];Request.Options options =(Request.Options) args[1];return((Client) bean).execute(request, options);}}
TracingFeignClient其实都算是ApacheHttpClient的代理,而ApacheHttpClient是Feign执行过程中必经之路,所以Sleuth就很容易加入自己的逻辑,如添加请求头等,如
finalclassTracingFeignClientimplementsClient{@OverridepublicResponseexecute(Request req,Request.Options options)throwsIOException{RequestWrapper request =newRequestWrapper(req);Span span =this.handler.handleSend(request);if(log.isDebugEnabled()){
log.debug("Handled send of "+ span);}Response res =null;Throwable error =null;try(Scope ws =this.currentTraceContext.newScope(span.context())){
res =this.delegate.execute(request.build(), options);if(res ==null){// possibly null on bad implementation or mocks
res =Response.builder().request(req).build();}return res;}catch(Throwable e){
error = e;throw e;}finally{ResponseWrapper response = res !=null?newResponseWrapper(request, res, error):null;this.handler.handleReceive(response, error, span);if(log.isDebugEnabled()){
log.debug("Handled receive of "+ span);}}}}
小结
- 在Springboot启动的过程中,FeignClientsRegistrar会扫描所有带注解FeignClient的接口,并将它们封装成BeanDefinition注册在Spring的容器中,在需要注入的时候,再生成动态代理,实际真正执行主体是FeignInvocationHandler
- 在调用接口的方法时,FeignInvocationHandler会找出对应的SynchronousMethodHandler,开始解析方法上的注解和参数组成url,利用Ribbon的AbstractLoadBalancerAwareClient获取服务的ip和端口,最终通过HttpClient发起请求
- 虽然Feign提供了RequestInterceptor可以对请求作处理,但是没有提供可以对整个执行过程处理的接口,所以Sleuth需要通过FeignContextBeanPostProcessor对FeignContext进行包装,再用TracingFeignClient把ApacheHttpClient包一层,从而实现自身的逻辑
版权归原作者 Caizil 所有, 如有侵权,请联系我们删除。