/** The queued items */ //数组,用于存放元素,注意到该字段是final修饰的,因此ArrayBlockingQueue是不能扩容的,其容量在初始化时就已经确定 final Object[] items;
/** items index for next take, poll, peek or remove */ //take/poll/peek/remove方法操作的下标 int takeIndex;
/** items index for next put, offer, or add */ //put/offer/add方法操作的下标 int putIndex;
/** Number of elements in the queue */ //队列中的所有元素 int count;
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */
/** Main lock guarding all access */ //重入锁,所有非线程安全字段的访问都需要配合该重入锁 final ReentrantLock lock;
/** Condition for waiting takes */ //条件变量,用于阻塞和唤醒线程 privatefinal Condition notEmpty;
/** Condition for waiting puts */ //条件变量,用于阻塞和唤醒线程 privatefinal Condition notFull;
/** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ //迭代器 transientItrsitrs=null;
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. This method is generally preferable to method {@link #add}, * which can fail to insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ publicbooleanoffer(E e) { checkNotNull(e); finalReentrantLocklock=this.lock; lock.lock(); try { //队列已满时,直接返回false if (count == items.length) returnfalse; else { //enqueue需要配合重入锁lock才能确保线程安全 enqueue(e); returntrue; } } finally { lock.unlock(); } }
3.1.1 enqueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ privatevoidenqueue(E x) { //assert lock.getHoldCount() == 1; //assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; //更新putIndex if (++putIndex == items.length) putIndex = 0; count++; //此时队列至少有一个元素,因此通过条件对象notEmpty唤醒那些阻塞在notEmpty上的其中一个线程 notEmpty.signal(); }
/** * Inserts the specified element at the tail of this queue, waiting * up to the specified wait time for space to become available if * the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicbooleanoffer(E e, long timeout, TimeUnit unit) throws InterruptedException {
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicvoidput(E e)throws InterruptedException { checkNotNull(e); finalReentrantLocklock=this.lock; //允许中断 lock.lockInterruptibly(); try { //若队列已满,则阻塞当前线程,直至队列非满 while (count == items.length) //阻塞后,会释放持有的锁。唤醒后又会重新持有锁 notFull.await(); //入队 enqueue(e); } finally { lock.unlock(); } }