Java多线程 - CompletableFuture
笔记
JDK1.8+,一个链式操作的异步工具类
# 方法
# 开启异步任务
// 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
# 处理上一个异步任务
// 接收上一个任务的返回作为入参,有返回值
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn)
// 接收上一个任务的返回作为入参,没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
// 没有入参,没有返回值
public CompletableFuture<Void> thenRun(Runnable action)
# 再执行一个任务,后处理两个任务
参数 1:第二个任务,参数 2 在上一个任务和参数 1 的任务完成后执行。
// 接收两个任务的返回作为入参,有返回值
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
// 接收上两个任务的返回作为入参,没有返回值
public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
// 没有入参,没有返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
# 再执行一个任务,任意一个完成后处理
参数 1:第二个任务,参数 2 在上一个任务和参数 1 的任务完成后执行。
// 接收最快完成任务的返回作为入参,有返回值
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)
// 接收最快完成任务的返回作为入参,没有返回值
public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)
// 没有入参,没有返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
# 异常处理
// 捕获上面任务的异常,有返回值
public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn)
// 接收上一个任务的返回作为入参1 或 将上面任务出现的异常作为入参2;换言之入参1和入参2只有一个能用,有返回值
public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn)
// 接收上一个任务的返回作为入参1 或 将上面任务出现的异常作为入参2;换言之入参1和入参2只有一个能用,没有返回值
public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action)
# 示例
# 单个异步 supplyAsync
开启一个异步线程。
@Test
void supplyAsyncTest() {
// 主线程干活
// 异步执行
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
// 异步一些耗时的操作
// 异步操作完成后,返回结果
return "结果";
});
// 主线程继续干活
// 阻塞等待异步线程返回结果
String result = cf.join();
System.out.println(result);
}
# 异步完又异步 thenCompose
使用 thenApplyAsync 更优雅
开启一个异步线程 A,并等它返回结果 res 后,再开启另一个异步线程 B,并传递 A 的 res 给 B。
@Test
public void thenComposeTest() {
// 主线程干活
// 异步执行
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
// 异步一些耗时的操作
// 异步操作完成后,返回结果
return "结果1";
}).thenCompose(res -> CompletableFuture.supplyAsync(() -> { // 上一个异步操作完成后,执行下一个异步操作
// res:上一个异步操作的结果
return res + "结果2";
}));
// 主线程继续干活
// 阻塞等待异步线程返回结果
String result = cf.join();
System.out.println(result);
}
# 异步完再异步 thenApply
@Test
public void thenApplyTest() {
// 主线程干活
// 异步执行
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
// 异步一些耗时的操作
// 异步操作完成后,返回结果
return "结果1";
}).thenApply(res -> { // 上一个异步操作完成后,执行下一个异步操作
// res:上一个异步操作的结果
return res + "结果2";
});
// 主线程继续干活
// 阻塞等待异步线程返回结果
String result = cf.join();
System.out.println(result);
}
# 两个同时异步 thenCombine
同时开启两个异步线程,等待他们完成,执行合并。
@Test
public void thenCombineTest() {
// 主线程干活
// 异步执行
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
// 异步一些耗时的操作
// 异步操作完成后,返回结果
return "结果1";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
// 与上一个异步操作并行执行
return "结果2";
}), (res1, res2) -> {
// 结合两个异步操作的结果
System.out.println(res1);
System.out.println(res2);
return res1 + res2;
}
);
// 主线程继续干活
// 阻塞等待异步线程返回结果
String result = cf.join();
System.out.println(result);
}
# 两个异步谁快返回谁 applyToEither
@Test
public void applyToEitherTest() {
// 主线程干活
// 异步执行
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
// 异步一些耗时的操作
sleep(100);
// 异步操作完成后,返回结果
return "结果1";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
// 与上一个异步操作并行执行
sleep(200);
return "结果2";
}), res -> res);
// 主线程继续干活
// 阻塞等待异步线程返回结果,谁快返回谁
String result = cf.join();
System.out.println(result);
}
# 处理异步异常 exceptionally
@Test
public void exceptionallyTest() {
// 主线程干活
// 异步执行
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("异常");
}
return "结果";
}).exceptionally(e -> {
// 异常处理
System.out.println(e.getMessage());
return "异常结果";
});
// 主线程继续干活
// 阻塞等待异步线程返回结果,谁快返回谁
String result = cf.join();
System.out.println(result);
}
# 总结
CompletableFuture 的方法主要有:
supplyAsync 开启异步任务
thenCompose 连接两个异步任务
thenApply 任务的后置处理
thenCombine 合并两个异步任务
applyToEither 获取最先完成的任务
exceptionally 处理异常
由以上方法可以延伸出带 Async
后缀的方法,这类方法使用另外的线程处理,反正则是与 supplyAsync
的线程相同。
关于运行的线程
CompletableFuture 默认使用公共线程池 ForkJoinPool.commonPool()
,此线程池适合计算密集型任务,不适合 IO 操作,涉及 IO 操作需自行传入线程池。
# 参考资料
上次更新: 2022/12/31, 03:04:26