0


CompletableFuture实现异步编排全面分析和总结

一、🌈CompletableFuture简介

  1. CompletableFuture

结合了

  1. Future

的优点,提供了非常强大的

  1. Future

的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合

  1. CompletableFuture

的方法。

  1. CompletableFuture

被设计在

  1. Java

中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

  1. CompletableFuture

是由

  1. Java8

引入的,在

  1. Java8

之前我们一般通过

  1. Future

实现异步。

  • Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java8之前若要设置回调一般会使用guavaListenableFuture
  • CompletableFutureFuture进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。

✔本文的名词缩写:

  • CF:代表CompletableFuture
  • CS:代表CompletionStage

二、CompletableFuture 核心接口API介绍

2.1 Future

使用Future局限性

从本质上说,Future表示一个异步计算的结果。它提供了

  1. isDone()

来检测计算是否已经完成,并且在计算结束后,可以通过

  1. get()

方法来获取计算结果。在异步计算中,

  1. Future

确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

  • 并发执行多任务Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理Future接口中没有关于异常处理的方法;
    方法说明描述
    1. boolean
  1. cancel (boolean mayInterruptIfRunning)

尝试取消执行此任务。

  1. V
  1. get()

如果需要等待计算完成,然后检索其结果。

  1. V
  1. get(long timeout, TimeUnit unit)

如果需要,最多等待计算完成的给定时间,然后检索其结果(如果可用)。

  1. boolean
  1. isCancelled()

如果此任务在正常完成之前取消,则返回

  1. true

  1. boolean
  1. isDone()

如果此任务完成,则返回

  1. true

2.2 CompletableFuture

  1. public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
  2. }
  3. 复制代码
  1. JDK1.8

才新加入的一个实现类

  1. CompletableFuture

,而

  1. CompletableFuture

实现了两个接口(如上面代码所示):***

  1. Future<T>

  1. CompletionStage<T>

***,意味着可以像以前一样通过阻塞或者轮询的方式获得结果。

  1. Future

表示异步计算的结果,

  1. CompletionStage

用于表示异步执行过程中的一个步骤

  1. Stage

,这个步骤可能是由另外一个

  1. CompletionStage

触发的,随着当前步骤的完成,也可能会触发其他一系列

  1. CompletionStage

的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,

  1. CompletionStage

接口正是定义了这样的能力,我们可以通过其提供的

  1. thenAppythenCompose

等函数式编程方法来组合编排这些步骤。

  • CompletableFuture是Future接口的扩展和增强CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,
  • CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

2.3 CompletionStage

  1. CompletionStage<T>

接口提供了更多方法来更好的实现异步编排,并且大量的使用了

  1. JDK8

引入的函数式编程概念。由

  1. stage

执行的计算可以表示为

  1. FunctionConsumerRunnable

(使用名称分别包括

  1. apply acceptrun

的方法 ),具体取决于它是否需要参数和/或产生结果。 例如:

  1. stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println());
  2. 复制代码

三、使用CompletableFuture场景

3.1 应用场景

1️⃣ 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度;
2️⃣ 使用

  1. CompletableFuture

类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常;
3️⃣ 如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个。

举个常见的案例🌰,在

  1. APP

查询首页信息的时候,一般会涉及到不同的

  1. RPC

远程调用来获取很多用户相关信息数据,比如:商品banner轮播图信息、用户message消息信息、用户权益信息、用户优惠券信息 等,假设每个

  1. rpc invoke()

耗时是

  1. 250ms

,那么基于同步的方式获取到话,算下来接口的

  1. RT

至少大于1s,这响应时长对于首页来说是万万不能接受的,因此,我们这种场景就可以通过多线程异步的方式去优化。

3.2 CompletableFuture依赖链分析

根据

  1. CompletableFuture

依赖数量,可以分为以下几类:零依赖、单依赖、双重依赖和多重依赖

零依赖

下图

  1. Future1Future2

都是零依赖的体现:

单依赖:仅依赖于一个CompletableFuture

下图

  1. Future3Future5

都是单依赖的体现,分别依赖于

  1. Future1

  1. Future2

双重依赖:同时依赖于两个CompletableFuture

下图

  1. Future4

即为双重依赖的体现,同时依赖于

  1. Future1

  1. Future2

多重依赖:同时依赖于多个CompletableFuture

下图

  1. Future6

即为多重依赖的体现,同时依赖于

  1. Future3

  1. Future4

  1. Future5

类似这种多重依赖的流程来说,结果依赖于三个步骤:***

  1. Future3Future4Future5

***,这种多元依赖可以通过

  1. allOf()

  1. anyOf()

方法来实现,区别是当需要多个依赖全部完成时使用

  1. allOf()

,当多个依赖中的任意一个完成即可时使用

  1. anyOf()

,如下代码所示:

  1. CompletableFuture<Void> Future6 = CompletableFuture.allOf(Future3, Future4, Future5);
  2. CompletableFuture<String> result = Future6.thenApply(v -> {
  3. //这里的join并不会阻塞,因为传给thenApply的函数是在Future3、Future4、Future5全部完成时,才会执行 。
  4. result3 = Future3.join();
  5. result4 = Future4.join();
  6. result5 = Future5.join();
  7. // 返回result3、result4、result5组装后结果
  8. return assamble(result3, result4, result5);
  9. });
  10. 复制代码

四、CompletableFuture异步编排

在分析

  1. CompletableFuture

异步编排之前,我跟大家理清一下

  1. CompletionStage

接口下 (thenRun、thenApply、thenAccept、thenCombine、thenCompose)、(handle、whenComplete、exceptionally) 相关方法的实际用法和它们之间的区别是什么? 带着你的想法💭往下看吧!!!

4.1 《异步编排API》

  • **thenRun**:【执行】直接开启一个异步线程执行任务,不接收任何参数,回调方法没有返回值;
  • **thenApply**:【提供】可以提供返回值,接收上一个任务的执行结果,作为下一个Future的入参,回调方法是有返回值的;
  • **thenAccept**:【接收】可以接收上一个任务的执行结果,作为下一个Future的入参,回调方法是没有返回值的;
  • **thenCombine**:【结合】可以结合不同的Future的返回值,做为下一个Future的入参,回调方法是有返回值的;
  • **thenCompose**:【组成】将上一个Future实例结果传递给下一个实例中。

✔异步回调建议使用自定义线程池

  1. /**
  2. * 线程池配置
  3. *
  4. * @author: austin
  5. * @since: 2023/3/12 1:32
  6. */
  7. @Configuration
  8. public class ThreadPoolConfig {
  9. /**
  10. * @Bean中声明的value不能跟定义的实例同名
  11. *
  12. */
  13. @Bean(value = "customAsyncTaskExecutor")
  14. public ThreadPoolTaskExecutor asyncThreadPoolExecutor() {
  15. ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  16. threadPoolTaskExecutor.setCorePoolSize(5);
  17. threadPoolTaskExecutor.setMaxPoolSize(10);
  18. threadPoolTaskExecutor.setKeepAliveSeconds(60);
  19. threadPoolTaskExecutor.setQueueCapacity(2048);
  20. threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  21. threadPoolTaskExecutor.setThreadNamePrefix("customAsyncTaskExecutor-");
  22. threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  23. return threadPoolTaskExecutor;
  24. }
  25. @Bean(value = "threadPoolExecutor")
  26. public ThreadPoolExecutor threadPoolExecutor() {
  27. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
  28. new ArrayBlockingQueue<>(10000), new ThreadPoolExecutor.CallerRunsPolicy());
  29. return threadPoolExecutor;
  30. }
  31. }
  32. 复制代码

如果所有异步回调都会共用该

  1. CommonPool

,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。通过自定义线程池

  1. customAsyncTaskExecutor

,后面不同的异步编排方法,我们可以通过指定对应的线程池。

1️⃣ runAsync()、thenRun()

  1. @RestController
  2. public class CompletableFutureCompose {
  3. @Resource
  4. private ThreadPoolTaskExecutor customAsyncTaskExecutor;
  5. @RequestMapping(value = "/thenRun")
  6. public void thenRun() {
  7. CompletableFuture.runAsync(() -> {
  8. System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");
  9. }, customAsyncTaskExecutor).thenRun(() -> {
  10. System.out.println("thread name:" + Thread.currentThread().getName() + " second step...");
  11. }).thenRunAsync(() -> {
  12. System.out.println("thread name:" + Thread.currentThread().getName() + " third step...");
  13. });
  14. }
  15. }
  16. 复制代码

