当前位置: 首页 > news >正文

Java多线程:ThreadPoolTaskExecutor线程池 与CompletableFuture如何打出组合拳

这里写目录标题

  • ThreadPoolTaskExecutor线程池
    • 核心配置参数
    • 任务提交方式
      • 1. 执行无返回值任务 (Runnable)
      • 2. 执行有返回值任务 (Callable)
    • 完整使用示例
    • 关键配置说明
    • 拒绝策略详解
    • 监控线程池状态
  • CompletableFuture 异步操作组合
    • 一、核心方法解析
      • 1. 创建方法
      • 2. 结果处理方法
      • 3. 组合方法
      • 4. 异常处理方法
      • 5. 控制方法
    • 二、完整使用示例
      • 1. 基础使用
      • 2. 任务组合
      • 3. 批量处理
      • 4. 异常处理
  • CompletableFuture 与线程池的关系
    • 关系
    • 协作方式
    • 通用设计原则
    • 生活化比喻

ThreadPoolTaskExecutor线程池

ThreadPoolTaskExecutor 是 Spring 框架提供的线程池实现,它封装了 Java 原生的 ThreadPoolExecutor,提供了更便捷的配置方式以及与 Spring 生态的深度集成。以下是详细用法指南:


核心配置参数

@Bean
public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心配置参数executor.setCorePoolSize(5);         // 核心线程数 (长期保留的线程)executor.setMaxPoolSize(10);          // 最大线程数 (突发流量时创建的最大线程数)executor.setQueueCapacity(100);       // 任务队列容量executor.setKeepAliveSeconds(60);     // 空闲线程存活时间(秒)executor.setThreadNamePrefix("task-");// 线程名前缀// 其他重要配置executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略executor.setWaitForTasksToCompleteOnShutdown(true); // 关闭时等待任务完成executor.setAwaitTerminationSeconds(30);            // 等待任务完成的最大时间executor.initialize(); // 必须初始化!return executor;
}

任务提交方式

1. 执行无返回值任务 (Runnable)

@Autowired
private ThreadPoolTaskExecutor taskExecutor;public void executeTask() {taskExecutor.execute(() -> {// 执行业务逻辑System.out.println(Thread.currentThread().getName() + " is running");});
}

2. 执行有返回值任务 (Callable)

public Future<String> submitTask() {return taskExecutor.submit(() -> {Thread.sleep(1000);return "Task result";});
}// 获取结果
Future<String> future = submitTask();
String result = future.get(); // 阻塞获取结果

完整使用示例

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Future;@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(50);executor.setKeepAliveSeconds(120);executor.setThreadNamePrefix("app-thread-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}@Service
public class TaskService {@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;// 执行批量任务public void executeBatchTasks() {for (int i = 0; i < 20; i++) {final int taskId = i;taskExecutor.execute(() -> processTask(taskId));}}private void processTask(int id) {System.out.printf("[%s] Processing task #%d%n", Thread.currentThread().getName(), id);// 模拟业务处理try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }}// 执行带返回值的任务public Future<Integer> calculateResult(int input) {return taskExecutor.submit(() -> {// 模拟复杂计算Thread.sleep(1000);return input * 2;});}
}

关键配置说明

参数说明
corePoolSize核心线程数,线程池长期保持的线程数量
maxPoolSize最大线程数,当队列满时创建新线程的最大数量
queueCapacity任务队列容量 (建议使用有界队列)
keepAliveSeconds非核心线程空闲存活时间
threadNamePrefix线程名前缀 (方便日志追踪)
rejectedExecutionHandler拒绝策略 (重要!)
waitForTasksToCompleteOnShutdown关闭时是否等待任务完成 (建议true)
awaitTerminationSeconds等待任务完成的超时时间

拒绝策略详解

当队列和线程池都满时,如何处理新任务:

策略行为
AbortPolicy (默认)抛出 RejectedExecutionException 异常
CallerRunsPolicy由调用线程直接执行该任务 (推荐)
DiscardPolicy直接丢弃任务,不抛异常
DiscardOldestPolicy丢弃队列最前面的任务,然后重试执行
// 推荐设置(避免任务丢失)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

监控线程池状态

// 获取运行时指标
int activeCount = taskExecutor.getActiveCount();    // 活动线程数
int poolSize = taskExecutor.getPoolSize();          // 当前线程数
long completedCount = taskExecutor.getThreadPoolExecutor().getCompletedTaskCount(); // 已完成任务数// 打印状态
System.out.printf("Active: %d, Pool: %d, Completed: %d%n",activeCount, poolSize, completedCount);

CompletableFuture 异步操作组合

CompletableFuture 是 Java 8 引入的异步编程核心类,用于简化多线程任务编排、非阻塞操作和复杂异步流程管理。它实现了 FutureCompletionStage 接口,不仅具备传统 Future 的异步结果获取能力,还支持链式调用、任务组合、异常处理等高级功能

一、核心方法解析

1. 创建方法

方法描述示例
supplyAsync(Supplier)执行有返回值的异步任务CompletableFuture.supplyAsync(() -> "Hello")
runAsync(Runnable)执行无返回值的异步任务CompletableFuture.runAsync(() -> System.out.println("Done"))
completedFuture(value)创建已完成的FutureCompletableFuture.completedFuture("Result")

2. 结果处理方法

所有前置的异步任务(CompletableFuture)都完成后,执行

方法描述特点
thenApply(Function)同步转换结果在主线程执行
thenApplyAsync(Function)异步转换结果在ForkJoinPool执行
thenApplyAsync(Function, Executor)在指定线程池转换自定义执行上下文
thenAccept(Consumer)消费结果无返回值
thenRun(Runnable)在所有任务完成后执行后续操作无输入无输出,后续清理、通知、汇总等操作

在这里插入图片描述

// 举个对比示例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);future1.thenCombine(future2, (x, y) -> x + y).thenAccept(result -> System.out.println("Sum: " + result));

3. 组合方法

方法描述适用场景
thenCompose(Function)链式组合(flatMap)任务B依赖任务A结果
thenCombine(CompletionStage, BiFunction)合并两个独立任务结果并行执行两个任务
allOf(CompletableFuture...)等待所有任务完成批量处理
anyOf(CompletableFuture...)等待任一任务完成快速响应

4. 异常处理方法

方法描述特点
exceptionally(Function)异常处理返回替代值
handle(BiFunction)结果和异常统一处理无论成功失败都执行
whenComplete(BiConsumer)观察结果或异常不修改结果

5. 控制方法

方法描述使用场景
complete(T value)手动完成Future超时处理
completeExceptionally(Throwable)以异常完成Future错误注入
cancel(boolean)取消任务资源释放
orTimeout(long, TimeUnit)设置超时(Java 9+)防止阻塞
completeOnTimeout(T, long, TimeUnit)超时返回默认值(Java 9+)优雅降级

二、完整使用示例

1. 基础使用

CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try { Thread.sleep(1000); } catch (InterruptedException e) {}return "Hello";
})
.thenApplyAsync(result -> result + " World") // 异步转换
.thenAccept(System.out::println)             // 消费结果
.exceptionally(ex -> {System.err.println("Error: " + ex.getMessage());return null;
});

2. 任务组合

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");// 组合两个独立任务
future1.thenCombine(future2, (res1, res2) -> res1 + " & " + res2).thenAccept(System.out::println); // 输出: Task1 & Task2// 链式依赖
future1.thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " completed")).thenAccept(System.out::println); // 输出: Task1 completed

