并发编程背景知识这篇文章中,我们了解到了管程的概念,并且知道Java内置锁synchronized也是通过管程实现的。但是正因为它是Java内置的,所以加解锁操作以及条件变量的个数我们都是无法控制的。所以在JDK1.5之后,Java为我们提供了显式锁ReentrantLock。本文先不讨论ReentrantLock的优缺点及其实现方式,而是探究一下比ReentrantLock更底层的一个用于构建锁和同步器的框架AbstractQueuedSynchronizer。几乎任一同步器都可以用来实现其他形式的同步器,我们常见的ReentrantLockSemaphoreCountDownLatch等J.U.C中的同步器都是通过AbstractQueuedSynchronizer实现的。

AbstractQueuedSynchronizer实现原理

AQS的理论依据就是管程,所以我们再次看一看管程:

管程由四部分组成:

  • 管程内部的共享变量。
  • 管程内部的条件变量。
  • 管程内部并行执行的进程/线程。
  • 对于局部与管程内部的共享数据设置初始值的语句。

AQS维护了一个共享资源state和一个 FIFO 的同步队列(即管程的入口等待队列),底层利用了 CAS 机制来保证操作的原子性。
AQS的UML如下,Node内部类代表同步队列中的线程,ConditionObject内部类代表一个条件变量,AbstractOwnableSynchronizer类很简单,就是独占模式下,记录占用线程。

再来看看AQS有哪些重要的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

// 同步队列的头尾指针
private transient volatile Node head;
private transient volatile Node tail;

// 代表需要同步的状态及其getter/setter
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// cas的方式更新state的方法
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

...
}

将上述代码转化为图,就是:

以实现独占锁为例(即当前资源只能被一个线程占有),其实现原理如下:state 初始化 0,在多线程条件下,线程要执行临界区的代码,必须首先获取锁,也就是线程需要成功将 state 从 0 加到 1,其他线程再获取的话由于共享资源已被占用,所以会到 FIFO 同步队列去等待,等占有 state 的线程执行完临界区的代码释放资源( state 减 1)后,会唤醒 FIFO 中的下一个等待线程(head 中的下一个结点)去获取锁。
head 结点代表当前占用的线程,其他节点由于暂时获取不到锁所以依次排队等待锁释放。
因为state变量是volatile的,而使用AQS进行加解锁操作时都会修改state变量的值,根据volatile关键字的Happens-Before规则,以及Happens-Before的传递性,就可以保证使用AQS进行同步时的可见性。

子类实现方式

我们看到AQS是一个抽象类,需要子类去实现它。在官方注释中有说明它的实现类应该是一个非公开的内部类。AQS提供了几个模板方法供子类去实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 独占模式
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 独占模式才有用
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

tryAcquire方法为例,它返回boolean类型,表示独占模式下,调用它的进程是否获取到了锁,或者说是否可以进入管程。但是它并不是直接被调用,持有AQS实现类的同步器会调用AQS中的pulic方法acquire进行尝试加锁操作。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

我们以官方注释中给的一个不可重入的互斥锁为例,它使用值 0 表示解锁状态,使用值 1 表示锁定状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class Mutex implements Lock, java.io.Serializable {

// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}

// AQS中的acquire方法会调用此方法
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// Provides a Condition
Condition newCondition() { return new ConditionObject(); }

// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();

// 我们在使用Mutex时,使用这个方法尝试加锁
public void lock() {
// 这个方法定义在AQS中
sync.acquire(1);
}
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

AQS为什么这么设计呢?因为AQS是支持公平锁和非公平锁实现的,通过在acquire方法中先调用子类实现的tryAcquire方法,然后再执行可能的入队逻辑。这样就可以将公平锁或者非公平锁的逻辑交给具体的子类的tryAcquire方法去实现。

加锁操作

知道了大致的流程以及子类怎么去实现它之后,我们再来看具体的加锁操作流程。我们再看回AQS中的acquire方法:

1
2
3
4
5
6
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}

如果线程尝试获取锁没有成功,那么就会先执行addWaiter方法,然后将其返回值作为入参再执行acquireQueued方法。

addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 将当前线程封装成Node,然后入队
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

先将当前线程封装成Node对象,然后先尝试将生成的Node尾插入到AQS的同步队列中,如果没有成功(同步队列未初始化或者cas没有成功),再执行完整的入队逻辑enq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

如果确实是同步队列还未初始化,那么就new一个新的Node对象,将头尾指针都指向这个Node对象。注意,这里new Node对象的方式是调用无参构造器,与上面封装线程的不一样。head 结点为虚结点(dummy header node),它只代表目前有线程获得了锁,至于获得锁的是哪个线程,其实是调用了上文的setExclusiveOwnerThread(current),即记录在exclusiveOwnerThread属性里。
初始化完同步队列,再疯狂地尝试将封装了当前线程的Node节点尾插入到AQS的同步队列中。

acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

经过前面的addWaiter方法,当前线程由于没有获得锁,已经插入到了AQS的同步队列中了。
但是光插入到同步队列中不行呀,它还需要将自己置为等待状态(Java线程状态置为Waiting),我们知道处于这种状态的线程不会被分配处理器执行时间,需要被其他线程显式唤醒。因此在进入等待状态之前,需要告诉其他线程,到了合适的时候记得叫醒我,然后才能安心的进入等待状态。
好,我们看回代码第6~12行,当前线程没有一上来就把自己置为等待状态,因为将线程从Runnable置为Waiting状态需要经历用户态向内核态的切换,而且唤醒后也要从内核态转为用户态,开销相对比较大。所以当发现当前线程的前一个节点就是head节点,也就是说极有可能下一次就轮到当前线程获得锁的时候,它又尝试了一个获取锁的操作,如果成功了,那么就通过setHead方法将封装了当前线程的Node对象设置为head节点(相当于出队,因为head节点是虚的),同时将原来的head节点从同步队列中剔除,然后直接返回了false。这个false涉及中断,一会一起说。
如果上面的努力没有成功,那么调用shouldParkAfterFailedAcquire方法,在看shouldParkAfterFailedAcquire方法之前,先看一看Node的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final class Node {

...

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

volatile int waitStatus;

// 主要用来处理取消的情况
volatile Node prev;
// 主要用来实现阻塞机制
volatile Node next;
volatile Thread thread;

...
}

可以看到,Node内部定义了一个变量waitStatus,初始值是0,其他几个可取的值直接看注释就能明白个大概。我们刚才说到,一个线程在进入waiting状态之前需要通知其他节点合适的时候唤醒自己。那么什么是其他节点,什么又是合适的时候呢?这里因为AQS的同步队列的实现是CLH队列的变体,所以AQS是通过next,也就是后继节点的链接去实现阻塞机制的。直接看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 当一个节点获取锁失败时,检查并更新waitStatus。此方法返回true代表线程应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点的ws已经被置为SIGNAL了,当前节点可以安心的park了
* 返回true
*/
return true;
if (ws > 0) {
/*
* 前去节点已经cancle了,那么向前遍历直至一个不是cancle的节点设置为前驱节点
* 返回false.
* 这一步将当前节点前面的所有连续的cancle节点都从同步队列中剔除了.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 进入此分支的waitStatus值不是0就是PROPAGATE. 需要在park之前先将前驱节点的ws置为SIGNAL
* 返回false.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

这个方法,将前驱节点的ws设置为SIGNAL,并且如果可能的话,还清理了一段已经cancle的node链。设置完SIGNAL,当前线程可以安心地进入Waiting状态了。

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

可以看到,就是调用LockSupport.park(this)park方法会将线程置为Waiting状态,所以AQS的代码注释里所说的阻塞(blocked)其实都是指Java线程的Waiting状态,而不是Blocked状态。park方法会一直阻塞线程,除非有其他线程unpark了这个线程或者interrupt了这个线程,这个方法才会返回。parkAndCheckInterrupt方法最后返回true代表park方法是由于中断才返回的,返回false代表是被其他线程唤醒了。
再看回acquireQueued方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

正常流程就是线程要么在阻塞,要么就是已经是同步队列的头一个了,尝试获取锁,循环往复。整个方法只在第11行返回,返回值代表在这个循环往复获取锁的过程中是否线程是否被要求中断过。
当时也有不正常的流程,在第17行有finally代码块,我们知道,finally中的代码是先于try中的return执行的。那么整个方法在try什么呢?try的就是执行acquireQueued的线程自己崩了的情况,要么是刚刚被unpark就崩了,要么就是竞争锁的过程中崩了。不管怎样最后就会进入cancelAcquire方法,取消正在进行的获取锁的尝试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0) {
node.prev = pred = pred.prev;
}
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}

总结起来就是,先通过prev指针向前遍历跳过所有连续的已经cancle的前驱节点,将当前节点的ws设置为CANCELLED。接下来分为3种情况:

  • 当前节点是尾节点:直接把当前节点移除就好了。
  • 当前节点不是头尾节点:将前驱节点和后继节点接上,把自己移除。
  • 当前节点是同步队列中的第一个真实节点(前驱节点是head节点):唤醒当前节点的后继节点,如果后继节点也是cancle的,那么从尾指针开始在当前节点到尾指针的区间里向前遍历直至最靠近当前节点的非cancle的节点,唤醒之。具体的unparkSuccessor逻辑可以看下面的解锁流程。

至此,整个加锁流程分析完毕了。

解锁操作

与加锁操作acquire对应,解锁操作在AQS中的public方法release实现。

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease方法释放锁操作,当完全释放锁(重入锁的情况,全部释放才算成功)成功后,自然是要看看有没有唤醒其他线程的需要,也就是第4行代码的判断逻辑。如果head节点为空,证明没有同步队列,自然不需要唤醒,如果存在同步队列但是head节点的ws是0,也就是刚初始化状态,证明同步队列中的第一个真实节点正在竞争锁的过程中,线程状态是Runnable的,自然无需唤醒。除了这两种情况,那么就需要唤醒其他线程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

unparkSuccessor方法是和上面加锁流程中处理失败中的unparkSuccessor方法是同一个。不同的是传递的入参不一样,处理失败的时候是将cancle节点自己传入,而这里是将同步队列的head节点传入。
处理流程简单来说就是从尾指针开始在整个同步队列区间内向前遍历,直至找到最靠近head节点的非cancle的节点,唤醒之。那么问题来了,为什么从尾指针开始向前遍历,直接从头指针向后遍历不是更快么?
这么做的原因是因为,prev指针是可靠的,而next指针是不可靠的。所以使用prev指针向前遍历实现唤醒流程。为什么会这样呢?其实AQS的作者在注释里有说明:

Determination of successor must avoid races with newly queued nodes to set the “next” fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated “tail” when a node’s successor appears to be null. (Or, said differently, the next-links are an optimization so that we don’t usually need a backward scan.)

意思是当一个节点通过next指针找它的后继节点时,需要避免与新入队的节点在为其前驱节点设置next属性时发生竞争。解决办法当一个节点发现它的后继节点是null的时候,后继节点不一定真的是null,而是应该通过尾指针向前遍历的方式,再做一遍确认。
之所以会发生竞争是因为双向链表的插入操作不具有原子性。在enq方法中,入队的逻辑在8~10行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

通过第8行和第9行,我们能够保证将当前节点正确的插入到队尾并且prev指针是正确的。但是在第9行和第10行中间可能有其他的线程正在执行unparkSuccessor逻辑(因为同步队列是无锁的嘛),也就是说,next指针还没有设置完,其他线程已经开始读,发现是null。所以需要通过尾指针向前遍历的方式解决这个问题。

条件队列

AQS内部通过ConditionObject内部类实现线程的协作,一个ConditionObject对象就是管程模型中的一个条件变量,自然就会对应一个条件队列。条件同步Condition的典型用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

// 等待
public void conditionWait() throws InterruptedException {
lock.lock();
try {
while(条件不满足)
condition.await();
} finally {
lock.unlock();
}
}

// 通知
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}

看一下ConditionObject的简要实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConditionObject implements Condition, java.io.Serializable {

...

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

public final void signal() {...}
public final void signalAll() {...}
public final void awaitUninterruptibly() {...}

public final void await() throws InterruptedException {...}
public final void awaitNanos(long nanosTimeout) throws InterruptedException {...}
public final void awaitUntil(Date deadline) throws InterruptedException {...}
public final void await(long time, TimeUnit unit) throws InterruptedException {...}

...

}

可以看到,总共有两组公共方法,与内置锁synchronized类比,signal相当于notifyawait相当于wait。对条件等待队列的操作是不需要上锁的,因为操作条件等待队列的线程就是持有锁的线程。

signalAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

final boolean transferForSignal(Node node) {
// 如果该节点在调用signal方法前已经被取消了,则直接跳过这个节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 如果该节点在条件队列中正常等待,则利用enq方法将该节点添加至同步队列队列的尾部
// 返回前驱节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
  1. 将条件队列清空(只是令lastWaiter = firstWaiter = null,队列中的节点和连接关系仍然还存在)
  2. 将条件队列中的头节点取出,使之成为孤立节点(nextWaiter,prev,next属性都为null)
  3. 如果该节点处于被Cancelled了的状态,则直接跳过该节点(由于是孤立节点,则会被GC回收)
  4. 如果该节点处于正常状态,则通过enq方法将它添加到同步队列的末尾
  5. 判断是否需要将该节点唤醒(包括设置该节点的前驱节点的状态为SIGNAL),如有必要,直接唤醒该节点
  6. 重复2-5,直到整个条件队列中的节点都被处理完

signal

signalAll方法不同,signal方法只会唤醒一个节点。调用signal方法会从当前条件队列中取出第一个没有被cancel的节点添加到同步队列的末尾。而signalAll方法会将条件队列中所有非cancel的节点添加到同步队列的末尾。

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final void await() throws InterruptedException {
// 如果当前线程在调动await()方法前已经被中断了,则直接抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程封装成Node添加到条件队列
Node node = addConditionWaiter();
// 释放当前线程所占用的锁,保存当前的锁状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果当前节点不在同步队列中,说明刚刚被await, 还没有人调用signal方法,则直接将当前线程挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);// 线程将在这里被挂起,停止运行

// 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了
// 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 无论是被中断唤醒还是被signal唤醒,被唤醒的线程最后都将离开条件等待队列,进入到同步队列中
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

需要注意的几点:

关于fullyRelease

fullyRelease方法一次性释放了所有的锁,即对于可重入锁而言,无论重入了几次,这里是一次性释放完的,这也就是为什么该方法的名字叫fullyRelease。但是要注意,有可能抛出IllegalMonitorStateExceptionawait调用fullyRelease,按理说持有锁的线程才能调用await,但是这种保证是由AQS实现类的try*方法实现的,因此如果程序员没有先lock,就直接调用await这种骚操作,就会到fullyRelease流程中的finally中,节点的ws就会被设置为CANCELLED。这也就是为什么上面的addConditionWaiter在添加新节点前每次都会检查尾节点是否已经被取消了。

如果从线程被唤醒,到线程获取到锁这段过程中发生过中断,该怎么处理?

中断对于当前线程只是个建议,由当前线程决定怎么对其做出处理。在acquireQueued方法中,我们对中断是不响应的,只是简单的记录抢锁过程中的中断状态,并在抢到锁后将这个中断状态返回,交于上层调用的函数处理,而这里“上层调用的函数”就是我们的await方法。那么await()方法是怎么对待这个中断的呢?这取决于:中断发生时,线程是否已经被signal过?
这部分内容过于复杂,目前我还没有消化好,有兴趣的可以看看这篇文章,分析的非常精彩!
最后上一张处理中断的逻辑图:

总结

通过这么长的分析,我们可以看出,我们使用AQS一定是通过实现try*的方式,那么在这些方法中我们需要做什么呢?

  • 通过state,定义怎样可以进入和离开管程,也就是获取锁和释放锁
  • 公平锁、非公平锁,也就是当一个线程不在同步队列中时,想要获取锁是可以直接抢占,还是要入队
  • 保证执行awaitrelease等方法的线程已经持有了锁

参考