# 锁原理
作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
# 一、LockSupport
该工具类主要作用是挂起和唤醒线程, 是创建锁和其它同步类的基础。LockSupport类与每个使用它的线程都会关联一个许可证, 默认情况下调用LockSupport类的方法的线程是不持有许可证的。
# 基础API
void park()如果调用该方法的线程已经拿到了关联的许可证, 则会立马返回, 否则调用线程会被禁止参与线程的调度, 即阻塞挂起。
@Test public void testPark() { System.out.println("begin park!"); // 让当前线程先获取和LockSupport之间的凭证 LockSupport.park(); System.out.println("end park!"); }1
2
3
4
5
6
7案例解析:
只会打印begin park!, 因为默认情况下调用线程是不持有许可证的
void unpark(Thread thread)一个线程调用unpark, 如果参数thread线程没有持有thread与LockSuport类关联的许可证, 则让thread线程持有, 如果thread因之前调用park被挂起, 则调用完unpark后会被唤醒
@Test public void testUnPark() { System.out.println("begin park!"); // 让当前线程先获取和LockSupport之间的凭证 LockSupport.unpark(Thread.currentThread()); LockSupport.park(); System.out.println("end park!"); }1
2
3
4
5
6
7
8案例解析:
都会打印, 在调用park前先调用unpark获取和LockSupport之间的关联
void parkNanos(long nanos)与park方法类型, 如果调用线程没有获取关联的许可证, 会被挂起nanos时间后修改为自动返回; 如果拿到了许可证会马上返回。
# 二、抽象同步队列 AQS
AQS = 用一个 state + 一个 FIFO 同步队列 来管理“拿不到资源就排队、阻塞,释放资源就按序唤醒”的框架。锁/信号量/闭锁只是对 state 的不同语义封装。
# 核心数据结构
# 核心字段
volatile int state:同步状态(含义由子类定义)。ReentrantLock:state表示可重入锁的次数。CountDownLatch:state表示倒计时的计数值。Semaphore:state表示可用的许可数量。ReentrantReadWriteLock:高 16 位是读锁数量,低 16 位是写锁可重入次数。
Node head, tail:同步队列(CLH 变体)的哑元头/尾指针。Node(队列节点)包含:volatile int waitStatus:节点状态CANCELLED = 1:节点取消排队SIGNAL = -1:前驱释放时需要唤醒我(最关键的状态)CONDITION = -2:在条件队列里(await后)PROPAGATE = -3:共享模式下传播用0:无特殊状态
volatile Node prev, next:队列前驱/后继volatile Thread thread:该节点对应的线程Node nextWaiter:条件队列的单向链指针(只用于 Condition)
同步队列是双向链,条件队列是单向链。
# 两种模式
AQS 支持两种模式来管理同步状态的访问:
- 独占模式(Exclusive):
- 同一时刻只能有一个线程持有资源。
- 典型方法:
void acquire(int arg)void acquireInterruptibly(int arg)boolean release(int arg)
- 共享模式(Shared):
- 多个线程可以同时访问资源。
- 典型方法:
void acquireShared(int arg)void acquireSharedInterruptibly(int arg)boolean releaseShared(int arg)
# 获取与释放流程
# 获取锁
acquire(int arg)public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }1
2
3
4
5tryAcquire(arg):尝试直接获取锁,子类去实现(比如ReentrantLock会实现公平 / 非公平获取)。- 成功:直接返回,不需要排队。
- 失败:进入 CLH队列(FIFO等待队列)。
addWaiter(Node.EXCLUSIVE):构造一个独占模式的节点,插入到等待队列尾部。acquireQueued(node, arg):阻塞地排队等待获取锁,直到成功。- 如果等待过程中被中断过,返回
true。
- 如果等待过程中被中断过,返回
selfInterrupt():如果获取锁时被中断过,就在当前线程上重新设置中断标志位。
tryAcquire(int arg)protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }1
2
3- 默认实现是抛异常,必须由子类重写(比如
ReentrantLock.Sync.tryAcquire)。 - 这是 模板方法模式:AQS 提供框架,子类实现锁的具体获取逻辑。
- 默认实现是抛异常,必须由子类重写(比如
acquireQueued(Node node, int arg)final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20这是整个 排队等待获取锁的核心逻辑:
- 自旋循环:
- 不断检查当前节点的前驱
p是否是head。 - 如果
p == head,说明轮到你了 → 再次尝试获取锁tryAcquire(arg)。 - 成功:设置自己为新的
head,出队,返回。
- 不断检查当前节点的前驱
- 未轮到你时:
shouldParkAfterFailedAcquire(p, node):检查前驱是否在等待中,如果需要就把自己标记为要阻塞。parkAndCheckInterrupt():真正调用LockSupport.park()挂起线程,直到被唤醒。
- 异常处理:
- 如果在等待过程中抛出异常,执行
cancelAcquire(node)把节点状态设为取消,清理队列。 - 如果期间被中断过,最后补一次
selfInterrupt(),让上层感知到中断。
- 如果在等待过程中抛出异常,执行
- 自旋循环:
# 释放锁
release(int arg)public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }1
2
3
4
5
6
7
8
9tryRelease(arg):尝试释放锁,子类去实现(比如ReentrantLock.Sync.tryRelease)。- 成功:返回
true。 - 失败:返回
false,表示当前线程还没有完全释放(可重入锁会减计数)。
- 成功:返回
h = head:取出队列头节点(它是一个哨兵节点,不代表实际线程)。unparkSuccessor(h):唤醒队列中等待的下一个有效节点(即队列里的第一个等待线程)。- 返回值:
true:释放成功。false:释放失败。
tryRelease(int arg)protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }1
2
3- 默认实现是抛异常,必须由子类重写。
- 典型场景:
ReentrantLock中会在state减到0时返回true,表示锁完全释放。
unparkSuccessor(Node node)private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15- 清理状态:
- 如果当前节点状态是
SIGNAL,CAS 改成0,表示不再需要等待。
- 如果当前节点状态是
- 找到下一个需要唤醒的节点:
- 优先尝试
node.next(正常情况队列有序)。 - 如果
next为null或者取消(waitStatus > 0),就从队列尾往前找第一个有效节点。
- 优先尝试
LockSupport.unpark(s.thread):- 精确唤醒找到的线程,让它从
park中恢复。
- 精确唤醒找到的线程,让它从
- 清理状态:
# 整体流程
- 尝试获取锁
- 调用
tryAcquire(arg)(子类实现,比如ReentrantLock),如果成功,当前线程直接进入临界区执行任务。
- 调用
- 获取失败 → 入队排队
- 用当前线程构造一个 独占模式节点,通过 CAS 方式放入 CLH 队列的尾部。
- 如果 CAS 竞争失败,会在
enq方法里短暂自旋,直到成功入队。
- 队列自旋等待
- 当前节点不断检查前驱是否是
head:- 如果是,就再次尝试获取锁(成功后自己成为新
head,进入临界区)。 - 如果不是,就调用
LockSupport.park()挂起自己,等待被唤醒。
- 如果是,就再次尝试获取锁(成功后自己成为新
- 当前节点不断检查前驱是否是
- 持有锁的线程释放
- 执行完任务后,调用
release(arg),内部通过tryRelease(arg)(子类实现)减少锁的占用计数。 - 当计数归零,说明锁完全释放。
- 执行完任务后,调用
- 唤醒下一个节点
- AQS 会调用
unparkSuccessor(head),找到队列中下一个有效节点,对应线程会被LockSupport.unpark()唤醒。
- AQS 会调用
- 被唤醒的线程继续竞争
- 被唤醒的线程从
park返回,重新进入acquireQueued的循环。 - 如果前驱是
head,再次调用tryAcquire(arg)获取锁。 - 成功后将自己设置为新
head,正式进入临界区执行任务。
- 被唤醒的线程从
线程 T1 调用 acquire()
└─ tryAcquire 成功 → T1 直接获得锁,进入临界区
线程 T2 调用 acquire()
└─ tryAcquire 失败
└─ addWaiter:T2 被包装成 Node,CAS 入队到 CLH 队列尾部
└─ acquireQueued:T2 自旋检查前驱是否是 head
└─ 否 → LockSupport.park() 阻塞 T2
└─ 等待被唤醒...
线程 T1 执行完任务,调用 release()
└─ tryRelease 成功 → state = 0(锁完全释放)
└─ unparkSuccessor(head):唤醒队列的第一个有效节点(即 T2)
线程 T2 被唤醒,从 park 返回
└─ 继续 acquireQueued 自旋
└─ 前驱 = head,尝试 tryAcquire
└─ 成功 → T2 成为新的 head,进入临界区
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 共享模式的不同点
以信号量/读锁为例
tryAcquireShared(arg)返回 剩余许可数(负数表示获取失败)。doAcquireShared逻辑与独占类似,但成功后会调用setHeadAndPropagate,根据剩余许可继续向后传播(让更多等待者醒来争取共享资源)。- 节点标记用
Node.SHARED,waitStatus中的PROPAGATE用于连续唤醒。
# Condition
两个队列各司其职
- 同步队列:竞争锁的排队区(挂
prev/next)。 - 条件队列:等待“条件满足”的停车区(挂
nextWaiter)。
# await() 流程
await():
1) 断言:当前线程必须持有锁
2) 把当前线程包装为 Node(CONDITION),入“条件队列”
3) 完全释放当前锁(把 state 归还干净)
4) 阻塞:反复 park,直到节点被转移到同步队列
5) 一旦被转移(signal 触发),就去同步队列里“重新竞争锁”
6) 重新拿到锁后返回(同时处理/恢复中断语义)
2
3
4
5
6
7
要点:
- await 会释放锁,否则别人无法让条件变真。
- 在条件队列时不参与获取锁,只有被
signal转移回同步队列后,才再次参与锁竞争。
# signal() 流程
signal():
1) 断言:当前线程必须持有锁
2) 从条件队列头取一个节点
3) 把它转移到“同步队列”(transferForSignal)
4) 被转移的线程随后会从 park 醒来,开始在同步队列里排队并竞争锁
2
3
4
5
# 和 wait/notify 的区别
await/signal绑定在Lock上;wait/notify绑定在对象监视器上(synchronized)。signal不能“存信号”(若先signal后await,信号会丢),这点与条件变量的语义一致。park/unpark能存“1 张票”;signal/await不能存票(所以要用条件判断 + while 循环防丢信号/虚假唤醒)。
正确用法(经典范式):
lock.lock();
try {
while (!条件成立) {
condition.await(); // 释放锁并等待
}
// 条件满足后的逻辑
} finally {
lock.unlock();
}
2
3
4
5
6
7
8
9
# 常见疑问
- 为什么要把前驱设为
SIGNAL再睡? 避免“释放者释放了,但没人负责叫醒我”的竞态;SIGNAL是一个明确的唤醒契约。 - 为什么会“自旋 + park”交替?
因为可能刚好临界条件变化、或虚假唤醒、或被中断,需要在再次
tryAcquire前判断是否安全入睡。 - 为什么要哑元
head? 简化并发入队/出队逻辑(真正队首是head.next),head自身不代表真实线程。 - 可见性靠什么保证?
state、head/tail、prev/next等是volatile,搭配 CAS 与park/unpark的happens-before 规则,保证内存可见性与顺序。
# 基于 AQS 实现自定义同步器
实现了一个自定义同步工具, 并用此实现 生产者-消费者模型
public class _05NonReentrantLock implements Lock, Serializable {
private static class Sync extends AbstractQueuedSynchronizer {
// 判断锁是否被持有
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 如果state=0, 尝试获取锁
public boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0, 1)) {
// 记录当前独占锁持有的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
public boolean tryRelease(int release) {
assert release == 1;
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 提供条件变量接口
Condition newCondition() {
return new ConditionObject();
}
}
// 创建一个Sync来做具体的工作
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
// 用自定义的同步锁 实现消费者 生产者模型
final static _05NonReentrantLock lock = new _05NonReentrantLock();
// 用于生产者等待“队列未满”
final static Condition notFull = lock.newCondition();
// 用于消费者等待“队列非空”
final static Condition notEmpty = lock.newCondition();
final static Queue<String> queue = new LinkedBlockingQueue<>();
final static int queueSize = 10;
@Test
public void testConsumeProduct() {
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
// 获取独占锁
lock.lock();
try {
// 如果队列满了 则等待消费者消费
while (queue.size() == queueSize) {
notFull.await();
}
// 未满增加元素
queue.add("elem");
// 唤醒消费线程
notFull.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
});
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
// 获取独占锁
lock.lock();
try {
// 队列为空, 停止消费
while (queue.size() == 0) {
notEmpty.await();
}
// 消费一个
String elem = queue.poll();
// 唤醒生产线程
notEmpty.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
});
producer.start();
consumer.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# 三、独占锁 ReentrantLock
ReentrantLock 是一种支持可重入的独占锁(即同时只允许一个线程获取锁),底层基于 AQS(AbstractQueuedSynchronizer) 实现。
- 可重入性:同一个线程多次获取锁不会被阻塞,锁的状态通过 AQS 的
state变量表示。 - 独占性:同一时刻只允许一个线程持有该锁,其余线程进入 AQS 队列中等待。
- 公平/非公平:通过构造函数决定锁的获取策略,公平锁遵循 FIFO,非公平锁则可能插队。
# 基本结构
ReentrantLock 的实现结构:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
// 默认构造:非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 可指定是否为公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
static final class NonfairSync extends Sync { ... }
static final class FairSync extends Sync { ... }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Sync:继承自 AQS 的抽象内部类,封装了获取/释放锁的核心逻辑。
NonfairSync / FairSync:Sync 的两个实现类,分别定义非公平锁和公平锁的获取策略。
# 加锁原理
# 非公平锁获取流程(默认)
private final ReentrantLock lock = new ReentrantLock(false);
@Test
public void testLock() {
lock.lock();
}
2
3
4
5
6
# 调用链解析
// ReentrantLock#lock
public void lock() {
sync.acquire(1); // 委托给 AQS 实现
}
// AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
// 先尝试获取锁, 获取成功则当前线程执行任务
// 获取失败, 说明锁已被占有, 将当前线程加入AQS阻塞队列, 通过park让当前线程阻塞
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// park在遇到中断时不会处理异常, 但是会消耗掉原有的标志位, 所以对原有的中断线程再次设置中断标志
selfInterrupt();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
如果 tryAcquire 成功,表示立即获取锁成功;
否则加入阻塞队列 addWaiter(),并通过 acquireQueued() 自旋等待锁可用。
tryAcquire 由具体子类实现
# 非公平锁的 tryAcquire 实现
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 当前无锁,尝试CAS抢占锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入逻辑
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 公平锁的 tryAcquire 实现
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平锁需先判断前驱是否有等待线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入逻辑
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
公平锁在获取前会检查等待队列是否有“前驱节点”;
非公平锁直接 CAS 抢占,可能“插队”。
# 释放锁原理
@Test
public void testUnlock() {
lock.unlock(); // 委托给 AQS 的 release 方法
}
2
3
4
// ReentrantLock#unlock
public void unlock() {
sync.release(1);
}
// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒队列中下一个等待线程
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Sync 中 tryRelease 的实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 当前线程必须是锁的持有者
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); // 清除持有者
}
setState(c); // 减少重入次数
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 四、共享锁ReentrantReadWriteLock
# 基本结构
ReentrantLock 是独占锁, 某一个时刻只能有一个线程获取锁, 在读的场景下, 应该使用共享锁, ReentrantLock 并不满足, ReentrantReadWriteLock 常用来读写分离。
读写锁的内部维护了一个 ReadLock 和一个 WriteLock,它们依赖 Sync 实现具体功能。而 Sync 继承自 AQS,并且也提供了公平和非公平的实现。 之前提过, state用来维护状态, 在读写锁中将state的高16位表示读状态, 表示获取读锁的次数; 使用低16位表示获取到写锁的线程可重入次数。
static final int SHARED_SHIFT = 16;
// 共享锁(读锁)状态
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 共享锁线程最大数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 排它锁(写锁)掩码
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 返回读锁线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回写锁可重入数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
2
3
4
5
6
7
8
9
10
11
读写锁需要记录每一个线程获取读锁的次数, 但是在大多数场景中只有一个线程在获取读锁, 为了对性能进行优化读写锁增加了 firstReader 用来记录第一个获取到读锁的线程
// 记录第一个获取锁的线程
private transient Thread firstReader;
// 记录第一个获取锁的线程的重入数
private transient int firstReaderHoldCount;
2
3
4
读锁是可以共享的, 但是每个线程获取锁都是可重入的, ReentrantReadWriteLock 将可重入次数存储在ThreadLocalHoldCounter中(除第一个获取读锁的线程), ThreadLocalHoldCounter继承了ThreadLocal, 相当于每个线程存储自己的可重入数在ThreadLocalMap中, 在获取时会消耗性能, 为了优化用cachedHoldCounter记录上一次线程的锁可重入数, 如果是同一个线程的情况下直接走缓存获取可重入数即可
private transient ThreadLocalHoldCounter readHolds;
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient HoldCounter cachedHoldCounter;
2
3
4
5
6
7
8
9
10
11
# 写锁的获取与释放
# 写锁的获取
在 ReentrantReadWriteLock 中写锁使用 WriteLock 来实现, 仅以lock()为例
private final ReentrantReadWriteLock fairLock = new ReentrantReadWriteLock(true);
@Test
public void testReentrantReadWriteLock() {
ReentrantReadWriteLock.WriteLock writeLock = fairLock.writeLock();
writeLock.lock();
writeLock.unlock();
}
2
3
4
5
6
7
跟着上述代码追下源码发现, 本质上也是调用AQS的acquire方法, 然后由自身实现tryAcquire方法完成锁的获取
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
5
6
7
8
9
分析ReentrantReadWriteLock.tryAcquire
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 这个状态高16位是读锁 低16位是写锁
int c = getState();
// 从低16位解析出写锁的持有数量
int w = exclusiveCount(c);
// c != 0, 说明写锁或读锁已经被某线程获取
if (c != 0) {
// 写独占, 写锁为0, 说明当前是读锁不支持获取锁
// 当前锁被其它线程占有, 不支持获取锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 可重入检查: 当前线程持有写锁的可重入次数不能超过最大值
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 增加锁的可重入次数(锁已经被线程持有 没有竞争关系 不需要 CAS)
setState(c + acquires);
return true;
}
// 公平锁或非公平锁是否应该阻塞当前线程(公平锁要求排队)
// CAS 设置独占锁状态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 写锁的释放
尝试释放锁, 如果该线程持有锁状态值会-1, 状态值为0时释放锁; 如果该线程未持有锁直接调用会抛出异常, 具体看源码
private final ReentrantReadWriteLock fairLock = new ReentrantReadWriteLock(true);
ReentrantReadWriteLock.WriteLock writeLock = fairLock.writeLock();
writeLock.unlock();
2
3
public void unlock() {
// 调用 AQS 释放锁
sync.release(1);
}
public final boolean release(int arg) {
// 真正的释放锁由子类实现
if (tryRelease(arg)) {
// 激活阻塞队列的一个线程
Node h = head;
if (h != null && h.waitStatus != 0)
// 释放锁成功后, 从AQS的阻塞队列中unpark一个线程
unparkSuccessor(h);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 当前线程未持有锁 抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 可重入数 - 释放, 判断线程是否还持有锁
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 锁持有的线程标识 置空
setExclusiveOwnerThread(null);
// 更新状态值
setState(nextc);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 读锁的获取与释放
# 读锁的获取
还是以lock unlock为例
ReentrantReadWriteLock.ReadLock readLock = fairLock.readLock();
readLock.lock();
public void lock() {
// 调用AQS的acquireShared
sync.acquireShared(1);
}
2
3
4
5
6
7
看下AQS的acquireShared
public final void acquireShared(int arg) {
// tryAcquireShared 由子类自身实现
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
2
3
4
5
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 判断 写锁是否被占用, 如果写锁是当前线程也可以读
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取高16位 即获取读锁的次数
int r = sharedCount(c);
// 尝试获取锁,多个读线程只有一个会成功,不成功的进入fullTryAcquireShared进行重试
// readerShouldBlock 用来做非公平锁的实现
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 优化点: 第一个线程获取读锁 用firstReader记录
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果是首个线程 直接通过firstReaderHoldCount来记录读锁被占有的次数
firstReaderHoldCount++;
} else {
// 其它线程获及可重入数的缓存
HoldCounter rh = cachedHoldCounter;
// 当前线程未被缓存
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
// 从ThreadLocal中获取当前线程的计数器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 当前线程之前释放过锁,重置进去
readHolds.set(rh);
// 当前线程的读锁获取次数 +1
rh.count++;
}
return 1;
}
// 通过自旋让其它线程尝试获取锁
return fullTryAcquireShared(current);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 读锁的释放
ReentrantReadWriteLock.ReadLock readLock = fairLock.readLock();
readLock.unlock();
public void unlock() {
sync.releaseShared(1);
}
2
3
4
5
6
这里实际上调用的是 AQS 的 releaseShared
public final boolean releaseShared(int arg) {
// 同理 该方法由子类实现
if (tryReleaseShared(arg)) {
// 在读锁释放成功后, 释放一个由于获取写锁的被阻塞的线程
doReleaseShared();
return true;
}
return false;
}
2
3
4
5
6
7
8
9
@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果是首个获取读锁的线程 直接通过 firstReader 进行可重入锁的释放
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 其它线程从 cachedHoldCounter 中获取
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
// 如果可重入次数<=1 释放后不需要再缓存该线程 移除即可
// ThreadLocal<HoldCounter> readHolds;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 虽然该HoldCounter从readHolds中移除, 但仍被当前线程引用, 有可能被复用 所以仍需要--
--rh.count;
}
// 自旋尝试变更状态值, 读锁是共享锁 只有当 state == 0,表示“最后一个读锁释放”
// 其它情况这边需要空转等待防止执行了 doReleaseShared() 唤醒了写线程
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
← 随机数、写时复制列表与原子操作解析 线程池 →