Skip to main content
CompletableFuture 是 JDK 8 引入的异步编程工具,实现了 FutureCompletionStage 接口,支持函数式编程和链式调用。

Future 的局限性

传统 Future 的问题:
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(() -> {
    Thread.sleep(1000);
    return "结果";
});

// 问题1:get() 会阻塞
String result = future.get();

// 问题2:无法手动完成
// 问题3:无法链式调用
// 问题4:无法组合多个 Future

创建 CompletableFuture

1. 直接创建

// 创建已完成的 Future
CompletableFuture<String> cf1 = CompletableFuture.completedFuture("result");

// 创建并手动完成
CompletableFuture<String> cf2 = new CompletableFuture<>();
cf2.complete("手动完成");

2. 异步执行

// 无返回值
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
    System.out.println("异步执行任务");
});

// 有返回值
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
    return "异步执行结果";
});

// 指定线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
    return "使用自定义线程池";
}, executor);

获取结果

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "结果");

// 阻塞获取
String result1 = cf.get();  // 可能抛出异常
String result2 = cf.get(5, TimeUnit.SECONDS);  // 超时获取

// 不抛受检异常
String result3 = cf.join();

// 获取已完成的值,否则返回默认值
String result4 = cf.getNow("默认值");

结果转换

thenApply - 转换结果

CompletableFuture<Integer> cf = CompletableFuture
    .supplyAsync(() -> "123")
    .thenApply(s -> Integer.parseInt(s))   // 转换为 Integer
    .thenApply(i -> i * 2);                // 乘以 2

System.out.println(cf.join());  // 246

thenAccept - 消费结果

CompletableFuture.supplyAsync(() -> "结果")
    .thenAccept(result -> {
        System.out.println("接收到:" + result);
    });

thenRun - 执行下一个任务

CompletableFuture.supplyAsync(() -> "结果")
    .thenRun(() -> {
        System.out.println("上一步完成,执行下一步");
    });

同步与异步

// 同步执行(使用上一步的线程)
.thenApply(x -> x * 2)

// 异步执行(使用默认线程池)
.thenApplyAsync(x -> x * 2)

// 异步执行(使用自定义线程池)
.thenApplyAsync(x -> x * 2, executor)

结果组合

thenCompose - 扁平化

用于两个有依赖关系的异步任务。
CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> 1)
    .thenCompose(i -> CompletableFuture.supplyAsync(() -> "结果:" + i));

System.out.println(cf.join());  // 结果:1

thenCombine - 合并两个结果

用于两个独立的异步任务。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combined = cf1.thenCombine(cf2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.join());  // Hello World

allOf - 等待所有完成

CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "任务1");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "任务2");
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "任务3");

CompletableFuture<Void> allOf = CompletableFuture.allOf(cf1, cf2, cf3);

// 等待所有完成
allOf.join();

// 获取所有结果
List<String> results = Stream.of(cf1, cf2, cf3)
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

anyOf - 任一完成

CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "慢任务";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
    sleep(100);
    return "快任务";
});

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(cf1, cf2);
System.out.println(anyOf.join());  // 快任务

异常处理

exceptionally - 异常恢复

CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> {
        if (true) throw new RuntimeException("出错了");
        return "成功";
    })
    .exceptionally(ex -> {
        System.out.println("异常:" + ex.getMessage());
        return "默认值";  // 返回默认值
    });

System.out.println(cf.join());  // 默认值

handle - 统一处理

无论成功或失败都会执行。
CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("出错了");
        }
        return "成功";
    })
    .handle((result, ex) -> {
        if (ex != null) {
            return "发生异常:" + ex.getMessage();
        }
        return "结果:" + result;
    });

whenComplete - 回调通知

不改变结果,仅做通知。
CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> "结果")
    .whenComplete((result, ex) -> {
        if (ex != null) {
            System.out.println("异常:" + ex);
        } else {
            System.out.println("成功:" + result);
        }
    });

实战示例

示例1:并行调用多个服务

