ReentrantLock 源码解析

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

public class App {
public static void main(String[] args) {

test1();
}

public static void test1() {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
lock.lock();
System.out.println("线程 " + Thread.currentThread().getId() + " 获得锁");
try {
Thread.sleep(50000000);
} catch (InterruptedException e) {
System.out.println("线程 " + Thread.currentThread().getId() + " 释放锁");
} finally {
lock.unlock();
}
});
t1.start();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}


Thread t2 = new Thread(()->{
lock.lock();
System.out.println("线程 " + Thread.currentThread().getId() + " 获得锁");
lock.unlock();
});
t2.start();
Scanner scanner=new Scanner(System.in);
scanner.nextLine();
}

public static void test2(){

}
}

代码执行顺序

1
2
//1. 入口
sync.lock();
1
2
3
4
5
6
7
//2. cas 更新 state 值,从 0 变成 1
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
1
2
3
4
5
//3. 使用 cas 方式修改 state 的值,如果是第一个线程执行,返回 true .代表已经获得了锁 
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
1
2
3
4
5
6
7
8
9
//4. 设置锁的持有者为当前线程
//这里是返回 true .
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());

//设置当前线程为锁的持有者的 set 方法
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

此处线程 1 进入执行状态,线程 2 开始尝试获得锁:

1
2
3
4
5
6
7
//5. 第二个线程进入 lock 方法
//这里返回 false,所以会走到 acquire 方法
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);

1
2
3
4
5
6
//6. 第二个线程尝试获取锁,执行 tryAcquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//7. 最终执行 nonfairTryAcquire  
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//8. 分析锁获取的方式
//8.1 获得当前线程
//8.2 当前的线程的状态 赋值给 c,此时 c=1
//8.3 if(c==0) //这个是false,所以走了 else
//8.4 判断当前线程是否锁的持有者,这里是 false
//8.5 直接返回的false
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//9. tryAcquire 获得锁为 false , 在 6 的方法出就进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//解析 addWaiter(Node.EXCLUSIVE) 调用过程
//9.1 创建 Node ,参数是当前线程和 mode
//9.2 获得 tail (队列的尾结点)
//9.3 如果尾结点不为空,会进入 if 逻辑,此处先不讨论
//9.4 此时 tail 为空,直接进入 enq 方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//10. 
//10.1 开启一个循环
//10.2 定义 t=tail,此时 tail 为 null ,所以 t 只也为空,进入 if 逻辑
//10.3 compareAndSetHead(new Node()) 使用 cas 方式把 head 开辟一个新空间
//10.4 tail 指针也指向 head
//10.5 执行下一次循环
//10.6 此时 tail 有了新开辟的空间,所以会进入 else 逻辑
//10.7 node.pre 指向 t,也是尾结点
//10.8 把 node 的引用,通过 cas 的方式赋值给 tail
//10.9 t.next 指向 node
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

具体运行的流程图如下:

image-20201027235109269

Node 的数据结构

02

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//11. 
//11.1 开始一个死循环
//11.2 获得当前的节点之前的节点
//11.3 如果当前节点是 head 节点 且已经获取到了锁,执行if 逻辑
//11.4 此处并没有获得到锁,执行 shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 12. 
// 第一次循环
// 12.1 定义 ws waitStatus 此处 waitStastus=0
// 12.2 执行 else 的逻辑,把 pre.waitStatus 的值修改为 Node.SIGNAL(-1)
// 12.3 返回 false
// 第二次循环
// ws = pred.waitStatus 等于 Node.SIGNAL 返回 true
// 执行 parkAndCheckInterrupt 方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1
2
3
4
5
//13. park 当前执行的线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
1
2
3
4
5
//14. 执行 unlock 方法
public void unlock() {
sync.release(1);
}

1
2
3
4
5
6
7
8
9
10
//15. 执行 tryRelease 方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 16. 
// 16.1 定义 c = state - release ,当前的状态 1 减去 release(也是 1) ,此时 c=0
// 16.2 判断当前线程是否等于当前持有锁的线程
// 16.2 如果不是 ,则直接抛出异常,说明加锁和解锁的操作只能是同一个线程
// 16.3 定义一个 free 变量
// 16.4 如果 c==0 ,则执行 if 逻辑
// 16.5 setExclusiveOwnerThread(null) 代表释放当前锁
// 16.6 setState(c),把 state 值设置为 0, 下次线程可以获得锁。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
1
2
3
4
5
6
7
8
9
10
11
//17. 回到 15,tryRelease()返回的值是 true
//17.1 定义 h 变量指向 head
//17.2 如果 h!=null 且 h.waitStatus!=0 执行 unparkSuccessor(h)
//17.3 从前面的分析,可以知道 此时的 head.waitStatus=-1
//17.4 执行 unparkSuccessor(h);
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//18. 释放锁
//18.1 判断 node.waitStatus 的值,此时值是 -1
//18.2 获得当前节点的下一个节点
//18.3 判断当前的节点的值是否为 null 和 s.waitStatus>0 此处条件不满足,s.waitStatus=0
//18.4 如果 s !=null 唤醒节点的线程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

手写 ReentrantLock伪代码

通过上面分析,我们可以得到 RReentrantLock 的整个流程,具体简化流程如下:

02

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
volatile int state=0;
Queue parkQueue;

void lock(){
while(!compareAndSet(0,1)){
//
park();
}
//do something
unclock();
}
void unlock(){
lock_notify();
}

void park(){
parkQueue.add(currentThread);
releaseCPU();
}

void lock_notify(){
Thread t=parkQueue.head();
unpark(t);
}