0


《Spring 高手系列》(异步)笔记

参考链接1

EnableAsync

源码

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AsyncConfigurationSelector.class)public@interfaceEnableAsync{Class<?extendsAnnotation>annotation()defaultAnnotation.class;booleanproxyTargetClass()defaultfalse;AdviceModemode()defaultAdviceMode.PROXY;intorder()defaultOrdered.LOWEST_PRECEDENCE;}

修饰范围:类型

AdviceMode

publicenumAdviceMode{/**
     * JDK proxy-based advice.
     */PROXY,/**
     * AspectJ weaving-based advice.
     */ASPECTJ}

AsyncConfigurationSelector

publicclassAsyncConfigurationSelectorextendsAdviceModeImportSelector<EnableAsync>{privatestaticfinalStringASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME="org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";@Override@NullablepublicString[]selectImports(AdviceMode adviceMode){switch(adviceMode){casePROXY:returnnewString[]{ProxyAsyncConfiguration.class.getName()};caseASPECTJ:returnnewString[]{ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:returnnull;}}}

Async

@Target({ElementType.TYPE,ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceAsync{Stringvalue()default"";}

修饰范围:类型、方法。

案例1 无返回值

@ComponentpublicclassLogService{@Asyncpublicvoidlog(String msg)throwsInterruptedException{System.out.println(Thread.currentThread()+"开始记录日志,"+System.currentTimeMillis());//模拟耗时2秒TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread()+"日志记录完毕,"+System.currentTimeMillis());}}
@ComponentScan@EnableAsyncpublicclassClient{publicstaticvoidmain(String[] args)throwsInterruptedException{AnnotationConfigApplicationContext context =newAnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();LogService logService = context.getBean(LogService.class);System.out.println(Thread.currentThread()+" logService.log start,"+System.currentTimeMillis());
        logService.log("异步执行方法!");System.out.println(Thread.currentThread()+" logService.log end,"+System.currentTimeMillis());TimeUnit.SECONDS.sleep(3);}}
Thread[main,5,main] logService.log start,166788757820014:06:18.209[main]DEBUGorg.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor-Could not find defaultTaskExecutor bean
org.springframework.beans.factory.NoSuchBeanDefinitionException:No qualifying bean of type 'org.springframework.core.task.TaskExecutor' available
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:351)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:342)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.getDefaultExecutor(AsyncExecutionAspectSupport.java:233)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.getDefaultExecutor(AsyncExecutionInterceptor.java:157)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$configure$2(AsyncExecutionAspectSupport.java:119)
    at org.springframework.util.function.SingletonSupplier.get(SingletonSupplier.java:100)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.determineAsyncExecutor(AsyncExecutionAspectSupport.java:172)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:107)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.example.lurenjia.spring.c37.d1.LogService$$EnhancerBySpringCGLIB$$ecabcff7.log(<generated>)
    at com.example.lurenjia.spring.c37.d1.Client.main(Client.java:23)14:06:18.210[main]INFOorg.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor-No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
Thread[main,5,main] logService.log end,1667887578212Thread[SimpleAsyncTaskExecutor-1,5,main]开始记录日志,1667887578220Thread[SimpleAsyncTaskExecutor-1,5,main]日志记录完毕,1667887580234

进程已结束,退出代码为 0

为什么不报错的原因在这里

AsyncExecutionInterceptor 这个类有一个getDefaultExecutor

@Override@NullableprotectedExecutorgetDefaultExecutor(@NullableBeanFactory beanFactory){Executor defaultExecutor =super.getDefaultExecutor(beanFactory);return(defaultExecutor !=null? defaultExecutor :newSimpleAsyncTaskExecutor());}

首先在容器中找TaskExecutor这个类型的bean如果有就用,没有就会创建一个SimpleAsyncTaskExecutor

想要修复这个错就得自定义一个线程池

@ConfigurationpublicclassTaskExecutorConfig{@BeanpublicThreadPoolTaskExecutortaskExecutor(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("my-thread-");return executor;}}

修复后输出

14:27:03.298[main]INFOorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor-InitializingExecutorService 'taskExecutor'
Thread[main,5,main] logService.log start,1667888823308Thread[main,5,main] logService.log end,1667888823310Thread[my-thread-1,5,main]开始记录日志,1667888823316Thread[my-thread-1,5,main]日志记录完毕,1667888825316

案例2 有返回值

@Async@ComponentpublicclassGoodsService{publicFuture<String>getGoodsInfo(long goodsId)throwsInterruptedException{TimeUnit.MILLISECONDS.sleep(500);returnAsyncResult.forValue(String.format("商品%s基本信息!", goodsId));}publicFuture<String>getGoodsDesc(long goodsId)throwsInterruptedException{TimeUnit.MILLISECONDS.sleep(500);returnAsyncResult.forValue(String.format("商品%s描述信息!", goodsId));}publicFuture<List<String>>getGoodsComments(long goodsId)throwsInterruptedException{TimeUnit.MILLISECONDS.sleep(500);List<String> comments =Arrays.asList("评论1","评论2");returnAsyncResult.forValue(comments);}}
@ConfigurationpublicclassTaskExecutorConfig{@BeanpublicThreadPoolTaskExecutortaskExecutor(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("d2-thread-");return executor;}}
@ComponentScan@EnableAsyncpublicclassClient{publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{AnnotationConfigApplicationContext context =newAnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();GoodsService goodsService = context.getBean(GoodsService.class);long starTime =System.currentTimeMillis();System.out.println("开始获取商品的各种信息");long goodsId =1L;Future<String> goodsInfoFuture = goodsService.getGoodsInfo(goodsId);Future<String> goodsDescFuture = goodsService.getGoodsDesc(goodsId);Future<List<String>> goodsCommentsFuture = goodsService.getGoodsComments(goodsId);System.out.println(goodsInfoFuture.get());System.out.println(goodsDescFuture.get());System.out.println(goodsCommentsFuture.get());System.out.println("商品信息获取完毕,总耗时(ms):"+(System.currentTimeMillis()- starTime));//休眠一下,防止@Test退出TimeUnit.SECONDS.sleep(3);}}
15:00:41.668[main]INFOorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor-InitializingExecutorService 'taskExecutor'
开始获取商品的各种信息
商品1基本信息!
商品1描述信息![评论1, 评论2]
商品信息获取完毕,总耗时(ms):521

自定义线程池

方式一

@ConfigurationpublicclassTaskExecutorConfig{@BeanpublicThreadPoolTaskExecutortaskExecutor(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("d2-thread-");return executor;}}

文章中说名称必须为taskExecutor,这个是不对的,将名字修改为任意值测试即可。

异常处理

在之前 juc 的篇目中我们知道Thread的runable方法只能通过UncaughtExceptionHandler来处理,future可以通过try-catch包裹get方法来捕获异常,在这里也是相同的道理。

有返回值

@ServicepublicclassLogService{@AsyncpublicFuture<String>mockException(){//模拟抛出一个异常thrownewIllegalArgumentException("参数有误!");}@AsyncpublicvoidmockNoReturnException(){//模拟抛出一个异常thrownewIllegalArgumentException("无返回值的异常!");}}
@ComponentScan(basePackageClasses ={TaskExecutorConfig.class,Client.class,LogService.class})@EnableAsyncpublicclassClient{privatestaticfinalLogger logger =LoggerFactory.getLogger(Client.class);publicstaticvoidmain(String[] args)throwsException{AnnotationConfigApplicationContext context =newAnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();LogService logService = context.getBean(LogService.class);try{Future<String> future = logService.mockException();System.out.println(future.get());}catch(Exception e){
            logger.error(e.getMessage());}}}

无返回值

@ComponentScan(basePackageClasses ={TaskExecutorConfig.class,Client.class,LogService.class})@EnableAsyncpublicclassClient{privatestaticfinalLogger logger =LoggerFactory.getLogger(Client.class);publicstaticvoidmain(String[] args)throwsException{AnnotationConfigApplicationContext context =newAnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();LogService logService = context.getBean(LogService.class);
        logService.mockNoReturnException();}}
ERRORorg.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler-Unexpected exception occurred invoking async method:publicvoidcom.example.lurenjia.spring.c37.service.LogService.mockNoReturnException()java.lang.IllegalArgumentException: 无返回值的异常!
    at com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException(LogService.java:25)
    at com.example.lurenjia.spring.c37.service.LogService$$FastClassBySpringCGLIB$$e4f93305.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我们还没有添加处理器直接打印的结果如下,观察打印有一个SimpleAsyncUncaughtExceptionHandler 这么一个类。

publicclassSimpleAsyncUncaughtExceptionHandlerimplementsAsyncUncaughtExceptionHandler{privatestaticfinalLog logger =LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);@OverridepublicvoidhandleUncaughtException(Throwable ex,Method method,Object... params){if(logger.isErrorEnabled()){
            logger.error("Unexpected exception occurred invoking async method: "+ method, ex);}}}

添加自定义处理器

@BeanpublicAsyncConfigurerasyncConfigurer(){returnnewAsyncConfigurer(){@Nullable@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(ex, method, params)->{String msg =String.format("方法[%s],参数[%s],发送异常了,异常详细信息:", method,Arrays.asList(params));System.out.println(msg);
                    ex.printStackTrace();};}};}

输出

方法[publicvoidcom.example.lurenjia.spring.c37.service.LogService.mockNoReturnException()],参数[[]],发送异常了,异常详细信息:java.lang.IllegalArgumentException: 无返回值的异常!
    at com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException(LogService.java:25)
    at com.example.lurenjia.spring.c37.service.LogService$$FastClassBySpringCGLIB$$e4f93305.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

原理:AbstractAsyncConfiguration这个类有一个bean

@Autowired(required =false)voidsetConfigurers(Collection<AsyncConfigurer> configurers){if(CollectionUtils.isEmpty(configurers)){return;}if(configurers.size()>1){thrownewIllegalStateException("Only one AsyncConfigurer may exist");}AsyncConfigurer configurer = configurers.iterator().next();this.executor = configurer::getAsyncExecutor;this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;}

线程池隔离

通过 @Async(“executor”) 来指定线程池 bean的名称。

@ServicepublicclassInsulateService{@Async("executor")publicvoida(){System.out.println(Thread.currentThread().getName());}@Async("executor2")publicvoidb(){System.out.println(Thread.currentThread().getName());}@Async("executor3")publicvoidc(){System.out.println(Thread.currentThread().getName());}}
@ConfigurationpublicclassTaskExecutorConfig{@BeanpublicThreadPoolTaskExecutorexecutor(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor-thread-");return executor;}@Bean("executor2")publicThreadPoolTaskExecutorexecutor2(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor2-thread-");return executor;}@Bean("executor3")publicThreadPoolTaskExecutorexecutor3(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor3-thread-");return executor;}}
@ComponentScan(basePackageClasses ={TaskExecutorConfig.class,com.example.lurenjia.spring.c37.d4.Client.class,LogService.class})@EnableAsyncpublicclassClient{publicstaticvoidmain(String[] args){AnnotationConfigApplicationContext context =newAnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();InsulateService service = context.getBean(InsulateService.class);
        service.a();
        service.b();
        service.c();}}
executor2-thread-1
executor3-thread-1
executor-thread-1

原理

内部使用aop实现的,@EnableAsync会引入一个bean后置处理器:AsyncAnnotationBeanPostProcessor,将其注册到spring容器,这个bean后置处理器在所有bean创建过程中,判断bean的类上是否有@Async注解或者类中是否有@Async标注的方法,如果有,会通过aop给这个bean生成代理对象,会在代理对象中添加一个切面:org.springframework.scheduling.annotation.AsyncAnnotationAdvisor,这个切面中会引入一个拦截器:AnnotationAsyncExecutionInterceptor,方法异步调用的关键代码就是在这个拦截器的invoke方法中实现的,可以去看一下。

AsyncAnnotationAdvisor

标签: spring

本文转载自: https://blog.csdn.net/qq_37151886/article/details/127749503
版权归原作者 newProxyInstance 所有, 如有侵权,请联系我们删除。

“《Spring 高手系列》(异步)笔记”的评论:

还没有评论