public class ParallelServiceCall {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        
        // 并行调用三个服务
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "用户信息";
        });
        
        CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> {
            sleep(1500);
            return "订单信息";
        });
        
        CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> {
            sleep(800);
            return "商品信息";
        });
        
        // 等待所有完成并汇总
        CompletableFuture<String> result = CompletableFuture
            .allOf(userFuture, orderFuture, productFuture)
            .thenApply(v -> {
                return String.format("汇总:%s, %s, %s",
                    userFuture.join(),
                    orderFuture.join(),
                    productFuture.join());
            });
        
        System.out.println(result.join());
        System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
        // 耗时约 1500ms(而非 3300ms)
    }
    
    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {}
    }
}

示例2:异步任务编排

public class AsyncTaskChain {
    public static void main(String[] args) {
        CompletableFuture<String> result = CompletableFuture
            // 1. 获取用户ID
            .supplyAsync(() -> {
                System.out.println("步骤1:获取用户ID");
                return 12345L;
            })
            // 2. 根据用户ID查询用户信息
            .thenCompose(userId -> CompletableFuture.supplyAsync(() -> {
                System.out.println("步骤2:查询用户信息,ID=" + userId);
                return new User(userId, "张三");
            }))
            // 3. 根据用户信息查询订单
            .thenCompose(user -> CompletableFuture.supplyAsync(() -> {
                System.out.println("步骤3:查询订单,用户=" + user.name);
                return "订单-001";
            }))
            // 4. 异常处理
            .exceptionally(ex -> {
                System.out.println("发生异常:" + ex.getMessage());
                return "默认订单";
            });
        
        System.out.println("最终结果:" + result.join());
    }
}

class User {
    Long id;
    String name;
    User(Long id, String name) {
        this.id = id;
        this.name = name;
    }
}

示例3:带超时的异步调用

public class AsyncWithTimeout {
    public static void main(String[] args) {
        CompletableFuture<String> cf = CompletableFuture
            .supplyAsync(() -> {
                sleep(5000);  // 模拟慢操作
                return "结果";
            })
            // JDK 9+ 支持
            // .orTimeout(2, TimeUnit.SECONDS)
            // .completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS)
            ;
        
        try {
            String result = cf.get(2, TimeUnit.SECONDS);
            System.out.println("结果:" + result);
        } catch (TimeoutException e) {
            System.out.println("操作超时");
            cf.cancel(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {}
    }
}

API 速查表

创建

方法说明
supplyAsync(Supplier)异步执行,有返回值
runAsync(Runnable)异步执行,无返回值
completedFuture(value)创建已完成的 Future

转换

方法说明
thenApply转换结果
thenAccept消费结果
thenRun执行下一步
thenCompose扁平化(依赖任务)
thenCombine合并两个结果

组合

方法说明
allOf等待所有完成
anyOf任一完成

异常

方法说明
exceptionally异常恢复
handle统一处理
whenComplete回调通知

最佳实践

1. 使用自定义线程池

// 不推荐:使用默认的 ForkJoinPool
CompletableFuture.supplyAsync(() -> "result");

// 推荐:使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> "result", executor);

2. 避免阻塞

// 不推荐:在异步回调中阻塞
cf.thenApply(result -> {
    return anotherCf.get();  // 阻塞!
});

// 推荐:使用 thenCompose
cf.thenCompose(result -> anotherCf);

3. 正确处理异常

CompletableFuture.supplyAsync(() -> riskyOperation())
    .exceptionally(ex -> {
        log.error("操作失败", ex);
        return defaultValue;
    })
    .thenAccept(result -> process(result));

4. 合理设置超时

// JDK 9+
cf.orTimeout(5, TimeUnit.SECONDS);

// JDK 8
cf.get(5, TimeUnit.SECONDS);

小结

  • CompletableFuture 解决了传统 Future 的局限性
  • 支持链式调用、函数式编程风格
  • 创建:supplyAsync、runAsync
  • 转换:thenApply、thenCompose
  • 组合:allOf、anyOf、thenCombine
  • 异常:exceptionally、handle、whenComplete
  • 实际使用中建议配合自定义线程池,并合理处理异常和超时