/** Marker to indicate a node is waiting in shared mode */ staticfinalNodeSHARED=newNode(); /** Marker to indicate a node is waiting in exclusive mode */ staticfinalNodeEXCLUSIVE=null;
/** waitStatus value to indicate thread has cancelled */ staticfinalintCANCELLED=1; /** waitStatus value to indicate successor's thread needs unparking */ staticfinalintSIGNAL= -1; /** waitStatus value to indicate thread is waiting on condition */ staticfinalintCONDITION= -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ staticfinalintPROPAGATE= -3;
/** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatileint waitStatus;
/** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev;
/** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next;
/** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread;
/** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter;
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ privatetransientvolatile Node head;
/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ privatetransientvolatile Node tail;
/** * The synchronization state. */ privatevolatileint state;
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ publicfinalvoidacquire(int arg) { //首先执行tryAcquire(arg)尝试获取资源,如果成功则直接返回 //如果tryAcquire(arg)获取资源失败,则将当前线程封装成Node节点加入到sync queue队列中,并通过acquireQueued进行获取资源直至成功(如果尚未有资源可获取,那么acquireQueued会阻塞当前线程) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { //生成指定模式的Node节点 Nodenode=newNode(Thread.currentThread(), mode); //Try the fast path of enq; backup to full enq on failure Nodepred= tail; //以下几行进行入队操作,如果失败,交给enq进行入队处理。其实,我认为可以直接调用enq,不知道作者设置如下几行的意图 if (pred != null) { node.prev = pred; //通过CAS操作串行化并发入队操作,仅有一个线程会成功,由于node节点的prev字段是在CAS操作之前进行的,一旦CAS操作成功,node节点的prev字段就是指向了其前继节点,因此说prev字段是安全的 if (compareAndSetTail(pred, node)) { //这里直接通过赋值操作赋值next字段,注意,可能有别的线程会在next字段赋值之前访问到next字段,因此next字段是非可靠的(一个节点的next字段为null并不代表该节点没有后继) pred.next = node; //一旦next字段赋值成功,那么next字段又变为可靠的了 return node; } } //通过enq入队 enq(node); return node; }
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { intws= pred.waitStatus; //一旦发现前继节点是SIGNAL状态,就返回true,在acquireQueued方法中会阻塞当前线程 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ //这里给出两个问题: //1.如果在当前线程阻塞之前,前继节点就唤醒了当前线程,那么当前线程不就永远阻塞下去了吗? //2.万一有别的线程更改了前继节点的状态,导致前继节点不唤醒当前线程,那么当前线程不就永远阻塞下去了吗? returntrue;
//如果前继节点处于CANCELL状态(仅有CANCELL状态大于0) if (ws > 0) { //那么跳过那些被CANCELL的节点,先前找到第一个有效节点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //前继节点状态要么是0,要么是PROPAGATE,将其通过CAS操作设为SIGNAL,不用管是否成功,退回到上层函数acquireQueued进行再次判断 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ privatefinalbooleanparkAndCheckInterrupt() { LockSupport.park(this); //返回是否被中断过 return Thread.interrupted(); }
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ publicfinalbooleanrelease(int arg) { //调用tryRelease尝试释放资源 if (tryRelease(arg)) { Nodeh= head; //只要头节点不为空且状态不为0,就唤醒后继节点,对于独占模式也就只有SIGNAL状态一种,头结点在任何情况下都不可能为CANCELL状态 if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; }
/** * Wakes up node's successor, if one exists. * * @param node the node */ privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; //若节点状态小于0,将其通过CAS操作改为0,表明本次SIGNAL的任务已经完成,至于CAS是否成功,或者是否再次被其他线程修改,都与本次无关unparkSuccessor无关,只是该节点被赋予了新的任务而已 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //这里通过非可靠的next字段直接获取后继,如果非空,那么说明该字段可靠,如果为空,那么利用可靠的prev字段从tail向前找到当前node节点的后继节点 Nodes= node.next; if (s == null || s.waitStatus > 0) { s = null; for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒后继节点 if (s != null) LockSupport.unpark(s.thread); }
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ publicfinalvoidacquireShared(int arg) { //尝试获取锁,如果返回值不小于9,则说明获取成功,直接返回 //如果获取失败,则进入doAcquireShared方法,执行后续操作 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ privatevoidsetHeadAndPropagate(Node node, int propagate) { Nodeh= head; //Record old head for check below //将当前节点设置为头结点 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //这是一堆极其诡异的条件,我暂时分析不清楚,但是感觉大概率是true,也就是,大概率会触发doReleaseShared方法 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Nodes= node.next; if (s == null || s.isShared()) doReleaseShared(); } }
doReleaseShared方法将放在下一小节中进行分析
4.4 releaseShared
releaseShared方法是共享模式下实现解锁语义的入口方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ publicfinalbooleanreleaseShared(int arg) { //通过tryReleaseShared方法尝试释放资源 if (tryReleaseShared(arg)) { doReleaseShared(); returntrue; } returnfalse; }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ privatevoiddoReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Nodeh= head; if (h != null && h != tail) { intws= h.waitStatus; //若头结点为SIGNAL状态,则将其通过CAS操作改为0 if (ws == Node.SIGNAL) { //如果失败,说明存在竞争,可能有其他线程也在执行该方法,那么由竞争成功的线程执行unparkSuccessor方法即可 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //loop to recheck cases //运行到这,说明当前线程竞争成功,执行unparkSuccessor唤醒头结点的后继节点,即sync queue中第二个节点 unparkSuccessor(h); } //如果头结点状态是0,意味着后面没有节点了,这里失败的可能原因是新节点加入,将头结点重新设置为SIGNAL elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果CAS失败了,此时可能有新节点将头结点重新标记为SIGNAL,如果此时不执行continue,将会导致该方法结束,这样便没有达到propagate的目的,因此必须区分CAS结果进行不同处理 continue; //loop on failed CAS } //如果头结点没有发生变化,则退出死循环 if (h == head) //loop if head changed break; } }
/** * Acquires in exclusive mode, aborting if interrupted. * Implemented by first checking interrupt status, then invoking * at least once {@link #tryAcquire}, returning on * success. Otherwise the thread is queued, possibly repeatedly * blocking and unblocking, invoking {@link #tryAcquire} * until success or the thread is interrupted. This method can be * used to implement method {@link Lock#lockInterruptibly}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * @throws InterruptedException if the current thread is interrupted */ publicfinalvoidacquireInterruptibly(int arg) throws InterruptedException { //先检查一次是否被中断,如果是,则直接抛出异常 if (Thread.interrupted()) thrownewInterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ publicfinalvoidacquireSharedInterruptibly(int arg) throws InterruptedException { //首先检查一下是否被中断,如果是,则直接抛出InterruptedException异常 if (Thread.interrupted()) thrownewInterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
/** * Attempts to acquire in exclusive mode, aborting if interrupted, * and failing if the given timeout elapses. Implemented by first * checking interrupt status, then invoking at least once {@link * #tryAcquire}, returning on success. Otherwise, the thread is * queued, possibly repeatedly blocking and unblocking, invoking * {@link #tryAcquire} until success or the thread is interrupted * or the timeout elapses. This method can be used to implement * method {@link Lock#tryLock(long, TimeUnit)}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * @param nanosTimeout the maximum number of nanoseconds to wait * @return {@code true} if acquired; {@code false} if timed out * @throws InterruptedException if the current thread is interrupted */ publicfinalbooleantryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
/** * Attempts to acquire in shared mode, aborting if interrupted, and * failing if the given timeout elapses. Implemented by first * checking interrupt status, then invoking at least once {@link * #tryAcquireShared}, returning on success. Otherwise, the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted or the timeout elapses. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. * @param nanosTimeout the maximum number of nanoseconds to wait * @return {@code true} if acquired; {@code false} if timed out * @throws InterruptedException if the current thread is interrupted */ publicfinalbooleantryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //如果已被中断,则直接抛出InterruptedException异常 if (Thread.interrupted()) thrownewInterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }