一.前言
在整理老的业务逻辑代码时候发现好多接口实现上面都标记了 @Async注解。我本身对这个注解使用的比较少,异步逻辑我都习惯自定义ThreadPoolExecutor工具类。正好借着这次梳理代码结构,来看看 @Async这个注解到底在玩什么?
本文将会给大家从 @Async注解使用层面入手逐步解读源码,分析各种踩坑实践,并且扩展sleuth链路追踪与线程变量如何花式应用。
二.尝鲜使用
Spring中,被 @Async注解标注的方法,称之为异步方法。这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作,是spring默认提供的异步调用方式。
2.1.使用方式
使用 @Async进行异步变成的方式特别简单。
- 在
启动类
或者能被启动类扫描到的配置类
上标注@EnableAsync
- 在
被spring管理的bean
的方法上标注@Async()
调用方法
与被调用方法
不在同一个bean中。
仔细品味一下上面三个限制条件,任意一个不满足,均会导致 @Async无法生效。
2.2.最简单的demo演示
启动类定义
@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
复制代码
controller层方法定义
@RestController
@RequestMapping
public class TestController {
@Autowired
TestService testService;
@GetMapping()
public void test(){
for (int i = 0; i <5 ; i++) {
testService.testAsync();
}
}
}
复制代码
service方法定义
@Service
@Slf4j
public class TestServiceImpl implements TestService {
@Override
@Async
public void testAsync(){
log.info("嘻嘻");
}
}
复制代码
日志输出
- 2021-09-15 19:39:54.300,[http-nio-8088-exec-5], com.examp.controller.TestController - 嘿嘿
- 2021-09-15 19:36:54.302,[task-5], com.examp.service.impl.TestServiceImpl - 嘻嘻
- 2021-09-15 19:36:54.302,[task-4], com.examp.service.impl.TestServiceImpl - 嘻嘻
- 2021-09-15 19:36:54.302,[task-1], com.examp.service.impl.TestServiceImpl - 嘻嘻
- 2021-09-15 19:36:54.302,[task-2], com.examp.service.impl.TestServiceImpl - 嘻嘻
- 2021-09-15 19:36:54.302,[task-3], com.examp.service.impl.TestServiceImpl - 嘻嘻
复制代码
从日志打印可以发现,
controller方法打印
与
service层方法
打印日志使用的是
不同的线程
。
使用是真滴简单!
2.3.踩坑提问
前面两点可以看到我们使用
@Async
进行异步变成是真的简单,但是里面也埋伏了各种各样的坑点。
先抛出问题,大家可以先思考:
- 为什么阿里不推荐直接使用@Async
- @Async标注的方法是否事务一致
- 同一个类里面A->B,B方法上标注了@Async,为了调用成功,在类中注入当前类方式能否异步调用成功
- @Async标注的方法能否读取到ThreadLocal的变量
- @Async标注的方法能否获取返回值
- slueth链路追踪的traceId能否追踪到线程池内、
三.源码分析
废话不多说,看看源码。
3.1.@Async
/**
* 该注解可以标记一个异步执行的方法,也可以用来标注类,表示类中的所有方法都是异步执行的。
* 入参随意,但返回值只能是void或者Future.(ListenableFuture接口/CompletableFuture类)
* Future是代理返回的切实的异步返回,用以追踪异步方法的返回值。当然也可以使用AsyncResult类(实现
* ListenableFuture接口)(Spring或者EJB都有)或者CompletableFuture类
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
//用以限定执行方法的执行器名称(自定义):Executor或者TaskExecutor
//加在类上表示整个类都使用,加在方法上会覆盖类上的设置
String value() default "";
}
复制代码
3.2.@EnableAsync
/**
* 开启spring异步执行器,类似xml中的task标签配置,需要联合@Configuration注解一起使用,对应文章开头,注解需* 要标注在启动类或者能被启动类扫描到的配置类上。
*
* 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为taskExecutor的Executor类型的bean,都不存在使* 用SimpleAsyncTaskExecutor执行器
*
* 可实现AsyncConfigurer接口复写getAsyncExecutor获取异步执行器,getAsyncUncaughtExceptionHandler获* 取异步未捕获异常处理器
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
// 该属性用来支持用户自定义异步注解,默认扫描spring的@Async和EJB3.1的@code @javax.ejb.Asynchronous
Class<? extends Annotation> annotation() default Annotation.class;
//标明是否需要创建CGLIB子类代理,AdviceMode=PROXY时才适用。注意设置为true时,其它spring管理的bean也会升级到CGLIB子类代理
boolean proxyTargetClass() default false;
//标明异步通知将会如何实现,默认PROXY,如需支持同一个类中非异步方法调用另一个异步方法,需要设置为ASPECTJ
AdviceMode mode() default AdviceMode.PROXY;
//标明异步注解bean处理器应该遵循的执行顺序,默认最低的优先级(Integer.MAX_VALUE,值越小优先级越高)
int order() default Ordered.LOWEST_PRECEDENCE;
}
复制代码
3.3.AsyncConfigurationSelector
@EnableAsync注解是非常明显的一个启动注解,几乎所有spring的开启配置类的注解都是以
@Enable
开头的。方法里面一看也是熟悉的老朋友 @Import
//查询器:基于@EanableAsync中定义的模式AdviceMode加在@Configuration标记的类上,确定抽象异步配置类的实现
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
//根据当前注解标注的方法坐在类的代理方式决定代理模式
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY://jdk接口代理
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ://cglib代理
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
复制代码
类还是比较简单的,就是决定使用什么代理模式
3.4.ProxyAsyncConfiguration
以jdk接口代理为例,瞅一瞅看看
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
//判断注解元数据信息是否拿到
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
//新建一个异步注解bean后处理器
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
//配置执行器与异常处理器
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
//设置是否升级到CGLIB子类代理,默认不开启
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
//设置执行优先级,默认最后执行
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
复制代码
配置类操作也比较清晰,就是集成AbstractAsyncConfiguration抽象类,新建AsyncAnnotationBeanPostProcessor后置处理器并进行参数初始化。
3.5.AbstractAsyncConfiguration
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
//enableAsync注解属性
@Nullable
protected AnnotationAttributes enableAsync;
//线程执行器
@Nullable
protected Supplier<Executor> executor;
//异常执行器
@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
//设置注解元数据信息
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
//根据配置设置异步任务执行器和异常处理器
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
}
复制代码
3.6.AsyncAnnotationBeanPostProcessor
由依赖图可知,这是一个bean的后置处理器
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
//省略部分代码
//初始化异步处理切面
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//初始化切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
复制代码
3.7.AsyncAnnotationAdvisor
芜湖,发现了,这是一个异步注解的切面,定义了@Async解析的解析的切面,具体处理在AbstractAdvisingBeanPostProcessor.postProcessAfterInitialization方法
3.8.Interceptor切面执行
Interceptor接口的invoke方法,
断点打在接口方法处,逐步下去,找到AsyncExecutionInterceptor类
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//定义任务
Callable<Object> task = () -> {
try {
//切点执行
Object result = invocation.proceed();
//如果代理方法的返回值是Future,则阻塞等待执行完毕得到结果
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//提交有任务给执行器
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
复制代码
再看最后的核心方法doSubmit
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//返回值是CompletableFuture处理
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
//返回值是ListenableFuture处理
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
//返回值是ListenableFuture处理
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
//其他情况(无参或者非以上返回参数,例如方法返回参数是string)
else {
//断点处
executor.submit(task);
return null;
}
}
复制代码
再次执行demo的方法,打断点在上面的代码行
发现任务被提交给了一个叫做applicationTaskExecutor的任务执行器
线程池参数:
核心线程数:8
队列大小与最大线程池都是Interger.Max。
3.9.小结
初始化解析@Async注解的切面类
解析@Async,切点织入执行
四.问题回顾
4.1.为什么阿里不推荐直接使用@Async
大家在2.8结尾能发现,默认情况下,
@Async
对应的线程池配置
最大线程数
与
最大队列数
为
2147483647
。
也就是说,如果被标记的
@Async
的方法,某个事件并发量突然变高,系统的负载会瞬间变高,夸张一点直接down机。
那@Async这个注解就不能使用了吗?
答案肯定是
否定的
。
在
@EnableAsync
注解的类注释上已经说了,允许自定义线程池的bean来替换掉系统默认线程池。
搜索一下applicationTaskExecutor,找到TaskExecutionAutoConfiguration
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {
/**
* Bean name of the application {@link TaskExecutor}.
*/
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
ObjectProvider<TaskDecorator> taskDecorator) {
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
builder = builder.taskDecorator(taskDecorator.getIfUnique());
return builder;
}
//系统中不存在Executor的bean时,默认加载名为:applicationTaskExecutor的bean。这里的bean即为实际切面执行时,从beanfactory里面获取的beanName叫做taskExecutor的线程池进行执行异步任务
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
}
复制代码
**因此,如果你想要使用@Async,请务必增加
自定义线程池
返回的bean**
例如
@Configuration
public class ThreadPoolTaskConfig {
/** 核心线程数(默认线程数) */
private static final int CORE_POOL_SIZE = 5;
/** 最大线程数 */
private static final int MAX_POOL_SIZE = 10;
/** 允许线程空闲时间(单位:默认为秒) */
private static final int KEEP_ALIVE_TIME = 10;
/** 缓冲队列大小 */
private static final int QUEUE_CAPACITY = 200;
/** 线程池名前缀 */
private static final String THREAD_NAME_PREFIX = "Async-Service-";
@Bean
public ThreadPoolTaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(KEEP_ALIVE_TIME);
executor.setKeepAliveSeconds(QUEUE_CAPACITY);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
复制代码
这里还要注意的是,当你的spring版本是2.1之前的版本时,这里是没有applicationTaskExecutor这个默认配置的线程池,而是SimpleAsyncTaskExecutor,他的submit方法就是在不断的新建线程去执行任务,也是及其消耗资源的,因此这里也是不推荐直接使用的。
4.2.@Async标注的方法是否事务一致
从分析可知,
@Async
标注的方法所在的类是被代理的,并且是使用线程池线程执行的。
例如现在调用链式:
UserService.test1()-> UserService.test2()-> UserService.test3()与UserRoleService.test1()
UserService.test()2方法上标注了 @Async
@Service
@Slf4j
public class UserServiceImpl implements UserService {
@Autowired
UserRoleService userRoleService;
@Override
@Transactional(rollbackFor = Exception.class)
public void test3(){
//省略save逻辑
save();
}
/**
* 事务一致
*/
@Override
@Async
@Transactional(rollbackFor = Exception.class)
public void test2(){
test3();
userRoleService.test1();
}
/**
* 事务不一致
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void test1(){
//省略save逻辑
save();
test2();
}
}
@Service
public class UserRoleServiceImpl implements UserRoleService {
@Override
@Transactional(rollbackFor = Exception.class)
public void test1(){
//省略save逻辑
save();
throw new RuntimeException("1");
}
}
复制代码
哈哈哈,这里如果你保证代码不报错,你爱怎么控制事务就怎么控制事务,反正一定会落库成功,也就不会回滚,有没有事务也无所谓。当然这个是废话,代码不报错,事务不控制对于生产来说那是不可能的。
其实就是
事务管理跨了线程,那么不同线程的事务就不能保证一致了。
4.3.同一个类里面A->B,B方法上标注了@Async,为了调用成功,在类中注入当前类方式能否异步调用成功
先来看一下测试代码
@Service
@Slf4j
public class UserServiceImpl implements UserService {
@Autowired
UserService userService;
@Override
public void testAsync(){
userService.testAsync1();
}
@Override
@Async()
public void testAsync1(){
log.info("嘻嘻1");
}
}
复制代码
我们知道spring默认帮我们解决了循环依赖
循环依赖:A类中注入属性B类,B类属性中注入A类;A类注入自身 都被称为循环依赖。默认情况下spring使用的三级缓存给我们解决了这个情况。关于三级缓存感兴趣的可以百度看看网上的解析,后续我也会抽时间出一篇解析的文章。
我们来启动一下,发现启动失败,报错如下
Caused by: org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'userServiceImpl': Bean with name 'userServiceImpl' has been injected into other beans [userServiceImpl] in its raw version as part of a circular reference, but has eventually been wrapped. This means that said other beans do not use the final version of the bean. This is often the result of over-eager type matching - consider using 'getBeanNamesForType' with the 'allowEagerInit' flag turned off, for example.
复制代码
意思就是发生了循环依赖,陷入了死循环,启动失败了!
诶,上面不是说spring给我们解决了循环依赖吗?为什么加了@Async注解后启动失败了?
考虑到部分读者不太了解三级缓存,这里直接给出文字版结论,感兴趣的可以去结合上面的源码进行debug分析。
结论:
将UserService放入二级缓存时会校验代理,但是生成代理是判断对应的后置处理器类型是否为:SmartInstantiationAwareBeanPostProcessor,上文2.6可以看到,@Async解析的后置处理器AsyncAnnotationBeanPostProcessor仅仅是BeanPostProcessor类型,因此早期构造方法暴露出来的是原始对象。因此UserService关联到自己本身是原始对象。但是后面到AsyncAnnotationBeanPostProcessor后置处理器处理时将UserService包装成了一个代理对象。因此这时在最后的校验逻辑里面发现类
UserService
内部属性依赖的是
userService
实际bean,与代理对象不相等。进入到下层自检程序,最终报错。
这里连启动都无法启动,答案自然是不能调用成功。
因此,
异步调用需要成功,则必须限制调用方法需要是两个不同bean的方法,即为@Async标注的方法必须能够被切面类代理到
。
4.4.@Async标注的方法能否读取到ThreadLocal的变量
本质意义上@Async还是使用的是线程池,因此直接使用
ThreadLocal
,是无法将线程变量从主线程同步至线程池内的线程,需要使用阿里开源线程变量:TransmittableThreadLocal。
Threadlocal知识可以参考:一文吃透ThreadLocal的前世与今生
4.5.@Async标注的方法能否获取返回值
从2.8的invoke方法得知,方法直接返回值无法获取,将得到null,可以通过Future类进行返回值获取。
4.6.slueth链路追踪的traceId能否追踪到线程池内
可以
pom引入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
复制代码
再次执行
- 2021-09-15 23:13:13.502, INFO, [7f6570e5ad87cebe,7f6570e5ad87cebe,], [http-nio-8088-exec-2], com.examp.controller.TestController - 嘿嘿
- 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,6c4ef07be0f53434,7f6570e5ad87cebe], [Async-Service-2], com.examp.service.impl.TestServiceImpl - 嘻嘻
- 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,8ff5d5ce00c93add,7f6570e5ad87cebe], [Async-Service-1], com.examp.service.impl.TestServiceImpl - 嘻嘻
- 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,99b553b90241b79e,7f6570e5ad87cebe], [Async-Service-3], com.examp.service.impl.TestServiceImpl - 嘻嘻
- 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,42c54ecd36df454c,7f6570e5ad87cebe], [Async-Service-4], com.examp.service.impl.TestServiceImpl - 嘻嘻
- 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,176b3679aa72038e,7f6570e5ad87cebe], [Async-Service-5], com.examp.service.impl.TestServiceImpl - 嘻嘻
复制代码
依旧回到2.8方法断点处查看
类型变成了LazyTraceThreadPoolTaskExecutor,是ThreadPoolTaskExecutor的子类
查看LazyTraceThreadPoolTaskExecutor的调用链得知,在ExecutorBeanPostProcessor后置处理器中代理了系统内ThreadPoolTaskExecutor类型的bean。也就是说,本质意义上执行任务的还是你自定义的ThreadPoolTaskExecutor,只是ExecutorBeanPostProcessor做了链路追踪的增强。
五.总结
本文从
@Async
的简单使用入手,进行了
@Async
源码的分析,最后对
@Async
使用过程中可能出现的问题做了总结与解析。当然@Async虽香,还是不要一股脑把整个系统的异步任务都放在一个线程池中。不同业务的大量的异步任务尽量分离线程池处理。
版权归原作者 Java技术攻略 所有, 如有侵权,请联系我们删除。