Java 并发编程核心在于 juc 包,而其中大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是 AQS。
AQS 定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(volatile int state)的同步器。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.
AQS 特性:
- 阻塞等待队列
- 共享、独占
- 公平、非公平
- 可重入
- 允许中断

一般都是通过定义内部类 Sync 继承 AQS
将同步器所有调用都映射到 Sync 对应的方法
AQS 内部维护属性 volatile int state 表示资源的可用状态
state 三种访问方式
getState、setState、compareAndSetState
AQS 定义两种资源共享方式
- Exclusive,独占,只有一个线程能执行,如 ReentrantLock
- Share,共享,多个线程可以同时执行,入 Semaphore、CountDownLatch
AQS 定义两种队列
- 同步等待队列
- 条件等待队列
AQS,重点
自旋、加锁(CAS)、LockSupport、队列
下面通过 ReentrantLock 看下 AQS 的实现。
ReentrantLock
ReentrantLock 内部 定义一个内部类 Sync,Sync 继承自 AQS,Sync 还有两个子类,FairSync、NonfairSync 区别公平锁与非公平锁。
new ReentrantLock(true) 为公平锁
构造函数:
// 默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁,按照线程过来的顺序依次去获取锁,FIFO
非公平锁,可以插队
加锁过程
假设现在 ReentrantLock 是 FairSync 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// new ReentrantLock().lock() 会调用到这里
final void lock() {
acquire(1);
}
// acquire 在 AQS 的代码
// public final void acquire(int arg) {
// if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
//}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 先获取 state 的值
int c = getState();
// 这一步是 第一次加锁
// 如果 state 的值为 0,并且队列中没有等待的线程,并且 CAS 设置 state 值为1成功,执行 setExclusiveOwnerThread ,设置当前线程为 current,表示当前线程获得了锁
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 这一步是为了重入锁
// 如果 state 不是0,并且 AQS 中的当前执行线程就是当前线程,
// state + 1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// false 就是没有获得到锁,没有获取锁,会在执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这个逻辑
return false;
}
}
当调用 ReentrantLock 的 lock 方法时会调用到 FairSync 类的 lock 方法。
acquire(1) 这个方法主要是加锁,然后设置 state 为 1,会先调用本类实现的 tryAcquire 方法 尝试去加锁,这个方法里面分为了第一次为当前线程加锁,和当前线程的重入锁两种情况,
如果是 state 为 0 ,hasQueuedPredecessors 方法,判断队列是否为空,或者当前线程是否是队列里第一个线程,
如果队列为空,然后 CAS 比较并交换 state 的值为 1,这两步都成功了,则当前线程获取了锁,设置 AQS 内部的当前线程变量为 current。
如果 state 不是 0,判断 AQS 中的内部线程变量与 current 是否相等,是不是同一个线程来获取锁,是同一个线程,state + 1。可重入锁
如果当前线程没有获取锁,会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 操作,将线程先添加队列,然后阻塞当前线程
Node.EXCLUSIVE 表示独占锁,addWaiter 这一步就是往队列里添加当前线程,添加到队列的尾部,Node 里有对当前线程的引用
private Node addWaiter(Node mode) {
// 新建 Node,绑定当前线程 与 独占模式
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// tail 为null,表示队列为空 ??tail 初始值是不是 null
// 不为 null,表示队列不为空,将 node 添加到队尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 自旋,保证当前node 添加成功
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋 与 CAS 保证入队的线程 是一个一个入队的,原子性、同步
// 并且双向队列指针指向正确,此处也有并发情况,如果不是同步进行的有可能队列乱掉
for (;;) {
Node t = tail;
// 队列为空,表明是第一个线程进来,创建一个头结点,CAS 确保并发情况下只有一个线程能创建成功
// 设置头结点,尾结点等于头结点,相当于 初始化 队列,
// 但是没有对当前 node 操作,也就是说当前这个 node 还没有入队
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//队列不为空,添加到队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter 方法中,第一个线程获取lock 的时候,tail应该是 null ,然后 enq 执行的时候,如果 tail 是 null,只是进行了初始化,并没有将 node 设置到队尾,在下一次循环的时候添加到队尾,这样就 head 就是头结点,然后第一个线程对应的 node 其实是第二个节点???
arg 就是 state,acquireQueued 这一步是为了阻塞队列中的线程,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 当前 node 的前一个 node
final Node p = node.predecessor();
// 如果 p 是头结点,说明队列中只有一个node ,
// 则再去重试获取锁,如果获取成功了,当前线程就不用阻塞,直接执行
// 下一步 selfInterrupt 就不用执行,一定程度上提高了并发
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断 前置节点 p 的 waitStatus
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前置节点的 ws,等于这个 Node.SIGNAL 的时候,才可以阻塞当前调用线程
// 当前节点能否被 park 取决于其前置节点的 waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 大于0 的就只有 CANCELLED,删除大于 0 的node
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前置节点的状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 阻塞当前调用线程
LockSupport.park(this);
// 返回线程中断状态,清除中断状态
return Thread.interrupted();
}
// 中断当前调用线程,上面的 parkAndCheckInterrupt 方法清理掉了线程的中断信号状态
static void selfInterrupt() {
// 中断线程
Thread.currentThread().interrupt();
}
Node 的 waitStatus
CANCELLED = 1,表示已经失效的,需要删除
SIGNAL = -1,表示可以被阻塞
CONDITION = -2
PROPAGATE = -3
shouldParkAfterFailedAcquire
当前节点对应的线程是否要 park,取决于其前置节点的 waitStatus 的值,当是 -1 的时候返回 true,表示当前节点对应的线程可以被阻塞,
如果是初始化状态,waitStatus = 0,则设置其前置节点的 waitStatus 为 -1。
ReentrantLock unlock 解锁过程
unlock 会调用 release 方法。唤醒队列头部线程
线程中断唤醒
public final boolean release(int arg) {
// 尝试释放锁,修改 state 值为 原值减去 arg 的值
if (tryRelease(arg)) {
Node h = head;
// 头结点不为 null,并且头结点的 waitStatus 为 -1 时,才去 unpark 当前线程,然后把头结点的 waitStatus 改为 0
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// node 为头结点,头结点的 waitStatus = -1 时,执行该方法
// 将 头结点的 waitStatus 通过 CAS 方式 修改为 0
// unpark 头结点后面第一个 waitStatus <= 0 的节点对应的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
上面加锁的过程,不惜循环两次,把头结点的 waitStatus 由 初始值 0 改为了 -1,
解锁的过程又把头结点的 waitStatus 由 -1 改为 0
非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 这里是插队,新来的线程是否能够立即获取到锁,
// 如果获取到则执行新来的线程,获取不到则添加到队列尾部
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 与公平锁的区别 少了一步判断 !hasQueuedPredecessors()
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
LockSupport
当线程调用 LockSupport.park 方法后,线程会阻塞在这,有两种方式唤醒,
一种是 外部唤醒,unpark
另一种是线程中断
Thread t0 = new Thread(new Runnable() {
@Override
public void run() {
Thread thread = Thread.currentThread();
String name = thread.getName();
System.out.println( name + " 开始执行");
for (;;){
System.out.println("准备 park 当前线程 " + name);
LockSupport.park();
System.out.println("当前线程 " + name + " 已被唤醒");
if (thread.isInterrupted()){
System.out.println("线程已被中断");
return;
}
}
}
});
t0.start();
TimeUnit.SECONDS.sleep(2);
System.out.println("准备唤醒 " + t0.getName());
LockSupport.unpark(t0);
TimeUnit.SECONDS.sleep(2);
t0.interrupt();
--- 运行结果
Thread-0 开始执行
准备 park 当前线程 Thread-0
准备唤醒 Thread-0
当前线程 Thread-0 已被唤醒
准备 park 当前线程 Thread-0
当前线程 Thread-0 已被唤醒
线程已被中断
interrupt() 方法中断线程,但并不是立即停止线程,而是等待线程运行结束后中断
