ForkJoinTask深度解析:Java并行计算利器
本文来深入解析 java.util.concurrent.ForkJoinTask
。它是在 Java 7 中引入的,是 Fork/Join 并行计算框架的核心组件。
ForkJoinTask<V>
是一个抽象类,代表在 ForkJoinPool
中运行的任务。它与传统的 Thread
或 Runnable
不同,ForkJoinTask
设计得非常轻量级,旨在支持细粒度的并行计算。其核心思想是“分而治之”(Divide and Conquer):一个大任务可以被分解(fork)成若干个小任务,这些小任务可以并行执行,最终它们的结果可以被合并(join)起来得到大任务的结果。
ForkJoinTask
实现了 Future<V>
接口,因此它也具备了异步计算任务的特性,比如获取任务结果、取消任务、判断任务是否完成等。
ForkJoinTask
的用法
通常,我们不直接使用 ForkJoinTask
,而是使用它的两个标准子类:
-
RecursiveAction
: 用于没有返回结果的任务。你需要重写其compute()
方法。 -
RecursiveTask<V>
: 用于有返回结果的任务。你需要重写其compute()
方法,并返回类型为V
的结果。
核心方法:
-
fork()
: 异步执行任务。调用此方法后,当前线程不会等待任务完成,而是继续执行。任务会被提交到ForkJoinPool
的工作队列中,等待空闲线程执行。 -
join()
: 等待任务执行完成并获取其结果。如果任务尚未完成,调用join()
的线程会被阻塞。如果任务是RecursiveAction
,join()
返回void
(实际上是null
,但类型上表现为void
的效果)。如果任务是RecursiveTask<V>
,join()
返回计算结果V
。如果任务在执行过程中抛出未检查异常,join()
会将该异常(或其包装)抛出。如果任务被取消,join()
会抛出CancellationException
。 -
invoke()
: 同步执行任务。它会启动任务,等待其完成,并返回结果(如果是RecursiveTask
)。invoke()
相当于fork()
后立即join()
,但invoke()
可能会进行一些优化,比如直接在当前线程执行任务,而不是总是提交到池中。 -
invokeAll(ForkJoinTask<?>... tasks)
: 提交一组任务并等待它们全部完成。
示例:使用 RecursiveTask
计算数组元素之和
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;class SumArrayTask extends RecursiveTask<Long> {private static final int THRESHOLD = 1000; // 任务分解的阈值private final long[] array;private final int start;private final int end;public SumArrayTask(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Long compute() {int length = end - start;if (length <= THRESHOLD) {// 如果任务足够小,直接计算long sum = 0;for (int i = start; i < end; i++) {sum += array[i];}// System.out.println(Thread.currentThread().getName() + " computes: " + start + "-" + end + " = " + sum);return sum;} else {// 任务过大,分解成两个子任务int middle = start + (length / 2);SumArrayTask leftTask = new SumArrayTask(array, start, middle);SumArrayTask rightTask = new SumArrayTask(array, middle, end);// System.out.println(Thread.currentThread().getName() + " forks: " + start + "-" + middle + " and " + middle + "-" + end);// 异步执行左边的子任务leftTask.fork();// 同步执行右边的子任务(或者也可以 fork + join)Long rightResult = rightTask.compute();// 等待左边子任务完成并获取结果Long leftResult = leftTask.join();// 合并结果return leftResult + rightResult;}}public static void main(String[] args) {ForkJoinPool pool = ForkJoinPool.commonPool(); // 使用公共的 ForkJoinPoollong[] array = new long[10000];for (int i = 0; i < array.length; i++) {array[i] = i + 1;}SumArrayTask task = new SumArrayTask(array, 0, array.length);long startTime = System.currentTimeMillis();Long result = pool.invoke(task); // 提交任务并等待结果long endTime = System.currentTimeMillis();System.out.println("Result: " + result);System.out.println("Time taken: " + (endTime - startTime) + " ms");pool.shutdown();}
}
ForkJoinTask
底层实现与设计
1. 状态字段 (status
)
ForkJoinTask
内部使用一个 volatile int status
字段来原子性地管理任务的各种状态。这个字段通过位运算打包了多个信息,以确保原子更新。
// ... existing code .../** The status field holds bits packed into a single int to ensure* atomicity. Status is initially zero, and takes on nonnegative* values until completed, upon which it holds (sign bit) DONE,* possibly with ABNORMAL (cancelled or exceptional) and THROWN* (in which case an exception has been stored). A value of* ABNORMAL without DONE signifies an interrupted wait. These* control bits occupy only (some of) the upper half (16 bits) of* status field. The lower bits are used for user-defined tags.*/static final int DONE = 1 << 31; // must be negativestatic final int ABNORMAL = 1 << 16;static final int THROWN = 1 << 17;static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN;static final int NUH_BIT = 24; // no external caller helpingstatic final int NO_USER_HELP = 1 << NUH_BIT;static final int MARKER = 1 << 30; // utility markerstatic final int SMASK = 0xffff; // short bits for tagsstatic final int UNCOMPENSATE = 1 << 16; // helpJoin sentinel// Fields/** @serial */volatile int status; // accessed directly by pool and workers
// ... existing code ...
-
DONE
(最高位,符号位): 标记任务已完成。这是一个负数值,所以可以通过status < 0
快速判断任务是否完成。 -
ABNORMAL
: 标记任务异常完成,比如被取消或遇到未捕获的异常。 -
THROWN
: 标记任务完成时存储了一个异常(通过recordExceptionalCompletion
)。 -
HAVE_EXCEPTION
:DONE | ABNORMAL | THROWN
的组合,表示任务完成且有异常。 -
NO_USER_HELP
: 一个标记位,指示外部调用者(非ForkJoinWorkerThread
)不应该帮助执行此任务。 -
SMASK
: 用于用户定义的标签的掩码(低16位)。 -
UNCOMPENSATE
: 用于helpJoin
的哨兵值。
任务的初始状态是 0
。状态会经历非负值,直到完成时变为负值(DONE
位被设置)。这些状态的转换通过 CAS (Compare-And-Swap) 操作原子地进行,确保线程安全。
2. 核心抽象方法
-
protected abstract V exec()
: 这是任务执行的主要逻辑。ForkJoinWorkerThread
在执行任务时会调用此方法。子类(如RecursiveAction
和RecursiveTask
)的compute()
方法通常是在exec()
内部被调用的。如果exec()
返回true
,表示任务正常完成;返回false
通常意味着任务被取消或出现异常。 -
protected abstract V getRawResult()
: 获取任务的原始结果,不进行任何等待或异常检查。 -
protected abstract void setRawResult(V value)
: 设置任务的原始结果。
3. 关键内部方法
-
doExec()
: 这是一个内部的包装方法,它调用exec()
,处理可能的异常,并相应地设置任务状态(如DONE
,ABNORMAL
,THROWN
)。 -
recordExceptionalCompletion(Throwable ex)
: 当任务执行过程中发生异常时调用,它会将异常存储起来,并将任务状态标记为ABNORMAL
和THROWN
。 -
getException()
: 获取任务执行过程中记录的异常。 -
doJoin()
和awaitDone(ForkJoinWorkerThread w, boolean timed, long nanos)
:-
doJoin()
是join()
方法的内部实现。它首先检查任务是否已完成。如果未完成,它会尝试帮助执行其他任务(work-stealing 的一部分),或者如果当前线程是ForkJoinWorkerThread
,它可能会尝试执行当前任务或帮助执行其他任务。如果当前线程不是ForkJoinWorkerThread
或者无法立即完成,它会调用awaitDone
。 -
awaitDone
会使当前线程等待任务完成。它可能会使用LockSupport.park()
来阻塞线程,直到任务完成或被中断/超时。在等待期间,如果当前线程是ForkJoinWorkerThread
,它可能会参与工作窃取。
-
-
doInvoke()
:invoke()
方法的内部实现。它通常会尝试直接执行任务(如果条件允许),否则行为类似fork()
后join()
。 -
externalAwaitDone()
: 当一个非ForkJoinWorkerThread
的外部线程调用join()
时,会进入此方法。它会阻塞等待任务完成,期间可能会尝试帮助ForkJoinPool
完成一些任务(如果NO_USER_HELP
未设置)。 -
helpStealer(ForkJoinPool.WorkQueue q, ForkJoinTask<?> task)
: 尝试帮助执行给定工作队列中的任务。这是工作窃取机制的一部分。
4. 工作窃取 (Work-Stealing)
ForkJoinTask
的设计与 ForkJoinPool
的工作窃取算法紧密相关。每个 ForkJoinWorkerThread
都有一个双端队列(Deque)来存放它 fork()
出来的任务。
-
当一个工作线程
fork()
一个新任务时,它通常会将新任务压入自己队列的底部(LIFO顺序)。 -
当工作线程自己的队列为空时,它会尝试从其他工作线程队列的顶部(FIFO顺序)“窃取”一个任务来执行。
-
join()
操作在等待子任务完成时,如果当前线程是ForkJoinWorkerThread
,它不会简单地阻塞,而是会尝试执行自己队列中的其他任务,或者去窃取其他线程的任务,以充分利用CPU资源。
5. 异常处理
-
如果一个任务在
compute()
(或exec()
) 中抛出未检查异常,该异常会被捕获,并通过recordExceptionalCompletion()
记录在任务内部。 -
当另一个任务调用
join()
来获取这个异常任务的结果时,join()
方法会重新抛出原始异常(或包装后的RuntimeException
,如CompletionException
)。 -
isCompletedAbnormally()
可以检查任务是否因为异常或取消而完成。 -
getException()
可以获取导致任务异常的Throwable
。 -
rethrow()
方法会检查任务是否异常完成,如果是,则重新抛出异常。
6. 取消
-
cancel(boolean mayInterruptIfRunning)
: 尝试取消任务的执行。-
如果任务尚未开始,它通常会被标记为已取消(
ABNORMAL
状态),并且不会执行。 -
如果任务已经在运行,
mayInterruptIfRunning
参数决定是否应该尝试中断执行该任务的线程。 -
取消是一个协作过程,正在运行的任务需要定期检查其中断状态或取消状态。
-
-
isCancelled()
: 判断任务是否被取消。
7. 接口设计
-
Future<V>
: 提供了标准的异步任务接口,允许获取结果、取消、检查完成状态等。 -
Serializable
: 允许ForkJoinTask
对象被序列化,尽管在实际应用中,序列化和反序列化正在运行的或已提交到池中的任务可能比较复杂。 -
抽象方法:
exec()
,getRawResult()
,setRawResult(V value)
是核心的抽象方法,强制子类实现具体的计算逻辑和结果处理。 -
受保护的方法: 许多内部控制方法(如
doExec
,doJoin
)是protected
的,允许ForkJoinPool
和ForkJoinWorkerThread
访问和控制任务的执行流程。 -
静态工具方法:
-
invokeAll(...)
: 方便地执行一组任务并等待它们全部完成。 -
helpQuiesce()
: 当前线程(必须是ForkJoinWorkerThread
)会持续执行任务,直到池中所有已提交的任务都完成(达到静默状态)。 -
getPool()
: 获取执行当前任务的ForkJoinPool
。 -
getQueuedTaskCount()
: 获取当前线程队列中待处理的任务数量。
-
ForkJoinTask
与 ForkJoinPool
的关系
-
ForkJoinTask
实例必须在ForkJoinPool
中执行。 -
ForkJoinPool
负责管理一组ForkJoinWorkerThread
。 -
当任务通过
fork()
提交或通过invoke()
(或submit()
) 提交到ForkJoinPool
时,池会将任务分配给某个工作线程的队列。 -
工作线程从自己的队列中获取任务执行,或者在空闲时从其他线程的队列中窃取任务。
-
ForkJoinPool
提供了任务提交(execute
,invoke
,submit
)、池管理(shutdown
,awaitTermination
)和监控方法。
RecursiveTask 和 RecursiveAction
两个类代码量不大。这其实是它们设计上的一大特点,它们作为 ForkJoinTask
的标准子类,旨在简化开发,让开发者能更专注于任务的计算逻辑。
下面我们来全面分析一下这两个子类具体做了什么:
首先,需要了解 RecursiveTask
和 RecursiveAction 都继承自 ForkJoinTask
。 ForkJoinTask 是 Java Fork/Join 框架的核心,它是一个轻量级的任务抽象,设计用于可以被分解成更小独立子任务的计算。 ForkJoinTask 本身处理了任务的生命周期管理(如等待、完成、取消)、与其他任务的同步(forking 和 joining)、在 ForkJoinPool 中的调度和执行,以及异常处理等复杂机制。
-
RecursiveTask
-
目的 : RecursiveTask<V> 用于那些执行后会 返回一个结果 的递归任务。泛型参数 V 代表任务返回结果的类型。
-
核心内容 :
-
V result; : 一个成员变量,用于存储任务的计算结果。
// ... existing code ... public abstract class RecursiveTask<V> extends ForkJoinTask<V> {// ... existing code .../*** The result of the computation.*/V result;/*** The main computation performed by this task.* @return the result of the computation*/protected abstract V compute();public final V getRawResult() { // ... existing code ...
-
protected abstract V compute(); : 这是一个抽象方法,子类必须实现这个方法来定义具体的计算逻辑。这个方法的返回值就是任务的结果。
-
protected final boolean exec() : 这个方法是 ForkJoinTask 框架实际调用的方法。它的实现很简单:
// ... existing code .../*** Implements execution conventions for RecursiveTask.*/protected final boolean exec() {try {result = compute();return true;} catch (Throwable rex) {return setExceptionalCompletion(rex);}}} //它调用子类实现的 compute() 方法,并将返回的结果存储在 result 字段中。 //如果 compute() 抛出异常,它会通过 setExceptionalCompletion(rex) 来标记任务异常完成。
-
public final V getRawResult() : 返回存储的 result 。
-
protected final void setRawResult(V value) : 设置 result 的值。
-
-
RecursiveTask 的代码之所以简洁,是因为它把所有通用的、复杂的 Fork/Join 任务执行逻辑(如任务拆分、合并、线程管理、同步等)都委托给了父类 ForkJoinTask 。 RecursiveTask 只专注于与其特定目的(即返回一个结果)相关的部分:定义一个地方存储结果 ( result 字段),并确保 compute() 方法的结果被正确存入。
-
RecursiveAction
-
目的 :与 RecursiveTask<V> 对应, RecursiveAction 用于那些执行一个动作但 不返回任何结果 的递归任务(即其逻辑上的返回类型是 void )。
-
核心内容:
-
它会继承 ForkJoinTask<Void>
-
protected abstract void compute(); : 它会有一个抽象的 compute() 方法,但这个方法没有返回值 ( void )。子类需要实现此方法来定义具体的操作。
-
protected final boolean exec() : 类似于 RecursiveTask 中的 exec() ,此方法会调用子类实现的 void compute() 方法。由于没有返回值,它不需要存储结果。如果 compute() 抛出异常,同样会通过 setExceptionalCompletion(rex) 处理。
-
public final Void getRawResult() : 会返回 null ,因为没有实际结果。
-
protected final void setRawResult(Void value) : 是一个空操作,或者用于设置内部完成状态。
-
public final ForkJoinTask<V> fork()
调用时机 :当用户希望将一个子任务分解出去,让它独立、并行执行时调用。通常在父任务中对子任务调用 fork() 。
fork()
方法是 ForkJoinTask
的核心方法之一,它用于异步执行任务。当一个任务调用 fork()
方法时,它会被提交到 ForkJoinPool
中等待执行。如果当前线程是 ForkJoinWorkerThread
并且有空闲资源,它可能会直接执行该任务,否则任务会被放入该工作线程的任务队列中,或者如果当前线程不是 ForkJoinWorkerThread
,则会通过 ForkJoinPool
的外部提交机制来处理。
下面是 fork()
方法的源码:
// ... 其他代码 .../*** Arranges to asynchronously execute this task in the pool the* current task is running in, if applicable, or using the {@link* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While* it is not necessarily enforced, it is a usage error to fork a* task more than once unless it has completed and been* reinitialized. Subsequent modifications to the state of this* task or any data it operates on are not necessarily* consistently observable by any thread other than the one* executing it unless preceded by a call to {@link #join} or* related methods, or a call to {@link #isDone} returning {@code* true}.** @return {@code this}, to simplify usage*/public final ForkJoinTask<V> fork() {Thread t; ForkJoinWorkerThread wt;ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal;if (internal =(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {q = (wt = (ForkJoinWorkerThread)t).workQueue;p = wt.pool;}elseq = (p = ForkJoinPool.common).externalSubmissionQueue(false);q.push(this, p, internal);return this;}// ... 其他代码 ...
方法签名
public final ForkJoinTask<V> fork()
-
final方法:禁止子类覆盖,确保核心调度逻辑一致性。
-
链式调用:返回
this
,支持task.fork().join()
模式(但更推荐invoke()
)。
执行流程
1. 环境检测
Thread t = Thread.currentThread();
boolean internal = (t instanceof ForkJoinWorkerThread);
- 变量声明:
t
:当前线程wt
:工作线程(若当前是ForkJoinWorkerThread
)p
:线程池实例q
:工作队列internal
:区分内部/外部调用
2. 分支处理
情况A:内部调用(internal=true
)
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
q = wt.workQueue; // 获取线程专属工作队列(LIFO)
p = wt.pool; // 获取所属线程池
-
行为:任务压入当前线程的工作队列头部(优先处理子任务,保持缓存局部性)
情况B:外部调用(internal=false
)
q = (p = ForkJoinPool.common).externalSubmissionQueue(false);
- 行为:
- 使用公共线程池
ForkJoinPool.common
- 任务提交到外部共享队列(FIFO),等待工作线程窃取
- 使用公共线程池
3. 任务入队
q.push(this, p, internal);
-
核心操作:
-
将当前任务(
this
)压入队列 -
可能触发线程唤醒/创建(依赖
ForkJoinPool
机制)
-
4. 返回结果
return this; // 支持链式调用
public final V join()
这个方法用于等待任务执行完成并获取其结果。如果任务在执行过程中抛出异常,join() 方法会抛出相应的运行时异常。join() 方法的调用链相对复杂,因为它涉及到任务状态的检查、可能的阻塞等待、以及异常处理。
ForkJoinTask.join() 方法调用流程
1. 检查任务状态
首先检查任务是否已经完成。如果任务已完成,直接进入后续的结果处理步骤;如果未完成,则进入下一步。
2. 调用 awaitDone(false, 0L)
-
尝试帮助执行任务 :该方法会首先调用 ForkJoinPool.helpJoin() (或 helpComplete ),让当前线程帮助执行任务,以此避免盲目等待。
-
加入等待队列并阻塞 :如果帮助执行后任务仍未完成,会调用另一个 awaitDone(...) 版本。此版本会将当前线程加入任务的等待队列,并使用 LockSupport.park() 进行阻塞。在 join 方法中,这里是无限等待,直到任务完成、被中断或超时。
3. 等待结束处理
当等待结束(任务完成)后,会检查任务是否处于异常状态( ABNORMAL 状态位)。
-
任务异常处理 :如果任务异常,会调用 reportException(false) 方法:
-
调用 getException(false) 获取底层的 Throwable ,可能是 CancellationException 或者任务执行过程中抛出的其他异常,并且可能会为了获取更好的堆栈信息而重新包装异常。
-
通过 uncheckedThrow() 方法将异常抛出。
-
-
任务正常处理 :如果任务正常完成,调用 getRawResult() 方法返回任务结果。
下面是 join() 方法的源码:
// ... 其他代码 ...public final V join() {int s;if ((((s = status) < 0 ? s : awaitDone(false, 0L)) & ABNORMAL) != 0)reportException(false);return getRawResult();}
// ... 其他代码 ...
join()
方法的执行流程分析
1. int s;
声明一个变量 s
用于存储任务的状态。
2. 核心逻辑判断
if ((((s = status) < 0 ? s : awaitDone(false, 0L)) & ABNORMAL) != 0)
这是 join()
方法的核心逻辑,具体步骤如下:
2.1 读取任务状态
s = status;
读取当前任务的 status
字段。status
是一个 volatile int
,用于表示任务的执行状态(例如,是否完成、是否异常、是否被取消等)。
2.2 判断任务是否完成
(s = status) < 0 ? s : awaitDone(false, 0L);
-
如果
status < 0
,表示任务已经完成(DONE
状态位被设置,这是一个负数值)。 -
如果
status >= 0
,表示任务尚未完成。此时,会调用awaitDone(false, 0L)
方法来等待任务完成。awaitDone
会返回任务完成后的状态。-
第一个参数
false
表示等待过程不可中断 (throws InterruptedException
)。 -
第二个参数
0L
表示没有超时限制,会一直等待直到任务完成。
-
2.3 判断任务是否异常完成
& ABNORMAL:
将获取到的任务最终状态 s
与 ABNORMAL
标志进行按位与操作。ABNORMAL
标志位表示任务是否以非正常方式完成(例如,被取消或抛出异常)。
3. 异常处理
reportException(false);
如果任务异常完成,则调用 reportException(false)
方法。这个方法会获取任务执行过程中抛出的异常,并将其重新抛出。参数 false
表示不将原始异常包装为 ExecutionException
(除非特定情况)。
4. 获取结果
return getRawResult();
如果任务正常完成(或者即使异常完成,reportException
抛出异常后这里不会执行),则调用 getRawResult()
方法获取任务的原始计算结果。这是一个 protected abstract
方法,需要由具体的子类(如 RecursiveTask
)来实现。对于 RecursiveAction
这种没有返回值的任务,它通常返回 null
。
下面我们来分析 join()
中调用的关键方法:
awaitDone
A. awaitDone(boolean interruptible, long deadline)
(第一个重载版本)
这个版本的 awaitDone
是 join()
首先调用的。它尝试通过 "帮助执行" (help join) 来避免当前线程完全阻塞。
// ... 其他代码 ...private int awaitDone(boolean interruptible, long deadline) {ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q;Thread t; boolean internal; int s, ss;if (internal =(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {p = (wt = (ForkJoinWorkerThread)t).pool;q = wt.workQueue;}elseq = ForkJoinPool.externalQueue(p = ForkJoinPool.common);return (((s = (p == null) ? 0 :((this instanceof CountedCompleter) ?p.helpComplete(this, q, internal) : // 如果是 CountedCompleter,尝试帮助完成!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :p.helpJoin(this, q, internal))) < 0)) ? s : // 尝试帮助当前任务执行awaitDone(internal ? p : null, s, interruptible, deadline); // 如果帮助后仍未完成,调用另一个 awaitDone 进行阻塞等待}
// ... 其他代码 ...
判断当前线程类型
-
如果当前线程是 ForkJoinWorkerThread (即 internal 为 true ),说明当前 join 是在 ForkJoinPool 的一个工作线程中调用的。它会获取当前工作线程的线程池 p 和工作队列 q 。
-
否则,认为是外部线程调用,使用 ForkJoinPool.commonPool() 作为线程池 p ,并获取其外部提交队列 q 。
尝试帮助 (Help Join/Complete)
-
p.helpComplete(this, q, internal) : 如果当前任务是 CountedCompleter 的实例,会调用线程池的 helpComplete 方法尝试帮助完成相关的任务。
-
p.helpJoin(this, q, internal) : 对于其他类型的任务,会调用线程池的 helpJoin 方法。这个方法的核心思想是,与其让当前线程阻塞等待,不如让它尝试执行池中其他待处理的任务,或者特别是当前正在 join 的这个任务(如果它在队列中且尚未被执行)。这是一种提高并行度和吞吐量的策略。
-
!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss : ... : 这是一个优化,如果是非内部线程调用,并且任务状态标记了 NO_USER_HELP (通常在构造时设置,表示不期望外部帮助),则可能直接跳过帮助步骤。
递归调用或阻塞等待
-
如果帮助执行后任务完成了 ( s < 0 ),则直接返回状态 s 。
-
否则,调用另一个重载版本的 awaitDone(ForkJoinPool pool, int compensation, boolean interruptible, long deadline) 方法,进入真正的阻塞等待逻辑。 compensation 参数在这里是上一步 helpJoin 或 helpComplete 的结果。
awaitDone (第二个重载版本)
awaitDone(ForkJoinPool pool, int compensation, boolean interruptible, long deadline)
这个版本的 awaitDone 负责实际的线程阻塞和等待逻辑。
// ... 其他代码 ...private int awaitDone(ForkJoinPool pool, int compensation,boolean interruptible, long deadline) {int s;if ((s = status) >= 0) { // 再次检查状态,如果已完成则直接返回Aux node = null;try { // spinwait if out of memorynode = new Aux(Thread.currentThread(), null); // 创建一个 Aux 节点代表当前等待线程} catch (OutOfMemoryError ex) {}boolean queued = false;for (Aux a;;) { // 尝试将当前等待节点加入到任务的等待者列表 (aux 字段)if ((s = status) < 0)break; // 任务已完成else if (node == null) // OOM 时可能 node 为 null,进行自旋等待Thread.onSpinWait();else if (((a = aux) == null || a.ex == null) && // aux 字段为空或不包含异常(queued = casAux(node.next = a, node))) // CAS 尝试将 node 插入 aux 链表头部break; // 成功入队}if (queued) { // 如果成功入队,则开始等待LockSupport.setCurrentBlocker(this); // 设置当前线程的阻塞对象,方便监控和诊断int interrupts = 0; // 中断状态标记for (;;) { // 循环等待if ((s = status) < 0) // 任务完成break;else if (interrupts < 0) { // 如果之前已标记为需要抛出中断异常s = ABNORMAL; // 将状态标记为 ABNORMALbreak;}else if (Thread.interrupted()) { // 检测到中断if (!ForkJoinPool.poolIsStopping(pool)) // 如果线程池没有停止interrupts = interruptible ? -1 : 1; // 根据 interruptible 参数决定是抛出异常还是仅记录中断else { // 线程池正在停止interrupts = 1; // 记录中断,后续会重新设置中断标志try {cancel(true); // 尝试取消任务} catch (Throwable ignore) {}}}else if (deadline != 0L) { // 如果设置了超时时间long ns;if ((ns = deadline - System.nanoTime()) <= 0) { // 超时s = 0; // 标记为超时 (返回0)break;}LockSupport.parkNanos(ns); // 带超时的阻塞}elseLockSupport.park(); // 无限期阻塞,直到被 unpark}node.thread = null; // 清理 Aux 节点clean: for (Aux a;;) { // 从等待者列表中移除当前节点if ((a = aux) == null || a.ex != null)break;for (Aux prev = null;;) {Aux next = a.next;if (a == node) {if (prev != null)prev.casNext(prev, next); // 从链表中移除else if (casAux(a, next)) // 如果是头节点,直接CAS更新break clean;break;}prev = a;if ((a = next) == null)break clean;}}LockSupport.setCurrentBlocker(null); // 清除阻塞对象if (interrupts > 0) // 如果之前仅记录了中断,现在重新设置线程的中断状态Thread.currentThread().interrupt();}}if (compensation == UNCOMPENSATE && pool != null) // 处理补偿逻辑pool.uncompensate();return s; // 返回最终状态}
// ... 其他代码 ...
1. 创建 Aux 节点
-
Aux 是一个内部静态类,用于构建一个等待线程的链表(Treiber stack),或者用于存储任务抛出的异常。
-
new Aux(Thread.currentThread(), null) 创建一个表示当前等待线程的节点。
2. 入队等待者列表
-
通过 CAS 操作 ( casAux ) 尝试将当前线程的 Aux 节点加入到任务的 aux 字段所维护的等待者链表的头部。
-
aux 字段要么指向等待者链表,要么指向一个包含异常的 Aux 节点。
3. 阻塞等待
-
使用 LockSupport.park() 或 LockSupport.parkNanos() 来阻塞当前线程。
-
在阻塞前会调用 LockSupport.setCurrentBlocker(this) ,这在线程 dump 或者使用 JMX 监控时,可以看到线程是因何阻塞的。
-
循环检查任务状态 ( status < 0 表示完成)。
-
处理线程中断 (Thread.interrupted())
-
如果 interruptible 为 true (例如从 get() 方法调用),则中断会导致抛出 InterruptedException (通过设置 interrupts = -1 并在外层处理)。
-
如果 interruptible 为 false (例如从 join() 方法调用),则仅记录中断状态,并在方法返回前重新设置线程的中断标志。
-
如果线程池正在停止 ( ForkJoinPool.poolIsStopping(pool) ), 则尝试取消当前任务。
-
-
处理超时 (deadline != 0L)
唤醒与清理
-
当任务完成(其他线程调用了 setDone() 或 trySetThrown() 等,这些方法内部会调用 signalWaiters() 来 unpark 等待的线程)或中断或超时后,线程被唤醒。
-
从 aux 等待者链表中移除当前线程的 Aux 节点。
-
LockSupport.setCurrentBlocker(null) 清除阻塞对象。
-
如果需要,重新中断当前线程。
补偿
-
pool.uncompensate() 与 ForkJoinPool 的线程补偿机制相关,用于在某些情况下调整池中的活跃线程数。
Aux
Aux 类是 ForkJoinTask 的一个静态内部类,它扮演着双重角色:
-
等待节点 (Waiter Node): 当一个线程调用 join() (或其他等待方法) 并且任务尚未完成时,会创建一个 Aux 对象来代表这个等待的线程。这些 Aux 对象会形成一个简单的单向链表(更准确地说,是一个 Treiber 堆栈,后进先出),存储在 ForkJoinTask 实例的 aux 字段中。
-
异常持有者 (Exception Holder): 当任务执行过程中抛出异常时,也会创建一个 Aux 对象,但这次它的 ex 字段会持有这个异常,并且 thread 字段通常是抛出异常的线程。这个 Aux 对象同样会存储在任务实例的 aux 字段中,但它会覆盖掉任何等待者链表。
// ... 其他代码 .../*** Nodes for threads waiting for completion, or holding a thrown* exception (never both). Waiting threads prepend nodes* Treiber-stack-style. Signallers detach and unpark* waiters. Cancelled waiters try to unsplice.*/static final class Aux {Thread thread; // thrower or waiterfinal Throwable ex;Aux next; // accessed only via memory-acquire chainsAux(Thread thread, Throwable ex) {this.thread = thread;this.ex = ex;}final boolean casNext(Aux c, Aux v) { // used only in cancellationreturn U.compareAndSetReference(this, NEXT, c, v);}private static final Unsafe U;private static final long NEXT;static {U = Unsafe.getUnsafe();NEXT = U.objectFieldOffset(Aux.class, "next"); // 获取 next 字段的内存偏移量,用于 CAS 操作}}/** The status field holds bits packed into a single int to ensure* atomicity. Status is initially zero, and takes on nonnegative* values until completed, upon which it holds (sign bit) DONE,* possibly with ABNORMAL (cancelled or exceptional) and THROWN* (in which case an exception has been stored). A value of* ABNORMAL without DONE signifies an interrupted wait. These* control bits occupy only (some of) the upper half (16 bits) of* status field. The lower bits are used for user-defined tags.*/static final int DONE = 1 << 31; // must be negativestatic final int ABNORMAL = 1 << 16;static final int THROWN = 1 << 17;static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN;
// ... 其他代码 ...// Fields/** @serial */volatile int status; // accessed directly by pool and workersprivate transient volatile Aux aux; // either waiters or thrown Exception// Support for atomic operationsprivate static final Unsafe U;private static final long STATUS;private static final long AUX; // 获取 aux 字段的内存偏移量,用于 CAS 操作
// ... 其他代码 ...private boolean casAux(Aux c, Aux v) {return U.compareAndSetReference(this, AUX, c, v);}private Aux compareAndExchangeAux(Aux c, Aux v) {return (Aux)U.compareAndExchangeReference(this, AUX, c, v);}
// ... 其他代码 ...
Aux 类关键点:
-
Thread thread: 如果这是一个等待节点,thread 就是等待的线程。如果这是一个异常持有者,thread 通常是抛出异常的线程。
-
final Throwable ex: 如果这是一个异常持有者,ex 就是实际的异常对象。对于等待节点,ex 为 null。
-
Aux next: 指向链表中的下一个 Aux 节点。
-
casNext(): 使用 Unsafe.compareAndSetReference 原子地更新 next 字段,主要用于取消等待时从链表中移除节点。
-
ForkJoinTask 中的 aux 字段是 volatile 的,并且通过 casAux 和 compareAndExchangeAux (基于 Unsafe 类) 进行原子更新,以确保线程安全。
awaitDone(ForkJoinPool pool, int compensation, boolean interruptible, long deadline) 方法中与 Aux 相关的部分:
// ... 其他代码 ...private int awaitDone(ForkJoinPool pool, int compensation,boolean interruptible, long deadline) {int s;if ((s = status) >= 0) { // 任务尚未完成 (status < 0 表示已完成)Aux node = null;try { // spinwait if out of memory// 1. 创建 Aux 节点代表当前等待线程// 如果任务未完成,当前线程需要等待。// 创建一个 Aux 实例,其中 thread 是当前线程,ex 是 null。node = new Aux(Thread.currentThread(), null);} catch (OutOfMemoryError ex) {// OutOfMemoryError 时,node 可能为 null,后续会进行自旋等待}boolean queued = false;// 2. 尝试将当前等待节点加入任务的等待者列表 (aux 字段)// 这是一个循环,因为可能有其他线程同时在修改 aux 字段。for (Aux a;;) { // try to install nodeif ((s = status) < 0) // 再次检查任务状态,可能在创建 node 后任务已完成break;else if (node == null) // 如果之前创建 node 失败 (OOM)Thread.onSpinWait(); // 进行短暂的自旋等待,避免CPU空转但仍保持尝试else if (((a = aux) == null || a.ex == null) &&// 读取当前任务的 aux 字段 (可能是 null,也可能是其他等待者链表头,或者是一个异常 Aux)// 只有当 aux 为 null 或者 aux.ex 为 null (表示 aux 指向的是等待者链表而不是异常) 时,才尝试入队。// 将新创建的 node 的 next 指向当前的 aux 链表头 a。// 然后通过 CAS 原子地尝试将任务的 aux 字段从 a 更新为新的 node。// 如果成功,queued 设置为 true,表示当前线程的等待节点已成功加入等待队列。(queued = casAux(node.next = a, node)))break; // 成功入队,退出循环}if (queued) { // 如果成功将当前线程的 Aux 节点加入等待队列LockSupport.setCurrentBlocker(this); // 标记当前线程的阻塞对象,用于调试和监控int interrupts = 0; // 中断状态标记: <0 表示需要抛出中断异常, >0 表示需要重新中断线程for (;;) { // 循环等待,直到任务完成或被中断/超时if ((s = status) < 0) // 任务完成break;// ... (处理中断和超时的逻辑,前面已分析过) ...else if (Thread.interrupted()) {// ...}else if (deadline != 0L) {// ...LockSupport.parkNanos(ns); // 带超时的阻塞}elseLockSupport.park(); // 无限期阻塞,直到被 unpark}// 3. 任务完成或等待结束,清理 Aux 节点node.thread = null; // 帮助GC,将 Aux 节点中的 thread 引用置空clean: for (Aux a;;) { // 从等待者列表中移除当前节点 node// 这是一个尽力而为的清理过程,因为其他线程可能也在操作这个链表。if ((a = aux) == null || a.ex != null) // 如果 aux 为空或已变为异常持有者,则无需清理break;for (Aux prev = null;;) {Aux next = a.next;if (a == node) { // 找到了当前线程的节点if (prev != null)prev.casNext(prev, next); // 如果不是头节点,让前一个节点的 next 指向当前节点的 nextelse if (casAux(a, next)) // 如果是头节点,直接CAS更新 aux 指向 nextbreak clean; // 成功移除,跳出外层循环break; // CAS失败或节点已陈旧,跳出内层循环重试或放弃}prev = a;if ((a = next) == null) // 遍历完链表未找到 (可能已被其他操作移除)break clean; // not found}}LockSupport.setCurrentBlocker(null); // 清除阻塞对象if (interrupts > 0) // 如果之前仅记录了中断,现在重新设置线程的中断状态Thread.currentThread().interrupt();}}if (compensation == UNCOMPENSATE && pool != null)pool.uncompensate();return s; // 返回最终状态}
// ... 其他代码 .../** Removes and unparks waiters */private void signalWaiters() {for (Aux a = aux;;) { // 读取当前 aux 链表头if (a == null || a.ex != null) // 如果没有等待者,或者 aux 字段现在是异常持有者,则无需操作break;// 4. 唤醒等待者// 通过 CAS 将 aux 字段设置为 null,表示取走了整个等待者链表。// 如果成功,则遍历取下的链表,并 unpark 每一个等待的线程。if (a == (a = compareAndExchangeAux(a, null))) { // CAS 成功,a 现在是原始的等待者链表头do { // detach entire listLockSupport.unpark(a.thread); // 唤醒等待线程} while ((a = a.next) != null); // 继续处理链表中的下一个等待者break;}// 如果 CAS 失败,说明有其他线程修改了 aux,循环重试。}}/*** Sets DONE status and wakes up threads waiting to join this task.*/private void setDone() {getAndBitwiseOrStatus(DONE); // 设置任务状态为 DONEsignalWaiters(); // 唤醒所有等待者}/*** Sets ABNORMAL DONE status unless already done, and wakes up threads* waiting to join this task.* @return previous status*/final int trySetCancelled() {int s;if ((s = status) >= 0 && // 如果任务尚未完成(s = getAndBitwiseOrStatus(DONE | ABNORMAL)) >= 0) // 原子地设置 DONE 和 ABNORMAL 状态signalWaiters(); // 唤醒所有等待者return s;}/*** Records exception and sets ABNORMAL THROWN DONE status unless* already done, and wakes up threads waiting to join this task.* If losing a race with setDone or trySetCancelled, the exception* may be recorded but not reported.** @return true if set*/final boolean trySetThrown(Throwable ex) {int s;boolean set = false, installed = false;if ((s = status) >= 0) { // 如果任务尚未完成Aux a, p = null, h = new Aux(Thread.currentThread(), ex); // 创建一个持有异常的 Aux 节点 hdo {// 尝试将异常节点 h 安装到 aux 字段。// 只有当 aux 为 null 或者 aux.ex 为 null (表示 aux 当前不是异常持有者) 时才尝试。if (!installed && ((a = aux) == null || a.ex == null) &&(installed = casAux(a, h))) // CAS 将 aux 从 a (可能是等待者链表) 更新为 h (异常节点)p = a; // 如果成功,p 保存了原始的等待者链表头 (如果存在的话)// 如果异常节点已安装,并且成功通过 CAS 设置任务状态为 HAVE_EXCEPTION (DONE | ABNORMAL | THROWN)if (installed && (set = casStatus(s, s | HAVE_EXCEPTION)))break; // 状态设置成功,退出循环} while ((s = status) >= 0); // 如果状态设置失败 (例如其他线程先完成了任务),则重试或放弃// 如果之前 aux 字段是等待者链表 (p != null),则唤醒这些等待者。// 它们醒来后会发现任务状态已变为异常。for (; p != null; p = p.next)LockSupport.unpark(p.thread);}return set;}
// ... 其他代码 ...
awaitDone
中 Aux
的使用流程分解
1. 创建等待节点
当一个线程调用 join()
并且发现任务 (status >= 0
) 还没有完成时,它会创建一个 Aux
实例:
node = new Aux(Thread.currentThread(), null);
这个 node
代表了当前正在等待的线程。
2. 入队操作 (Treiber Stack)
线程会尝试将这个 node
原子地添加到任务的 aux
字段所指向的链表的头部,具体步骤如下:
node.next = a; // 其中 a 是当前 aux 的值
queued = casAux(a, node); // 尝试将 aux 从 a 更新为 node
这是一个典型的无锁堆栈(Treiber Stack)的压入操作。
关键条件:
((a = aux) == null || a.ex == null)
该条件确保只有在 aux
为空或者 aux
指向的是一个等待者链表(而不是一个异常持有者)时,才会尝试将新的等待节点加入。如果 aux.ex != null
,说明任务已经因为异常而结束,新的等待者不应该再入队等待,而是应该直接获取异常状态。
3. 阻塞等待
如果 node
成功入队 (queued == true
),当前线程就会调用 LockSupport.park()
(或 parkNanos
) 进入阻塞状态,等待被唤醒。
4. 唤醒操作 (signalWaiters
)
当任务完成时(无论是正常完成 setDone()
,取消 trySetCancelled()
,还是异常完成 trySetThrown()
),都会调用 signalWaiters()
。
-
signalWaiters()
会尝试原子地将任务的aux
字段设置为null
(使用compareAndExchangeAux(a, null)
),从而“取走”整个等待者链表。 -
如果成功取走链表,它会遍历这个链表中的所有
Aux
节点,并调用LockSupport.unpark(a.thread)
来唤醒每一个等待的线程。 -
在
trySetThrown
中,如果它用一个异常Aux
节点替换了一个等待者链表,它也会负责唤醒原等待者链表中的线程。
5. 出队与清理
当等待的线程被唤醒后(通常是因为任务状态已变为完成),它会尝试从 aux
链表中移除它自己的 Aux
节点 (node
)。这是一个“尽力而为”的清理操作,因为 signalWaiters
可能已经将整个 aux
链表清空了。清理逻辑会遍历当前的 aux
链表,找到 node
并尝试通过 casNext
(如果不是头节点) 或 casAux
(如果是头节点) 将其移除。
整体调用流程
doExec()
是 ForkJoinTask
内部一个关键的 final
方法,它负责实际执行任务的业务逻辑(通过调用子类实现的抽象方法 exec()
)。理解谁以及在什么情况下调用 doExec()
有助于我们理解 Fork/Join 框架的执行模型。
通常,一个 ForkJoinTask
的执行可以通过以下几种主要方式发起:
-
直接调用
task.invoke()
: 任务在当前线程同步执行。 -
通过
ForkJoinPool
提交:-
pool.invoke(task)
: 提交任务到池中,并等待其完成(同步)。 -
pool.submit(task)
: 提交任务到池中,立即返回一个Future
(异步)。 -
pool.execute(task)
: 提交任务到池中,不返回结果 (异步)。
-
-
调用
task.fork()
: 将任务推入工作队列,由池中的某个工作线程异步执行。 -
调用静态方法
ForkJoinTask.invokeAll(...)
: 批量执行一组任务。
下面我们针对这些场景分析 doExec()
的调用路径:
场景 1: 用户直接调用 task.invoke()
当用户代码直接在一个 ForkJoinTask
实例上调用 invoke()
方法时:
// ... 其他代码 ...public final V invoke() {doExec(); // 关键调用return join();}
// ... 其他代码 ...
-
调用者: 用户线程 (即调用
task.invoke()
的那个线程)。 -
流程:
-
用户线程调用
task.invoke()
。 -
invoke()
方法内部首先直接调用this.doExec()
。
-
// ... 其他代码 ...final void doExec() {if (status >= 0) { // 检查任务是否尚未完成boolean completed = false;try {completed = exec(); // 调用子类实现的 exec() 方法} catch (Throwable rex) {trySetException(rex); // 如果 exec() 抛出异常,记录异常}if (completed) // 如果 exec() 返回 true (表示任务已完成)setDone(); // 设置任务状态为完成}}
// ... 其他代码 ...
-
在
doExec()
中,如果任务尚未完成 (status >= 0
),则会调用exec()
方法。exec()
是一个受保护的抽象方法,需要由ForkJoinTask
的具体子类(如RecursiveTask
或RecursiveAction
)实现,它包含了任务的实际计算逻辑。 -
exec()
方法执行完毕后,doExec()
会根据exec()
的返回值或是否抛出异常来更新任务的状态。 -
doExec()
返回后,invoke()
方法接着调用join()
来等待任务(实际上此时任务很可能已经由当前线程执行完毕)并获取结果。
-
结论: 在这种情况下,
doExec()
(以及其中的exec()
) 由调用task.invoke()
的用户线程直接执行。任务是同步执行的。
场景 2: 通过 ForkJoinPool
提交任务 (例如 pool.invoke(task)
)
-
调用者: 初始时是用户线程,但
doExec()
最终由ForkJoinPool
中的一个ForkJoinWorkerThread
执行。 -
流程:
-
用户线程调用
pool.invoke(task)
。
-
// ... (ForkJoinPool.java)
public <T> T invoke(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();externalSubmit(task); // 将任务提交给池处理return task.join(); // 当前用户线程等待任务完成
}
// ...
-
pool.invoke()
内部会调用如externalSubmit(task)
(或externalPush
等) 方法,将任务放入ForkJoinPool
的某个工作队列 (WorkQueue
) 中。 -
ForkJoinPool
维护了一组ForkJoinWorkerThread
工作线程。这些工作线程会不断地从自己的工作队列或其他线程的工作队列中窃取任务来执行。 -
某个
ForkJoinWorkerThread
在其运行循环中(通常在ForkJoinPool.runWorker(WorkQueue w)
方法内)获取到该任务。
// ... (ForkJoinPool.java)
final void runWorker(WorkQueue w) {// ... 扫描或等待任务 ...// 假设获取到的任务是 'taskToRun'if ((taskToRun = scan(w, r)) != null || (taskToRun = awaitWork(w, r)) != null) {w.runTask(taskToRun); // 工作队列执行任务// ...}// ...
}
// ...
-
ForkJoinWorkerThread
通过其关联的WorkQueue
的runTask(ForkJoinTask<?> task)
方法来执行任务。
// ... (ForkJoinPool.java - WorkQueue 内部类)
final void runTask(ForkJoinTask<?> task) {currentSteal = task; // 标记当前正在执行的任务 (用于帮助窃取等)task.doExec(); // 关键调用:工作线程执行 doExec()currentSteal = null;// ... 其他逻辑 ...localTasksDone.increment();Thread.interrupted(); // 清除中断状态
}
// ...
-
在
WorkQueue.runTask()
中,task.doExec()
被调用。 -
与此同时,最初调用
pool.invoke(task)
的用户线程正在执行task.join()
,阻塞等待任务完成。当ForkJoinWorkerThread
完成task.doExec()
并设置任务状态后,用户线程的join()
调用会返回。
-
结论: 在这种情况下,
doExec()
(以及其中的exec()
) 由ForkJoinPool
中的一个ForkJoinWorkerThread
执行。任务是异步于提交者执行的(但pool.invoke
本身是阻塞的)。对于pool.submit()
或pool.execute()
,提交者不会阻塞。
场景 3: 用户调用 task.fork()
当用户在一个任务上调用 fork()
时,该任务被提交到当前线程所在的 ForkJoinPool
(如果是 ForkJoinWorkerThread
) 或者 ForkJoinPool.commonPool()
(如果是外部线程)。
// ... 其他代码 ...public final ForkJoinTask<V> fork() {Thread t; ForkJoinWorkerThread wt;ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal;if (internal =(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { // 如果当前是工作线程q = (wt = (ForkJoinWorkerThread)t).workQueue; // 获取当前工作线程的队列p = wt.pool;}else // 如果是外部线程q = (p = ForkJoinPool.common).externalSubmissionQueue(false); // 获取公共池的外部提交队列q.push(this, p, internal); // 将任务推入队列return this;}
// ... 其他代码 ...
调用者
-
fork()
本身由用户线程或另一个ForkJoinTask
调用,但doExec()
最终由ForkJoinPool
中的一个ForkJoinWorkerThread
执行。
流程
-
调用
task.fork()
的线程(称之为“分叉线程”)将任务this
推入一个工作队列。 -
fork()
方法本身并不执行doExec()
。它仅仅是安排任务后续被执行。 -
之后,流程与场景 2 类似:
ForkJoinPool
中的某个ForkJoinWorkerThread
会从队列中获取这个被fork
的任务,并通过其WorkQueue
的runTask()
方法调用task.doExec()
。
结论
-
task.fork()
安排任务异步执行。doExec()
(以及其中的exec()
)由ForkJoinPool
中的一个ForkJoinWorkerThread
执行。
场景 4: 调用静态方法 ForkJoinTask.invokeAll(...)
-
invokeAll
方法有多种重载形式,它们通常会直接执行一部分任务,并fork
另一部分任务。 -
例如,
invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)
:
// ... 其他代码 ...public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {int s1, s2;if (t1 == null || t2 == null)throw new NullPointerException();t2.fork(); // t2 被 fork,将由工作线程执行其 doExec()t1.doExec(); // t1 的 doExec() 由当前调用 invokeAll 的线程直接执行// ... 后续是 join t1 和 t2 的逻辑 ...}
// ... 其他代码 ...
对于可变参数的 invokeAll(ForkJoinTask<?>... tasks):
// ... 其他代码 ...public static void invokeAll(ForkJoinTask<?>... tasks) {Throwable ex = null;int last = tasks.length - 1;for (int i = last; i >= 0; --i) { // 从后往前遍历ForkJoinTask<?> t; int s;// ... null check ...if (i == 0) { // 对于数组中的第一个任务 (在循环中是最后一个被处理的)t.doExec(); // 由当前调用 invokeAll 的线程直接执行// ... join 逻辑 ...break;}t.fork(); // 其他任务被 fork}// ... 后续是 join 其他被 fork 的任务的逻辑 ...}
// ... 其他代码 ...
调用者
-
invokeAll
由用户线程调用。
流程
-
invokeAll
方法会选择至少一个任务(通常是第一个或最后一个,取决于实现)在当前调用invokeAll
的线程中通过直接调用task.doExec()
来执行。 -
其余的任务则通过调用
task.fork()
被提交到工作队列中。 -
这些被
fork
的任务的doExec()
将由ForkJoinPool
中的ForkJoinWorkerThread
执行,流程与 场景 3 类似。
结论
invokeAll
混合了同步执行和异步安排。一部分任务的 doExec()
由调用 invokeAll
的线程执行,另一部分任务的 doExec()
由池中的 ForkJoinWorkerThread
执行。
总结 Task / doExec()
的调用者
当前线程(调用者线程)
-
当直接调用
task.invoke()
时。 -
当调用
ForkJoinTask.invokeAll(...)
时,其中一个任务会由调用者线程直接执行。
ForkJoinWorkerThread
(池中的工作线程)
-
当任务通过
pool.invoke(task)
、pool.submit(task)
或pool.execute(task)
提交给ForkJoinPool
后。 -
当任务通过
task.fork()
被推入工作队列后。 -
当调用
ForkJoinTask.invokeAll(...)
时,被fork
的那些任务。
整体调用流程概要 (Thread -> Pool -> Task)
Initiating Thread (用户线程或另一个Task)
-
创建
ForkJoinTask
实例。 -
决定执行方式:
-
task.invoke()
: 当前线程 ->task.doExec()
->task.exec()
。 -
pool.externalSubmit(task)
(由pool.invoke/submit/execute
调用): 当前线程将任务交给ForkJoinPool
。 -
task.fork()
: 当前线程将任务推入ForkJoinPool
的队列。
-
ForkJoinPool
-
接收到任务后,将其放入某个
WorkQueue
。 -
ForkJoinWorkerThread(s)
是池的核心执行单元。
ForkJoinWorkerThread
-
在其
run()
方法驱动的循环 (pool.runWorker()
) 中,从WorkQueue
获取任务。 -
调用
workQueue.runTask(task)
。
ForkJoinPool.WorkQueue
-
runTask(task)
方法内调用task.doExec()
。
ForkJoinTask
-
doExec()
方法被调用 (无论是被初始线程还是工作线程)。 -
doExec()
内部调用抽象的exec()
方法,执行任务的具体逻辑。 -
doExec()
更新任务状态 (setDone()
或trySetException()
)。 -
如果初始调用是阻塞的 (如
join()
或pool.invoke()
),等待的线程在任务状态更新后被唤醒。
这个流程清晰地展示了 ForkJoinTask
如何根据调用方式,选择在当前线程执行或委托给 ForkJoinPool
中的工作线程来执行其核心逻辑 doExec()
。