0%

SourceAnalysis-ForkJoin

阅读更多

1 什么是Fork/Join框架

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架

我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:

fig1

1.1 工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:

fig2

那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列

2 ForkJoinTask继承体系

要利用ForkJoin框架,必须继承ForkJoinTask的两个子类中的一个:RecursiveTask、RecursiveAction

ForkJoinTask的继承结构如下图所示

fig3

2.1 RecursiveAction

RecursiveAction源码如下,比较简单。其compute方法需要子类来实现,compute方法主要用于定义运算逻辑以及fork和join的逻辑,即定义何时该fork何时该join。for和join方法详见ForkJoinTask的源码分析

RecursiveAction与RecursiveTask的区别在于:

  1. RecursiveAction执行的任务没有结果
  2. RecursiveTask执行的任务有结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;

/**
* The main computation performed by this task.
*/
protected abstract void compute();

/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() { return null; }

/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) { }

/**
* Implements execution conventions for RecursiveActions.
*/
protected final boolean exec() {
compute();
return true;
}

}

2.2 RecursiveTask

RecursiveTask源码如下,比较简单。其compute方法需要子类来实现,compute方法主要用于定义运算逻辑以及fork和join的逻辑,即定义何时该fork何时该join。for和join方法详见ForkJoinTask的源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;

/**
* The result of the computation.
*/
V result;

/**
* The main computation performed by this task.
* @return the result of the computation
*/
protected abstract V compute();

public final V getRawResult() {
return result;
}

protected final void setRawResult(V value) {
result = value;
}

/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}

}

3 ForkJoinTask

3.1 常量

1
2
3
4
5
6
static final int DONE_MASK   = 0xf0000000;  //mask out non-completion bits
static final int NORMAL = 0xf0000000; //must be negative
static final int CANCELLED = 0xc0000000; //must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; //must be < CANCELLED
static final int SIGNAL = 0x00010000; //must be >= 1 << 16
static final int SMASK = 0x0000ffff; //short bits for tags
  • DONE_MASK:completion的掩码,即高四位为completion bits
  • NORMAL:正常状态,负数
  • CANCELLED:取消状态,负数,且小于NORMAL
  • EXCEPTIONAL:异常状态,负数,且小于CANCELLED
  • SIGNAL:???
  • SMASK:tags的掩码,即低四位

3.2 字段

1
volatile int status; //accessed directly by pool and workers
  • status:用于标记任务的状态

3.3 重要方法

3.3.1 fork

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Arranges to asynchronously execute this task in the pool the
* current task is running in, if applicable, or using the {@link
* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
* it is not necessarily enforced, it is a usage error to fork a
* task more than once unless it has completed and been
* reinitialized. Subsequent modifications to the state of this
* task or any data it operates on are not necessarily
* consistently observable by any thread other than the one
* executing it unless preceded by a call to {@link #join} or
* related methods, or a call to {@link #isDone} returning {@code
* true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
Thread t;
//如果当前线程的类型为ForkJoinWorkerThread,意味着当前任务已经在ForkJoinPool中进行处理了
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
//将当前任务添加到ForkJoinWorkerThread#workQueue中
((ForkJoinWorkerThread)t).workQueue.push(this);
else
//否则公用一个ForkJoinPool处理任务,common是一个静态字段,类型为ForkJoinPool
ForkJoinPool.common.externalPush(this);
return this;
}

3.3.2 join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
*
* @return the computed result
*/
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

3.3.2.1 doJoin

doJoin方法执行具体的join逻辑,即合并各个线程执行任务的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//下面这个符合语句显得有的复杂,我们进行一下分解
//1. 当前状态为负数,即高四位为NORMAL或CANCELLED或EXCEPTIONAL,返回当前状态
//2. 当前状态为非负数,当前线程为ForkJoinWorkerThread,且tryUnpush(this)方法返回true且doExec()返回负数时,返回doExec()方法返回的结果
//3. 当前状态为非负数,当前线程为ForkJoinWorkerThread,且tryUnpush(this)方法返回false或doExec()返回非负数时,返回wt.pool.awaitJoin(w, this, 0L)方法执行的结果
//4. 当前状态为非负数,且当前线程为普通线程时,执行externalAwaitDone
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
  • tryUnpush方法详见ForkJoinPool源码剖析

3.3.2.2 doExec

该方法主要目的就是调用exec()方法,该方法是ForkJoinTask暴露给子类的抽象方法,而其子类RecursiveAction与RecursiveTask又对exec()方法进行了一层封装,对外暴露compute方法,因此对于RecursiveAction与RecursiveTask来说,doExec方法最终执行的就是compute()方法的逻辑,也就是用户自定义的运算逻辑

为什么RecursiveAction与RecursiveTask需要对exec()方法再做一层封装?因为exec()方法是有返回值的,而RecursiveAction与RecursiveTask为了提供不同的语义,需要对外暴露不同的compute方法(其返回类型不同),因此不能直接暴露exec方法给用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Primary execution method for stolen tasks. Unless done, calls
* exec and records status if completed, but doesn't wait for
* completion otherwise.
*
* @return status on exit from this method
*/
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
//其中exec方法是一个抽象方法,其实现详见RecursiveAction与RecursiveTask
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
//设置任务的状态为正常
s = setCompletion(NORMAL);
}
return s;
}

