0


@EnableAsync的使用、进阶、源码分析

@EnableAsync使用

基础使用

使用@EnableAsync开启异步切面,然后在异步调用的方法上加上@Asyc注解即可

@SpringBootApplication@EnableAsync//开启异步切面publicclassSpringdemoApplication{publicstaticvoidmain(String[] args){SpringApplication.run(SpringdemoApplication.class, args);}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async//异步@OverridepublicvoidinvokeAsyncTest01(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest01方法!");}}

自定义异步注解

@Async注解是异步切面默认的异步注解,我们可以在@EnableAsync(annotation = AsyncCustom.class)开启异步切面时指定自定义的异步注解

@Target({ElementType.TYPE,ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceAsyncCustom{}
@SpringBootApplication@EnableAsync(annotation =AsyncCustom.class)publicclassSpringdemoApplication{publicstaticvoidmain(String[] args){SpringApplication.run(SpringdemoApplication.class, args);}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@AsyncCustom//异步@OverridepublicvoidinvokeAsyncTest01(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest01方法!");}}

@EnableAsync进阶

线程池配置

配置默认线程池

@Slf4j@ComponentpublicclassAsyncConfigimplementsAsyncConfigurer{/**
    * 设置默认线程池
    **/@OverridepublicExecutorgetAsyncExecutor(){//此处最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程SimpleAsyncTaskExecutor taskExecutor =newSimpleAsyncTaskExecutor();
        taskExecutor.setThreadNamePrefix("CustomAsync-Test-");return taskExecutor;}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async@OverridepublicvoidinvokeAsyncTest01(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest01方法!");}}

指定线程池 (建议,根据业务进行线程池隔离)

当@Async注解的value有指定线程池名称时,将会使用容器中beanname=此value值的Executor线程池

@ConfigurationpublicclassTaskExecutorConfig{@BeanpublicExecutordeleteFileExecutor(){//此处最好使用new ThreadPoolExecutor显示创建,SimpleAsyncTaskExecutor没有复用线程SimpleAsyncTaskExecutor taskExecutor =newSimpleAsyncTaskExecutor();
        taskExecutor.setThreadNamePrefix("delete-file-");return taskExecutor;}@BeanpublicExecutorsendEmailExecutor(){//此处最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程SimpleAsyncTaskExecutor taskExecutor =newSimpleAsyncTaskExecutor();
        taskExecutor.setThreadNamePrefix("send-email-");return taskExecutor;}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async("deleteFileExecutor")@OverridepublicvoiddeleteFile(){System.out.println(Thread.currentThread()+"运行了deleteFile方法!");}@Async("sendEmailExecutor")@OverridepublicvoidsendEmail(){System.out.println(Thread.currentThread()+"运行了sendEmail方法!");}}

异步任务结果

只要是异步,一般都有可能用到需要返回结果的异步任务,当然@Async也支持异步结果返回,目前仅支持CompletableFuture、ListenableFuture、Future

CompletableFuture

@RestController@RequestMapping("/testasync")publicclassTestAsyncController{@AutowiredprivateAsyncTestService asyncTestService;@GetMapping("/test02")publicvoidtest02(){CompletableFuture<String> completableFuture = asyncTestService.invokeAsyncTest02();

        completableFuture.thenAccept(System.out::println);}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async@OverridepublicCompletableFuture<String>invokeAsyncTest02(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest02方法!");returnCompletableFuture.completedFuture("Hello world!");}}

ListenableFuture

@RestController@RequestMapping("/testasync")publicclassTestAsyncController{@AutowiredprivateAsyncTestService asyncTestService;@GetMapping("/test03")publicvoidtest03(){ListenableFuture<String> stringListenableFuture = asyncTestService.invokeAsyncTest03();

        stringListenableFuture.addCallback(System.out::println,System.out::println);}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async@OverridepublicListenableFuture<String>invokeAsyncTest03(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest03方法!");returnnewAsyncResult<String>("Hello World!");}}

Future

@RestController@RequestMapping("/testasync")publicclassTestAsyncController{@AutowiredprivateAsyncTestService asyncTestService;@GetMapping("/test04")publicvoidtest04()throwsExecutionException,InterruptedException{Future<String> future = asyncTestService.invokeAsyncTest04();String str = future.get();System.out.println(str);}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async@OverridepublicFuture<String>invokeAsyncTest04(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest04方法!");returnnewAsyncResult<>("Hello World!");}}

Future、ListenableFuture、CompletableFuture区别

  • Future为异步任务调用的结果
  • ListenableFuture继承了Future,所以也为异步任务调用的结果,但是ListenableFuture还阔以添加两个回调函数,成功回调和异常回调
  • CompletableFuture也继承了Future,所以也为异步任务调用的结果,但是CompletableFuture阔以对异步任务进行编排

异常处理器

当返回值是Future及其子类

此时,如果异步任务在执行时抛出异常时,异常先会存储在Future中并记录状态,当正真调用future.get()等获取结果函数时才会抛出异常。

@RestController@RequestMapping("/testasync")publicclassTestAsyncController{@AutowiredprivateAsyncTestService asyncTestService;@GetMapping("/test04")publicvoidtest04()throwsExecutionException,InterruptedException{Future<String> future = asyncTestService.invokeAsyncTest04();//此时当当前线程获取结果时 才会抛出异常String str = future.get();System.out.println(str);}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async@OverridepublicFuture<String>invokeAsyncTest04(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest04方法!");if(true){thrownewIllegalArgumentException("Hello sendEmailExecutor Exception!");}returnnewAsyncResult<>("Hello World!");}}

当返回值是非Future

返回类型非Future时,任务发生异常将会调用异常处理器处理异常。异常处理器阔以AsyncConfigurer 实现类的getAsyncUncaughtExceptionHandler方法手动设置,如果未设置异常处理器,系统将会给你创建一个默认的SimpleAsyncUncaughtExceptionHandler异常处理器,此默认异常处理器异常处理器只对异常进行了日志输出

@Slf4j@ComponentpublicclassAsyncConfigimplementsAsyncConfigurer{@OverridepublicExecutorgetAsyncExecutor(){SimpleAsyncTaskExecutor taskExecutor =newSimpleAsyncTaskExecutor();
        taskExecutor.setThreadNamePrefix("CustomAsync-Test-");return taskExecutor;}/**
    * 当异步任务调用出现时将会调用此异常处理器 可在此记录日志,补偿机制等
    **/@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(ex, method, params)->{System.err.println("Unexpected exception occurred invoking async method: "+ method +":"+ ex.getMessage());};}}
@RestController@RequestMapping("/testasync")publicclassTestAsyncController{@AutowiredprivateAsyncTestService asyncTestService;@GetMapping("/test06")publicvoidtest06(){

        asyncTestService.invokeAsyncTest06();}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async@OverridepublicvoidinvokeAsyncTest06(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest06方法!");thrownewIllegalArgumentException("Hello Exception!");}}

扩展异常处理器

原因

博主通过源码发现,异常处理器只能设置一个,且后续所有@Async使用的线程池全都只有走我们设置的默认异常处理器,如果我们根据业务划分了线程池,不同线程池的异常想走不同的处理逻辑,就只有在我们手动设置的异常处理器中进行逻辑判断,非常的不优雅。

博主的解决方案
  1. 扩展@Async注解,添加exceptionHandler属性指定异常处理器AsyncUncaughtExceptionHandler 的容器名
  2. 在设置AsyncConfigurer 实现类getAsyncUncaughtExceptionHandler方法设置一个自定义异常处理器,此处理器读取异常方法@Async的exceptionHandler属性值,然后获取到容器中名为exceptionHandler属性值的异常处理器
  3. 如果能在容器找到给定容器名称的异常处理器,就走此异常处理器
  4. 如果不能找到给定容器名称的处理器,就走默认异常处理器
  5. 如果没有设置@Async的exceptionHandler属性值,也走默认异常处理器
方案实现

扩展@Async注解,添加@JokerAsync继承@Async,添加exceptionHandler属性

@Target({ElementType.TYPE,ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documented@Asyncpublic@interfaceJokerAsync{@AliasFor(annotation =Async.class)Stringvalue()default"";StringexceptionHandler()default"";}

把AsyncConfigurer 实现类getAsyncUncaughtExceptionHandler方法设置一个自定义异常处理器,此处理器读取异常方法@Async的exceptionHandler属性值,然后获取到容器中名为exceptionHandler属性值的异常处理器

@Slf4j@ComponentpublicclassAsyncConfigimplementsAsyncConfigurer{@Autowired(required =false)privateMap<String,AsyncUncaughtExceptionHandler> exceptionHandlerMap =newHashMap<>();privatefinalAsyncUncaughtExceptionHandler defaultExceptionHandler =newSimpleAsyncUncaughtExceptionHandler();@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(ex, method, params)->{String qualifier =getExceptionHandlerQualifier(method);AsyncUncaughtExceptionHandler exceptionHandler =null;if(Objects.nonNull(qualifier)&& qualifier.length()>0){
                exceptionHandler = exceptionHandlerMap.get(qualifier);}if(Objects.isNull(exceptionHandler)){
                exceptionHandler = defaultExceptionHandler;}

            exceptionHandler.handleUncaughtException(ex, method, params);};}protectedStringgetExceptionHandlerQualifier(Method method){JokerAsync async =AnnotatedElementUtils.findMergedAnnotation(method,JokerAsync.class);if(async ==null){
            async =AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(),JokerAsync.class);}return(async !=null? async.exceptionHandler():null);}}

测试示例代码

@Slf4j@ComponentpublicclassDeleteFileAsyncUncaughtExceptionHandlerimplementsAsyncUncaughtExceptionHandler{@OverridepublicvoidhandleUncaughtException(Throwable ex,Method method,Object... params){
        log.error("DeleteFileAsyncUncaughtExceptionHandler Unexpected exception occurred invoking async method: "+ method, ex);}}
@Slf4j@ComponentpublicclassSendFileAsyncUncaughtExceptionHandlerimplementsAsyncUncaughtExceptionHandler{@OverridepublicvoidhandleUncaughtException(Throwable ex,Method method,Object... params){
        log.error("SendFileAsyncUncaughtExceptionHandler Unexpected exception occurred invoking async method: "+ method, ex);}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@JokerAsync(exceptionHandler ="deleteFileAsyncUncaughtExceptionHandler")@OverridepublicvoiddeleteFile(){System.out.println(Thread.currentThread()+"运行了deleteFile方法!");thrownewIllegalArgumentException("Hello deleteFileExecutor Exception!");}@JokerAsync(exceptionHandler ="sendFileAsyncUncaughtExceptionHandler")@OverridepublicvoidsendEmail(){System.out.println(Thread.currentThread()+"运行了sendEmail方法!");thrownewIllegalArgumentException("Hello sendEmailExecutor Exception!");}}
@RestController@RequestMapping("/testasync")publicclassTestAsyncController{@AutowiredprivateAsyncTestService asyncTestService;@GetMapping("/sendEmail")publicvoidsendEmail(){

        asyncTestService.sendEmail();}@GetMapping("/deleteFile")publicvoiddeleteFile(){

        asyncTestService.deleteFile();}}

结果如下:不同的业务走不同的异常处理器
在这里插入图片描述

源码分析

首先咱们从@EnableAsync入口开始看起

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented//使用@Import 导入AsyncConfigurationSelector类到容器中@Import(AsyncConfigurationSelector.class)public@interfaceEnableAsync{//自定义异步注解Class<?extendsAnnotation>annotation()defaultAnnotation.class;//JDK代理 还是 CGLIB代理booleanproxyTargetClass()defaultfalse;AdviceModemode()defaultAdviceMode.PROXY;intorder()defaultOrdered.LOWEST_PRECEDENCE;}

注意使用@Import注解导入的一般会实现ImportSelector 接口,则ImportSelector 中的selectImports方法返回的类的完全限定名数组中的类会被加入到容器中;如果是实现了ImportBeanDefinitionRegistrar接口,则会调用registerBeanDefinitions的方法

publicinterfaceImportSelector{String[]selectImports(AnnotationMetadata importingClassMetadata);@NullabledefaultPredicate<String>getExclusionFilter(){returnnull;}}publicinterfaceImportBeanDefinitionRegistrar{defaultvoidregisterBeanDefinitions(AnnotationMetadata importingClassMetadata,BeanDefinitionRegistry registry,BeanNameGenerator importBeanNameGenerator){registerBeanDefinitions(importingClassMetadata, registry);}defaultvoidregisterBeanDefinitions(AnnotationMetadata importingClassMetadata,BeanDefinitionRegistry registry){}}

继续看@EnableAsync使用@Import导入的AsyncConfigurationSelector类

publicclassAsyncConfigurationSelectorextendsAdviceModeImportSelector<EnableAsync>{privatestaticfinalString ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME ="org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";/**
     * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
     * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
     * respectively.
     */@Override@NullablepublicString[]selectImports(AdviceMode adviceMode){switch(adviceMode){//@EnableAsync mode属性默认为AdviceMode.PROXY case PROXY:returnnewString[]{ProxyAsyncConfiguration.class.getName()};case ASPECTJ:returnnewString[]{ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:returnnull;}}}

看哈AsyncConfigurationSelector的父类AdviceModeImportSelector

/**
* 由于该类实现ImportSelector接口 所以会调用selectImports方法
**/publicabstractclassAdviceModeImportSelector<AextendsAnnotation>implementsImportSelector{publicstaticfinalString DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME ="mode";protectedStringgetAdviceModeAttributeName(){return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;}//importingClassMetadata 是加了@Import注解的类的元信息@OverridepublicfinalString[]selectImports(AnnotationMetadata importingClassMetadata){Class<?> annType =GenericTypeResolver.resolveTypeArgument(getClass(),AdviceModeImportSelector.class);Assert.state(annType !=null,"Unresolvable type argument for AdviceModeImportSelector");AnnotationAttributes attributes =AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);if(attributes ==null){thrownewIllegalArgumentException(String.format("@%s is not present on importing class '%s' as expected",
                    annType.getSimpleName(), importingClassMetadata.getClassName()));}//得到加了@Import注解类上的mode属性值AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());//模板方法 调用子类实现的selectImports方法得到需要导入到Spring容器中的类的String[] imports =selectImports(adviceMode);if(imports ==null){thrownewIllegalArgumentException("Unknown AdviceMode: "+ adviceMode);}return imports;}@NullableprotectedabstractString[]selectImports(AdviceMode adviceMode);}

由于@EnableAsync mode属性默认为AdviceMode.PROXY ,所以ProxyAsyncConfiguration类将会导入容器继续点进去看

@Configuration(proxyBeanMethods =false)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassProxyAsyncConfigurationextendsAbstractAsyncConfiguration{//把异步后置处理器放入容器中@Bean(name =TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicAsyncAnnotationBeanPostProcessorasyncAdvisor(){Assert.notNull(this.enableAsync,"@EnableAsync annotation metadata was not injected");//异步后置处理器AsyncAnnotationBeanPostProcessor bpp =newAsyncAnnotationBeanPostProcessor();//把线程池和异常处理器放到后置处理器中
        bpp.configure(this.executor,this.exceptionHandler);//得到@EnableAsync中annotation的注解Class<?extendsAnnotation> customAsyncAnnotation =this.enableAsync.getClass("annotation");//自定义注解不等于默认值时 把自定义异步注解放入后置处理器中if(customAsyncAnnotation !=AnnotationUtils.getDefaultValue(EnableAsync.class,"annotation")){
            bpp.setAsyncAnnotationType(customAsyncAnnotation);}//设置动态代理方式
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}

看哈ProxyAsyncConfiguration 的父类AbstractAsyncConfiguration

@Configuration(proxyBeanMethods =false)publicabstractclassAbstractAsyncConfigurationimplementsImportAware{@NullableprotectedAnnotationAttributes enableAsync;@NullableprotectedSupplier<Executor> executor;@NullableprotectedSupplier<AsyncUncaughtExceptionHandler> exceptionHandler;//importMetadata 是加了@Import注解的类的元信息@OverridepublicvoidsetImportMetadata(AnnotationMetadata importMetadata){//@EnableAsync的注解属性设置给enableAsync属性this.enableAsync =AnnotationAttributes.fromMap(
                importMetadata.getAnnotationAttributes(EnableAsync.class.getName(),false));if(this.enableAsync ==null){thrownewIllegalArgumentException("@EnableAsync is not present on importing class "+ importMetadata.getClassName());}}/**
    * 配置默认线程池 默认异常处理器
    **/@Autowired(required =false)voidsetConfigurers(Collection<AsyncConfigurer> configurers){if(CollectionUtils.isEmpty(configurers)){return;}if(configurers.size()>1){thrownewIllegalStateException("Only one AsyncConfigurer may exist");}AsyncConfigurer configurer = configurers.iterator().next();this.executor = configurer::getAsyncExecutor;this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;}}publicinterfaceAsyncConfigurer{//配置异步线程池@NullabledefaultExecutorgetAsyncExecutor(){returnnull;}//配置异步异常处理器@NullabledefaultAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){returnnull;}}

上述代码表明 把@EnableAsync注解的属性解析了设置到了AsyncAnnotationBeanPostProcessor后置处理器中,还有AsyncConfigurer配置的线程池和异常处理器也设置到了后置处理中,现在我们继续看AsyncAnnotationBeanPostProcessor后置处理器的代码

publicclassAsyncAnnotationBeanPostProcessorextendsAbstractBeanFactoryAwareAdvisingPostProcessor{publicstaticfinalString DEFAULT_TASK_EXECUTOR_BEAN_NAME =AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;//默认线程池@NullableprivateSupplier<Executor> executor;//异常处理器@NullableprivateSupplier<AsyncUncaughtExceptionHandler> exceptionHandler;//异步注解@NullableprivateClass<?extendsAnnotation> asyncAnnotationType;publicAsyncAnnotationBeanPostProcessor(){setBeforeExistingAdvisors(true);}publicvoidconfigure(@NullableSupplier<Executor> executor,@NullableSupplier<AsyncUncaughtExceptionHandler> exceptionHandler){this.executor = executor;this.exceptionHandler = exceptionHandler;}publicvoidsetExecutor(Executor executor){this.executor =SingletonSupplier.of(executor);}publicvoidsetExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler){this.exceptionHandler =SingletonSupplier.of(exceptionHandler);}publicvoidsetAsyncAnnotationType(Class<?extendsAnnotation> asyncAnnotationType){Assert.notNull(asyncAnnotationType,"'asyncAnnotationType' must not be null");this.asyncAnnotationType = asyncAnnotationType;}/**
    *  由于父类实现了BeanFactoryAware接口 在实例初始化时会被调用
    **/@OverridepublicvoidsetBeanFactory(BeanFactory beanFactory){super.setBeanFactory(beanFactory);/**
        * Advice:通知,标识逻辑织入的位置(增强代码调用的地方)。
        * PointCut:切入点,标识对什么方法进入代理(判断哪个方法能被增强);
        * Advisor:通知器,是通知与切入点的集合(一般里面持有一个Advice和一个PointCut,用来标识一个切面增强)。
        **///我们阔以看到此处创建了一个通知器 把线程池和异常处理器传进去 AsyncAnnotation  advisor =newAsyncAnnotationAdvisor(this.executor,this.exceptionHandler);if(this.asyncAnnotationType !=null){
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);}//把类工厂传入通知器中
        advisor.setBeanFactory(beanFactory);//把通知器赋给本类的成员变量this.advisor = advisor;}}

上诉代码主要是把增强的advisor 类创建好并复制给了本类成员变量,
下面我们继续看此类的父类AbstractAdvisingBeanPostProcessor,应为此类实现了BeanPostProcessor 接口,所以初始化完后肯定会调用postProcessAfterInitialization方法

publicabstractclassAbstractAdvisingBeanPostProcessorextendsProxyProcessorSupportimplementsBeanPostProcessor{@NullableprotectedAdvisor advisor;protectedboolean beforeExistingAdvisors =false;privatefinalMap<Class<?>,Boolean> eligibleBeans =newConcurrentHashMap<>(256);publicvoidsetBeforeExistingAdvisors(boolean beforeExistingAdvisors){this.beforeExistingAdvisors = beforeExistingAdvisors;}@OverridepublicObjectpostProcessBeforeInitialization(Object bean,String beanName){return bean;}@OverridepublicObjectpostProcessAfterInitialization(Object bean,String beanName){if(this.advisor ==null|| bean instanceofAopInfrastructureBean){// Ignore AOP infrastructure such as scoped proxies.return bean;}//如果被代理过 直接把Advisor加入到代理里中的Advisor列表中if(bean instanceofAdvised){Advised advised =(Advised) bean;if(!advised.isFrozen()&&isEligible(AopUtils.getTargetClass(bean))){// Add our local Advisor to the existing proxy's Advisor chain...if(this.beforeExistingAdvisors){
                    advised.addAdvisor(0,this.advisor);}else{
                    advised.addAdvisor(this.advisor);}return bean;}}//如果没被代理过但是需要被代理的类 创建代理并直接加入到增强Advisor加入的Advisor列表中,并返回代理类if(isEligible(bean, beanName)){ProxyFactory proxyFactory =prepareProxyFactory(bean, beanName);if(!proxyFactory.isProxyTargetClass()){evaluateProxyInterfaces(bean.getClass(), proxyFactory);}
            proxyFactory.addAdvisor(this.advisor);customizeProxyFactory(proxyFactory);// Use original ClassLoader if bean class not locally loaded in overriding class loaderClassLoader classLoader =getProxyClassLoader();if(classLoader instanceofSmartClassLoader&& classLoader != bean.getClass().getClassLoader()){
                classLoader =((SmartClassLoader) classLoader).getOriginalClassLoader();}return proxyFactory.getProxy(classLoader);}// No proxy needed.return bean;}protectedbooleanisEligible(Object bean,String beanName){returnisEligible(bean.getClass());}//判断此类是否需要代理protectedbooleanisEligible(Class<?> targetClass){Boolean eligible =this.eligibleBeans.get(targetClass);if(eligible !=null){return eligible;}if(this.advisor ==null){returnfalse;}
        eligible =AopUtils.canApply(this.advisor, targetClass);this.eligibleBeans.put(targetClass, eligible);return eligible;}protectedProxyFactoryprepareProxyFactory(Object bean,String beanName){ProxyFactory proxyFactory =newProxyFactory();
        proxyFactory.copyFrom(this);
        proxyFactory.setTarget(bean);return proxyFactory;}protectedvoidcustomizeProxyFactory(ProxyFactory proxyFactory){}}

上述代码可以知道,只是把增强的advisor 放入代理类中,所以我们只需要看advisor 中的增强方法就知道增强的代码逻辑。我们来看advisor 成员的实现类AsyncAnnotationAdvisor,而AsyncAnnotationAdvisor是Advisor的实现类。而Advisor实现类一般会包含一般里面持有一个Advice和一个PointCut类,而Advice的子类MethodInterceptor的invoke方法就是代理的主要增强代码实现的地方

    * Advice:通知,标识逻辑织入的位置(增强代码调用的地方)。
    * PointCut:切入点,标识对什么方法进入代理(判断哪个方法能被增强);
    * Advisor:通知器,是通知与切入点的集合(一般里面持有一个Advice和一个PointCut,用来标识一个切面增强)。
publicclassAsyncAnnotationAdvisorextendsAbstractPointcutAdvisorimplementsBeanFactoryAware{privateAdvice advice;privatePointcut pointcut;publicAsyncAnnotationAdvisor(){this((Supplier<Executor>)null,(Supplier<AsyncUncaughtExceptionHandler>)null);}publicAsyncAnnotationAdvisor(@NullableExecutor executor,@NullableAsyncUncaughtExceptionHandler exceptionHandler){this(SingletonSupplier.ofNullable(executor),SingletonSupplier.ofNullable(exceptionHandler));}@SuppressWarnings("unchecked")publicAsyncAnnotationAdvisor(@NullableSupplier<Executor> executor,@NullableSupplier<AsyncUncaughtExceptionHandler> exceptionHandler){Set<Class<?extendsAnnotation>> asyncAnnotationTypes =newLinkedHashSet<>(2);
        asyncAnnotationTypes.add(Async.class);try{
            asyncAnnotationTypes.add((Class<?extendsAnnotation>)ClassUtils.forName("javax.ejb.Asynchronous",AsyncAnnotationAdvisor.class.getClassLoader()));}catch(ClassNotFoundException ex){// If EJB 3.1 API not present, simply ignore.}//通知实现this.advice =buildAdvice(executor, exceptionHandler);//切入点实现this.pointcut =buildPointcut(asyncAnnotationTypes);}publicvoidsetAsyncAnnotationType(Class<?extendsAnnotation> asyncAnnotationType){Assert.notNull(asyncAnnotationType,"'asyncAnnotationType' must not be null");Set<Class<?extendsAnnotation>> asyncAnnotationTypes =newHashSet<>();
        asyncAnnotationTypes.add(asyncAnnotationType);this.pointcut =buildPointcut(asyncAnnotationTypes);}@OverridepublicvoidsetBeanFactory(BeanFactory beanFactory){if(this.advice instanceofBeanFactoryAware){((BeanFactoryAware)this.advice).setBeanFactory(beanFactory);}}@OverridepublicAdvicegetAdvice(){returnthis.advice;}@OverridepublicPointcutgetPointcut(){returnthis.pointcut;}/**
    * 通知的实现类
    **/protectedAdvicebuildAdvice(@NullableSupplier<Executor> executor,@NullableSupplier<AsyncUncaughtExceptionHandler> exceptionHandler){//核心通知类AnnotationAsyncExecutionInterceptor interceptor =newAnnotationAsyncExecutionInterceptor(null);
        interceptor.configure(executor, exceptionHandler);return interceptor;}protectedPointcutbuildPointcut(Set<Class<?extendsAnnotation>> asyncAnnotationTypes){ComposablePointcut result =null;for(Class<?extendsAnnotation> asyncAnnotationType : asyncAnnotationTypes){Pointcut cpc =newAnnotationMatchingPointcut(asyncAnnotationType,true);Pointcut mpc =newAnnotationMatchingPointcut(null, asyncAnnotationType,true);if(result ==null){
                result =newComposablePointcut(cpc);}else{
                result.union(cpc);}
            result = result.union(mpc);}return(result !=null? result :Pointcut.TRUE);}}

上面代码可以知道核心通知的实现类是AnnotationAsyncExecutionInterceptor,那就继续AnnotationAsyncExecutionInterceptor代码

publicclassAnnotationAsyncExecutionInterceptorextendsAsyncExecutionInterceptor{publicAnnotationAsyncExecutionInterceptor(@NullableExecutor defaultExecutor){super(defaultExecutor);}publicAnnotationAsyncExecutionInterceptor(@NullableExecutor defaultExecutor,AsyncUncaughtExceptionHandler exceptionHandler){super(defaultExecutor, exceptionHandler);}@Override@NullableprotectedStringgetExecutorQualifier(Method method){// Maintainer's note: changes made here should also be made in// AnnotationAsyncExecutionAspect#getExecutorQualifierAsync async =AnnotatedElementUtils.findMergedAnnotation(method,Async.class);if(async ==null){
            async =AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(),Async.class);}return(async !=null? async.value():null);}}

没有看到我们需要的invoke方法,继续看父类AsyncExecutionInterceptor

publicclassAsyncExecutionInterceptorextendsAsyncExecutionAspectSupportimplementsMethodInterceptor,Ordered{publicAsyncExecutionInterceptor(@NullableExecutor defaultExecutor){super(defaultExecutor);}publicAsyncExecutionInterceptor(@NullableExecutor defaultExecutor,AsyncUncaughtExceptionHandler exceptionHandler){super(defaultExecutor, exceptionHandler);}@Override@NullablepublicObjectinvoke(finalMethodInvocation invocation)throwsThrowable{Class<?> targetClass =(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null);Method specificMethod =ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);finalMethod userDeclaredMethod =BridgeMethodResolver.findBridgedMethod(specificMethod);//通过方法上的@Async注解里的value参数 value参数就是线程池Executor放入Spring容器的名称 ********AsyncTaskExecutor executor =determineAsyncExecutor(userDeclaredMethod);if(executor ==null){thrownewIllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}//把任务调用封装成callable方法  ****************Callable<Object> task =()->{try{Object result = invocation.proceed();if(result instanceofFuture){return((Future<?>) result).get();}}//如果出现了异常 走异常处理器catch(ExecutionException ex){handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch(Throwable ex){handleError(ex, userDeclaredMethod, invocation.getArguments());}returnnull;};//把callable 线程池 和 方法返回类型一同传到doSubmit方法 *************returndoSubmit(task, executor, invocation.getMethod().getReturnType());}@NullableprotectedStringgetExecutorQualifier(Method method){returnnull;}/**
    * 如果从父类方法获取不到线程池 就返回一个默认线程池new SimpleAsyncTaskExecutor()
    **/@Override@NullableprotectedExecutorgetDefaultExecutor(@NullableBeanFactory beanFactory){Executor defaultExecutor =super.getDefaultExecutor(beanFactory);return(defaultExecutor !=null? defaultExecutor :newSimpleAsyncTaskExecutor());}@OverridepublicintgetOrder(){returnOrdered.HIGHEST_PRECEDENCE;}}

我们会发现获取线程池方法和正真调用方法的doSubmit方法都是在父类AsyncExecutionAspectSupport中,继续看AsyncExecutionAspectSupport代码

publicabstractclassAsyncExecutionAspectSupportimplementsBeanFactoryAware{publicstaticfinalString DEFAULT_TASK_EXECUTOR_BEAN_NAME ="taskExecutor";protectedfinalLog logger =LogFactory.getLog(getClass());privatefinalMap<Method,AsyncTaskExecutor> executors =newConcurrentHashMap<>(16);privateSingletonSupplier<Executor> defaultExecutor;privateSingletonSupplier<AsyncUncaughtExceptionHandler> exceptionHandler;@NullableprivateBeanFactory beanFactory;publicAsyncExecutionAspectSupport(@NullableExecutor defaultExecutor){//默认线程池 this.defaultExecutor =newSingletonSupplier<>(defaultExecutor,()->getDefaultExecutor(this.beanFactory));//默认异常处理器this.exceptionHandler =SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);}publicAsyncExecutionAspectSupport(@NullableExecutor defaultExecutor,AsyncUncaughtExceptionHandler exceptionHandler){//默认线程池 this.defaultExecutor =newSingletonSupplier<>(defaultExecutor,()->getDefaultExecutor(this.beanFactory));//默认异常处理器this.exceptionHandler =SingletonSupplier.of(exceptionHandler);}publicvoidconfigure(@NullableSupplier<Executor> defaultExecutor,@NullableSupplier<AsyncUncaughtExceptionHandler> exceptionHandler){//默认线程池this.defaultExecutor =newSingletonSupplier<>(defaultExecutor,()->getDefaultExecutor(this.beanFactory));//默认异常处理器this.exceptionHandler =newSingletonSupplier<>(exceptionHandler,SimpleAsyncUncaughtExceptionHandler::new);}publicvoidsetExecutor(Executor defaultExecutor){this.defaultExecutor =SingletonSupplier.of(defaultExecutor);}publicvoidsetExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler){this.exceptionHandler =SingletonSupplier.of(exceptionHandler);}@OverridepublicvoidsetBeanFactory(BeanFactory beanFactory){this.beanFactory = beanFactory;}/**
    * 获取@Async注释方法使用的线程池
    **/@NullableprotectedAsyncTaskExecutordetermineAsyncExecutor(Method method){//先从缓存中取AsyncTaskExecutor executor =this.executors.get(method);//没有在从容器中找if(executor ==null){Executor targetExecutor;//得到此方法中@Async属性value的值 即  容器中线程池的Bean名称String qualifier =getExecutorQualifier(method);//如果设置了value值 就从容器中获取if(StringUtils.hasLength(qualifier)){
                targetExecutor =findQualifiedExecutor(this.beanFactory, qualifier);}//如果没有设置value值 就获取AsyncConfigurer配置的默认线程池 如果没有就从容器中获取TaskExecutor的实现类,如果有多个TaskExecutor实现类,就取容器bean名称为“taskExecutor”的容Bean类else{
                targetExecutor =this.defaultExecutor.get();}if(targetExecutor ==null){returnnull;}
            executor =(targetExecutor instanceofAsyncListenableTaskExecutor?(AsyncListenableTaskExecutor) targetExecutor :newTaskExecutorAdapter(targetExecutor));//放入缓存中this.executors.put(method, executor);}return executor;}@NullableprotectedExecutorgetDefaultExecutor(@NullableBeanFactory beanFactory){if(beanFactory !=null){try{//先获取容器中TaskExecutor的实现类return beanFactory.getBean(TaskExecutor.class);}catch(NoUniqueBeanDefinitionException ex){
                logger.debug("Could not find unique TaskExecutor bean. "+"Continuing search for an Executor bean named 'taskExecutor'", ex);try{//如果有多个就取名称DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor容器类return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME,Executor.class);}catch(NoSuchBeanDefinitionException ex2){if(logger.isInfoEnabled()){
                        logger.info("More than one TaskExecutor bean found within the context, and none is named "+"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly "+"as an alias) in order to use it for async processing: "+ ex.getBeanNamesFound());}}}//如果容器中没有TaskExecutor的实现类 取名称DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor容器类catch(NoSuchBeanDefinitionException ex){
                logger.debug("Could not find default TaskExecutor bean. "+"Continuing search for an Executor bean named 'taskExecutor'", ex);try{return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME,Executor.class);}catch(NoSuchBeanDefinitionException ex2){
                    logger.info("No task executor bean found for async processing: "+"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}//走完所有都没取到 线程池  那么就返回null 子类中会判断如果返回null 将new出一个默认线程池returnnull;}@NullableprotectedObjectdoSubmit(Callable<Object> task,AsyncTaskExecutor executor,Class<?> returnType){//如果返回类型是CompletableFuture及其子类 if(CompletableFuture.class.isAssignableFrom(returnType)){returnCompletableFuture.supplyAsync(()->{try{return task.call();}catch(Throwable ex){thrownewCompletionException(ex);}}, executor);}//如果返回类型是ListenableFuture及其子类 elseif(ListenableFuture.class.isAssignableFrom(returnType)){return((AsyncListenableTaskExecutor) executor).submitListenable(task);}//如果返回类型是Future及其子类 elseif(Future.class.isAssignableFrom(returnType)){return executor.submit(task);}//如果返回类型是其他else{
            executor.submit(task);returnnull;}}/**
    * 异常处理器
    **/protectedvoidhandleError(Throwable ex,Method method,Object... params)throwsException{//如果返回类型是Future及其子类 发生异常 则直接丢出异常if(Future.class.isAssignableFrom(method.getReturnType())){ReflectionUtils.rethrowException(ex);}//否则 则走异常处理器else{// Could not transmit the exception to the caller with default executortry{this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);}catch(Throwable ex2){
                logger.warn("Exception handler for async method '"+ method.toGenericString()+"' threw unexpected exception itself", ex2);}}}}

到此为止,源码已经分析的差不多了,我们阔以得出几个重点:

  • AsyncConfigurer实现类可以设置默认线程池和默认异常处理器
  • @Async的value是支持指定线程池
  • @Async是支持全局异常处理器
  • @Async注解的方法是可以支持返回类型为CompletableFuture、ListenableFuture、Future

总结

线程池获取优先级

当@Async中value值没有指定线程池

  • 首先使用 AsyncConfigurer类中配置的默认线程池
  • 如果没有配置AsyncConfigurer类,那么将使用容器TaskExecutor的实现类
  • 如果容器中有多个TaskExecutor个实现类,将会使用beanname="taskExecutor"的Executor实现类
  • 如果容器中没有有TaskExecutor实现类,将会使用beanname="taskExecutor"的Executor实现类
  • 如果容器中没有beanname="taskExecutor"的Executor实现类,将会new出一个SimpleAsyncTaskExecutor线程池

/最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程

当@Async中value值指定了线程池beanname,可以根据业务进行线程池级别隔离

  • 取出容器中beanname=(@Async注解value值)的Executor实现类

如果没有取到相应的线程池,比如beanname写错导致取不到相应线程池将会抛出异常

异常处理器

返回类型为Future及其子类时

  • 直接抛出异常

返回类型不是Future及其子类

  • 当AsyncConfigurer设置了默认异常处理器,则走此异常处理器
  • 如果没有设置AsyncConfigurer异常处理器,则走SimpleAsyncUncaughtExceptionHandler异常处理器,此处理器是直接打印日志

方法返回类型

CompletableFuture及其子类

//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokeCallable<Object> task =()->{try{//@Async注释的方法调用Object result = invocation.proceed();//如果是Future类型 调用get获取结果值if(result instanceofFuture){return((Future<?>) result).get();}}catch(ExecutionException ex){handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch(Throwable ex){handleError(ex, userDeclaredMethod, invocation.getArguments());}returnnull;};//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmitif(CompletableFuture.class.isAssignableFrom(returnType)){//@Async注释的方法返回类型如果为CompletableFuture及其子类//就使用线程池执行并封装成CompletableFuture返回returnCompletableFuture.supplyAsync(()->{try{return task.call();}catch(Throwable ex){thrownewCompletionException(ex);}}, executor);}

ListenableFuture及其子类

//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokeCallable<Object> task =()->{try{//@Async注释的方法调用Object result = invocation.proceed();//如果是Future类型 调用get获取结果值if(result instanceofFuture){return((Future<?>) result).get();}}catch(ExecutionException ex){handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch(Throwable ex){handleError(ex, userDeclaredMethod, invocation.getArguments());}returnnull;};//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmitif(ListenableFuture.class.isAssignableFrom(returnType)){//@Async注释的方法返回类型如果为ListenableFuture及其子类//就使用线程池执行并返回ListenableFuturereturn((AsyncListenableTaskExecutor) executor).submitListenable(task);}

注意ListenableFuture.addCallback()添加回调函数时,如果异步任务还未执行完成,则回调函数由异步任务线程执行,如果异步任务已经执行完成,则是当前掉addCallback函数的线程调用回调函数

Future及其子类

//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokeCallable<Object> task =()->{try{//@Async注释的方法调用Object result = invocation.proceed();//如果是Future类型 调用get获取结果值if(result instanceofFuture){return((Future<?>) result).get();}}catch(ExecutionException ex){handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch(Throwable ex){handleError(ex, userDeclaredMethod, invocation.getArguments());}returnnull;};//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmitif(Future.class.isAssignableFrom(returnType)){//@Async注释的方法返回类型如果为Future及其子类//就使用线程池执行并返回Futurereturn executor.submit(task);}

其他

源码分析
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokeCallable<Object> task =()->{try{//@Async注释的方法调用Object result = invocation.proceed();//如果是Future类型 调用get获取结果值if(result instanceofFuture){return((Future<?>) result).get();}}catch(ExecutionException ex){handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch(Throwable ex){handleError(ex, userDeclaredMethod, invocation.getArguments());}returnnull;};//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit//@Async注释的方法返回类型如果是非Future//使用线程池执行后 直接返回null
        executor.submit(task);returnnull;
当返回值为void时无返回值示例
@RestController@RequestMapping("/testasync")publicclassTestAsyncController{@AutowiredprivateAsyncTestService asyncTestService;@GetMapping("/test05")publicvoidtest05(){

        asyncTestService.invokeAsyncTest05();}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async@OverridepublicvoidinvokeAsyncTest05(){System.out.println(Thread.currentThread()+"运行了invokeAsyncTest05方法!");}}
当返回值为非Futute类型示例
  • 返回的结果为空,如果要异步结果,请用Future封装返回结果
@RestController@RequestMapping("/testasync")publicclassTestAsyncController{@AutowiredprivateAsyncTestService asyncTestService;@GetMapping("/test07")publicvoidtest07(){//永远为null 如果要异步结果 请用Future封装返回结果List<String> result = asyncTestService.invokeAsyncTest07();System.out.println(result);}}
@ServicepublicclassAsyncTestServiceImplimplementsAsyncTestService{@Async@OverridepublicList<String>invokeAsyncTest07(){System.out.println(Thread.currentThread()+"invokeAsyncTest07!");List<String> result =Arrays.asList("Hello World1","Hello World2");return result;}}

思考

  • 容器中beanname="taskExecutor"的默认线程池是何时注入容器中的
  • ListenableFuture的子类ListenableFutureTask的addCallback()添加的回调函数是被哪个线程调用的
标签: java spring

本文转载自: https://blog.csdn.net/yubaojin/article/details/116355735
版权归原作者 Yux_o 所有, 如有侵权,请联系我们删除。

“@EnableAsync的使用、进阶、源码分析”的评论:

还没有评论