0


springboot自帶线程池ThreadPoolTaskExecutor使用

  1. 不管是阿里,还是华为java开发手册,都会有一条建议,就是让开发者不要使用Executors去创建线程池,而是使用构造函数ThreadPoolExecutor的方式来创建,并设置合理的参数。原因如下:
  1. 说明:Executors 返回的线程池对象的弊端如下:

1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

  1. spring框架中,spring提供了ThreadPoolTaskExecutor来创建线程池,该类在spring-context包下。其实ThreadPoolTaskExecutor是对ThreadPoolExecutor的封装。

到了springboot这里,因为引入了spring-boot-autoconfigurer,自动装配机制,在task包下,直接把ThreadPoolTaskExecutor注入到bean容器里面。所以在springboot项目中,如果要使用线程池,可以直接使用,都不用额外任何配置。

  1. springboot自动装配的线程池使用的配置如下:

  1. 默认核心线程数是8个。最大线程数和等待队列都是Integer.MAX_VALUE。综合上面的介绍,默认配置的线程池其实也有OOM的风险。
  2. 这里使用的springboot版本是2.7.8

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>springexample</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  13. </properties>
  14. <parent>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-parent</artifactId>
  17. <version>2.7.8</version>
  18. </parent>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-websocket</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>com.alibaba</groupId>
  30. <artifactId>fastjson</artifactId>
  31. <version>2.0.20</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.projectlombok</groupId>
  35. <artifactId>lombok</artifactId>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-starter-test</artifactId>
  40. </dependency>
  41. </dependencies>
  42. </project>
  1. 异步任务类:在方法上添加@Async注解,可以让他启用线程池处理异步任务。
  1. /*
  2. * xxx co.ltd Copyright @ 2023-2023 All Rights Reserved
  3. */
  4. package com.example.task;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.scheduling.annotation.Async;
  7. import org.springframework.stereotype.Component;
  8. import java.util.Random;
  9. import java.util.concurrent.CompletableFuture;
  10. /**
  11. * 描述信息
  12. *
  13. * @author Administrator
  14. * @since 2023/4/2 7:30
  15. */
  16. @Slf4j
  17. @Component
  18. public class AsyncTask {
  19. private Random random = new Random();
  20. @Async
  21. public CompletableFuture<String> doTaskOne() throws Exception {
  22. log.info("task one start.");
  23. long start = System.currentTimeMillis();
  24. Thread.sleep(random.nextInt(10000));
  25. long end = System.currentTimeMillis();
  26. log.info("task one done,cost " + (end - start) + "ms.");
  27. return CompletableFuture.completedFuture("task one done");
  28. }
  29. @Async
  30. public CompletableFuture<String> doTaskTwo() throws Exception {
  31. log.info("task two start.");
  32. long start = System.currentTimeMillis();
  33. Thread.sleep(random.nextInt(10000));
  34. long end = System.currentTimeMillis();
  35. log.info("task two done,cost " + (end - start) + "ms.");
  36. return CompletableFuture.completedFuture("task two done");
  37. }
  38. @Async
  39. public CompletableFuture<String> doTaskThree() throws Exception {
  40. log.info("task three start.");
  41. long start = System.currentTimeMillis();
  42. Thread.sleep(random.nextInt(10000));
  43. long end = System.currentTimeMillis();
  44. log.info("task three done,cost " + (end - start) + "ms.");
  45. return CompletableFuture.completedFuture("task three done");
  46. }
  47. }
  1. 启动类:启动类上添加注解@EnableAsync开启异步
  1. /*
  2. * xxx co.ltd Copyright @ 2023-2023 All Rights Reserved
  3. */
  4. package com.example;
  5. import org.springframework.boot.SpringApplication;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.springframework.scheduling.annotation.EnableAsync;
  8. /**
  9. * 描述信息
  10. *
  11. * @author Administrator
  12. * @since 2023/4/2 7:29
  13. */
  14. @SpringBootApplication
  15. @EnableAsync
  16. public class App {
  17. public static void main(String[] args) {
  18. SpringApplication.run(App.class, args);
  19. }
  20. }
  1. 测试类:
  1. /*
  2. * xxx co.ltd Copyright @ 2023-2023 All Rights Reserved
  3. */
  4. package com.example.task;
  5. import com.example.App;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.junit.jupiter.api.Test;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.test.context.SpringBootTest;
  10. import java.util.concurrent.CompletableFuture;
  11. /**
  12. * 描述信息
  13. *
  14. * @author Administrator
  15. * @since 2023/4/2 7:34
  16. */
  17. @SpringBootTest(classes = {App.class})
  18. @Slf4j
  19. public class AsyncTaskTest {
  20. @Autowired
  21. private AsyncTask asyncTask;
  22. @Test
  23. public void testTask() throws Exception {
  24. long start = System.currentTimeMillis();
  25. CompletableFuture<String> taskOne = asyncTask.doTaskOne();
  26. CompletableFuture<String> taskTwo = asyncTask.doTaskTwo();
  27. CompletableFuture<String> taskThree = asyncTask.doTaskThree();
  28. CompletableFuture.allOf(taskOne, taskTwo, taskThree).join();
  29. long end = System.currentTimeMillis();
  30. log.info("all task done,cost " + (end - start) + " ms");
  31. }
  32. }
  1. 这里设置了3个任务,默认线程池核心线程数是8个, 所以这3个任务在线程池环境中,基本都是同时运行,所以总体运行时间肯定会大于他们3各种最耗时的一个任务,小于三个任务耗时之和。
  2. 运行这个测试用例,打印结果:

  1. 从打印结果来看,3个任务几乎同时执行,运行结束,分别耗时:2094ms 653ms 1505ms,最后总耗时2129ms,符合预期。
  2. 这里打印的线程池前缀是task-,也是默认线程池配置。
  3. springboot配置中,提供了可以配置线程池的参数:
  1. spring:
  2. task:
  3. execution:
  4. pool:
  5. core-size: 2
  6. max-size: 5
  7. queue-capacity: 10
  8. thread-name-prefix: test-task-

