ReentrantLock
类关系
graph BT
Sync--extend-->AbstractQueuedSynchronizer
FairSync--extend-->Sync
NoFairSync--extend-->Sync
ReentrantLock--implement-->Lock
ReentrantLock-.内部类.->A["Sync/FairSync/NoFairSync"]
AbstractQueuedSynchronizer
/**
* Queries whether any threads are waiting to acquire. Note that
* because cancellations due to interrupts and timeouts may occur
* at any time, a {@code true} return does not guarantee that any
* other thread will ever acquire.
*
* @return {@code true} if there may be other threads waiting to acquire
*/
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
if (p.status >= 0)
return true;
return false;
}
ReentrantLock
final void lock() {
if (!initialTryLock())
acquire(1);
}
...
/**
* Returns a {@link Condition} instance for use with this
* {@link Lock} instance.
*
* <p>The returned {@link Condition} instance supports the same
* usages as do the {@link Object} monitor methods ({@link
* Object#wait() wait}, {@link Object#notify notify}, and {@link
* Object#notifyAll notifyAll}) when used with the built-in
* monitor lock.
*
* <ul>
*
* <li>If this lock is not held when any of the {@link Condition}
* {@linkplain Condition#await() waiting} or {@linkplain
* Condition#signal signalling} methods are called, then an {@link
* IllegalMonitorStateException} is thrown.
*
* <li>When the condition {@linkplain Condition#await() waiting}
* methods are called the lock is released and, before they
* return, the lock is reacquired and the lock hold count restored
* to what it was when the method was called.
*
* <li>If a thread is {@linkplain Thread#interrupt interrupted}
* while waiting then the wait will terminate, an {@link
* InterruptedException} will be thrown, and the thread's
* interrupted status will be cleared.
*
* <li>Waiting threads are signalled in FIFO order.
*
* <li>The ordering of lock reacquisition for threads returning
* from waiting methods is the same as for threads initially
* acquiring the lock, which is in the default case not specified,
* but for <em>fair</em> locks favors those threads that have been
* waiting the longest.
*
* </ul>
*
* @return the Condition object
*/
public Condition newCondition() {
return sync.newCondition();
}
Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
NoFairSync
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 直接尝试CAS
if (compareAndSetState(0, 1)) { // first attempt is unguarded
// 设置当前线程为锁持有线程
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
// 重入
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
FairSync
/**
* Acquires only if reentrant or queue is empty.
*/
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果c==0,即没有上锁
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
// 如果没有queuedThreads,且CAS成功
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
// 重入
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
ConditionObject
/**
* Implements interruptible condition wait.
* <ol>
* <li>If current thread is interrupted, throw InterruptedException.
* <li>Save lock state returned by {@link #getState}.
* <li>Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li>Block until signalled or interrupted.
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 构建条件节点
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false, rejected = false;
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
条件队列的流程图
graph TD
Start(["开始:线程准备等待条件"])
Start --> A_await --> B_checkLock
subgraph AwaitFlow [等待流程]
A_await["调用: ConditionObject.await()系列方法"]
B_checkLock{"是否持有关联锁(ReentrantLock)?"}
Err["抛出异常: IllegalMonitorStateException"]
C_add["ConditionObject.addConditionWaiter()将线程加入条件队列尾部"]
D_release["ReentrantLock.fullyRelease(savedState)\n释放锁的重入计数并保存状态"]
E_park["阻塞: LockSupport.park(this)\n线程挂起等待唤醒/中断/超时"]
F_wakeup{"被唤醒的原因"}
B_checkLock -- 是 --> C_add --> D_release --> E_park --> F_wakeup
B_checkLock -- 否 --> Err
end
subgraph SignalFlow [唤醒与转移流程]
S_call["另一线程调用: ConditionObject.signal() 或 signalAll()\n(必须持锁)"]
G_transfer["ConditionObject.transferForSignal(node)\n将条件节点转为同步队列等待"]
SA_loop["signalAll(): 循环调用 transferForSignal 转移所有节点"]
S_call --> G_transfer
SA_loop --> G_transfer
end
subgraph SyncQueue [同步队列入队与竞争锁]
H_enq["AbstractQueuedSynchronizer.enq(node)\n将节点加入同步队列尾部"]
I_acquire["AbstractQueuedSynchronizer.acquireQueued(node, savedState)\n竞争重新获取锁,直至成功"]
J_unpark["LockSupport.unpark(thread) 或 同步队列竞争使线程恢复\nawait 方法返回"]
H_enq --> I_acquire --> J_unpark
end
subgraph CancelCleanup [取消与清理]
Cancel["ConditionObject.unlinkCancelledWaiters()\n移除已取消的条件队列节点,防止泄漏"]
Cancel --> H_enq
end
subgraph AfterWake [唤醒后行为]
K_recheck["重新检查条件谓词(通常在循环中)\n如果不满足则重新进入等待"]
End(["结束:线程返回并继续执行"])
J_unpark --> K_recheck --> End
end
%% 连接唤醒分支到转移流程或取消流程
F_wakeup -- 被 signal/signalAll 转移 --> G_transfer
F_wakeup -- 被中断 --> Cancel
F_wakeup -- 超时 --> Cancel
%% 转移后的入队与竞争
G_transfer --> H_enq
%% 其他连接
Err --> End
对比
graph TD
%% 总入口
Start(["开始:线程尝试执行临界段或等待条件"])
%% 共享子图:获取锁流程
subgraph LockAcquire ["获取锁(任一实现)"]
LA_lock["调用: 获取锁\n- ReentrantLock.lock() -> 抽象队列同步器.acquire\n- synchronized -> 尝试进入对象监视器"]
LA_fail["无法获得锁 -> 同步队列入队并阻塞"]
LA_enq["AbstractQueuedSynchronizer.enq(node)\n把线程加入同步队列尾部(仅 ReentrantLock 实现)"]
LA_park["阻塞: LockSupport.park() 或 等待对象监视器"]
LA_acquireReturn["获得锁后返回,进入临界区"]
LA_lock --无法获得--> LA_enq
LA_enq --> LA_park
LA_lock --获得--> LA_acquireReturn
LA_park --> LA_acquireReturn
end
%% Condition 等待流程
subgraph ConditionAwait ["使用 Condition 的等待流程"]
C1["线程调用: ConditionObject.await()"]
C2["ConditionObject.addConditionWaiter() -> 条件队列尾部\n节点标记为 CONDITION"]
C3["ReentrantLock.fullyRelease(savedState)\n释放锁的重入计数"]
C4["阻塞: LockSupport.park()(线程挂起)"]
C1 --> C2 --> C3 --> C4
end
%% Condition 唤醒流程(精准唤醒)
subgraph ConditionSignal ["Condition 的唤醒/转移流程(精准)"]
S_call["另一线程持锁调用: ConditionObject.signal() 或 signalAll()"]
Transfer["ConditionObject.transferForSignal(node)\n从条件队列移除特定节点,准备转入同步队列"]
Enq["AbstractQueuedSynchronizer.enq(node)\n把该节点加入同步队列尾部"]
MaybeUnpark["如果适当,调用 LockSupport.unpark(thread) 或等候 unlock 时由 release 唤醒队首"]
S_call --> Transfer --> Enq --> MaybeUnpark
end
%% synchronized wait/notifyAll 比较流程(粗暴唤醒)
subgraph MonitorWaitNotify ["使用 synchronized + wait/notify/notifyAll 的情况"]
M_wait["线程调用: object.wait()\n加入该对象的单一等待集并释放对象监视器,线程阻塞"]
M_notifyAll["另一线程持监视器调用: object.notifyAll()\n将等待集和中全部线程变为可运行(全部唤醒)"]
M_notify["另一线程持监视器调用: object.notify()\n任意选择一个等待线程唤醒(选择不可控)"]
M_wait -->|notifyAll| M_notifyAll
M_wait -->|notify| M_notify
end
%% notifyAll 后的结果(性能问题)
subgraph NotifyAllConsequence ["notifyAll 导致的后果(虚拟群起)"]
NA_awake["所有等待线程被唤醒并尝试重新获得监视器/锁"]
NA_compete["大量线程瞬间竞争锁 -> 许多上下文切换与自旋/排队开销"]
NA_wasted["不相关线程被唤醒导致无效重试(性能浪费)"]
M_notifyAll --> NA_awake --> NA_compete --> NA_wasted
end
%% Condition 的优势点(指向图中)
subgraph ConditionAdvantage ["Condition 的具体优势"]
Adv1["多个 Condition 支持多个独立等待集合\n可以按需要精确唤醒某一集合"]
Adv2["signal() 仅转移队首等待线程,避免全体唤醒"]
Adv3["FIFO 风格的条件队列减少不必要竞争"]
Adv1 --> Adv2 --> Adv3
end
%% 合并控制流
Start --> LA_lock
LA_acquireReturn -->|调用 await 时| ConditionAwait
ConditionAwait -->|等待中| LA_park
%% 当其他线程做出改变后,Condition 唤醒路径
LA_acquireReturn -->|修改状态并想唤醒等待者| ConditionSignal
%% 对比:若使用监视器等待
LA_acquireReturn -->|"使用 object.wait()/notifyAll()"| MonitorWaitNotify
%% 唤醒后回到获取锁的竞争
MaybeUnpark --> LA_enq
M_notify --> LA_enq
NA_awake --> LA_enq
%% 最终被唤醒线程重新竞争并继续
LA_enq --> LA_park --> LA_acquireReturn
%% 标注性能差异
Adv1 --- ConditionSignal
Adv2 --- MaybeUnpark
NA_wasted --- MonitorWaitNotify
%% 结束
LA_acquireReturn --> End(["结束:线程获得锁并继续执行临界区/后续逻辑"])
graph TD
subgraph "线程t1"
t1lock["t1: lock.lock()"]-->runT1["运行生产者"]
runT1-->full满了?
full--否-->runT1
full--是-->notFullAwait["notFull.await()"]
runT1-.->notEmptySignal["唤醒notEmpty等待队列中的一线程"]
notFullAwait-->addConditionNotFullWaiter[加入当前线程到notFull的等待队列中]
addConditionNotFullWaiter-->fullyT1Release["完全释放锁(savedState)"]
fullyT1Release-->parkT1["挂起T1线程"]
parkT1-.-unparkT1[被唤醒]
unparkT1-.->runT1
end
subgraph "线程t2"
t2lock["t2: lock.lock()"]-->runT2["运行消费者"]
runT2-->empty空了?
empty--否-->runT2
empty--是-->notEmptyAwait["notEmpty.await()"]
notEmptyAwait-->addConditionNotEmptyWaiter[加入当前线程到notEmpty的等待队列中]
addConditionNotEmptyWaiter-->fullyT2Release["完全释放锁(savedState)"]
fullyT2Release-->parkT2["挂起T2线程"]
runT2-.->notFullSignal["唤醒notFull等待队列中的一线程"]
parkT2-.-unparkT2[被唤醒]
unparkT2-.->runT2
end
notEmptySignal-.->unparkT2
notFullSignal-.->unparkT1
- t1: lock.lock() -> 执行 -> 如果满 -> notFull.await()
- await: addConditionWaiter() -> fullyRelease(savedState) -> LockSupport.park()
- t2: lock.lock() -> 获取锁(因为 t1 已释放) -> take() -> notFull.signal()
- signal: transferForSignal(node) -> AbstractQueuedSynchronizer.enq(node)(转入同步队列)
- t2: lock.unlock() -> AQS.release -> unparkSuccessor(head) -> LockSupport.unpark(t1)
- t1: park 返回 -> acquireQueued(node, savedState) -> 重新获得锁 -> await 返回 -> 继续执行 put()