0%

Java 并发编程 —— AQS

欢迎指正。

队列同步器(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组件的基础框架,它使用了一个 int 成员变量表示同步状态, 通用过内置的 FIFO 队列来,可以用于构建锁或者其他同步装置,完成资源获取线程的排队工作。

同步器的主要使用方法是继承、子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态镜像更改,这时就需要使用同步器提供的方法(getState()、setState(in newSate) 和 compareAndSetState(int expect, int update))来进行操作。

AQS 是一个抽象类,内置自旋锁实现的同步队列,封装入队和出队的操作,提供独占、共享、中断等特性。

基于AQS构建的同步器:

  • ReentrantLock
  • Semaphore
  • CountDownLatch
  • ReentrantReadWriteLock
  • SynchronusQueue
  • FutureTask

AQS 核心知识

队列同步器的接口与实例

重写同步器指定的方法是,需要使用同步器提供的3个方法来访问或者修稿同步状态:

  • getState():获取当前同步状态
  • setState(in newSate):设置当前同步状态
  • ompareAndSetState(int expect, int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。

CLH 同步队列

CLH 同步队列(Craig、Landin、Hagersten)是一个FIFO双向对象,AQS 依赖它来完成同步状态的管理,当线程如果获取同步状态失败时,AQS 则会将当前线程等待状态等信息构造成一个节点(Node)并将其加入到 CLH 同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
static final class Node {
/** 共享*/
static final Node SHARED = new Node();
/** 独占*/
static final Node EXCLUSIVE = null;
/**因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态 */
static final int CANCELLED = 1;
/** 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */
static final int SIGNAL = -1;
/** 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中*/
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
* 表示下一次共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;
/** 前驱节点*/
volatile Node prev;
/**后续节点*/
volatile Node next;
/** 获取同步状态的线程*/
volatile Thread thread;

Node nextWaiter;

/**Returns true if node is waiting in shared mode.*/
final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

AQS 实现细节

线程首先尝试获取锁,如果失败就将当前线程及等待状态等信息包装成一个node节点加入到FIFO队列中。 接着会不断的循环尝试获取锁,条件是当前节点为head的直接后继才会尝试。如果失败就会阻塞自己直到自己被唤醒。而当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。

Reference

客官,赏一杯coffee嘛~~~~