CompletableFuture结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。
CompletableFuture是由Java8引入的,在Java8之前我们一般通过Future实现异步。Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java8之前若要设置回调一般会使用guava的ListenableFuture。 CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
方法 | 说明 | 描述 |
boolean | cancel (boolean mayInterruptIfRunning) | 尝试取消执行此任务。 |
V | get() | 如果需要等待计算完成,然后检索其结果。 |
V | get(long timeout, TimeUnit unit) | 如果需要,最多等待计算完成的给定时间,然后检索其结果(如果可用)。 |
boolean | isCancelled() | 如果此任务在正常完成之前取消,则返回 true 。 |
boolean | isDone() | 如果此任务完成,则返回 true 。 |
public class CompletableFuture implements Future, CompletionStage {
}
JDK1.8 才新加入的一个实现类CompletableFuture,而CompletableFuture实现了两个接口(如上面代码所示):Future
Future表示异步计算的结果,CompletionStage用于表示异步执行过程中的一个步骤Stage,这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。
CompletionStage
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println());
1️⃣ 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度;
2️⃣ 使用CompletableFuture类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常;
3️⃣ 如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个。
举个常见的案例,在APP查询首页信息的时候,一般会涉及到不同的RPC远程调用来获取很多用户相关信息数据,比如:商品banner轮播图信息、用户message消息信息、用户权益信息、用户优惠券信息 等,假设每个rpc invoke()耗时是250ms,那么基于同步的方式获取到话,算下来接口的RT至少大于1s,这响应时长对于首页来说是万万不能接受的,因此,我们这种场景就可以通过多线程异步的方式去优化。
根据CompletableFuture依赖数量,可以分为以下几类:零依赖、单依赖、双重依赖和多重依赖 。
下图Future1、Future2都是零依赖的体现:
下图Future3、Future5都是单依赖的体现,分别依赖于Future1和Future2:
下图Future4即为双重依赖的体现,同时依赖于Future1和Future2:
下图Future6即为多重依赖的体现,同时依赖于Future3、Future4和Future5:
类似这种多重依赖的流程来说,结果依赖于三个步骤:Future3、Future4、Future5,这种多元依赖可以通过allOf()或anyOf()方法来实现,区别是当需要多个依赖全部完成时使用allOf(),当多个依赖中的任意一个完成即可时使用anyOf(),如下代码所示:
CompletableFuture Future6 = CompletableFuture.allOf(Future3, Future4, Future5);
CompletableFuture result = Future6.thenApply(v -> {
//这里的join并不会阻塞,因为传给thenApply的函数是在Future3、Future4、Future5全部完成时,才会执行 。
result3 = Future3.join();
result4 = Future4.join();
result5 = Future5.join();
// 返回result3、result4、result5组装后结果
return assamble(result3, result4, result5);
});
在分析CompletableFuture异步编排之前,我跟大家理清一下CompletionStage接口下 (thenRun、thenApply、thenAccept、thenCombine、thenCompose)、(handle、whenComplete、exceptionally) 相关方法的实际用法和它们之间的区别是什么? 带着你的想法往下看吧!!!
/**
* 线程池配置
*
* @author: austin
* @since: 2023/3/12 1:32
*/
@Configuration
public class ThreadPoolConfig {
/**
* @Bean中声明的value不能跟定义的实例同名
*
*/
@Bean(value = "customAsyncTaskExecutor")
public ThreadPoolTaskExecutor asyncThreadPoolExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setQueueCapacity(2048);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setThreadNamePrefix("customAsyncTaskExecutor-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor;
}
@Bean(value = "threadPoolExecutor")
public ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000), new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolExecutor;
}
}
如果所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。通过自定义线程池customAsyncTaskExecutor,后面不同的异步编排方法,我们可以通过指定对应的线程池。
@RestController
public class CompletableFutureCompose {
@Resource
private ThreadPoolTaskExecutor customAsyncTaskExecutor;
@RequestMapping(value = "/thenRun")
public void thenRun() {
CompletableFuture.runAsync(() -> {
System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");
}, customAsyncTaskExecutor).thenRun(() -> {
System.out.println("thread name:" + Thread.currentThread().getName() + " second step...");
}).thenRunAsync(() -> {
System.out.println("thread name:" + Thread.currentThread().getName() + " third step...");
});
}
}
接口输出结果:
thread name:customAsyncTaskExecutor-1 first step...
thread name:customAsyncTaskExecutor-1 second step...
thread name:ForkJoinPool.commonPool-worker-3 third step...
@RequestMapping(value = "/thenApply")
public void thenApply() {
CompletableFuture.supplyAsync(() -> {
System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");
return "hello";
}, customAsyncTaskExecutor).thenApply((result1) -> {
String targetResult = result1 + " austin";
System.out.println("first step result: " + result1);
System.out.println("thread name:" + Thread.currentThread().getName() + " second step..., targetResult: " + targetResult);
return targetResult;
});
}
接口输出结果:
thread name:customAsyncTaskExecutor-2 first step...
first step result: hello
// thenApply虽然没有指定线程池,但是默认是复用它上一个任务的线程池的
thread name:customAsyncTaskExecutor-2 second step..., targetResult: hello austin
@RequestMapping(value = "/thenAccept")
public void thenAccept() {
CompletableFuture.supplyAsync(() -> {
System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");
return "hello";
}, customAsyncTaskExecutor).thenAccept((result1) -> {
String targetResult = result1 + " austin";
System.out.println("first step result: " + result1);
System.out.println("thread name:" + Thread.currentThread().getName() + " second step..., targetResult: " + targetResult);
});
}
接口输出结果:
thread name:customAsyncTaskExecutor-3 first step...
first step result: hello
// thenAccept在没有指定线程池的情况下,并未复用它上一个任务的线程池
thread name:http-nio-10032-exec-9 second step..., targetResult: hello austin
thenAccept()和thenApply()的用法实际上基本上一致,区别在于thenAccept()回调方法是没有返回值的,而thenApply()回调的带返回值的。
细心的朋友可能会发现,上面thenApply()和thenAccept()请求线程池在不指定的情况下,两者的不同表现,thenApply()在不指定线程池的情况下,会沿用上一个Future指定的线程池customAsyncTaskExecutor,而thenAccept()在不指定线程池的情况,并没有复用上一个Future设置的线程池,而是重新创建了新的线程来实现异步调用。
@RequestMapping(value = "/thenCombine")
public void thenCombine() {
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行future1开始...");
return "Hello";
}, asyncThreadPoolExecutor);
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行future2开始...");
return "World";
}, asyncThreadPoolExecutor);
future1.thenCombine(future2, (result1, result2) -> {
String result = result1 + " " + result2;
System.out.println("获取到future1、future2聚合结果:" + result);
return result;
}).thenAccept(result -> System.out.println(result));
}
接口访问,打印结果:
thread name:customAsyncTaskExecutor-4 执行future1开始...
thread name:customAsyncTaskExecutor-5 执行future2开始...
thread name:http-nio-10032-exec-8 获取到future1、future2聚合结果:Hello World
Hello World
我们先有future1,然后和future2组成一个链:future1 -> future2,然后又组合了future3,形成链:future1 -> future2 -> future3。这里有个隐藏的点:future1、future2、future3它们完全没有数据依赖关系,我们只不过是聚合了它们的结果。
@RequestMapping(value = "/thenCompose")
public void thenCompose() {
CompletableFuture.supplyAsync(() -> {
// 第一个Future实例结果
System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future1开始...");
return "Hello";
}, customAsyncTaskExecutor).thenCompose(result1 -> CompletableFuture.supplyAsync(() -> {
// 将上一个Future实例结果传到这里
System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future2开始..., 第一个实例结果:" + result1);
return result1 + " World";
})).thenCompose(result12 -> CompletableFuture.supplyAsync(() -> {
// 将第一个和第二个实例结果传到这里
System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future3开始..., 第一第二个实现聚合结果:" + result12);
String targetResult = result12 + ", I am austin!";
System.out.println("最终输出结果:" + targetResult);
return targetResult;
}));
}
接口访问,打印结果:
thread name:customAsyncTaskExecutor-1 执行future1开始...
thread name:ForkJoinPool.commonPool-worker-3 执行future2开始..., 第一个实例结果:Hello
thread name:ForkJoinPool.commonPool-worker-3 执行future3开始..., 第一第二个实现聚合结果:Hello World
最终输出结果:Hello World, I am austin!
Note:thenCombine() VS thenCompose(),两者之间的区别
thenCombine结合的两个CompletableFuture没有依赖关系,且第二个CompletableFuture不需要等第一个CompletableFuture执行完成才开始。 thenCompose() 可以两个 CompletableFuture 对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。 thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。
// 返回一个新的CompletableFuture,由线程池ForkJoinPool.commonPool()中运行的任务异步完成,不会返回结果。
public static CompletableFuture runAsync(Runnable runnable);
// 返回一个新的CompletableFuture,运行任务时可以指定自定义线程池来实现异步,不会返回结果。
public static CompletableFuture runAsync(Runnable runnable, Executor executor);
// 返回由线程池ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,可以返回异步线程执行之后的结果。
public static CompletableFuture supplyAsync(Supplier supplier);
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
CompletableFuture有两种方式实现异步,一种是supply开头的方法,一种是run开头的方法:
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
public CompletableFuture
使用方式,演示:
CompletableFuture future = new CompletableFuture<>();
Integer integer = future.get();
Note:
join()和get()方法都是 阻塞式 调用它们的线程(通常为主线程)来获取CompletableFuture异步之后的返回值。 两者的区别在于join()返回计算的结果或者抛出一个unchecked异常CompletionException,而get()返回一个具体的异常。
当使用CompletableFuture异步调用计算结果完成、或者是抛出异常的时候,我们可以执行特定的Action做进一步处理,比如:
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
使用CompletableFuture编写代码时,异常处理很重要,CompletableFuture提供了三种方法来处理它们:handle()、whenComplete() 和 exceptionly()。
@Autowired
private RemoteDictService remoteDictService;
public CompletableFuture getDictDataAsync(long dictId) {
CompletableFuture resultFuture = remoteDictService.findDictById(dictId);
// 业务方法,内部会发起异步rpc调用
return resultFuture
.exceptionally(error -> {
//通过exceptionally捕获异常,打印日志并返回默认值
log.error("RemoteDictService.getDictDataAsync Exception dictId = {}", dictId, error);
return null;
});
}
Two method forms support processing whether the triggering stage completed normally or exceptionally:
Method {whenComplete} allows injection of an action regardless of outcome, otherwise preserving the outcome in its completion.
Method {handle} additionally allows the stage to compute a replacement result that may enable further processing by other dependent stages.
翻译过来就是:
两种方法形式支持处理触发阶段是否 正常完成 或 异常完成:
这里我通过代码演示一下:
public class CompletableFutureExceptionHandler {
public static CompletableFuture handle(int a, int b) {
return CompletableFuture.supplyAsync(() -> a / b)
.handle((result, ex) -> {
if (null != ex) {
System.out.println("handle error: " + ex.getMessage());
return 0;
} else {
return result;
}
});
}
public static CompletableFuture whenComplete(int a, int b) {
return CompletableFuture.supplyAsync(() -> a / b)
.whenComplete((result, ex) -> {
if (null != ex) {
System.out.println("whenComplete error: " + ex.getMessage());
}
});
}
public static void main(String[] args) {
try {
System.out.println("success: " + handle(10, 5).get());
System.out.println("fail: " + handle(10, 0).get());
} catch (Exception e) {
System.out.println("catch exception= " + e.getMessage());
}
System.out.println("------------------------------------------------------------------");
try {
System.out.println("success: " + whenComplete(10, 5).get());
System.out.println("fail: " + whenComplete(10, 0).get());
} catch (Exception e) {
System.out.println("catch exception=" + e.getMessage());
}
}
}
运行结果如下显示:
success: 2
handle error: java.lang.ArithmeticException: / by zero
fail: 0
------------------------------------------------------------------
success: 2
whenComplete error: java.lang.ArithmeticException: / by zero
catch exception=java.lang.ArithmeticException: / by zero
✔可以看到,handle处理,当程序发生异常的时候,即便是catch获取异常期望输出,但是并未跟实际预想那样,原因是handle不会把内部异常外抛出来,而whenComplete会将内部异常抛出。
Note:关于异步线程池(十分重要)
异步回调方法可以选择是否传递线程池参数Executor,这里为了实现线程池隔离,当不传递线程池时,默认会使用ForkJoinPool中的公共线程池CommonPool,这个线程池默认创建的线程数是CPU的核数,如果所有的异步回调共享一个线程池,核心与非核心业务都竞争同一个池中的线程,那么一旦有任务执行一些很慢的I/O 操作,就会导致线程池中所有线程都阻塞在I/O操作上,很容易成为系统瓶颈,影响整个系统的性能。因此, 建议强制传线程池,且根据实际情况做线程池隔离,减少不同业务之间的相互干扰。
@RestController
@RequestMapping("/index")
public class IndexWebController {
@Resource
private ThreadPoolExecutor asyncThreadPoolExecutor;
@RequestMapping(value = "/homeIndex", method = {RequestMethod.POST, RequestMethod.GET})
public String homeIndex(@RequestParam(required = false) String userId, @RequestParam(value = "lang") String lang) {
ResultData result = new ResultData<>();
// 获取Banner轮播图信息
CompletableFuture> future1 = CompletableFuture.supplyAsync(() -> this.buildBanners(userId, lang), asyncThreadPoolExecutor);
// 获取用户message通知信息
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> this.buildNotifications(userId, lang), asyncThreadPoolExecutor);
// 获取用户权益信息
CompletableFuture> future3 = CompletableFuture.supplyAsync(() -> this.buildBenefits(userId, lang), asyncThreadPoolExecutor);
// 获取优惠券信息
CompletableFuture> future4 = CompletableFuture.supplyAsync(() -> this.buildCoupons(userId), asyncThreadPoolExecutor);
CompletableFuture allOfFuture = CompletableFuture.allOf(futrue1, futrue2, futrue3, future4);
HomeVo finalHomeVO = homeVO;
CompletableFuture resultFuture = allOfFuture.thenApply(v -> {
try {
finalHomeVo.setBanners(future1.get());
finalHomeVo.setNotifications(future2.get());
finalHomeVo.setBenefits(future3.get());
finalHomeVo.setCoupons(future4.get());
return finalHomeVO;
} catch (Exception e) {
logger.error("[Error] assemble homeVO data error: {}", e);
throw new RuntimeException(e);
}
});
homeVO = resultFuture.join();
result.setData(homeVO);
return writeJson(result);
}
}
@SneakyThrows
public List buildBanners(String userId, String lang) {
// 模拟请求耗时0.5秒
Thread.sleep(500);
return new List();
}
@SneakyThrows
public List buildNotifications(String userId, String lang) {
// 模拟请求耗时0.5秒
Thread.sleep(500);
return new List();
}
@SneakyThrows
public List buildBenefits(String userId, String lang) {
// 模拟请求耗时0.5秒
Thread.sleep(500);
return new List();
}
@SneakyThrows
public List buildCoupons(String userId) {
// 模拟请求耗时0.5秒
Thread.sleep(500);
return new List();
}
页面更新:2024-05-22
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号