在并发编程中,ReentrantLock作为一个非常重要同步组件,基层是基于AQS同步器构建,我们一起以ReentrantLock源码,分析AQS工作原理。
AQS(AbstractQueuedSynchronizer)使用一个int成员变量state表示同步状态(state为0,表示当前资源没有被占用,state>0,表示当前资源已经被占用),结合内置的一个同步队列(FIFO)完成资源获取线程的等待排队工作,如果当前线程没有获取到锁,则将线程封装成一个Node节点,加入到同步队列,等待唤醒,通过自旋的方式尝试获得锁。
Node节点组成
waitStatus:等待状态
Node prev:前驱节点
Node next:后继节点
Thread thread:获取同步状态的线程
Node nextWaiter:等待队列中的后继节点
Node节点状态:waitStatus
INITIAL=0:初始状态
SIGNAL=-1:后继节点的线程处于等待状态,当前节点如果释放了同步状态,将通知后继节点
CONDITION=-2:节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal方法后,该节点将从等待队列转移到同步队列中,加入到对同步状态的获取中
PROPAGATE=-3:表示下一次共享式同步状态获取将会无条件的被传播下去
CANNELED=1:在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消等待,节点进入该状态将不会变化
1. 加锁
首先从ReentrantLock的lock方法开始。
public void lock() {
sync.lock();
}
可以发现ReentrantLock实际调用的sync对象的lock方法,sync对象是ReentrantLock内部的一个静态内部类。
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
}
sync类有两个是实现:FairSync、NonfairSync,分别对应公平锁和非公平锁。 先从非公平锁的角度分析源码,下文用sync代替非公平锁NonfairSync。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
sync类的lock方法先尝试通过CAS修改state的状态位1,如果修改成功,表示当前线程获取到锁。否则调用acquire方法尝试加锁。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire方法内部再次尝试通过tryAcquire获取锁。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
从这个方法可以看出,如果当前线程已经拥有锁,则将状态+acquires(获取锁的次数,需要释放相同的次数,可重入)直接返回加锁成功。如果没有获取到锁,则返回false。
回到acquire方法,假设tryAcquire没有加锁成功,继续跟踪addWaiter,通过方法名称可以猜出来,这个方法是将当前线程封装成一个Node。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先将当前线程封装成一个Node对象,如果当前队列是空的情况下,head和tail默认是都null,if (pred != null) 条件不成立,继续跟踪enq方法。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一次循环,t为null,if (t == null) 条件成立,通过CAS设置head节点为一个空的Node节点(哨兵节点),然后让tail和head同时指向哨兵节点。
哨兵节点表示当前已经获得锁,正在执行业务逻辑的线程
第二次循环,t不为null(t指向哨兵节点),node(当前线程封装的对象)的前驱指针指向t,即第一次循环创建的哨兵节点,通过CAS让tail指向node节点,然后将哨兵节点的后继指针指向node节点,结束循环。
通过自旋的方式初始化了哨兵节点,并且将当前线程Node节点添加到同步队列中,假设此时有多个线程抢占锁,最后都会进入同步队列等待。
继续跟踪acquireQueued方法,首先获取当前线程Node节点的前驱节点p,判断前驱节点p是否是头节点。通过上面的分析可知,p是头结点,然后通过通过tryAcquire方法尝试获取锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
头结点当前等待状态是初始状态,所以进入else代码块,设置头结点状态为SIGNAL(-1),继续进行下一次循环。 假设下一次循环依旧没有加锁成功,再次进入shouldParkAfterFailedAcquire方法,当前头结点状态已经变成SIGNAL,返回true,继续调用parkAndCheckInterrupt方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
可以看出,当前线程被阻塞,进入等待状态。
至此已经完成了线程加锁失败,进入同步队列排队等待源码分析,下面继续看等待线程如何被唤醒。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2. 释放锁
排队线程的唤醒必须要由其他线程释放锁触发,所以我们继续跟踪ReentrantLock的unlock方法。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock通过Sync的release方法释放锁,release调用了tryRelease方法,跟踪tryRelease方法源码。
判断当前线程是否是锁的独占线程,如果不是,抛出异常。所以我们使用ReentrantLock的时候,必须要加锁成功,才能执行unlock方法。 如果state-releases的结果等于0,说明已经释放锁成功,清空当前锁的独占线程,重置状态为0。 这里必须要注意,我们加锁几次,就要释放几次。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
继续回到Sync.release方法,释放成功之后,如果头节点h不等于空,并且头结点h的等待状态不是0,调用unparkSuccessor方法(通过上面的分析可知,头结点的状态是-1,表示头结点的后继节点处于等待状态)。
头结点h的状态等于SIGNAL(-1),CAS设置头结点状态为0,节点s(指向线程A对应的Node节点)不等于空,并且节点s的状态为初始状态(0),不会进入if条件,通过LockSupport.unpark唤醒线程A。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
如果线程A等待超时或者被中断的情况
如果节点s的状态大于0(即线程A等待超时或者被中断),执行for循环,此时tail指向线程B对应的Node节点。
至此,s指向Node B,继续往下执行,通过LockSupport.unpark唤醒线程B。
页面更新:2024-05-19
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号