3. 批量处理

示例一:

List<CompletableFuture<String>> futures = IntStream.range(1, 6).mapToObj(i -> CompletableFuture.supplyAsync(() -> "Result" + i)).collect(Collectors.toList());CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])
);CompletableFuture<List<String>> allResults = allFutures.thenApply(v ->futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);allResults.thenAccept(results -> System.out.println("All results: " + results));
// 输出: All results: [Result1, Result2, Result3, Result4, Result5]

示例二:

  • 如果仅需在所有任务完成后触发后续操作(如通知、清理),用 thenRun() 即可,无需 join()
  • 如果需阻塞当前线程等待任务完成(如主线程必须等待结果),则必须调用 join()
// 多线程分别查询任务详情
List<CompletableFuture<TaskPo>> futures = new ArrayList<>();
for (int id : allTaskIds) {try {// 给pnpTagExecutor 安排执行有返回值的异步任务CompletableFuture<TaskPo> future = CompletableFuture.supplyAsync(() -> taskService.getTask(id), pnpTagExecutor);// 将异步任务的未来结果(Future 对象)添加到一个 futures 集合中,以便后续可以跟踪或处理这些异步任务的结果futures.add(future);} catch (Exception e) {log.error("query task err:{}", id, e.getMessage());}
}// 等待所有任务完成
// toArray(...):这是 Java 中 List 接口提供的方法,用于将列表转换为数组
// new CompletableFuture[0]:指定目标数组的类型和长度。虽然传入的是 0,但 toArray() 方法会根据列表的实际大小自动调整数组长度
// 使用 CompletableFuture.allOf() 方法时,需要传入一个 CompletableFuture[] 数组
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));// 等待所有 CompletableFuture 任务完成的关键语句,它会 阻塞当前线程,直到所有的异步任务都执行完毕。
// 如果不加这一句,主线程可能会提前结束,导致部分异步任务没有被执行完或被中断
allFutures.join();HashMap<String, TaskPo> taskMap = new HashMap<>();// 所有前置的异步任务(CompletableFuture)都完成后,执行一个指定的无返回值的后续操作
allFutures.thenRun(() -> {for (CompletableFuture<TaskPo> future : futures) {try {if (future == null) {continue;}// 获取每个 CompletableFuture 的结果taskMap.put(future.get().getId(), future.get());} catch (Exception e) {log.error("Error getting future result: {}", e.getMessage());}}
});

示例三:

批量下载文件后统一处理

List<CompletableFuture<Void>> downloadTasks = urls.stream().map(url -> CompletableFuture.runAsync(() -> downloadFile(url), executor)).collect(Collectors.toList());CompletableFuture<Void> allDownloads = CompletableFuture.allOf(downloadTasks.toArray(new CompletableFuture[0])
);allDownloads.thenRun(() -> {System.out.println("All files downloaded successfully.");processDownloadedFiles();
});

示例四:

多个 API 调用完成后汇总日志

CompletableFuture<User> userFuture = getUserInfoAsync();
CompletableFuture<Order> orderFuture = getOrderInfoAsync();CompletableFuture<Void> allFutures = CompletableFuture.allOf(userFuture, orderFuture);allFutures.thenRun(() -> {User user = userFuture.join(); // 获取用户信息Order order = orderFuture.join(); // 获取订单信息log.info("User: {}, Order: {}", user.getName(), order.getId());
});

4. 异常处理

CompletableFuture.supplyAsync(() -> {if (Math.random() > 0.5) {throw new RuntimeException("Random error");}return "Success";
})
.handle((result, ex) -> {if (ex != null) {return "Fallback value";}return result;
})
.thenAccept(System.out::println);

CompletableFuture 与线程池的关系

关系

CompletableFuture 与线程池的关系,本质上是 任务编排者与执行者的协作关系。用更通用的视角解释如下:

  • CompletableFuture
    是​​任务编排框架​​,负责定义异步任务的依赖关系、组合逻辑和结果处理(如“任务A完成后触发任务B,合并任务C和D的结果”)。
    它通过回调链(如 thenApplythenCombine)实现非阻塞的任务调度,开发者无需手动管理线程状态。
  • 线程池
    是​​任务执行引擎​​,提供线程资源来实际运行任务(如计算、网络请求等)。
    它管理线程的生命周期、复用和调度,避免频繁创建/销毁线程的开销。

协作方式

  • 任务提交
    当调用 supplyAsync()thenApplyAsync() 时,CompletableFuture 将任务提交给线程池,由线程池分配线程执行。

    // 任务提交给自定义线程池
    ExecutorService pool = Executors.newFixedThreadPool(4);
    CompletableFuture.supplyAsync(() -> "任务结果", pool);
    
  • 结果回调
    任务完成后,线程池中的线程将结果返回给 CompletableFuture,由其触发后续回调(如 thenAccept)。

  • 资源隔离
    不同业务可使用独立线程池(如核心业务池 vs 日志池),防止资源竞争。


通用设计原则

场景策略
CPU密集型任务固定大小线程池(线程数≈CPU核数)
IO密集型任务弹性线程池(线程数=2~4倍CPU核数)
任务编排复杂度高thenCompose 处理嵌套任务,allOf 合并并行任务
超时熔断JDK9+ 的 orTimeout() + exceptionally() 实现降级

生活化比喻

想象一家餐厅:

  • 线程池 = 厨师团队(4人固定编制,高效炒菜)。

CompletableFuture

= 调度系统:

  • 接单后分配厨师(supplyAsync)。

  • 炒菜完成后自动通知服务员上菜(thenAccept)。

  • 多份订单合并出餐(allOf)。

  • 风险:若所有订单挤占厨师(共用线程池),新订单将饿死(死锁)。

http://www.lqws.cn/news/122275.html

相关文章:

  • Redisson - 实现延迟队列
  • [特殊字符] 深度剖析 n8n 与 Dify:使用场景、优劣势及技术选型建议
  • DA14531_beacon_大小信标设备开发
  • mysql 悲观锁和乐观锁(—悲观锁)
  • 电路设计基础-2
  • Redis-旁路缓存策略详解
  • JSON基础知识
  • Java 线程池原理详解
  • 分布式互斥算法
  • 使用ArcPy进行栅格数据分析
  • Axios 取消请求的演进:CancelToken vs. AbortController
  • rknn优化教程(一)
  • 海信IP810N-海思MV320芯片-安卓9-2+16G-免拆优盘卡刷固件包
  • 瀚文机械键盘固件开发详解:HWKeyboard.cpp文件解析与应用
  • Async-profiler 内存采样机制解析:从原理到实现
  • Docker慢慢学
  • Java 中 ArrayList、Vector、LinkedList 的核心区别与应用场景
  • 【Docker 从入门到实战全攻略(二):核心概念 + 命令详解 + 部署案例】
  • Spring Boot 从Socket 到Netty网络编程(下):Netty基本开发与改进【心跳、粘包与拆包、闲置连接】
  • java从azure中读取用户信息
  • Docker 常用命令详解
  • docker生命周期
  • Elasticsearch的搜索流程描述
  • 微软的新系统Windows12未来有哪些新特性
  • Python 隐藏法宝:双下划线 _ _Dunder_ _
  • stripe支付测试,ngrok无法使用?免费vscode端口转发,轻松简单!
  • Java Lombok @Data 注解用法详解
  • 打卡Day44
  • 吴恩达机器学习讲义概述
  • 泛型编程技巧——使用std::enable_if实现按类型进行条件编译​