-
Notifications
You must be signed in to change notification settings - Fork 32
/
Semaphore 源码解析.md
154 lines (114 loc) · 3.84 KB
/
Semaphore 源码解析.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#### Semaphore
限制可以访问某些资源(物理或逻辑的)的线程数目
* 实际上是就是普通共享锁,内部静态类 Sync 继承了 AQS,实现了共享锁的 tryAcquireShared、tryReleaseShared。
* 逻辑就是正常共享锁逻辑,同时还可以实现公平和非公平
* 可以说是 ReentrantLock 的共享版,所以注意**要在 finally 里面调用 semaphore.release() 释放锁**
##### Sync 抽象类
```java
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
// 父类提供了非公平的 tryAcquireShared 方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 和 tryReleaseShared 一个效果,但是不会唤醒 CLH 的线程,不过下一次 tryReleaseShared 唤醒后就可以用这个释放的。
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 重置为 0,但是就算重置为 0,后面通过 aquirceShared 也就能进 permits 个
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
```
##### Sync 实现类
* NonfairSync
```java
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
```
* FairSync
```java
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) // 多了个判断 CLH 队列还有没有等待的节点
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
```
##### 方法
###### 构造方法
```java
public Semaphore(int permits) {
// 默认选择非公平锁
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
```
###### acquire
```java
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
```
###### release
```java
public void release() {
sync.releaseShared(1);
}
```
###### 应用场景
就是限制访问同步块的线程数