Copyright © 2015 Powered by MWeb, Theme used GitHub CSS.
Java 并发编程核心在于 juc 包,而其中大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是 AQS。
AQS 定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(volatile int state)的同步器。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.
AQS 特性:

一般都是通过定义内部类 Sync 继承 AQS
将同步器所有调用都映射到 Sync 对应的方法
AQS 内部维护属性 volatile int state 表示资源的可用状态
state 三种访问方式
getState、setState、compareAndSetState
AQS 定义两种资源共享方式
AQS 定义两种队列
AQS,重点
自旋、加锁(CAS)、LockSupport、队列
下面通过 ReentrantLock 看下 AQS 的实现。
ReentrantLock 内部 定义一个内部类 Sync,Sync 继承自 AQS,Sync 还有两个子类,FairSync、NonfairSync 区别公平锁与非公平锁。
new ReentrantLock(true) 为公平锁
构造函数:
// 默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁,按照线程过来的顺序依次去获取锁,FIFO
非公平锁,可以插队
假设现在 ReentrantLock 是 FairSync 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// new ReentrantLock().lock() 会调用到这里
final void lock() {
acquire(1);
}
// acquire 在 AQS 的代码
// public final void acquire(int arg) {
// if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
//}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 先获取 state 的值
int c = getState();
// 这一步是 第一次加锁
// 如果 state 的值为 0,并且队列中没有等待的线程,并且 CAS 设置 state 值为1成功,执行 setExclusiveOwnerThread ,设置当前线程为 current,表示当前线程获得了锁
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 这一步是为了重入锁
// 如果 state 不是0,并且 AQS 中的当前执行线程就是当前线程,
// state + 1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// false 就是没有获得到锁,没有获取锁,会在执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这个逻辑
return false;
}
}
当调用 ReentrantLock 的 lock 方法时会调用到 FairSync 类的 lock 方法。
acquire(1) 这个方法主要是加锁,然后设置 state 为 1,会先调用本类实现的 tryAcquire 方法 尝试去加锁,这个方法里面分为了第一次为当前线程加锁,和当前线程的重入锁两种情况,
如果是 state 为 0 ,hasQueuedPredecessors 方法,判断队列是否为空,或者当前线程是否是队列里第一个线程,
如果队列为空,然后 CAS 比较并交换 state 的值为 1,这两步都成功了,则当前线程获取了锁,设置 AQS 内部的当前线程变量为 current。
如果 state 不是 0,判断 AQS 中的内部线程变量与 current 是否相等,是不是同一个线程来获取锁,是同一个线程,state + 1。可重入锁
如果当前线程没有获取锁,会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 操作,将线程先添加队列,然后阻塞当前线程
Node.EXCLUSIVE 表示独占锁,addWaiter 这一步就是往队列里添加当前线程,添加到队列的尾部,Node 里有对当前线程的引用
private Node addWaiter(Node mode) {
// 新建 Node,绑定当前线程 与 独占模式
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// tail 为null,表示队列为空 ??tail 初始值是不是 null
// 不为 null,表示队列不为空,将 node 添加到队尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 自旋,保证当前node 添加成功
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋 与 CAS 保证入队的线程 是一个一个入队的,原子性、同步
// 并且双向队列指针指向正确,此处也有并发情况,如果不是同步进行的有可能队列乱掉
for (;;) {
Node t = tail;
// 队列为空,表明是第一个线程进来,创建一个头结点,CAS 确保并发情况下只有一个线程能创建成功
// 设置头结点,尾结点等于头结点,相当于 初始化 队列,
// 但是没有对当前 node 操作,也就是说当前这个 node 还没有入队
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//队列不为空,添加到队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter 方法中,第一个线程获取lock 的时候,tail应该是 null ,然后 enq 执行的时候,如果 tail 是 null,只是进行了初始化,并没有将 node 设置到队尾,在下一次循环的时候添加到队尾,这样就 head 就是头结点,然后第一个线程对应的 node 其实是第二个节点???
arg 就是 state,acquireQueued 这一步是为了阻塞队列中的线程,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 当前 node 的前一个 node
final Node p = node.predecessor();
// 如果 p 是头结点,说明队列中只有一个node ,
// 则再去重试获取锁,如果获取成功了,当前线程就不用阻塞,直接执行
// 下一步 selfInterrupt 就不用执行,一定程度上提高了并发
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断 前置节点 p 的 waitStatus
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前置节点的 ws,等于这个 Node.SIGNAL 的时候,才可以阻塞当前调用线程
// 当前节点能否被 park 取决于其前置节点的 waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 大于0 的就只有 CANCELLED,删除大于 0 的node
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前置节点的状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 阻塞当前调用线程
LockSupport.park(this);
// 返回线程中断状态,清除中断状态
return Thread.interrupted();
}
// 中断当前调用线程,上面的 parkAndCheckInterrupt 方法清理掉了线程的中断信号状态
static void selfInterrupt() {
// 中断线程
Thread.currentThread().interrupt();
}
Node 的 waitStatus
CANCELLED = 1,表示已经失效的,需要删除
SIGNAL = -1,表示可以被阻塞
CONDITION = -2
PROPAGATE = -3
shouldParkAfterFailedAcquire
当前节点对应的线程是否要 park,取决于其前置节点的 waitStatus 的值,当是 -1 的时候返回 true,表示当前节点对应的线程可以被阻塞,
如果是初始化状态,waitStatus = 0,则设置其前置节点的 waitStatus 为 -1。
unlock 会调用 release 方法。唤醒队列头部线程
线程中断唤醒
public final boolean release(int arg) {
// 尝试释放锁,修改 state 值为 原值减去 arg 的值
if (tryRelease(arg)) {
Node h = head;
// 头结点不为 null,并且头结点的 waitStatus 为 -1 时,才去 unpark 当前线程,然后把头结点的 waitStatus 改为 0
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// node 为头结点,头结点的 waitStatus = -1 时,执行该方法
// 将 头结点的 waitStatus 通过 CAS 方式 修改为 0
// unpark 头结点后面第一个 waitStatus <= 0 的节点对应的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
上面加锁的过程,不惜循环两次,把头结点的 waitStatus 由 初始值 0 改为了 -1,
解锁的过程又把头结点的 waitStatus 由 -1 改为 0
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 这里是插队,新来的线程是否能够立即获取到锁,
// 如果获取到则执行新来的线程,获取不到则添加到队列尾部
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 与公平锁的区别 少了一步判断 !hasQueuedPredecessors()
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
当线程调用 LockSupport.park 方法后,线程会阻塞在这,有两种方式唤醒,
一种是 外部唤醒,unpark
另一种是线程中断
Thread t0 = new Thread(new Runnable() {
@Override
public void run() {
Thread thread = Thread.currentThread();
String name = thread.getName();
System.out.println( name + " 开始执行");
for (;;){
System.out.println("准备 park 当前线程 " + name);
LockSupport.park();
System.out.println("当前线程 " + name + " 已被唤醒");
if (thread.isInterrupted()){
System.out.println("线程已被中断");
return;
}
}
}
});
t0.start();
TimeUnit.SECONDS.sleep(2);
System.out.println("准备唤醒 " + t0.getName());
LockSupport.unpark(t0);
TimeUnit.SECONDS.sleep(2);
t0.interrupt();
--- 运行结果
Thread-0 开始执行
准备 park 当前线程 Thread-0
准备唤醒 Thread-0
当前线程 Thread-0 已被唤醒
准备 park 当前线程 Thread-0
当前线程 Thread-0 已被唤醒
线程已被中断
interrupt() 方法中断线程,但并不是立即停止线程,而是等待线程运行结束后中断

Copyright © 2015 Powered by MWeb, Theme used GitHub CSS.