Java多线程:ThreadPoolTaskExecutor线程池 与CompletableFuture如何打出组合拳
这里写目录标题
- ThreadPoolTaskExecutor线程池
- 核心配置参数
- 任务提交方式
- 1. 执行无返回值任务 (Runnable)
- 2. 执行有返回值任务 (Callable)
- 完整使用示例
- 关键配置说明
- 拒绝策略详解
- 监控线程池状态
- CompletableFuture 异步操作组合
- 一、核心方法解析
- 1. 创建方法
- 2. 结果处理方法
- 3. 组合方法
- 4. 异常处理方法
- 5. 控制方法
- 二、完整使用示例
- 1. 基础使用
- 2. 任务组合
- 3. 批量处理
- 4. 异常处理
- CompletableFuture 与线程池的关系
- 关系
- 协作方式
- 通用设计原则
- 生活化比喻
ThreadPoolTaskExecutor线程池
ThreadPoolTaskExecutor
是 Spring 框架提供的线程池实现,它封装了 Java 原生的 ThreadPoolExecutor
,提供了更便捷的配置方式以及与 Spring 生态的深度集成。以下是详细用法指南:
核心配置参数
@Bean
public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心配置参数executor.setCorePoolSize(5); // 核心线程数 (长期保留的线程)executor.setMaxPoolSize(10); // 最大线程数 (突发流量时创建的最大线程数)executor.setQueueCapacity(100); // 任务队列容量executor.setKeepAliveSeconds(60); // 空闲线程存活时间(秒)executor.setThreadNamePrefix("task-");// 线程名前缀// 其他重要配置executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略executor.setWaitForTasksToCompleteOnShutdown(true); // 关闭时等待任务完成executor.setAwaitTerminationSeconds(30); // 等待任务完成的最大时间executor.initialize(); // 必须初始化!return executor;
}
任务提交方式
1. 执行无返回值任务 (Runnable)
@Autowired
private ThreadPoolTaskExecutor taskExecutor;public void executeTask() {taskExecutor.execute(() -> {// 执行业务逻辑System.out.println(Thread.currentThread().getName() + " is running");});
}
2. 执行有返回值任务 (Callable)
public Future<String> submitTask() {return taskExecutor.submit(() -> {Thread.sleep(1000);return "Task result";});
}// 获取结果
Future<String> future = submitTask();
String result = future.get(); // 阻塞获取结果
完整使用示例
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Future;@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(50);executor.setKeepAliveSeconds(120);executor.setThreadNamePrefix("app-thread-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}@Service
public class TaskService {@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;// 执行批量任务public void executeBatchTasks() {for (int i = 0; i < 20; i++) {final int taskId = i;taskExecutor.execute(() -> processTask(taskId));}}private void processTask(int id) {System.out.printf("[%s] Processing task #%d%n", Thread.currentThread().getName(), id);// 模拟业务处理try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }}// 执行带返回值的任务public Future<Integer> calculateResult(int input) {return taskExecutor.submit(() -> {// 模拟复杂计算Thread.sleep(1000);return input * 2;});}
}
关键配置说明
参数 | 说明 |
---|---|
corePoolSize | 核心线程数,线程池长期保持的线程数量 |
maxPoolSize | 最大线程数,当队列满时创建新线程的最大数量 |
queueCapacity | 任务队列容量 (建议使用有界队列) |
keepAliveSeconds | 非核心线程空闲存活时间 |
threadNamePrefix | 线程名前缀 (方便日志追踪) |
rejectedExecutionHandler | 拒绝策略 (重要!) |
waitForTasksToCompleteOnShutdown | 关闭时是否等待任务完成 (建议true) |
awaitTerminationSeconds | 等待任务完成的超时时间 |
拒绝策略详解
当队列和线程池都满时,如何处理新任务:
策略 | 行为 |
---|---|
AbortPolicy (默认) | 抛出 RejectedExecutionException 异常 |
CallerRunsPolicy | 由调用线程直接执行该任务 (推荐) |
DiscardPolicy | 直接丢弃任务,不抛异常 |
DiscardOldestPolicy | 丢弃队列最前面的任务,然后重试执行 |
// 推荐设置(避免任务丢失)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
监控线程池状态
// 获取运行时指标
int activeCount = taskExecutor.getActiveCount(); // 活动线程数
int poolSize = taskExecutor.getPoolSize(); // 当前线程数
long completedCount = taskExecutor.getThreadPoolExecutor().getCompletedTaskCount(); // 已完成任务数// 打印状态
System.out.printf("Active: %d, Pool: %d, Completed: %d%n",activeCount, poolSize, completedCount);
CompletableFuture 异步操作组合
CompletableFuture 是 Java 8 引入的异步编程核心类,用于简化多线程任务编排、非阻塞操作和复杂异步流程管理。它实现了 Future
和 CompletionStage
接口,不仅具备传统 Future
的异步结果获取能力,还支持链式调用、任务组合、异常处理等高级功能
一、核心方法解析
1. 创建方法
方法 | 描述 | 示例 |
---|---|---|
supplyAsync(Supplier) | 执行有返回值的异步任务 | CompletableFuture.supplyAsync(() -> "Hello") |
runAsync(Runnable) | 执行无返回值的异步任务 | CompletableFuture.runAsync(() -> System.out.println("Done")) |
completedFuture(value) | 创建已完成的Future | CompletableFuture.completedFuture("Result") |
2. 结果处理方法
所有前置的异步任务(CompletableFuture)都完成后,执行
方法 | 描述 | 特点 |
---|---|---|
thenApply(Function) | 同步转换结果 | 在主线程执行 |
thenApplyAsync(Function) | 异步转换结果 | 在ForkJoinPool执行 |
thenApplyAsync(Function, Executor) | 在指定线程池转换 | 自定义执行上下文 |
thenAccept(Consumer) | 消费结果 | 无返回值 |
thenRun(Runnable) | 在所有任务完成后执行后续操作 | 无输入无输出,后续清理、通知、汇总等操作 |
// 举个对比示例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);future1.thenCombine(future2, (x, y) -> x + y).thenAccept(result -> System.out.println("Sum: " + result));
3. 组合方法
方法 | 描述 | 适用场景 |
---|---|---|
thenCompose(Function) | 链式组合(flatMap) | 任务B依赖任务A结果 |
thenCombine(CompletionStage, BiFunction) | 合并两个独立任务结果 | 并行执行两个任务 |
allOf(CompletableFuture...) | 等待所有任务完成 | 批量处理 |
anyOf(CompletableFuture...) | 等待任一任务完成 | 快速响应 |
4. 异常处理方法
方法 | 描述 | 特点 |
---|---|---|
exceptionally(Function) | 异常处理 | 返回替代值 |
handle(BiFunction) | 结果和异常统一处理 | 无论成功失败都执行 |
whenComplete(BiConsumer) | 观察结果或异常 | 不修改结果 |
5. 控制方法
方法 | 描述 | 使用场景 |
---|---|---|
complete(T value) | 手动完成Future | 超时处理 |
completeExceptionally(Throwable) | 以异常完成Future | 错误注入 |
cancel(boolean) | 取消任务 | 资源释放 |
orTimeout(long, TimeUnit) | 设置超时(Java 9+) | 防止阻塞 |
completeOnTimeout(T, long, TimeUnit) | 超时返回默认值(Java 9+) | 优雅降级 |
二、完整使用示例
1. 基础使用
CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try { Thread.sleep(1000); } catch (InterruptedException e) {}return "Hello";
})
.thenApplyAsync(result -> result + " World") // 异步转换
.thenAccept(System.out::println) // 消费结果
.exceptionally(ex -> {System.err.println("Error: " + ex.getMessage());return null;
});
2. 任务组合
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");// 组合两个独立任务
future1.thenCombine(future2, (res1, res2) -> res1 + " & " + res2).thenAccept(System.out::println); // 输出: Task1 & Task2// 链式依赖
future1.thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " completed")).thenAccept(System.out::println); // 输出: Task1 completed
3. 批量处理
示例一:
List<CompletableFuture<String>> futures = IntStream.range(1, 6).mapToObj(i -> CompletableFuture.supplyAsync(() -> "Result" + i)).collect(Collectors.toList());CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])
);CompletableFuture<List<String>> allResults = allFutures.thenApply(v ->futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);allResults.thenAccept(results -> System.out.println("All results: " + results));
// 输出: All results: [Result1, Result2, Result3, Result4, Result5]
示例二:
- 如果仅需在所有任务完成后触发后续操作(如通知、清理),用
thenRun()
即可,无需join()
; - 如果需阻塞当前线程等待任务完成(如主线程必须等待结果),则必须调用
join()
。
// 多线程分别查询任务详情
List<CompletableFuture<TaskPo>> futures = new ArrayList<>();
for (int id : allTaskIds) {try {// 给pnpTagExecutor 安排执行有返回值的异步任务CompletableFuture<TaskPo> future = CompletableFuture.supplyAsync(() -> taskService.getTask(id), pnpTagExecutor);// 将异步任务的未来结果(Future 对象)添加到一个 futures 集合中,以便后续可以跟踪或处理这些异步任务的结果futures.add(future);} catch (Exception e) {log.error("query task err:{}", id, e.getMessage());}
}// 等待所有任务完成
// toArray(...):这是 Java 中 List 接口提供的方法,用于将列表转换为数组
// new CompletableFuture[0]:指定目标数组的类型和长度。虽然传入的是 0,但 toArray() 方法会根据列表的实际大小自动调整数组长度
// 使用 CompletableFuture.allOf() 方法时,需要传入一个 CompletableFuture[] 数组
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));// 等待所有 CompletableFuture 任务完成的关键语句,它会 阻塞当前线程,直到所有的异步任务都执行完毕。
// 如果不加这一句,主线程可能会提前结束,导致部分异步任务没有被执行完或被中断
allFutures.join();HashMap<String, TaskPo> taskMap = new HashMap<>();// 所有前置的异步任务(CompletableFuture)都完成后,执行一个指定的无返回值的后续操作
allFutures.thenRun(() -> {for (CompletableFuture<TaskPo> future : futures) {try {if (future == null) {continue;}// 获取每个 CompletableFuture 的结果taskMap.put(future.get().getId(), future.get());} catch (Exception e) {log.error("Error getting future result: {}", e.getMessage());}}
});
示例三:
批量下载文件后统一处理
List<CompletableFuture<Void>> downloadTasks = urls.stream().map(url -> CompletableFuture.runAsync(() -> downloadFile(url), executor)).collect(Collectors.toList());CompletableFuture<Void> allDownloads = CompletableFuture.allOf(downloadTasks.toArray(new CompletableFuture[0])
);allDownloads.thenRun(() -> {System.out.println("All files downloaded successfully.");processDownloadedFiles();
});
示例四:
多个 API 调用完成后汇总日志
CompletableFuture<User> userFuture = getUserInfoAsync();
CompletableFuture<Order> orderFuture = getOrderInfoAsync();CompletableFuture<Void> allFutures = CompletableFuture.allOf(userFuture, orderFuture);allFutures.thenRun(() -> {User user = userFuture.join(); // 获取用户信息Order order = orderFuture.join(); // 获取订单信息log.info("User: {}, Order: {}", user.getName(), order.getId());
});
4. 异常处理
CompletableFuture.supplyAsync(() -> {if (Math.random() > 0.5) {throw new RuntimeException("Random error");}return "Success";
})
.handle((result, ex) -> {if (ex != null) {return "Fallback value";}return result;
})
.thenAccept(System.out::println);
CompletableFuture 与线程池的关系
关系
CompletableFuture 与线程池的关系,本质上是 任务编排者与执行者的协作关系。用更通用的视角解释如下:
- CompletableFuture:
是任务编排框架,负责定义异步任务的依赖关系、组合逻辑和结果处理(如“任务A完成后触发任务B,合并任务C和D的结果”)。
它通过回调链(如thenApply
、thenCombine
)实现非阻塞的任务调度,开发者无需手动管理线程状态。 - 线程池:
是任务执行引擎,提供线程资源来实际运行任务(如计算、网络请求等)。
它管理线程的生命周期、复用和调度,避免频繁创建/销毁线程的开销。
协作方式
-
任务提交:
当调用supplyAsync()
或thenApplyAsync()
时,CompletableFuture 将任务提交给线程池,由线程池分配线程执行。// 任务提交给自定义线程池 ExecutorService pool = Executors.newFixedThreadPool(4); CompletableFuture.supplyAsync(() -> "任务结果", pool);
-
结果回调:
任务完成后,线程池中的线程将结果返回给 CompletableFuture,由其触发后续回调(如thenAccept
)。 -
资源隔离:
不同业务可使用独立线程池(如核心业务池 vs 日志池),防止资源竞争。
通用设计原则
场景 | 策略 |
---|---|
CPU密集型任务 | 固定大小线程池(线程数≈CPU核数) |
IO密集型任务 | 弹性线程池(线程数=2~4倍CPU核数) |
任务编排复杂度高 | 用 thenCompose 处理嵌套任务,allOf 合并并行任务 |
超时熔断 | JDK9+ 的 orTimeout() + exceptionally() 实现降级 |
生活化比喻
想象一家餐厅:
-
线程池 = 厨师团队(4人固定编制,高效炒菜)。
CompletableFuture
= 调度系统:
-
接单后分配厨师(
supplyAsync
)。 -
炒菜完成后自动通知服务员上菜(
thenAccept
)。 -
多份订单合并出餐(
allOf
)。 -
风险:若所有订单挤占厨师(共用线程池),新订单将饿死(死锁)。