3.3.2.3 externalAwaitDone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? //try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}

3.3.3 invoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
*
* @return the computed result
*/
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

4 ForkJoinPool

4.1 常量

1
2
3
4
5
6
private static final int  RSLOCK     = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
  • RSLOCK
  • RSIGNAL
  • STARTED
  • STOP
  • TERMINATED
  • SHUTDOWN

4.2 字段

1
2
3
4
5
6
7
8
9
volatile long ctl;                   //main pool control
volatile int runState; //lockable status
final int config; //parallelism, mode
int indexSeed; //to generate worker index
volatile WorkQueue[] workQueues; //main registry
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; //per-worker UEH
final String workerNamePrefix; //to create worker name string
volatile AtomicLong stealCounter; //also used as sync monitor

4.3 WorkQueue

WorkQueue(ForkJoinPool的静态内部类)用于支持任务窃取(work-stealing)以及任务提交(task submission)。下面即给出WorkQueue的源码,以及注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
static final class WorkQueue {

/**
* Capacity of work-stealing queue array upon initialization.
* Must be a power of two; at least 4, but should be larger to
* reduce or eliminate cacheline sharing among queues.
* Currently, it is much larger, as a partial workaround for
* the fact that JVMs often place arrays in locations that
* share GC bookkeeping (especially cardmarks) such that
* per-write accesses encounter serious memory contention.
*/
//初始Queue的大小,必须是2的幂次,这样设计的用意是什么???
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

/**
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
//Queue大小的最大值
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; //64M

//Instance fields

//
volatile int scanState; //versioned, <0: inactive; odd:scanning

//
int stackPred; //pool stack (ctl) predecessor

//
int nsteals; //number of steals

//
int hint; //randomization and stealer index hint

//
int config; //pool index and mode

//
volatile int qlock; //1: locked, < 0: terminate; else 0

//指向下一个poll的元素,一般而言,base<top,base可能大于array.length
volatile int base; //index of next slot for poll

//指向下一个push的元素,一般而言,base<top,base可能大于array.length
int top; //index of next slot for push

//
ForkJoinTask<?>[] array; //the elements (initially unallocated)

//当前WorkQueue归属的ForkJoinPool
final ForkJoinPool pool; //the containing pool (may be null)

//当前WorkQueue归属的ForkJoinWorkerThread
final ForkJoinWorkerThread owner; //owning thread or null if shared

//
volatile Thread parker; //== owner during call to park; else null

//
volatile ForkJoinTask<?> currentJoin; //task being joined in awaitJoin

//
volatile ForkJoinTask<?> currentSteal; //mainly used by helpStealer

//一个WorkQueue归属于一个ForkJoinPool以及一个ForkJoinWorkerThread
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
//Place indices in the center of array (that is not yet allocated)
//将base和top置于中间位置
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}

/**
* Returns an exportable index (used by ForkJoinWorkerThread).
*/
final int getPoolIndex() {
return (config & 0xffff) >>> 1; //ignore odd/even tag bit
}

/**
* Returns the approximate number of tasks in the queue.
*/
//返回队列中的元素,即base~top之间的元素个数
final int queueSize() {
int n = base - top; //non-owner callers must read base first
return (n >= 0) ? 0 : -n; //ignore transient negative
}

/**
* Provides a more accurate estimate of whether this queue has
* any tasks than does queueSize, by checking whether a
* near-empty queue has at least one unclaimed task.
*/
//该方法提供比queueSize()更准确的估计
//1. 如果base~top没有元素,则直接返回true
//2. 当base~top含有一个元素,且数组a中并不存在元素时返回true
final boolean isEmpty() {
ForkJoinTask<?>[] a; int n, m, s;
return ((n = base - (s = top)) >= 0 ||
(n == -1 && //possibly one task
((a = array) == null || (m = a.length - 1) < 0 ||
U.getObject
(a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
}

/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { //ignore if queue removed
int m = a.length - 1; //fenced write for task visibility
//为什么m&s相当于计算下标,m的bit位形如000..111。putOrderedObject插入StoreStore内存屏障,禁止写写重排序
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
//putOrderedInt插入StoreStore内存屏障,禁止写写重排序
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}

/**
* Initializes or doubles the capacity of array. Call either
* by owner or with lock held -- it is OK for base, but not
* top, to move while resizings are in progress.
*/
final ForkJoinTask<?>[] growArray() {
ForkJoinTask<?>[] oldA = array;
int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
int oldMask, t, b;
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
(t = top) - (b = base) > 0) {
int mask = size - 1;
do { //emulate poll from old array, push to new array
ForkJoinTask<?> x;
int oldj = ((b & oldMask) << ASHIFT) + ABASE;
int j = ((b & mask) << ASHIFT) + ABASE;
x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
if (x != null &&
U.compareAndSwapObject(oldA, oldj, x, null))
U.putObjectVolatile(a, j, x);
} while (++b != t);
}
return a;
}

/**
* Takes next task, if one exists, in LIFO order. Call only
* by owner in unshared queues.
*/
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
if ((a = array) != null && (m = a.length - 1) >= 0) {
for (int s; (s = top - 1) - base >= 0;) {
long j = ((m & s) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
break;
if (U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s);
return t;
}
}
}
return null;
}

/**
* Takes a task in FIFO order if b is base of queue and a task
* can be claimed without contention. Specialized versions
* appear in ForkJoinPool methods scan and helpStealer.
*/
final ForkJoinTask<?> pollAt(int b) {
ForkJoinTask<?> t; ForkJoinTask<?>[] a;
if ((a = array) != null) {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
base == b && U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
return null;
}

/**
* Takes next task, if one exists, in FIFO order.
*/
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
while ((b = base) - top < 0 && (a = array) != null) {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
if (base == b) {
if (t != null) {
if (U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
else if (b + 1 == top) //now empty
break;
}
}
return null;
}

/**
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
return (config & FIFO_QUEUE) == 0 ? pop() : poll();
}

/**
* Returns next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> peek() {
ForkJoinTask<?>[] a = array; int m;
if (a == null || (m = a.length - 1) < 0)
return null;
int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
int j = ((i & m) << ASHIFT) + ABASE;
return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
}

/**
* Pops the given task only if it is at the current top.
* (A shared version is available only via FJP.tryExternalUnpush)
*/
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
if ((a = array) != null && (s = top) != base &&
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
U.putOrderedInt(this, QTOP, s);
return true;
}
return false;
}

/**
* Removes and cancels all known tasks, ignoring any exceptions.
*/
final void cancelAll() {
ForkJoinTask<?> t;
if ((t = currentJoin) != null) {
currentJoin = null;
ForkJoinTask.cancelIgnoringExceptions(t);
}
if ((t = currentSteal) != null) {
currentSteal = null;
ForkJoinTask.cancelIgnoringExceptions(t);
}
while ((t = poll()) != null)
ForkJoinTask.cancelIgnoringExceptions(t);
}

//Specialized execution methods

/**
* Polls and runs tasks until empty.
*/
final void pollAndExecAll() {
for (ForkJoinTask<?> t; (t = poll()) != null;)
t.doExec();
}

/**
* Removes and executes all local tasks. If LIFO, invokes
* pollAndExecAll. Otherwise implements a specialized pop loop
* to exec until empty.
*/
final void execLocalTasks() {
int b = base, m, s;
ForkJoinTask<?>[] a = array;
if (b - (s = top - 1) <= 0 && a != null &&
(m = a.length - 1) >= 0) {
if ((config & FIFO_QUEUE) == 0) {
for (ForkJoinTask<?> t;;) {
if ((t = (ForkJoinTask<?>)U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) == null)
break;
U.putOrderedInt(this, QTOP, s);
t.doExec();
if (base - (s = top - 1) > 0)
break;
}
}
else
pollAndExecAll();
}
}

/**
* Executes the given task and any remaining local tasks.
*/
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; //mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); //release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) //collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}

/**
* Adds steal count to pool stealCounter if it exists, and resets.
*/
final void transferStealCount(ForkJoinPool p) {
AtomicLong sc;
if (p != null && (sc = p.stealCounter) != null) {
int s = nsteals;
nsteals = 0; //if negative, correct for overflow
sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
}
}

/**
* If present, removes from queue and executes the given task,
* or any other cancelled task. Used only by awaitJoin.
*
* @return true if queue empty and task not known to be done
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
while ((n = (s = top) - (b = base)) > 0) {
for (ForkJoinTask<?> t;;) { //traverse from s to b
long j = ((--s & m) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
return s + 1 == top; //shorter than expected
else if (t == task) {
boolean removed = false;
if (s + 1 == top) { //pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
else if (base == b) //replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
if (removed)
task.doExec();
break;
}
else if (t.status < 0 && s + 1 == top) {
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; //was cancelled
}
if (--n == 0)
return false;
}
if (task.status < 0)
return false;
}
}
return true;
}

/**
* Pops task if in the same CC computation as the given task,
* in either shared or owned mode. Used only by helpComplete.
*/
final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
int s; ForkJoinTask<?>[] a; Object o;
if (base - (s = top) < 0 && (a = array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
if ((o = U.getObjectVolatile(a, j)) != null &&
(o instanceof CountedCompleter)) {
CountedCompleter<?> t = (CountedCompleter<?>)o;
for (CountedCompleter<?> r = t;;) {
if (r == task) {
if (mode < 0) { //must lock
if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
if (top == s && array == a &&
U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s - 1);
U.putOrderedInt(this, QLOCK, 0);
return t;
}
U.compareAndSwapInt(this, QLOCK, 1, 0);
}
}
else if (U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s - 1);
return t;
}
break;
}
else if ((r = r.completer) == null) //try parent
break;
}
}
}
return null;
}

/**
* Steals and runs a task in the same CC computation as the
* given task if one exists and can be taken without
* contention. Otherwise returns a checksum/control value for
* use by method helpComplete.
*
* @return 1 if successful, 2 if retryable (lost to another
* stealer), -1 if non-empty but no matching task found, else
* the base index, forced negative.
*/
final int pollAndExecCC(CountedCompleter<?> task) {
int b, h; ForkJoinTask<?>[] a; Object o;
if ((b = base) - top >= 0 || (a = array) == null)
h = b | Integer.MIN_VALUE; //to sense movement on re-poll
else {
long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((o = U.getObjectVolatile(a, j)) == null)
h = 2; //retryable
else if (!(o instanceof CountedCompleter))
h = -1; //unmatchable
else {
CountedCompleter<?> t = (CountedCompleter<?>)o;
for (CountedCompleter<?> r = t;;) {
if (r == task) {
if (base == b &&
U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
t.doExec();
h = 1; //success
}
else
h = 2; //lost CAS
break;
}
else if ((r = r.completer) == null) {
h = -1; //unmatched
break;
}
}
}
}
return h;
}

/**
* Returns true if owned and not known to be blocked.
*/
final boolean isApparentlyUnblocked() {
Thread wt; Thread.State s;
return (scanState >= 0 &&
(wt = owner) != null &&
(s = wt.getState()) != Thread.State.BLOCKED &&
s != Thread.State.WAITING &&
s != Thread.State.TIMED_WAITING);
}

//Unsafe mechanics. Note that some are (and must be) the same as in FJP
private static final sun.misc.Unsafe U;
private static final int ABASE;
private static final int ASHIFT;
private static final long QTOP;
private static final long QLOCK;
private static final long QCURRENTSTEAL;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> wk = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
QTOP = U.objectFieldOffset
(wk.getDeclaredField("top"));
QLOCK = U.objectFieldOffset
(wk.getDeclaredField("qlock"));
QCURRENTSTEAL = U.objectFieldOffset
(wk.getDeclaredField("currentSteal"));
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
}

