JUC(13) AbstractQueuedSynchronizer
2024-02-12 23:16:32 # Language # Java

1. AQS

1.1 AQS理论知识

抽象的队列同步器

  • 是用来实现锁或者其他同步器组件的公共基础部分的抽象实现
  • 是重量级基础框架及整个JUC体系的基石, 主要用于解决锁分配给”谁“的问题。
  • 通过内置的CLH(FIFO)队列的变种来完成资源获取线程的排队工作, 将每条将要去抢占资源的线程封装成一个Node节点来实现锁的分配, 有一个int类变量表示持有锁的状态(private volatile int state), 通过CAS完成对state值的修改(0表示空闲, 大于等于1表示占有)
    • CLH: Craig、Landin and Hagersten 队列, 是一个单向链表, AQS中的队列是CLH变体的虚拟双向队列FIFO
package java.util.concurrent.locks;

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

public abstract class AbstractQueuedLongSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {

CLH 队列

1.2 AQS是JUC的基石

image-20240205151132153

  • 锁:面向锁的使用者, 定义了使用层API, 隐藏了实现细节
  • 同步器:面向锁的实现者, DougLee提出了统一规范并简化了锁的实现, 将其抽象出来, 屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等, 是一切锁和同步组件实现的公共基础部分

1.3 AQS内部体系架构

img

Node

static final class Node {
/** Marker to indicate a node is waiting in shared mode 共享模式*/
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode 独占模式*/
static final Node EXCLUSIVE = null;

/** 线程被取消 */
static final int CANCELLED = 1;
/** 后续线程需要唤醒 */
static final int SIGNAL = -1;
/** 等待condition唤醒 */
static final int CONDITION = -2;
/** 共享式同步状态获取将会无条件的传播下去 */
static final int PROPAGATE = -3;

/** 初始为0, 状态是上述几种 */
volatile int waitStatus;
// 前置节点
volatile Node prev;
// 后继节点
volatile Node next;

volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

// @return the predecessor of this node
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() {} // Used to establish initial head or SHARED marker

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

img

2. 源码分析

ReentrantLock 为例, Lock接口的实现类, 基本都是通过聚合了一个队列同步器的子类完成线程访问控制的

img

2.1 构造函数

public ReentrantLock() {
sync = new NonfairSync(); // 非公平锁
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

2.2 lock() & unlock()

public void lock() {
sync.lock();
}

public void unlock() {
sync.release(1);
}

公平锁

final void lock() {
acquire(1);
}

非公平锁

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

2.3 acquire()

// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// AQS中直接抛出异常, 并没有实现, 子类要重写该方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

img

tryAcquire() & hasQueuedPredecessors()

公平锁

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // 这是与非公平锁唯一的区别
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

非公平锁

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 体现了可重入性
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

对比公平锁和非公平锁的 tryAcquire() 方法, 其实差异就在于非公平锁获取锁时比公平锁中少了一个判断!hasQueuedPredecessors()

hasQueuedPredecessors() 是公平锁加锁时判断等待队列中是否存在有效节点的方法

// 如果当前线程前面有一个排队的线程, 则返回 true
// 如果当前线程位于队列的顶部或队列为空, 则返回 false
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

addWaiter() & enq()

如果 tryAcquire 失败, 则会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 入队
return node;
}
/**
* Inserts node into queue, initializing if necessary.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) { // 自旋
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 头节点是虚节点
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

双向链表中, 第一个节点为虚节点(也叫哨兵节点), 其实并不存储任何信息, 只是占位。真正的第一个有数据的节点, 是从第二个节点开始的

enq中不断通过自旋执行入队

acquireQueued()

/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 是否中断, 例如系统出问题了
for (;;) {
final Node p = node.predecessor(); // 获得前置节点
if (p == head && tryAcquire(arg)) { //如果执行到这里, 说明前面持有资源的线程已经unpark了
setHead(node); // 将node内容清空, 变为虚节点
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获得前置节点的状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 这里会将前置节点的status从初始化的0变为-1, 用于后续唤醒操作
}
return false;
}

parkAndCheckInterrupt()

/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 线程挂起, 程序不会继续向下执行
return Thread.interrupted(); // 在调用unpark方法后, 会继续执行
}

cancelAcquire()

/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

2.4 release()

public final boolean release(int arg) {
if (tryRelease(arg)) { // 释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒头结点的后置节点
return true;
}
return false;
}

tryRelease()

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); // 将持有资源的线程设置为null
}
setState(c);
return free;
}

unparkSuccessor()

private void unparkSuccessor(Node node) { // node是头节点
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 哨兵节点状态置0

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 为线程发放许可
}

此时唤醒节点B(队列中的第一个节点), 之前线程B在 parkAndCheckInterrupt() 被挂起

返回return Thread.interrupted();, 因为线程B从未中断过, 所以返回false。

  • interrupted()执行后具有清除状态标志值并设置为false的功能

acquireQueued() 因为返回 false, 所以继续循环, 获取B节点前驱节点p, p就是头结点, 然后执行 tryAcquire() 尝试获取锁, 因为刚才已经把锁释放了, 所以state等于0, tryAcquire() 能够获取锁成功。