SpringBoot教程(二十) | SpringBoot实现异步操作
需求:某个方法里面存在多个处理逻辑的操作,其中有一个调有短信/邮箱的逻辑操作,如何不堵塞整个方法,实现异步操作
一、直接使用线程(Thread),实现"类似异步"操作
重点:线程本身不是异步的,但是由于线程之间是并发执行的,这确实会产生类似于异步行为的效果
以下是一个简单的Spring Boot案例,展示了如何在服务层中使用
Thread
来实现类似异步的操作。
首先,我们需要一个服务类,该类将包含一个方法,该方法将使用
Thread
来异步执行某些任务:
importorg.springframework.stereotype.Service;@ServicepublicclassAsyncService{// 使用Thread来模拟异步操作publicvoidperformAsyncTask(String taskName){// 创建一个新的线程来执行异步任务newThread(()->{// 模拟耗时的异步操作try{System.out.println(Thread.currentThread().getName()+" 开始执行异步任务: "+ taskName);Thread.sleep(2000);// 假设这个任务需要2秒钟来完成System.out.println(Thread.currentThread().getName()+" 异步任务完成: "+ taskName);}catch(InterruptedException e){Thread.currentThread().interrupt();System.err.println("异步任务被中断: "+ taskName);}}).start();// 启动线程}}
然后,我们需要一个控制器来触发这个异步服务:
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassAsyncController{@AutowiredprivateAsyncService asyncService;@GetMapping("/startAsyncTask")publicStringstartAsyncTask(@RequestParamString taskName){// 调用服务层的方法来启动异步任务
asyncService.performAsyncTask(taskName);// 注意:这里不会等待异步任务完成,而是立即返回return"异步任务已启动: "+ taskName;}}
在这个例子中,当客户端向
/startAsyncTask
端点发送请求时,
AsyncController
会调用
AsyncService
中的
performAsyncTask
方法。
performAsyncTask
方法会创建一个新的线程来执行耗时的异步操作,而不会阻塞主线程(即处理HTTP请求的线程)。因此,客户端会立即收到响应,而不需要等待异步任务完成。
然而,需要注意的是,直接使用
Thread
类来管理异步任务可能会导致一些问题,比如:
- 资源管理:如果不当心地管理线程,可能会导致资源耗尽(如创建过多的线程)。
- 异常处理:在异步线程中抛出的异常可能不会被主线程捕获,除非你使用某种机制(如
Future
或CompletableFuture
)来跟踪异步任务的结果。 - 结果同步:如果你需要等待异步任务的结果,那么你需要自己实现同步机制(如使用
CountDownLatch
、Semaphore
或CyclicBarrier
等)。
二、使用@Async+@EnableAsync注解(Spring的异步支持-更为常用)
@Async
用于标记一个方法应该异步执行。
@EnableAsync
用于启用Spring的异步方法执行能力,并触发Spring去查找和配置所有被
@Async
注解的方法。
这两个注解通常一起使用,以在Spring应用中实现异步编程
在Spring Boot中,如果使用了@EnableAsync注解并且没有显式配置TaskExecutor,
Spring Boot可能会自动配置一个基于ThreadPoolTaskExecutor的TaskExecutor来执行异步方法,
但具体的配置取决于Spring Boot的版本和可用的配置属性。
在 Spring Boot 应用中,
@EnableAsync
注解确实可以加在配置类(通常是带有
@Configuration
注解的类)上,
也可以加在启动类(通常是带有
@SpringBootApplication
注解的类)上。
不过,通常情况下,你只需要在其中一个地方加上这个注解即可,
因为 Spring Boot 会扫描并应用启动类和所有配置类上的注解。
1. @EnableAsync 加在启动类上
启动类(Application.java)
@SpringBootApplication@EnableAsyncpublicclassAsyncApplication{publicstaticvoidmain(String[] args){SpringApplication.run(AsyncApplication.class, args);}}
服务层接口(AsyncService.java)
publicinterfaceAsyncService{voidexecuteAsyncTask(String taskName);}
服务层实现(AsyncServiceImpl.java)
@ServicepublicclassAsyncServiceImplimplementsAsyncService{@Async@OverridepublicvoidexecuteAsyncTask(String taskName){System.out.println("Executing "+ taskName +" in "+Thread.currentThread().getName());}}
控制层(AsyncController.java)
@RestController@RequestMapping("/async")publicclassAsyncController{@AutowiredprivateAsyncService asyncService;@GetMapping("/task")publicResponseEntity<String>startAsyncTask(@RequestParamString taskName){
asyncService.executeAsyncTask(taskName);returnResponseEntity.ok("Async task "+ taskName +" started.");}}
2. @EnableAsync 加在配置类上
在Spring Boot中,如果使用了@EnableAsync注解并且没有显式配置TaskExecutor,
Spring Boot可能
会自动配置一个基于ThreadPoolTaskExecutor的TaskExecutor来执行异步方法,
但具体的配置取决于Spring Boot的版本和可用的配置属性。
配置类(AsyncConfig.java)
@Configuration@EnableAsyncpublicclassAsyncConfig{// 这里可以 配置自定义的 TaskExecutor,但在这个例子中我们保持简单}
启动类(Application.java)、服务层接口(AsyncService.java)、服务层实现(AsyncServiceImpl.java) 和 控制层(AsyncController.java) 与上面的例子完全相同。
如果都加会有影响吗?
如果你同时在启动类和配置类上加了
@EnableAsync
,通常不会有负面影响。
Spring Boot 会智能地处理这种情况,确保
@EnableAsync
的效果只被应用一次。
然而,这种做法是不必要的,因为它没有提供额外的功能或灵活性,反而可能让其他开发者感到困惑。
因此,建议只在一个地方(通常是启动类)加上
@EnableAsync
注解,以保持代码的清晰和一致性。
扩展(配置 自定义的 线程池)
当 需要精细控制异步任务的执行过程时,可以自定义 线程池
究竟选用哪种线程池?
阿里巴巴《Java开发手册》给我们的答案:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor(Java原生的线程池类)的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
本处创建线程池使用的是ThreadPoolTaskExecutor线程池工具
还有一个细节的点:ThreadPoolExecutor是Java原生的线程池类,而ThreadPoolTaskExecutor是Spring推出的线程池工具
(一)具体关于“线程池的创建”可以看以下这篇博客
【Thread】线程池的 7 种创建方式及自定义线程池
(二)ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的区别
此处我已经加过@EnableAsync 加在启动类上了
/**
* 线程池参数配置,多个线程池实现线程池隔离,
* @Async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,
* 在异步调用的时候,指明需要调用的线程池名称,比如:@Async("taskName")
**/importorg.springframework.context.annotation.Configuration;importorg.springframework.scheduling.annotation.EnableAsync;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;importorg.springframework.context.annotation.Bean;@Configuration//已经加过@EnableAsync 加在启动类上,所以此处注释调//@EnableAsync publicclassAsyncConfig{@Bean(name ="taskExecutor")publicExecutortaskExecutor(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();//设置线程池的核心线程数
executor.setCorePoolSize(5);//设置线程池的最大线程数
executor.setMaxPoolSize(10);//线程池的工作队列容量
executor.setQueueCapacity(25);//线程池中线程的名称前缀
executor.setThreadNamePrefix("Async-");//设置自定义的拒绝策略
executor.setRejectedExecutionHandler((r, e)->{try{// 记录一个警告日志,说明当前保存评价的连接池已满,触发了拒绝策略。
log.warn("保存评价连接池任务已满,触发拒绝策略");// 尝试将任务重新放入队列中,等待30秒。 // 如果在这30秒内队列有空闲空间,任务将被成功放入队列;否则,offer方法将返回false。 boolean offer = e.getQueue().offer(r,30,TimeUnit.SECONDS);// 记录日志,显示等待30秒后尝试重新放入队列的结果。
log.warn("保存评价连接池任务已满,拒绝接收任务,等待30s重新放入队列结果rs:{}", offer);}catch(InterruptedException ex){// 如果在等待过程中线程被中断,捕获InterruptedException异常。 // 记录一个错误日志,说明在尝试重新放入队列时发生了异常。
log.error("【保存评价】连接池任务已满,拒绝接收任务了,再重新放入队列后出现异常", ex);// 构建一条警告消息,其中包含线程池的各种信息(如池大小、活动线程数、核心线程数等)。 String msg =String.format("保存评价线程池拒绝接收任务! Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)", e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(),
e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(),
e.getCompletedTaskCount());// 记录包含线程池详细信息的警告日志。
log.warn(msg);}});//初始化线程池,线程池就会处于可以接收任务的状态
executor.initialize();return executor;}}
接下来,在需要异步执行的方法上使用
@Async
注解。
例如,在
SmsService
服务类中:
importorg.springframework.scheduling.annotation.Async;importorg.springframework.stereotype.Service;@ServicepublicclassSmsService{// 使用@Async注解来标识该方法为异步方法 @Async("taskExecutor")// 可以指定使用哪个TaskExecutor,这里使用上面定义的taskExecutor publicvoidsendSms(String message,String phoneNumber){// 模拟耗时操作 System.out.println("执行逻辑二进行中。。。");try{Thread.sleep(5000);// 假设发送短信需要5秒 // 调用短信接口发送短信 System.out.println("发送成功 to "+ phoneNumber +" with message: "+ message);}catch(InterruptedException e){Thread.currentThread().interrupt();}}}
在
SmsController
控制类中:
@RestController@RequestMapping("/sms")publicclassSmsController{@AutowiredprivateSmsService smsService ;@GetMapping("/task")publicvoiddoWork(@RequestParamString taskName){// 逻辑一System.out.println("执行逻辑一完毕!");// 调用异步方法发送短信,不会阻塞主线程
smsService.sendSms("Hello","17027897398");// 逻辑三 System.out.println("执行逻辑三完毕!");}}
三、使用CompletableFuture(Java 8及以上)
CompletableFuture
是Java 8中新增的一个功能强大的类,它实现了
Future
和
CompletionStage
接口,提供了函数式编程的能力来处理和组合异步计算的结果。
CompletableFuture
在Java 8及以上版本的应用中非常常用,特别是在需要处理异步操作、组合多个异步结果或提高程序响应性的场景下。它提供了一种简洁而强大的方式来处理并发编程中的复杂问题,是现代Java并发编程中的重要工具之一。
CompletableFuture
是设计为
线程安全的
,用于支持在并发环境中进行高效的异步编程。
创建方式(runAsync不带返回值):
@GetMapping("/start1")publicvoidstart1(){System.out.println("异步任务执行111");// 使用runAsync执行一个不需要返回结果的异步任务CompletableFuture<Void> future =CompletableFuture.runAsync(()->{// 模拟耗时的任务,比如发送邮件或记录日志try{Thread.sleep(5000);// 假设这是一个耗时的操作}catch(InterruptedException e){Thread.currentThread().interrupt();}System.out.println("5s后,异步任务执行完成");});System.out.println("异步任务执行222");}
创建方式(supplyAsync带返回值):
@GetMapping("/start2")publicvoidstart2(){System.out.println("异步任务执行111");// 使用supplyAsync执行一个异步计算任务 CompletableFuture<Integer> future =CompletableFuture.supplyAsync(()->{// 模拟耗时的计算任务 try{Thread.sleep(5000);// 假设这里是一个耗时的计算 }catch(InterruptedException e){Thread.currentThread().interrupt();}return123;// 假设这是计算的结果 });try{// future.get会阻塞直到异步任务完成 Integer result = future.get();//下面的业务代码运行,需要等着result有结果才能往下走。System.out.println("异步任务的结果: "+ result);}catch(Exception e){
e.printStackTrace();}//下面的业务代码运行,需要等着result有结果才能往下走。System.out.println("异步任务执行222");}
版权归原作者 Slow菜鸟 所有, 如有侵权,请联系我们删除。