这些参数都不是我们自定义的,而是springboot配置文件中指定的参数名。所以我们可以通过yml自动提示类进行配置:

  1. 这里也可以看出默认线程池配置核心数量是8个, 这里我们设置为2,来验证线程池工作原理。
  2. 这里有3个任务,核心线程数是2,所以只能先执行2个任务,剩下的进入队列等待,当前面一个任务执行完成,最后一个任务才会从等待队列中进入核心线程进行执行,重新运行单元测试,打印信息如下:

  1. 这个打印结果,刚开始任务12都运行,任务2完成之后,任务3开始执行。
  2. 因为修改了线程前缀,这里打印的线程前缀是test-task-,从线程前缀 + 线程数上来看,这里最大线程数是2,因为前面设置的核心线程数就是2
  3. =========================================
  4. 相信做过springboot线程池相关的测试,可能有的人得出的结论和我这里不太一样,springboot默认线程池是SimpleAsyncTaskExecutor。这个原因呢,有两个,一个是springboot版本的原因,默认不做任何配置,一样的代码,上面运行打印的线程池前缀就是SimpleAsyncTaskExecutor。另外一个原因就是上面提到的ThreadPoolTaskExecutor在配置的时候,其实使用了一个特别的注解:@ConditionalOnMissingBean({Executor.class}),如下所示:

  1. 这个注解的意思是,当bean容器中没有Executor.class这个实例的时候,进行配置。也即是说其他地方配置了线程器Executor,那么这个ThreadPoolTaskExecutorbean就不会被配置。这也就是大家的结论里面spring线程池默认不是ThreadPoolTaskExecutor的原因。
  2. 我这里通过引入spring-boot-starter-websocket依赖,然后配置websocket
  1. /*
  2. * xxx co.ltd Copyright @ 2023-2023 All Rights Reserved
  3. */
  4. package com.example.config;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
  7. import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
  8. import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
  9. import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
  10. /**
  11. * 描述信息
  12. *
  13. * @author Administrator
  14. * @since 2023/4/2 20:14
  15. */
  16. @Configuration
  17. @EnableWebSocketMessageBroker
  18. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  19. @Override
  20. public void registerStompEndpoints(StompEndpointRegistry registry) {
  21. registry.addEndpoint("/tmax/ws").setAllowedOriginPatterns("*").withSockJS();
  22. }
  23. @Override
  24. public void configureMessageBroker(MessageBrokerRegistry registry) {
  25. registry.enableSimpleBroker("/user","/topic");
  26. registry.setApplicationDestinationPrefixes("/app");
  27. registry.setUserDestinationPrefix("/user");
  28. }
  29. }
  1. 同样的,我们再次测试,发现结果如下所示:

  1. 这里也验证了上面@ConditionalOnMissingBean({Executor.class})注解的作用,因为有了别的线程池,所以这里ThreadPoolTaskExecutor线程池就没有被加载。这里的线程池就是SimpleAsyncTaskExecutor。这个线程池其实不是一个真正的线程池,因为它每次都会创建新线程,这个线程池创建的目的其实就是为了执行少量短时间的任务,并不适合在高并发场景下。
  2. ================================
  3. 通过上面的实验,我们知道,在springboot 2.7.8版本里面,如果没有其他配置,默认线程池就是ThreadPoolTaskExecutor,而且可以不用任何配置就可以使用。但是它还是有OOM的风险,因为它的max-sizequeue-capacity都是Integer.MAX_VALUE,所以我们需要修改它的默认线程池配置信息。但是默认线程池有个德性,就是如果配置了其他线程池,它又不会被加载。
  4. 所以一般的项目里面,我们都是进行如下所示的手动配置:
  1. /*
  2. * xxx co.ltd Copyright @ 2023-2023 All Rights Reserved
  3. */
  4. package com.example.config;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.scheduling.annotation.EnableAsync;
  9. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  10. import java.util.concurrent.ThreadPoolExecutor;
  11. /**
  12. * 描述信息
  13. *
  14. * @author Administrator
  15. * @since 2023/4/2 20:36
  16. */
  17. @Configuration
  18. @EnableAsync
  19. public class ThreadPoolConfig {
  20. @Value("${spring.task.execution.pool.core-size}")
  21. private int corePoolSize;
  22. @Value("${spring.task.execution.pool.max-size}")
  23. private int maxPoolSize;
  24. @Value("${spring.task.execution.pool.queue-capacity}")
  25. private int queueCapacity;
  26. @Value("${spring.task.execution.thread-name-prefix}")
  27. private String threadNamePrefix;
  28. @Bean
  29. public ThreadPoolTaskExecutor taskExecutor() {
  30. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  31. executor.setCorePoolSize(corePoolSize);
  32. executor.setMaxPoolSize(maxPoolSize);
  33. executor.setQueueCapacity(queueCapacity);
  34. executor.setThreadNamePrefix(threadNamePrefix);
  35. // 拒绝策略
  36. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  37. executor.initialize();
  38. return executor;
  39. }
  40. }
  1. 另外,一定要结合application.yml中设置的线程池配置信息使用,这样才符合文章开头所说的大厂java开发手册中建议使用自定义参数配置线程池,避免OOM风险。
标签: java ThreadPool Async

本文转载自: https://blog.csdn.net/feinifi/article/details/129912200
版权归原作者 luffy5459 所有, 如有侵权,请联系我们删除。

“springboot自帶线程池ThreadPoolTaskExecutor使用”的评论:

还没有评论