NipGeihou's blog NipGeihou's blog
  • Java

    • 开发规范
    • 进阶笔记
    • 微服务
    • 快速开始
    • 设计模式
  • 其他

    • Golang
    • Python
    • Drat
  • Redis
  • MongoDB
  • 数据结构与算法
  • 计算机网络
  • 应用

    • Grafana
    • Prometheus
  • 容器与编排

    • KubeSphere
    • Kubernetes
    • Docker Compose
    • Docker
  • 组网

    • TailScale
    • WireGuard
  • 密码生成器
  • 英文单词生成器
🍳烹饪
🧑‍💻关于
  • 分类
  • 标签
  • 归档

NipGeihou

我见青山多妩媚,料青山见我应如是
  • Java

    • 开发规范
    • 进阶笔记
    • 微服务
    • 快速开始
    • 设计模式
  • 其他

    • Golang
    • Python
    • Drat
  • Redis
  • MongoDB
  • 数据结构与算法
  • 计算机网络
  • 应用

    • Grafana
    • Prometheus
  • 容器与编排

    • KubeSphere
    • Kubernetes
    • Docker Compose
    • Docker
  • 组网

    • TailScale
    • WireGuard
  • 密码生成器
  • 英文单词生成器
🍳烹饪
🧑‍💻关于
  • 分类
  • 标签
  • 归档
  • 设计模式

  • 开发规范

  • 经验分享

  • 记录

  • 快速开始

  • 笔记

    • 多线程与并发

      • 前言
      • 理论基础
      • Java中各种锁的概念
      • 关键字:synchronized
      • 关键字:volatile
      • 关键字:final
      • Java多线程 - 线程池
      • Java多线程 - Block Queue阻塞队列
      • Java多线程 - 辅助类
      • Java多线程 - CompletableFuture
        • 方法
          • 开启异步任务
          • 处理上一个异步任务
          • 再执行一个任务,后处理两个任务
          • 再执行一个任务,任意一个完成后处理
          • 异常处理
        • 示例
          • 单个异步 supplyAsync
          • 异步完又异步 thenCompose
          • 异步完再异步 thenApply
          • 两个同时异步 thenCombine
          • 两个异步谁快返回谁 applyToEither
          • 处理异步异常 exceptionally
        • 总结
        • 参考资料
      • Java多线程 - 线程变量传递ThreadLocal
    • JDK

    • Java集合

    • Spring

    • JVM

    • Other

  • 面试题

  • 微服务

  • 踩过的坑

  • Java
  • 笔记
  • 多线程与并发
NipGeihou
2022-08-11
目录

Java多线程 - CompletableFuture

笔记

JDK1.8+,一个链式操作的异步工具类

# 方法

# 开启异步任务

// 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    
// 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)

# 处理上一个异步任务

// 接收上一个任务的返回作为入参,有返回值
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn)

// 接收上一个任务的返回作为入参,没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

// 没有入参,没有返回值
public CompletableFuture<Void> thenRun(Runnable action)

# 再执行一个任务,后处理两个任务

参数 1:第二个任务,参数 2 在上一个任务和参数 1 的任务完成后执行。

// 接收两个任务的返回作为入参,有返回值
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

// 接收上两个任务的返回作为入参,没有返回值
public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    
// 没有入参,没有返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)

# 再执行一个任务,任意一个完成后处理

参数 1:第二个任务,参数 2 在上一个任务和参数 1 的任务完成后执行。

// 接收最快完成任务的返回作为入参,有返回值
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)
    
// 接收最快完成任务的返回作为入参,没有返回值
public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)
    
// 没有入参,没有返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)    

# 异常处理

// 捕获上面任务的异常,有返回值
public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn)
    
// 接收上一个任务的返回作为入参1 或 将上面任务出现的异常作为入参2;换言之入参1和入参2只有一个能用,有返回值
public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn)
    
// 接收上一个任务的返回作为入参1 或 将上面任务出现的异常作为入参2;换言之入参1和入参2只有一个能用,没有返回值
public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action)

# 示例

# 单个异步 supplyAsync

开启一个异步线程。

