AQS解析
AQS通过FIFO的等待队列,提供了一个用来实现阻塞锁和相关的同步器(信号量、事件机制)的框架,它是很多同步器实现的底层基础,内部基于一个int原子变量来代表状态,子类必须实现protect方法来改变状态,这些方法定义怎么去获取锁和释放锁。AQS的其他方法则定义了入队以及阻塞的机制,子类可以定义其他的状态变量,但是只有通过getState,SetState以及compareAndSetState来原子更新的state变量能代表同步状态。子类一般定义为非public的内部类,AQS提供了独占模式和共享模式获取锁,当在独占模式,尝试获取其他线程占用的锁将会失败,共享模式下多线程则可能会成功。不同模式下的等待线程公用同一个FIFO队列,通常,子类只提供其中一种模式的实现,ReadWriteLock除外。
该类的内部定义了一个嵌套的ConditionObject类用来作为Condition的实现,用来支撑独占模式的实现
AQS类的继承关系:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
AbstractOwnableSynchronizer:同步器可能被一个线程独占,因此抽象出该类保存独占的线程。主要属性和方法:
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
Node节点
AQS最主要的是其中的节点类Node,用来表示阻塞的等待队列中的节点(下面这几段话翻译自java doc,有点拗口)
等待队列是CLH锁队列的变种,CLH锁一般用来作为自旋锁,这里用来进行阻塞同步,但是使用的理论都是持有前置节点的状态来进行同步等待。每个节点有一个"status"属性,用来表示线程是否block,当前置节点释放锁,当前节点会被通知。队列里面的每个节点就像持有单个等待线程的monitor一样。这个status属性不会控制线程是否获取到锁。当一个线程在队头的时候会尝试获取锁,但是排在第一位并不能保证能够获取成功,它只是表示有权利去竞争。所以释放锁的线程再去获取锁时可能需要等待。
当入CLH锁队列时,会通过原子操作将入队节点排在队尾,出队的时候,只需要设置头结点的状态(置为空)
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
插入到CLH队列中,只需要对“尾巴”执行一次原子操作,因此存在一个简单的原子分界点,即从未排队到排队。同样,出队仅涉及更新“头”。但是,节点需要花费更多的精力来确定其后继者是谁,因为要处理由于超时和中断而可能导致的取消。前置链接需要处理取消的情况,如果一个节点已经取消了,它的后继节点将会重新链接到新的非取消的前置节点。同时这里又使用next链接来实现阻塞机制,每个节点的线程ID保存在自己的节点中,所以当一个前置节点唤醒后面的节点,通过遍历next链接决定哪个线程被唤醒。
确定后继者必须避免与新排队的节点竞争,竞争是通过设置他们前置节点的next属性产生。这里的解决办法是,如果需要,通过向后遍历检查队列中的节点的后继节点为null(换句话说,next链接是一种优化,能够避免向后扫描)
由于我们必须轮询其他节点的取消情况,因此我们可能没有注意到已取消的节点是在我们前面还是后面。要解决此问题,必须在取消时,总是unpark后继节点,使他们能够稳定在新的前置节点身上,除非我们可以确定未取消的前置节点。
在条件上等待的线程使用同样的节点,但是使用额外的链接。
关于CLH锁:博客1,博客2,一个CLH锁的实现,方便我们理解AQS队列:
public class ClhSpinLock {
// 前置节点
private final ThreadLocal<Node> pred;
// 当前情况节点
private final ThreadLocal<Node> node;
// 尾部节点
private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
public ClhSpinLock() {
// 这两个节点是线程内部变量,tail是多线程共享变量
this.node = ThreadLocal.withInitial(() -> new Node());
this.pred = ThreadLocal.withInitial(() -> null);
}
public void lock() {
// 当前持有锁的节点
final Node node = this.node.get();
// 表示当前准备持有锁
node.locked = true;
// 加锁都会直接设置tail节点,并获取之前的tail节点
// 有竞争时,只有获取到锁的才会执行完lock,其他都会在while中等待
// 对于同一个线程都是获取同一个node,对于不同线程会获取之前的tail,从而形成一个链
Node pred = this.tail.getAndSet(node);
// 每次后来的接单会覆盖前置节点,形成一个链
this.pred.set(pred);
while (pred.locked) {}
}
public void unlock() {
// 获取当前线程的锁节点
final Node node = this.node.get();
// 释放锁,上面的while会退出从而获取到锁
node.locked = false;
// 当前节点设置为前置节点,开始时,pred相当于一个虚拟的头结点
this.node.set(this.pred.get());
}
private static class Node {
private volatile boolean locked;
}
}
AQS中的等待队列Node代码如下:
static final class Node {
/** 表示节点在共享模式下等待 */
static final Node SHARED = new Node();
/** 表示节点在独占模式下等待 */
static final Node EXCLUSIVE = null;
/** waitStatus的值,表示取消 */
static final int CANCELLED = 1;
/** waitStatus的值,表示后继节点需要唤醒 */
static final int SIGNAL = -1;
/** waitStatus的值,表示线程正在条件等待 */
static final int CONDITION = -2;
/**
* waitStatus的值,表示下一个获取共享锁需要无条件传播
*/
static final int PROPAGATE = -3;
/**
* 非负的数值表示节点不需要唤醒,在同步节点中,该值初始化为0
* Status field, taking on only the values:
* SIGNAL: 当前节点获取锁后,后继节点将会被被阻塞,因此当当前节点释放锁或者取消的时候,当前节点需要唤醒后继节点,
为了避免竞争,acquire方法必须一开要说明他们需要signal,然后进行原子地acquire,然后失败的时候阻塞
The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: 当前节点超时或者被中断,节点进入到这个状态后不会再改变
This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: 当前节点处于等待条件队列中,它知道状态发生改变,变成0时,才会作为一个同步队列节点,使用这个值和该属性的其他使用无关,只是为了简化机制
This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: 一个释放的共享状态需要传播到其他节点,只针对头节点设置,并在doReleaseShared设置,来保证传播的继续,即使其他操作已经介入
A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above 其他
* 非负值表示节点不需要唤醒操作,所以大部分代码不需要检查特定值,直接赋值即可
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
* 对于一般的同步节点来说初始值是0,对于条件等待节点来说是CONDITION,该值的修改使用CAS
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
* 前驱节点:入队的时候赋值,出队的时候赋值为null
前驱节点的链接,在入队的时候赋值,在出队的时候赋值为null。如果一个前驱节点取消,会通过短路操作找到一个非取消的节点,
该节点一定存在,因为头节点永远不会取消。当且仅当获取锁成功,一个节点才会变成头结点。一个取消的节点永远不会成功的获取锁,
并且一个线程只会取消自己,不会取消其他节点
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
后继节点的链接,入队的时候赋值,当绕过取消节点的时候会调整,当出队的时候会赋值为null。直到关联后,入队操作才会赋值前置节点的next属性,所以当看到next属性是null,并不代表它是队列的最后一个节点。然后,如果next属性是null,我们可以从尾部向后扫描prev节点进行double-check。取消节点的next属性设置成该节点自己,而不是null,这是为了方便isOnSyncQueue方法
*/
volatile Node next;
/** 当前入队节点的线程
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
下一个在条件上等待的节点,或者是特殊节点SHARED。因为条件队列仅在独占模式下才会访问,所以我们只需要一个简单的队列来存放等待条件的节点。
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
子类实现需要重写的方法
继承AQS的实现类需要重写这几个方法:
//尝试在独占模式中获取锁,这个方法调用之前需要查询当前对象的状态是否在独占模式下允许获取,如果获取失败,该方法可能会将当前线程入队
protected boolean tryAcquire(int arg)
//在独占模式下,线程释放锁,该方法会尝试设置同步器的状态
protected boolean tryRelease(int arg)
//尝试在共享模式下获取锁
protected int tryAcquireShared(int arg)
//尝试在共享模式下释放锁
protected boolean tryReleaseShared(int arg)
AQS其他字段属性:
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
等待队列的头结点,采用延迟初始化,如果头存在,waitStatus保证不能是取消状态
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
* 尾结点
*/
private transient volatile Node tail;
/**
* 同步状态,为0表示没加锁
*/
private volatile int state;
// 为了提高性能直接使用本地方法CAS修改相关字段的值,所以需要Unsafe获取到每个属性的内存偏移
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
acquire方法
AQS的主要方法:
acquire 获取允许
release释放允许
下面讲解下acquire 方法,配合ReentrantLock中的实现讲解,参考博客
//获取信号量的过程,tryAcquire逻辑由子类实现,如果获取失败,进入队列,在队列中获取,如果中断了 则中断
public final void acquire(int arg) { // Method 1
/**如果tryAcquire成功。不会走到后面的acquireQueued,
如果tryAcquire失败,会走到acquireQueued,表示在队里中去等待获取锁*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**这里看下ReentrantLock中的公平锁的tryAcquire的实现*/
protected final boolean tryAcquire(int acquires) { // Method 2
final Thread current = Thread.currentThread();
// 调用的AQS代码,获取当前状态
int c = getState();
// 如果当前状态是0,
if (c == 0) {
// hasQueuedPredecessors代码在下面,看下队列中是否还有前置节点
// 如果没有前置节点,就去获取信号量,获取的数量是acquires
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 设置当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 这里表示有竞争,但是重入的情况还是可以获取锁的
else if (current == getExclusiveOwnerThread()) {
// 如果是重入的情况,直接设置信号量
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 表示有竞争,或者有前置节点,需要等待
return false;
}
/**
* 查询当前是否有其他的线程比当前线程等待的更久,当前方法的逻辑类似于下面的代码(效率比JDK的代码低):
*
* getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()
*
* 注意,由于中断或者超时产生的取消可能发生在任何时候,所以返回true不能保证有其他线程会比当前线程更先获取到锁,同样,当方法返回false(判断了队列为空),也可能出现另一个线程在方法调用之后赢得竞争入队(也就是该方法结果不可信)
* 这个方法主要被公平锁用来避免竞争,当这个方法返回true(非重入情况),公平锁的tryAcquire方法应该直接返回false,以及tryAcquireShared返回一个负值,下面是一个代码案例:
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* // 重入,增加计数
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }
*/
public final boolean hasQueuedPredecessors() { // Method 3
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
// 结果的正确性依赖头节点比尾节点更早初始化,以及如果当前线程是队列中的第一个,head.next是准确的
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 头不等于尾,头等于尾表示队列为空,在初始化的时候
// 头的下一个节点为null或者当前线程不能等于头节点的线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// 假设tryAcquire获取失败,也就是有竞争的情况,那后进入的线程肯定要等待,等待需要进入等待队列,需要调用addWaiter方法
// 创建当前线程节点并入队列,返回当前线程节点
private Node addWaiter(Node mode) { // Method 4
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 拿到尾节点,尝试在尾部加节点
Node pred = tail;
if (pred != null) {
// 当前节点的prev链接指向pred
node.prev = pred;
//CAS设置尾结点,设置成功返回true,设置失败表示入队有竞争
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果设置失败,获取的尾结点已经不是尾结点
// 或者尾节点为null,还没有初始化,再尝试入队
enq(node);
// 返回新入队的节点
return node;
}
// 循环入队
private Node enq(final Node node) {
//循环操作
for (;;) {
Node t = tail;
//如果尾结点为null 表示没有初始化,开始初始化头结点和尾结点为同一节点
//这里使用了延迟初始化,初始化时,头节点等于尾节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 当前节点的pre节点指向尾节点
// 注意此处的设置链接和下面的设置next链接不是原子操作
// 因此在找取消节点的是,是从后向前遍历,因为next指针可能没连上
node.prev = t;
// 设置新的尾结点
if (compareAndSetTail(t, node)) {
t.next = node;
// 返回旧的尾节点
return t;
}
}
}
}
// 现在节点已经入队了
// node是新创建的入队的节点
// 该方法是节点在队列中唤醒然后获取锁这一逻辑的实现
final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//循环入队
for (;;) {
final AbstractQueuedSynchronizer.Node p = node.predecessor();
// 如果前置节点是头节点,再获取资源
if (p == head && tryAcquire(arg)) {
// 设置新的头结点,相当于获取到锁后取代新的头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 是否需要park,如果需要则park
// 如果不需要直接进入下一轮循环
if (shouldParkAfterFailedAcquire(p, node) &&
// 用来阻塞等待(park),醒来后返回是否中断过,如果中断过返回true
// 然后设置interrupted为true
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// pred是前置节点,node是当前节点
// 判断是否需要park,如果不park 继续循环
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
// 获取当前节点状态
int ws = pred.waitStatus;
if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* 如果节点的等待状态时SIGNAL,表示已经在请求SIGNAL,所以可以安全的等待
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 表示前置节点取消了,跳过前置节点
*/
do {
// pred = pred.prev 前置节点设置成前置节点的前置节点
// node.prev = pred 当前节点的前置节点变成前前置节点
// 也就是跳过前置节点,一直这样向后遍历跳过取消的节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* 表示等待状态是 0 或者PROPAGATE 表明需要唤醒,但还没有park,调用方需要重试来保证在park之前不会获取到锁
* 设置前置节点为SIGNAL,表示要唤醒后继节点
* 正常的情况会走这里,表示想要获取信号量,就要把其前置节点的waiteStatus修改
* 当前线程对应的节点进行入队至队尾(挂起之前),那么其前驱节点的状态就必须为SIGNAL,以便后者取消或释放时将当前节点唤醒
*/
compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL);
}
//如果不是第一种情况,比如第三种,设置完前置节点状态后 又进入acquireQueued继续循环
return false;
}
//判断线程是否中断过
private final boolean parkAndCheckInterrupt() {
//阻塞线程
LockSupport.park(this);
return Thread.interrupted();
}
// 如果acquireQueued的过程中被中断,从而导致失败,会取消acquire
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 线程置为空
node.thread = null;
// 重置pre节点,跳过取消的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 获取pre的next节点,用来做CAS重新设置next节点
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 当前节点状态设置为取消
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 如果当前节点是尾部节点,尾节点设置成前置节点,next值置为空
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
// 如果前置节点是SIGNAL表示要唤醒后继节点
// 并且如果状态不是取消,也尝试设置SIGNAL标记,唤醒后面的节点(自己能唤醒说明自己的前置状态是SIGNAL,自己既然取消了,SIGNAL状态要让现在的pre继承)
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 前置节点的next指针指向当前节点的next指针,跳过当前节点
compareAndSetNext(pred, predNext, next);
} else {
// pred是头节点,尝试唤醒后面的节点
unparkSuccessor(node);
}
// 指向自己,方便GC,当前节点消亡
node.next = node; // help GC
}
}
release方法
释放的主要方法:
//独占模式下释放信号
public final boolean release(int arg) {
//如果释放锁成功
if (tryRelease(arg)) {
// 判断当前头节点
AbstractQueuedSynchronizer.Node h = head;
// 0 表示不是初始化状态
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
/**看下ReentrantLock的Sync锁的实现:
* release返回true表名解锁成功
*/
protected final boolean tryRelease(int releases) {
// 减去当前的信号量
int c = getState() - releases;
// 如果当前线程不是获取锁的线程,不能释放
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果信号量是0
free = true;
setExclusiveOwnerThread(null);
}
// 设置信号量
setState(c);
return free;
}
// 当tryRelease成功后,需要唤醒后继节点,这里的node是头结点
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
* 如果状态是是负数(可能是需要SIGNAL),预期需要SINGAL,因此尝试清除状态值,
* 如果清楚失败或者状态已经被改变都不会影响正确性
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* upark下一个节点,通常情况及时下一个节点,但是可能存在取消或者明显的就是null情况,从尾部向后遍历去找一个非取消的节点
* 从tail往前找一个没有取消的节点
* 为什么从尾部找:是因为enq的时候不是原子操作,在cas设置尾部节点的时候,指向前面的连接已经创建好,但是指向后面的连接没有
*/
AbstractQueuedSynchronizer.Node s = node.next;
// 拿到头节点的下一个节点,如果是取消状态或者下一个节点是null
// 从尾部向后遍历找一个非取消的节点唤醒
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部向后遍历,如果t不等于当前节点
for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
// 非取消状态,s更新为当前遍历的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
acquireShared
/** tryAcquireShared:在共享模式下获取锁*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// CountDownLatch的实现
protected int tryAcquireShared(int acquires) {
// 如果获取的状态是0,表示没有被加锁,返回1
return (getState() == 0) ? 1 : -1;
}
// tryAcquireShared失败,进入队列
private void doAcquireShared(int arg) {
// 添加shared节点,注意,这个Node.SHARED是一个静态变量,主要方便isShared方法判断节点类型
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 拿到前置节点
final Node p = node.predecessor();
if (p == head) {
// 如果前置节点是头节点,说明该尝试获取锁了
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果大于等于0表示获取成功
// 设置新的头 并传播状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 设置队列的头结点,检查是否后继节点在共享模式下等待,如果是,当propagate>0 或者PROPAGATE状态已经设置,则进行传播
// 为什么acquire中会调用doReleaseShared,原因在于共享锁是可以支持多个线程获取的,所以要通知其他线程来获取锁
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 获取到资源后,设置新的头
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 当在下列的情况下尝试去唤醒下一个节点:
// 调用者需要传播 propagate>0 或者 传播已经有记录了
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 看下一个节点是否是共享的,如果是共享的则传播
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 保证release是传播的,即使当前有其他正在acquire/release的调用。
// 如果head的后继节点需要SIGNAL,尝试unpark head的后继节点,如果不需要SIGNAL,status设置为PROPAGATE,来保证传播继续.另外,必须要循环处理,避免在我们执行的时候有其他节点添加到队列中,和其他的unparkSuccessor使用不同,我们需要知道CAS是否重置status失败,如果失败重新检查
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 说明有后继节点唤醒
// 如果是SIGANL,表示要唤醒后续节点,设置成0,然后唤醒后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 失败说明有其他节点竞争
continue; // loop to recheck cases
// unpark后继节点,其他节点会获取资源,然后设置头节点,此时头结点会变化
unparkSuccessor(h);
}
// 有新的head产生,新head的ws初始值为0
// 如果compareAndSetWaitStatus失败,表示有竞争,继续执行
// 设置head为PROPAGATE,表示传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点没有发生变化,表示没有后续的节点获取到资源,直接退出
// 其他共享节点获取资源,会设置头为Shared状态,这样一直唤醒下去
if (h == head) // loop if head changed
break;
}
}
releaseShared
releaseShared实际就是调用的上面的doReleaseShared
Condition使用
synchronized和notify/wait 组合是和lock和condition的await以及signal 组合等价。await在AQS的一个内部类中实现:
条件队列和等待队列是两个不同的队列,但是使用的都是Node类,条件队列结束等待会进入到等待队列中去
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列的第一个节点. */
private transient Node firstWaiter;
/** 最后一个节点. */
private transient Node lastWaiter;
public ConditionObject() { }
// Internal methods
// 添加一个新的条件节点
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果尾节点是取消的状态,则清理
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 重新构造等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 开始的情况
firstWaiter = node;
else
// 构造链接
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 取消链接waiter
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 拿到下一个节点
Node next = t.nextWaiter;
// 判断是否取消
if (t.waitStatus != Node.CONDITION) {
// 切断链接
t.nextWaiter = null;
if (trail == null)
// 头节点是取消的,头节点直接等于下一个节点
firstWaiter = next;
else
// 重新构造链接
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
// 上一次遍历的节点
trail = t;
// t遍历到下一个
t = next;
}
}
}
await方法
//AQS方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加到condition队列
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//第一次总是返回false,因为上面已经释放锁了
while (!isOnSyncQueue(node)) {
// 如果没有在同步队列,阻塞
LockSupport.park(this);
// 如果中断退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//醒来之后的操作,会尝试拿锁,同时设置标志位
// acquireQueued进入队列阻塞等待被唤醒
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清楚cancel节点
unlinkCancelledWaiters();
if (interruptMode != 0)
//根据interruptMode要不抛异常后者自我中断
reportInterruptAfterWait(interruptMode);
}
//释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 首先获取当前线程所在的AQS队列的状态值
int savedState = getState();
// 尝试去释放信号量
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 释放资源
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果释放成功,唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//判断当前线程是否在同步队列,一开始是在条件队列
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
// 第一次走这个逻辑
return false;
if (node.next != null) // If has successor, it must be on queue
// 如果有next链接,肯定是在等待队列上
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// node.prev是非空的,但是可能还没有加入队列中个,因为CAS操作可能失败
// 所以从尾部向头遍历的时候需要确定它已经加入到队列中
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* 从尾向后查找同步队列,查看节点是否存在
*/
private boolean findNodeFromTail(AbstractQueuedSynchronizer.Node node) {
AbstractQueuedSynchronizer.Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
//检查是否中断,如果中断了,转移到同步等待队列
private int checkInterruptWhileWaiting(Node node) {
// 如果中断则转换节点,如果转换节点成功,抛出异常,失败则自我中断
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 为什么取消要进入同步队列:取消只是不再等待,可以再去获取锁
final boolean transferAfterCancelledWait(Node node) {
// 设置WS为0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 入同步队列
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 循环等待入队列,知道入队成功
// 走到这说明有竞争力 已经在入队了
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
unlinkCancelledWaiters方法的示意图
signal
// 移除等待最长的线程,从条件队列移到同步队列中去
public final void signal() {
// 如果获取锁的线程不是当前线程,抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 向前遍历,如果没有取消,则唤醒
// 如果遇到取消节点,则继续找下一个节点处理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 设置节点状态,如果不能设置,表示节点已经取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 入队,返回前一个节点
Node p = enq(node);
int ws = p.waitStatus;
// ws>0 表示前置节点取消了 或者设置状态失败,即节点发生变化
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒当前节点,重新同步状态
LockSupport.unpark(node.thread);
return true;
}
ReentrantLock
前面在讲解AQS的时候已经介绍了ReentrantLock公平锁的代码,下面完整介绍下ReentrantLock
以ReenterantLock为例,该类中定义几个同步器:
Sync是基类,是公平锁和非公平锁实现的基础
tryAcquire由于公平锁和非公平锁实现不一样 因此在Sync子类实现
下图是非公平锁NonfairSync的调用逻辑,公平锁类似
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
允许在非公平锁情况下实现快速lock
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
* 尝试获取信号量,也就是设置state为非0数值
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//获取
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程已经获取了,增加获取的量
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//表示没获取到,已经被其他人获取了
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//只有当前显示是获取信号量的线程才能释放
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//判断当前线程是否独占
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
//state为0表示没加锁
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
//state的数值表示获取锁的数量
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
非公平锁的实现:
static final class NonfairSync extends ReentrantLock.Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 尝试获取锁,将state从0更新成1,如果失败直接使用acquire
* 获取锁的主要逻辑acquire是AQS中的方法,其中又会调用tryAcquire
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//AQS子类关联的实现方法,可以看到调用的是上面的nonfairTryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
公平锁的实现:
static final class FairSync extends ReentrantLock.Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果是锁的重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//查询是否有线程比当前线程等待的时间更长
//由于并发的实时性 该方法返回的结果不一定是正确的
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
AbstractQueuedSynchronizer.Node t = tail; // Read fields in reverse initialization order
AbstractQueuedSynchronizer.Node h = head;
AbstractQueuedSynchronizer.Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}