ReentrantLock是一个标准的互斥锁的实现,它实现了Lock接口,因此可以提供多种获取锁的方式。与synchronized不同的是,它支持公平锁以及可以绑定多个条件变量。

ReentrantLock有一个抽象静态内部类Sync,它是AQS的实现类。Sync有两个实现类,NonfairSync实现非公平锁,FairSync实现公平锁。ReentrantLock只有一个属性,那就是syncReentrantLock的所有方法都是委托给sync执行。sync可能是NonfairSync对象或者FairSync对象,由构造方法决定,默认是非公平锁实现。

公平性

在公平的锁上,线程将按照它们发出请求的顺序来获取锁,但在非公平锁上,允许“插队”:当一个县城请求非公平锁时,如果在发出请求的同时该锁的状态变为可用,那么这个线程将跳过同步队列中所有的等待线程并获得这个锁。在大多数情况下,尤其是竞争激烈时,非公平锁的性能要好于公平锁。原因在于:在恢复一个被挂起的线程与该线程真正开始执行之间存在着严重的延迟。

ReentrantLock对公平性的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 实现Lock接口方法
public void lock() {
sync.lock();
}
// NonfairSync实现
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// AQS的方法
acquire(1);
}
// FairSync实现
final void lock() {
// AQS的方法
acquire(1);
}

NonfairSync实现没有直接执行AQS的加锁逻辑,而是先尝试通过CAS加锁。
除此之外,我们知道AQS的acquire方法还会调用实现类的tryAcquire方法,而我们在AQS实现原理的总结中也说过,tryAcquire方法也会实现公平性的逻辑。在NonfairSyncFairSynctryAcquire实现中,逻辑基本一致,只有一行代码不一样:

NonfairSync
1
2
3
4
5
6
7
8
9
10
11
12
final boolean nonfairTryAcquire(int acquires) {
...

if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}

...
}
FairSync
1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryAcquire(int acquires) {
...

if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}

...
}

hasQueuedPredecessors()方法返回true的情况代表有其他线程排在当前线程前面。因此非公平锁的实现就是直接尝试抢占锁,而公平锁会判断是否前面有排队的其他线程。

可重入性

ReentrantLock中,无论是公平锁还是非公平锁,都是支持重入的,因此关于重入逻辑的加锁解锁操作都是定义在Sync类中。AQS定义的tryAcquire模板方法要表达的含义是:在这个方法中通过对state值的判断决定当前线程是否抢到了锁,或者说可以进入管程。对于ReentrantLock而言,state的值为0就代表当前锁处于空闲状态,尝试获取锁自然就通过acquire(1)方法将state的值从0置为1。这一点在上面各处代码中都有体现。可重入性就是指,在当前线程已经占有锁的前提下,当前线程还可以进入管程,换句话说就是tryAcquire返回true。

FairSync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected final boolean tryAcquire(int acquires) {
...

int c = getState();
if (c == 0) {
// 锁状态为空闲
...
return true;
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

代码第10行先判断当前持有锁的线程就是自己,然后通过setState方法将state的值+1,这里直接使用setState方法而不是cas的方式是因为当前持有锁的线程就是自己,不存在数据竞争的可能。
再来看看锁的释放:

Sync
1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

释放锁的时候需要确保当前线程是持有锁的线程,这又印证了AQS实现原理的总结。然后将state-1,如果减完是0,那么就是完全释放返回true,否则false。

对显式锁的拓展性支持

ReentrantLock实现了Lock接口。因此它是支持显式锁的那些拓展方法

lockInterruptibly

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 下面方法定义在AQS中

public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

可以看到,显式锁对响应中断等拓展特性的实现都是在AQS中,所以其实这部分内容是对AQS实现原理的补充。
上面代码的流程与AQS中acquireQueued方法基本是一致的,都是当前线程在尝试获取锁失败后,自己包装成Node然后去同步队列排队等待被唤醒再获取锁的逻辑。再来看看acquireQueued方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

两者的区别主要在于方法返回值和抛出的受检查异常。在acquireQueued方法中,如果线程收到了中断信号,也是可以正常返回true的,相当于将中断标记复原,然后将对中断的处理交给了上层方法,也就是acquire的调用者。而在doAcquireInterruptibly方法中,没有返回值,如果线程是由于中断被唤醒的,直接抛出InterruptedException异常(代码第29行),然后进入到cancelAcquire方法中将当前节点从同步队列中移除了。

tryLock()

1
2
3
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

由代码可以知道,tryLock()方法不存在公平锁的可能,tryLock()都是非公平性的,并且没有入队操作,无论是否获取到锁,都是直接返回。

tryLock(long timeout, TimeUnit unit)

lockInterruptibly可中断地阻塞式获取锁一样,对超时的支持也是由AQS实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 下面方法定义在AQS中

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)// 超时
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())// 中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

只需要关注高亮部分的代码即可,其余的跟doAcquireInterruptibly的逻辑基本一样,代码还是很好读的,唯一需要注意的地方是第35行,nanosTimeout > spinForTimeoutThreshold,只有当剩余的超时时间大于spinForTimeoutThreshold1000纳秒时,才会调用parkNanos,否则时间太短,直接自旋了。

范例代码

最后,上一个ReentrantLock的典型使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class X {
private final ReentrantLock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

public void m() {
lock.lock(); // block until condition holds
try {
while (!notFull)
notFull.await();
// ... method body
notEmpty.signal();
} finally {
lock.unlock()
}
}
}