目录
一、@Async注解
@Async的作用就是异步处理任务。
- 在方法上添加@Async,表示此方法是异步方法;
- 在类上添加@Async,表示类中的所有方法都是异步方法;
- 使用此注解的类,必须是Spring管理的类;
- 需要在启动类或配置类中加入@EnableAsync注解,@Async才会生效;
在使用@Async时,如果不指定线程池的名称,也就是不自定义线程池,@Async是有默认线程池的,使用的是Spring默认的线程池SimpleAsyncTaskExecutor。
默认线程池的默认配置如下:
- 默认核心线程数:8;
- 最大线程数:Integet.MAX_VALUE;
- 队列使用LinkedBlockingQueue;
- 容量是:Integet.MAX_VALUE;
- 空闲线程保留时间:60s;
- 线程池拒绝策略:AbortPolicy;
从最大线程数可以看出,在并发情况下,会无限制的创建线程,我勒个吗啊。
也可以通过yml重新配置:
spring:
task:
execution:
pool:
max-size: 10
core-size: 5
keep-alive: 3s
queue-capacity: 1000
thread-name-prefix: my-executor
也可以自定义线程池,下面通过简单的代码来实现以下@Async自定义线程池。
二、代码实例
Spring为任务调度与异步方法执行提供了注解@Async支持,通过在方法上标注@Async注解,可使得方法被异步调用。在需要异步执行的方法上加入@Async注解,并指定使用的线程池,当然可以不指定,直接写@Async。
1、导入POM
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.0.1-jre</version></dependency>
2、配置类
packagecom.nezhac.config;importcom.google.common.util.concurrent.ThreadFactoryBuilder;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.scheduling.annotation.EnableAsync;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;importjava.util.concurrent.*;@EnableAsync// 支持异步操作@ConfigurationpublicclassAsyncTaskConfig{/**
* com.google.guava中的线程池
* @return
*/@Bean("my-executor")publicExecutorfirstExecutor(){ThreadFactory threadFactory =newThreadFactoryBuilder().setNameFormat("my-executor").build();// 获取CPU的处理器数量int curSystemThreads =Runtime.getRuntime().availableProcessors()*2;ThreadPoolExecutor threadPool =newThreadPoolExecutor(curSystemThreads,100,200,TimeUnit.SECONDS,newLinkedBlockingQueue<>(), threadFactory);
threadPool.allowsCoreThreadTimeOut();return threadPool;}/**
* Spring线程池
* @return
*/@Bean("async-executor")publicExecutorasyncExecutor(){ThreadPoolTaskExecutor taskExecutor =newThreadPoolTaskExecutor();// 核心线程数
taskExecutor.setCorePoolSize(10);// 线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(100);// 缓存队列
taskExecutor.setQueueCapacity(50);// 空闲时间,当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(200);// 异步方法内部线程名称
taskExecutor.setThreadNamePrefix("async-executor-");/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();return taskExecutor;}}
3、controller
packagecom.nezha.controller;importcom.nezha.service.UserService;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.annotation.Async;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/test")publicclassUserController{privatestaticfinalLogger logger =LoggerFactory.getLogger(UserController.class);@AutowiredprivateUserService userService;@GetMapping("asyncTest")publicvoidasyncTest(){
logger.info("哪吒真帅");
userService.asyncTest();asyncTest2();
logger.info("哪吒编程,每日更新Java干货");}@Async("my-executor")publicvoidasyncTest2(){
logger.info("同文件内执行执行异步任务");}}
4、service
packagecom.nezha.service;publicinterfaceUserService{// 普通方法voidtest();// 异步方法voidasyncTest();}
service实现类
packagecom.nezha.service;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.scheduling.annotation.Async;importorg.springframework.stereotype.Service;@ServicepublicclassUserServiceImplimplementsUserService{privatestaticfinalLogger logger =LoggerFactory.getLogger(UserServiceImpl.class);@Overridepublicvoidtest(){
logger.info("执行普通任务");}@Async("my-executor")@OverridepublicvoidasyncTest(){
logger.info("执行异步任务");}}
三、发现同文件内执行异步任务,还是一个线程,没有实现@Async效果,why?
众里寻他千百度,查到了@Async失效的几个原因:
- 注解@Async的方法不是public方法;
- 注解@Async的返回值只能为void或Future;
- 注解@Async方法使用static修饰也会失效;
- 没加@EnableAsync注解;
- 调用方和@Async不能在一个类中;
- 在Async方法上标注@Transactional是没用的,但在Async方法调用的方法上标注@Transcational是有效的;
这里就不一一演示了,有兴趣的小伙伴可以研究一下。
四、配置中分别使用了ThreadPoolTaskExecutor和ThreadPoolExecutor,这两个有啥区别?
ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装。
1、initialize()
查看一下ThreadPoolTaskExecutor 的 initialize()方法
publicabstractclassExecutorConfigurationSupportextendsCustomizableThreadFactoryimplementsBeanNameAware,InitializingBean,DisposableBean{.../**
* Set up the ExecutorService.
*/publicvoidinitialize(){if(logger.isInfoEnabled()){
logger.info("Initializing ExecutorService"+(this.beanName !=null?" '"+this.beanName +"'":""));}if(!this.threadNamePrefixSet &&this.beanName !=null){setThreadNamePrefix(this.beanName +"-");}this.executor =initializeExecutor(this.threadFactory,this.rejectedExecutionHandler);}/**
* Create the target {@link java.util.concurrent.ExecutorService} instance.
* Called by {@code afterPropertiesSet}.
* @param threadFactory the ThreadFactory to use
* @param rejectedExecutionHandler the RejectedExecutionHandler to use
* @return a new ExecutorService instance
* @see #afterPropertiesSet()
*/protectedabstractExecutorServiceinitializeExecutor(ThreadFactory threadFactory,RejectedExecutionHandler rejectedExecutionHandler);...}
2、initializeExecutor抽象方法
再查看一下initializeExecutor抽象方法的具体实现类,其中有一个就是ThreadPoolTaskExecutor类,查看它的initializeExecutor方法,使用的就是ThreadPoolExecutor。
publicclassThreadPoolTaskExecutorextendsExecutorConfigurationSupportimplementsAsyncListenableTaskExecutor,SchedulingTaskExecutor{...@OverrideprotectedExecutorServiceinitializeExecutor(ThreadFactory threadFactory,RejectedExecutionHandler rejectedExecutionHandler){BlockingQueue<Runnable> queue =createQueue(this.queueCapacity);ThreadPoolExecutor executor;if(this.taskDecorator !=null){
executor =newThreadPoolExecutor(this.corePoolSize,this.maxPoolSize,this.keepAliveSeconds,TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler){@Overridepublicvoidexecute(Runnable command){Runnable decorated = taskDecorator.decorate(command);if(decorated != command){
decoratedTaskMap.put(decorated, command);}super.execute(decorated);}};}else{
executor =newThreadPoolExecutor(this.corePoolSize,this.maxPoolSize,this.keepAliveSeconds,TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);}if(this.allowCoreThreadTimeOut){
executor.allowCoreThreadTimeOut(true);}this.threadPoolExecutor = executor;return executor;}...}
因此可以了解到ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装。
五、核心线程数
配置文件中的线程池核心线程数为何配置为
// 获取CPU的处理器数量int curSystemThreads =Runtime.getRuntime().availableProcessors()*2;
Runtime.getRuntime().availableProcessors()获取的是CPU核心线程数,也就是计算资源。
- CPU密集型,线程池大小设置为N,也就是和cpu的线程数相同,可以尽可能地避免线程间上下文切换,但在实际开发中,一般会设置为N+1,为了防止意外情况出现线程阻塞,如果出现阻塞,多出来的线程会继续执行任务,保证CPU的利用效率。
- IO密集型,线程池大小设置为2N,这个数是根据业务压测出来的,如果不涉及业务就使用推荐。
在实际中,需要对具体的线程池大小进行调整,可以通过压测及机器设备现状,进行调整大小。
如果线程池太大,则会造成CPU不断的切换,对整个系统性能也不会有太大的提升,反而会导致系统缓慢。
六、线程池执行过程
Java高并发编程实战系列文章
Java高并发编程实战1,那些年学过的锁
Java高并发编程实战2,原子性、可见性、有序性,傻傻分不清
Java高并发编程实战3,Java内存模型与Java对象结构
Java高并发编程实战4,synchronized与Lock底层原理
哪吒精品系列文章
Java学习路线总结,搬砖工逆袭Java架构师
10万字208道Java经典面试题总结(附答案)
SQL性能优化的21个小技巧
Java基础教程系列
Spring Boot 进阶实战
版权归原作者 哪 吒 所有, 如有侵权,请联系我们删除。