接口输出结果:

  1. thread name:customAsyncTaskExecutor-1 first step...
  2. thread name:customAsyncTaskExecutor-1 second step...
  3. thread name:ForkJoinPool.commonPool-worker-3 third step...
  4. 复制代码

2️⃣ thenApply()

  1. @RequestMapping(value = "/thenApply")
  2. public void thenApply() {
  3. CompletableFuture.supplyAsync(() -> {
  4. System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");
  5. return "hello";
  6. }, customAsyncTaskExecutor).thenApply((result1) -> {
  7. String targetResult = result1 + " austin";
  8. System.out.println("first step result: " + result1);
  9. System.out.println("thread name:" + Thread.currentThread().getName() + " second step..., targetResult: " + targetResult);
  10. return targetResult;
  11. });
  12. }
  13. 复制代码

接口输出结果:

  1. thread name:customAsyncTaskExecutor-2 first step...
  2. first step result: hello
  3. // thenApply虽然没有指定线程池,但是默认是复用它上一个任务的线程池的
  4. thread name:customAsyncTaskExecutor-2 second step..., targetResult: hello austin
  5. 复制代码

3️⃣ thenAccept()

  1. @RequestMapping(value = "/thenAccept")
  2. public void thenAccept() {
  3. CompletableFuture.supplyAsync(() -> {
  4. System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");
  5. return "hello";
  6. }, customAsyncTaskExecutor).thenAccept((result1) -> {
  7. String targetResult = result1 + " austin";
  8. System.out.println("first step result: " + result1);
  9. System.out.println("thread name:" + Thread.currentThread().getName() + " second step..., targetResult: " + targetResult);
  10. });
  11. }
  12. 复制代码

接口输出结果:

  1. thread name:customAsyncTaskExecutor-3 first step...
  2. first step result: hello
  3. // thenAccept在没有指定线程池的情况下,并未复用它上一个任务的线程池
  4. thread name:http-nio-10032-exec-9 second step..., targetResult: hello austin
  5. 复制代码

  1. thenAccept()

  1. thenApply()

的用法实际上基本上一致,区别在于

  1. thenAccept()

回调方法是没有返回值的,而

  1. thenApply()

回调的带返回值的。***

**🌈细心的朋友可能会发现,上面

  1. thenApply()

  1. thenAccept()

请求线程池在不指定的情况下,两者的不同表现,

  1. thenApply()

在不指定线程池的情况下,会沿用上一个

  1. Future

指定的线程池

  1. customAsyncTaskExecutor

,而

  1. thenAccept()

在不指定线程池的情况,并没有复用上一个

  1. Future

设置的线程池,而是重新创建了新的线程来实现异步调用。**

4️⃣ thenCombine()

  1. @RequestMapping(value = "/thenCombine")
  2. public void thenCombine() {
  3. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("执行future1开始...");
  5. return "Hello";
  6. }, asyncThreadPoolExecutor);
  7. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
  8. System.out.println("执行future2开始...");
  9. return "World";
  10. }, asyncThreadPoolExecutor);
  11. future1.thenCombine(future2, (result1, result2) -> {
  12. String result = result1 + " " + result2;
  13. System.out.println("获取到future1、future2聚合结果:" + result);
  14. return result;
  15. }).thenAccept(result -> System.out.println(result));
  16. }
  17. 复制代码

接口访问,打印结果:

  1. thread name:customAsyncTaskExecutor-4 执行future1开始...
  2. thread name:customAsyncTaskExecutor-5 执行future2开始...
  3. thread name:http-nio-10032-exec-8 获取到future1future2聚合结果:Hello World
  4. Hello World
  5. 复制代码

5️⃣ thenCompose()

*我们先有

  1. future1

,然后和

  1. future2

组成一个链:

  1. future1 -> future2

,然后又组合了

  1. future3

,形成链:

  1. future1 -> future2 -> future3

。这里有个隐藏的点:

  1. future1future2future3