4.4 重要方法

4.4.1 submit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Submits a ForkJoinTask for execution.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
//NPE检查
if (task == null)
throw new NullPointerException();
//执行该task
externalPush(task);
return task;
}

4.4.1.1 externalPush

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
//下面这堆条件的意思是:先进行一些边界条件的判断,然后获取锁状态,即当前线程拿到了独占资源,可以进行一些线程安全的操作
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
//n代表已经占用的数组中的元素的个数。当数组中仍有剩余元素时,那么将指定的task放入queue的尾部,即top指向的地方
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
//由于top可能大于数组长度,因此通过&运算符来计算下标,这也是为什么数组长度必须是2的幂次的原因,如果数组长度是其他的数值,那么求余运算的开销将会比较大。下面的表达式含义就是计算top指向的位置的内存偏移量,然后利用Unsafe的put方法进行赋值操作
int j = ((am & s) << ASHIFT) + ABASE;
//putOrderedObject可以插入StoreStore内存屏障禁止写写重排序
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
//这里为什么还需要putIntVolatile?qlock字段本来就是volatile的
U.putIntVolatile(q, QLOCK, 0);
//当Task数量很少???
if (n <= 1)
signalWork(ws, q);
return;
}
//解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}

4.4.1.2 signalWork

当work数量过少时,signalWork方法用于创建或者激活一些worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Tries to create or activate a worker if too few are active.
*
* @param ws the worker array to use to find signallees
* @param q a WorkQueue --if non-null, don't retry if now empty
*/
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { //too few active
if ((sp = (int)c) == 0) { //no idle workers
if ((c & ADD_WORKER) != 0L) //too few workers
tryAddWorker(c);
break;
}
if (ws == null) //unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) //terminated
break;
if ((v = ws[i]) == null) //terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; //next scanState
int d = sp - v.scanState; //screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; //activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) //no more work
break;
}
}

5 参考