0%

SourceAnalysis-Semaphore

阅读更多

1 前言

一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个acquire(),然后再获取该许可。每个release()添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可

2 内部类

2.1 内部类Sync

Sync继承了AbstractQueuedSynchronizer(AQS),并使用的是AQS中的共享模式。要想深入理解Semaphore还是要先了解AQS的机制以及源码 SourceAnalysis-AQS

为什么是共享模式呢?很好理解,因为是许可嘛,假设有多个许可可用,那么肯定同时可能有多个线程来取用这个许可,因此需要的是共享模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

//设置初始许可数量
Sync(int permits) {
setState(permits);
}

//获取现有的许可数量
final int getPermits() {
return getState();
}

//共享模式下,非公平的tryAcquiresShared,所谓非公平是指,获取资源的线程不管阻塞队列中是否有已经处于等待状态的线程,而直接尝试获取资源,获取失败时才进入sync queue(AQS维护的同步阻塞队列),这对于那些已经在sync queue中等待的线程来说是不公平的
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

//共享模式下的释放许可的方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//释放后的许可数量
int next = current + releases;
if (next < current) //overflow
throw new Error("Maximum permit count exceeded");
//CAS操作成功才返回
if (compareAndSetState(current, next))
return true;
}
}

//减少许可数量
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
//下溢出
if (next > current) //underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

//释放所有的许可
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

2.2 内部类NonfairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

//转调用Sync的nonfairTryAcquireShared方法即可
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

2.3 内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

//共享模式下,公平的tryAcquireShared方法,公平是指,当执行tryAcquireShared
protected int tryAcquireShared(int acquires) {
for (;;) {
//sync queue不为空,且当前线程不是第二个节点时hasQueuedPredecessors()返回true
if (hasQueuedPredecessors())
//返回-1,那么当前线程将会被AQS添加到sync queue中去
return -1;
int available = getState();
//剩余许可的数量
int remaining = available - acquires;
//当许可不足时,或者CAS操作成功时返回剩余许可数量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

3 字段

Semaphore仅有一个字段,即上面的内部类Sync的实例。信号量的语义是靠sync来实现的

1
2
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;

4 重要方法

4.1 构造方法

该构造方法创建指定数量的许可

1
2
3
4
5
6
7
8
9
10
11
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

该构造方法创建指定数量的许可,并且指定公平模式还是非公平模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

4.2 acquire

以可中断的方式获取一个许可,当没有许可时将会阻塞,直到获取许可或者被中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

该方法获取指定数量的许可,本质上与上一个acquire没有区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Acquires the given number of permits from this semaphore,
* blocking until all are available,
* or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires the given number of permits, if they are available,
* and returns immediately, reducing the number of available permits
* by the given amount.
*
* <p>If insufficient permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes one of the {@link #release() release}
* methods for this semaphore, the current thread is next to be assigned
* permits and the number of available permits satisfies this request; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
* Any permits that were to be assigned to this thread are instead
* assigned to other threads trying to acquire permits, as if
* permits had been made available by a call to {@link #release()}.
*
* @param permits the number of permits to acquire
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if {@code permits} is negative
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

4.3 release

释放一个许可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

/**
* Releases a permit, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads are trying to acquire a permit, then one is
* selected and given the permit that was just released. That thread
* is (re)enabled for thread scheduling purposes.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*/
public void release() {
sync.releaseShared(1);
}

释放指定数量的许可,与上个方法没有本质上的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Releases the given number of permits, returning them to the semaphore.
*
* <p>Releases the given number of permits, increasing the number of
* available permits by that amount.
* If any threads are trying to acquire permits, then one
* is selected and given the permits that were just released.
* If the number of available permits satisfies that thread's request
* then that thread is (re)enabled for thread scheduling purposes;
* otherwise the thread will wait until sufficient permits are available.
* If there are still permits available
* after this thread's request has been satisfied, then those permits
* are assigned in turn to other threads trying to acquire permits.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link Semaphore#acquire acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permits the number of permits to release
* @throws IllegalArgumentException if {@code permits} is negative
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}