它们完全没有数据依赖关系,我们只不过是聚合了它们的结果。*

  1. @RequestMapping(value = "/thenCompose")
  2. public void thenCompose() {
  3. CompletableFuture.supplyAsync(() -> {
  4. // 第一个Future实例结果
  5. System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future1开始...");
  6. return "Hello";
  7. }, customAsyncTaskExecutor).thenCompose(result1 -> CompletableFuture.supplyAsync(() -> {
  8. // 将上一个Future实例结果传到这里
  9. System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future2开始..., 第一个实例结果:" + result1);
  10. return result1 + " World";
  11. })).thenCompose(result12 -> CompletableFuture.supplyAsync(() -> {
  12. // 将第一个和第二个实例结果传到这里
  13. System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future3开始..., 第一第二个实现聚合结果:" + result12);
  14. String targetResult = result12 + ", I am austin!";
  15. System.out.println("最终输出结果:" + targetResult);
  16. return targetResult;
  17. }));
  18. }
  19. 复制代码

接口访问,打印结果:

  1. thread name:customAsyncTaskExecutor-1 执行future1开始...
  2. thread name:ForkJoinPool.commonPool-worker-3 执行future2开始..., 第一个实例结果:Hello
  3. thread name:ForkJoinPool.commonPool-worker-3 执行future3开始..., 第一第二个实现聚合结果:Hello World
  4. 最终输出结果:Hello World, I am austin!
  5. 复制代码

💥Note:thenCombine() VS thenCompose(),两者之间的区别

  • thenCombine结合的两个CompletableFuture没有依赖关系,且第二个CompletableFuture不需要等第一个CompletableFuture执行完成才开始。
  • thenCompose() 可以两个 CompletableFuture 对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。
  • thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。

4.2 《CompletableFuture实例化创建》

  1. // 返回一个新的CompletableFuture,由线程池ForkJoinPool.commonPool()中运行的任务异步完成,不会返回结果。
  2. public static CompletableFuture<Void> runAsync(Runnable runnable);
  3. // 返回一个新的CompletableFuture,运行任务时可以指定自定义线程池来实现异步,不会返回结果。
  4. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
  5. // 返回由线程池ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,可以返回异步线程执行之后的结果。
  6. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
  7. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
  8. 复制代码

  1. CompletableFuture

有两种方式实现异步,一种是

  1. supply

开头的方法,一种是

  1. run

开头的方法:***

  • supply 开头:该方法可以返回异步线程执行之后的结果;
  • run 开头:该方法不会返回结果,就只是执行线程任务。

4.3 《获取CompletableFuture结果》

  1. public T get()
  2. public T get(long timeout, TimeUnit unit)
  3. public T getNow(T valueIfAbsent)
  4. public T join()
  5. public CompletableFuture<Object> allOf()
  6. public CompletableFuture<Object> anyOf()
  7. 复制代码

使用方式,演示🌰:

  1. CompletableFuture<Integer> future = new CompletableFuture<>();
  2. Integer integer = future.get();
  3. 复制代码
  • get()阻塞式获取执行结果,如果任务还没有完成则会阻塞等待知直到任务执行完成
  • **get(long timeout, TimeUnit unit)**:带超时的阻塞式获取执行结果
  • getNow():如果已完成,立刻返回执行结果,否则返回给定的valueIfAbsent
  • **join()**:该方法和get()方法作用一样, 不抛异常的阻塞式获取异步执行结果
  • **allOf()**:当给定的所有CompletableFuture都完成时,返回一个新的CompletableFuture
  • **anyOf()**:当给定的其中一个CompletableFuture完成时,返回一个新的CompletableFuture

💥Note:

  • join()get()方法都是 阻塞式 调用它们的线程(通常为主线程)来获取CompletableFuture异步之后的返回值。
  • 两者的区别在于join()返回计算的结果或者抛出一个unchecked异常CompletionException,而get()返回一个具体的异常。

4.4 《结果处理》

当使用

  1. CompletableFuture

异步调用计算结果完成、或者是抛出异常的时候,我们可以执行特定的

  1. Action

做进一步处理,比如:

  1. public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  2. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  3. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
  4. 复制代码

4.5 《异常处理》

使用

  1. CompletableFuture

编写代码时,异常处理很重要,

  1. CompletableFuture

提供了三种方法来处理它们:***handle()、whenComplete() 和 exceptionly()***。

  • handle:返回一个新的CompletionStage,当该阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行,不会将内部异常抛出
  • whenComplete:返回与此阶段具有相同结果或异常的新CompletionStage,该阶段在此阶段完成时执行给定操作。与方法handle不同,会将内部异常往外抛出
  • **exceptionally**:返回一个新的CompletableFutureCompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try/catch
  1. @Autowired
  2. private RemoteDictService remoteDictService;
  3. public CompletableFuture<Dict> getDictDataAsync(long dictId) {
  4. CompletableFuture<DictResult> resultFuture = remoteDictService.findDictById(dictId);
  5. // 业务方法,内部会发起异步rpc调用
  6. return resultFuture
  7. .exceptionally(error -> {
  8. //通过exceptionally捕获异常,打印日志并返回默认值
  9. log.error("RemoteDictService.getDictDataAsync Exception dictId = {}", dictId, error);
  10. return null;
  11. });
  12. }
  13. 复制代码

handle() VS whenComplete(), 两者之间的区别

  • 💥💥💥核心区别在于whenComplete不消费异常,而handle消费异常

Two method forms support processing whether the triggering stage completed normally or exceptionally:

Method {whenComplete} allows injection of an action regardless of outcome, otherwise preserving the outcome in its completion.

Method {handle} additionally allows the stage to compute a replacement result that may enable further processing by other dependent stages.

翻译过来就是:

两种方法形式支持处理触发阶段是否 正常完成 或 异常完成:

  • whenComplete:可以访问当前CompletableFuture结果异常 作为参数,使用它们并执行您想要的操作。此方法并不能转换完成的结果,会内部抛出异常**。
  • handle:当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数来执行。当此阶段完成时,以 该阶段的结果该阶段的异常 作为参数调用给定函数,并且函数的结果用于完成返回的阶段,不会把异常外抛出来**。

这里我通过代码演示一下:

  1. public class CompletableFutureExceptionHandler {
  2. public static CompletableFuture handle(int a, int b) {
  3. return CompletableFuture.supplyAsync(() -> a / b)
  4. .handle((result, ex) -> {
  5. if (null != ex) {
  6. System.out.println("handle error: " + ex.getMessage());
  7. return 0;
  8. } else {
  9. return result;
  10. }
  11. });
  12. }
  13. public static CompletableFuture whenComplete(int a, int b) {
  14. return CompletableFuture.supplyAsync(() -> a / b)
  15. .whenComplete((result, ex) -> {
  16. if (null != ex) {
  17. System.out.println("whenComplete error: " + ex.getMessage());
  18. }
  19. });
  20. }
  21. public static void main(String[] args) {
  22. try {
  23. System.out.println("success: " + handle(10, 5).get());
  24. System.out.println("fail: " + handle(10, 0).get());
  25. } catch (Exception e) {
  26. System.out.println("catch exception= " + e.getMessage());
  27. }
  28. System.out.println("------------------------------------------------------------------");
  29. try {
  30. System.out.println("success: " + whenComplete(10, 5).get());
  31. System.out.println("fail: " + whenComplete(10, 0).get());
  32. } catch (Exception e) {
  33. System.out.println("catch exception=" + e.getMessage());
  34. }
  35. }
  36. }
  37. 复制代码

运行结果如下显示

  1. success: 2
  2. handle error: java.lang.ArithmeticException: / by zero
  3. fail: 0
  4. ------------------------------------------------------------------
  5. success: 2
  6. whenComplete error: java.lang.ArithmeticException: / by zero
  7. catch exception=java.lang.ArithmeticException: / by zero
  8. 复制代码

*✔可以看到,

  1. handle

处理,当程序发生异常的时候,即便是

  1. catch

获取异常期望输出,但是并未跟实际预想那样,原因是

  1. handle

不会把内部异常外抛出来,而

  1. whenComplete

会将内部异常抛出。*

五、CompletableFuture线程池须知

🔥Note:关于异步线程池(十分重要)

*异步回调方法可以选择是否传递线程池参数

  1. Executor

,这里为了实现线程池隔离,当不传递线程池时,默认会使用

  1. ForkJoinPool

中的公共线程池

  1. CommonPool

,这个线程池默认创建的线程数是

  1. CPU

的核数,如果所有的异步回调共享一个线程池,核心与非核心业务都竞争同一个池中的线程,那么一旦有任务执行一些很慢的

  1. I/O

操作,就会导致线程池中所有线程都阻塞在

  1. I/O

操作上,很容易成为系统瓶颈,影响整个系统的性能。因此,💥💥 建议强制传线程池,且根据实际情况做线程池隔离,减少不同业务之间的相互干扰。*

六、基于CompletableFuture实现接口异步revoke

案例实现Controller层

  1. @RestController
  2. @RequestMapping("/index")
  3. public class IndexWebController {
  4. @Resource
  5. private ThreadPoolExecutor asyncThreadPoolExecutor;
  6. @RequestMapping(value = "/homeIndex", method = {RequestMethod.POST, RequestMethod.GET})
  7. public String homeIndex(@RequestParam(required = false) String userId, @RequestParam(value = "lang") String lang) {
  8. ResultData<HomeVO> result = new ResultData<>();
  9. // 获取Banner轮播图信息
  10. CompletableFuture<List<BannerVO>> future1 = CompletableFuture.supplyAsync(() -> this.buildBanners(userId, lang), asyncThreadPoolExecutor);
  11. // 获取用户message通知信息
  12. CompletableFuture<NotificationVO> future2 = CompletableFuture.supplyAsync(() -> this.buildNotifications(userId, lang), asyncThreadPoolExecutor);
  13. // 获取用户权益信息
  14. CompletableFuture<List<BenefitVO>> future3 = CompletableFuture.supplyAsync(() -> this.buildBenefits(userId, lang), asyncThreadPoolExecutor);
  15. // 获取优惠券信息
  16. CompletableFuture<List<CouponVO>> future4 = CompletableFuture.supplyAsync(() -> this.buildCoupons(userId), asyncThreadPoolExecutor);
  17. CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futrue1, futrue2, futrue3, future4);
  18. HomeVo finalHomeVO = homeVO;
  19. CompletableFuture<HomeVO> resultFuture = allOfFuture.thenApply(v -> {
  20. try {
  21. finalHomeVo.setBanners(future1.get());
  22. finalHomeVo.setNotifications(future2.get());
  23. finalHomeVo.setBenefits(future3.get());
  24. finalHomeVo.setCoupons(future4.get());
  25. return finalHomeVO;
  26. } catch (Exception e) {
  27. logger.error("[Error] assemble homeVO data error: {}", e);
  28. throw new RuntimeException(e);
  29. }
  30. });
  31. homeVO = resultFuture.join();
  32. result.setData(homeVO);
  33. return writeJson(result);
  34. }
  35. }
  36. 复制代码

Service层

  1. @SneakyThrows
  2. public List<BannerVO> buildBanners(String userId, String lang) {
  3. // 模拟请求耗时0.5秒
  4. Thread.sleep(500);
  5. return new List<BannerVO>();
  6. }
  7. @SneakyThrows
  8. public List<NotificationVO> buildNotifications(String userId, String lang) {
  9. // 模拟请求耗时0.5秒
  10. Thread.sleep(500);
  11. return new List<NotificationVO>();
  12. }
  13. @SneakyThrows
  14. public List<BenefitVO> buildBenefits(String userId, String lang) {
  15. // 模拟请求耗时0.5秒
  16. Thread.sleep(500);
  17. return new List<BenefitVO>();
  18. }
  19. @SneakyThrows
  20. public List<CouponVO> buildCoupons(String userId) {
  21. // 模拟请求耗时0.5秒
  22. Thread.sleep(500);
  23. return new List<CouponVO>();
  24. }
  25. 复制代码

六、异步化带来的性能提升

  • 通过异步化改造,原本同步获取数据的API性能得到明显提升,大大减少了接口的响应时长(RT)。
  • 接口的吞吐量大幅度提升。

七、总结

本篇文章主要是介绍了

  1. CompletableFuture

使用原理和相关不同方法的场景化使用,以及通过不同的实例演示了异步回调的过程,好了,今天的分享就到此结束了,如果文章对你有所帮助,欢迎 点赞👍+评论💬+收藏❤

标签: java jvm 前端

本文转载自: https://blog.csdn.net/ww2651071028/article/details/129485158
版权归原作者 Java技术攻略 所有, 如有侵权,请联系我们删除。

“CompletableFuture实现异步编排全面分析和总结”的评论:

还没有评论