@Test
void supplyAsyncTest() {
    // 主线程干活

    // 异步执行
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        // 异步一些耗时的操作

        // 异步操作完成后,返回结果
        return "结果";
    });

    // 主线程继续干活


    // 阻塞等待异步线程返回结果
    String result = cf.join();
    System.out.println(result);
}

# 异步完又异步 thenCompose

使用 thenApplyAsync 更优雅

开启一个异步线程 A,并等它返回结果 res 后,再开启另一个异步线程 B,并传递 A 的 res 给 B。

@Test
public void thenComposeTest() {
    // 主线程干活

    // 异步执行
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        // 异步一些耗时的操作

        // 异步操作完成后,返回结果
        return "结果1";
    }).thenCompose(res -> CompletableFuture.supplyAsync(() -> { // 上一个异步操作完成后,执行下一个异步操作
        // res:上一个异步操作的结果

        return res + "结果2";
    }));


    // 主线程继续干活

    // 阻塞等待异步线程返回结果
    String result = cf.join();
    System.out.println(result);
}

# 异步完再异步 thenApply

@Test
public void thenApplyTest() {
    // 主线程干活

    // 异步执行
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        // 异步一些耗时的操作

        // 异步操作完成后,返回结果
        return "结果1";
    }).thenApply(res -> { // 上一个异步操作完成后,执行下一个异步操作
        // res:上一个异步操作的结果
        return res + "结果2";
    });


    // 主线程继续干活

    // 阻塞等待异步线程返回结果
    String result = cf.join();
    System.out.println(result);
}

# 两个同时异步 thenCombine

同时开启两个异步线程,等待他们完成,执行合并。

@Test
public void thenCombineTest() {
    // 主线程干活

    // 异步执行
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        // 异步一些耗时的操作

        // 异步操作完成后,返回结果
        return "结果1";
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
        // 与上一个异步操作并行执行
        return "结果2";
    }), (res1, res2) -> {
        // 结合两个异步操作的结果
        System.out.println(res1);
        System.out.println(res2);
        return res1 + res2;
    }
                  );


    // 主线程继续干活

    // 阻塞等待异步线程返回结果
    String result = cf.join();
    System.out.println(result);
}

# 两个异步谁快返回谁 applyToEither

@Test
public void applyToEitherTest() {
    // 主线程干活

    // 异步执行
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        // 异步一些耗时的操作
        sleep(100);
        // 异步操作完成后,返回结果
        return "结果1";
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        // 与上一个异步操作并行执行
        sleep(200);
        return "结果2";
    }), res -> res);


    // 主线程继续干活

    // 阻塞等待异步线程返回结果,谁快返回谁
    String result = cf.join();
    System.out.println(result);
}

# 处理异步异常 exceptionally

@Test
public void exceptionallyTest() {
    // 主线程干活

    // 异步执行
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("异常");
        }
        return "结果";
    }).exceptionally(e -> {
        // 异常处理
        System.out.println(e.getMessage());
        return "异常结果";
    });


    // 主线程继续干活

    // 阻塞等待异步线程返回结果,谁快返回谁
    String result = cf.join();
    System.out.println(result);
}

# 总结

CompletableFuture 的方法主要有:

supplyAsync	开启异步任务
thenCompose	连接两个异步任务
thenApply	任务的后置处理
thenCombine 合并两个异步任务
applyToEither 获取最先完成的任务
exceptionally 处理异常

由以上方法可以延伸出带 Async 后缀的方法,这类方法使用另外的线程处理,反正则是与 supplyAsync 的线程相同。

关于运行的线程

CompletableFuture 默认使用公共线程池 ForkJoinPool.commonPool() ,此线程池适合计算密集型任务,不适合 IO 操作,涉及 IO 操作需自行传入线程池。

# 参考资料

  1. 【Java 并发・03】CompletableFuture 入门_哔哩哔哩_bilibili (opens new window)
上次更新: 2022/12/31, 03:04:26
Java多线程 - 辅助类
Java多线程 - 线程变量传递ThreadLocal

← Java多线程 - 辅助类 Java多线程 - 线程变量传递ThreadLocal→

最近更新
01
Docker Swarm
04-18
02
安全隧道 - gost
04-17
03
Solana最佳实践
04-16
更多文章>
Theme by Vdoing | Copyright © 2018-2025 NipGeihou | 友情链接
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式