/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ privatevolatileint state;
/** The underlying callable; nulled out after running */ private Callable<V> callable;
/** The result to return or exception to throw from get() */ //用于保存callable正常执行的结果,或者是保存抛出的异常 private Object outcome; //non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */ //该Runnable关联的线程,该字段只有在run方法内才会赋值(执行线程和创建FutureTask的线程并非同一个) privatevolatile Thread runner;
/** * Simple linked list nodes to record waiting threads in a Treiber * stack. See other classes such as Phaser and SynchronousQueue * for more detailed explanation. */ staticfinalclassWaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ publicFutureTask(Callable<V> callable) { if (callable == null) thrownewNullPointerException(); this.callable = callable; this.state = NEW; //ensure visibility of callable }
/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if the runnable is null */ publicFutureTask(Runnable runnable, V result) { //将Runnable适配成Callable this.callable = Executors.callable(runnable, result); this.state = NEW; //ensure visibility of callable }
7.1.1 Executors.callable
该静态方法负责将Runnable对象适配成一个Callable对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Returns a {@link Callable} object that, when * called, runs the given task and returns the given result. This * can be useful when applying methods requiring a * {@code Callable} to an otherwise resultless action. * @param task the task to run * @param result the result to return * @param <T> the type of the result * @return a callable object * @throws NullPointerException if task null */ publicstatic <T> Callable<T> callable(Runnable task, T result) { if (task == null) thrownewNullPointerException(); returnnewRunnableAdapter<T>(task, result); }
其中适配器RunnableAdapter如下
该类仅仅是将一个Runnable对象适配成一个Callable对象,并无他用
注意到result通过构造方法进行赋值,然后在call方法中直接返回,与Runnable无任何关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * A callable that runs given task and returns given result */ staticfinalclassRunnableAdapter<T> implementsCallable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { //转调用Runnable的run方法 task.run(); //直接返回构造方法中传入的result return result; } }
publicvoidrun() { //当且仅当state==NEW 并且 CAS执行成功的线程才能继续执行 //为什么需要CAS?FutureTask可能并不一定需要单独开一个线程来执行,总之保证有且仅有一个线程执行这个方法 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //调用Callable#call方法,这里才是业务逻辑执行的入口 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //异常调用时,设置FutureTask的状态 setException(ex); } if (ran) //正常调用时,设置FutureTask的状态 set(result); } } finally { //runner must be non-null until state is settled to //prevent concurrent calls to run() runner = null; //state must be re-read after nulling runner to prevent //leaked interrupts ints= state; //确保该方法退出时,FutureTask处于最终状态,即NORMAL/EXCEPTIONAL/CANCELLED/INTERRUPTED状态中的一种 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
7.2.1 setException
该方法逻辑如下
该方法将FutureTask的状态通过CAS改为COMPLETING
然后将outcome赋值为异常对象
最后唤醒阻塞线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/** * Causes this future to report an {@link ExecutionException} * with the given throwable as its cause, unless this future has * already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protectedvoidsetException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); //final state finishCompletion(); } }
7.2.2 set
该方法逻辑如下
该方法将FutureTask的状态通过CAS改为COMPLETING
然后将outcome赋值Callable#run的返回结果
最后唤醒阻塞线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon successful completion of the computation. * * @param v the value */ protectedvoidset(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //final state finishCompletion(); } }
/** * Ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runAndReset. */ privatevoidhandlePossibleCancellationInterrupt(int s) { //It is possible for our interrupter to stall before getting a //chance to interrupt us. Let's spin-wait patiently. //如果处于INTERRUPTING,则说明有一个线程正在执行cancel方法,此处只需等待执行完毕即可 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); //wait out pending interrupt
//assert state == INTERRUPTED;
//We want to clear any interrupt we may have received from //cancel(true). However, it is permissible to use interrupts //as an independent mechanism for a task to communicate with //its caller, and there is no way to clear only the //cancellation interrupt. // //Thread.interrupted(); }
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ privateintawaitDone(boolean timed, long nanos) throws InterruptedException { //超时时间 finallongdeadline= timed ? System.nanoTime() + nanos : 0L; WaitNodeq=null; booleanqueued=false; for (;;) { //检查是否已被中断,如果已被中断,那么将当前线程所关联的WaiterNode移出等待链表 if (Thread.interrupted()) { removeWaiter(q); thrownewInterruptedException(); }
/** * Tries to unlink a timed-out or interrupted wait node to avoid * accumulating garbage. Internal nodes are simply unspliced * without CAS since it is harmless if they are traversed anyway * by releasers. To avoid effects of unsplicing from already * removed nodes, the list is retraversed in case of an apparent * race. This is slow when there are a lot of nodes, but we don't * expect lists to be long enough to outweigh higher-overhead * schemes. */ privatevoidremoveWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { //restart on removeWaiter race for (WaitNodepred=null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; //q.thread==null,且pred!=null,那么需要将q节点从链表中除去 elseif (pred != null) { pred.next = s; //如果发现pred.thread==null,说明被其他线程改过了,重新遍历一遍节点链表 if (pred.thread == null) //check for race continue retry; } //q.thread==null 且 pred==null,说明当前q是头节点,且q是无效节点,因此更改链表的头结点 elseif (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) //若CAS失败,说明存在竞争,重新遍历链表 continue retry; } break; } } }
/** * Returns result or throws exception for completed task. * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s)throws ExecutionException { Objectx= outcome; //说明Callable#call正常执行 if (s == NORMAL) return (V)x; //说明FutureTask被cancel了 if (s >= CANCELLED) thrownewCancellationException(); //说明Callable#call执行过程中抛出异常 thrownewExecutionException((Throwable)x); }