/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); //将节点添加到当前Condition对象的condition queue中 Nodenode= addConditionWaiter(); //释放当前线程持有的资源,因此await方法时,必须处于持有锁的状态,与wait类似(wait/notify必须位于synchronize块内部) intsavedState= fullyRelease(node); //用于记录中断模式 intinterruptMode=0; //循环,直至位于sync queue中或者被中断 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //下面进行一些后续处理 //可以看出,当前线程通过acquireQueued来获取锁状态直至成功,该方法返回值指示该过程是否被中断 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //这里为什么有可能不是null? signal方法会将节点的nextWaiter字段赋值为null,但是如果因为中断而退出wihle循环,则此字段没有置空 if (node.nextWaiter != null) //clean up if cancelled unlinkCancelledWaiters(); //根据中断模式,进行相应的中断处理,抛出异常或者回复中断现场等 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ finalbooleanisOnSyncQueue(Node node) { //如果节点的状态是CONDITION,那么意味着节点位于condition queue中 //如果节点的prev为null,那么节点位于condition queue中 //sync queue头结点的prev字段也是null(setHead方法会将该字段设为null),但是这种特殊情况在这里是不会出现的,在调用acquireQueued方法之前,节点是不可能获取锁状态的,因此不可能成为头结点 if (node.waitStatus == Node.CONDITION || node.prev == null) returnfalse; //如果节点的next字段不为null,说明节点一定位于sync queue中 //节点的next字段为null,则并不一定说明该节点没有后继,因为Node#next字段是非可靠的。该节点可能通过addWaiter加入到sync queue中(添加操作是通过执行signal方法的线程来完成的),当前线程在节点的next字段正确赋值之前对其进行了访问 if (node.next != null) //If has successor, it must be on queue returntrue;
//节点prev字段不为空,并不一定代表节点位于sync queue中,因为prev字段的赋值操作在CAS操作之前,只有CAS成功之后,节点才会成功入队 /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ //从后往前遍历sync queue,查找node节点是否位于sync queue,这种方式是比较慢的,因此在上述判断均不能确定的情况下才会使用 return findNodeFromTail(node); }
在while循环中等待时,需要检查当前线程是被unpark正常唤醒还是被interrupt唤醒
1 2 3 4 5 6 7 8 9 10
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ privateintcheckInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
/** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. * * @param node the node * @return true if cancelled before the node was signalled */ finalbooleantransferAfterCancelledWait(Node node) { //将节点状态改为0后入队,必须由一个线程来完成 //为什么可能存在竞争?当前线程被中断,于是进入该方法,此时另一个线程恰好执行signal,会进入另一个方法transferForSignal,这两个方法均会执行CAS操作将节点状态从CONDITION改为0 //或者节点的状态是CANCELLED if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); returntrue; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ //可能出现上述竞争场景,于是等待另一个线程入队操作完毕,这个等待不会很久 while (!isOnSyncQueue(node)) Thread.yield(); returnfalse; }
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignal() { //同理,执行signal方法的线程必须持有独占锁,否则抛出IllegalMonitorStateException异常 if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); //唤醒condition queue中第一个节点 Nodefirst= firstWaiter; if (first != null) doSignal(first); }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first) { do { //这里更新头结点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } //直至transferForSignal成功一次,或者队列为空 while (!transferForSignal(first) && (first = firstWaiter) != null); }
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ finalbooleantransferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //可能存在竞争:另一个线程在执行await时,被中断,然后执行transferAfterCancelledWait方法;当前线程执行signal,然后执行次方法。这两个方法都会通过CAS操作将节的状态从CONDITION改为0 //或者节点的状态已经是CANCELLED if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //通过enq入队(sync queue),该方法返回node节点的前继节点 Nodep= enq(node); intws= p.waitStatus; //若前继节点状态为CANCELLED,或者将前继节点置为SIGNAL失败,那么需要找到前继节点,并将其设为SIGNAL状态后才能安心阻塞,因此唤醒一下,让节点关联的线程去自行处理 //这里唤醒节点,醒来的地方可能是await方法中的while循环内,紧接着会执行acquireQueued方法 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); returntrue; }
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignalAll() { //必须处于独占模式下,并且必须持有锁,否则抛出IllegalMonitorStateException异常 if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); Nodefirst= firstWaiter; if (first != null) doSignalAll(first); }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawait(long time, TimeUnit unit) throws InterruptedException { //将时间信息转为纳秒 longnanosTimeout= unit.toNanos(time); if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); finallongdeadline= System.nanoTime() + nanosTimeout; booleantimedout=false; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
/** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawaitUntil(Date deadline) throws InterruptedException { //获取具体时刻,对应前面几种方法的deadline longabstime= deadline.getTime(); if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); booleantimedout=false; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }