在JAVA中大多数的同步类(比如ReentrantLockCountDownLatchSemaphore等)都是基于AbstractQueuedSynchronier(简称AQS)来实现的。AQS是一个提供了原子式的管理状态同步、线程阻塞以及线程唤醒的简单框架,他内部使用了模板方法模式,这就使得其他的同步类都可以有自己的实现(ReentrantLockCountDownLatch等都是通过变相继承AQS来实现同步功能的,注意看源码的Sync的实现)。

AQS可以做什么

借助于AQS,我们可以实现哪些功能呢?

比如我们可以自己写一个排他锁,可以自己实现公平锁和非公平锁,比如可以自己写一个共享读锁等等。

为什么AQS可以完成这些功能呢?它内部是如何实现的呢?

要回答上面的问题,我们需要了解AQS内部的几个重要的属性以及他们的数据存储结构。

在AQS内部,有三个特别重要的属性:

  • state

源码如下(为了减少篇幅,删减了注释部分):

1
2
3
4
5
6
7
8
9
10
11
12
/**
* The synchronization state.
*/
private volatile int state;

protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

这个就是同步的标志位,外部程序可以通过改变state来实现同步状态控制。

  • Node

AQS内部维护了一个虚拟的FIFO同步队列(队列包含了头节点和尾节点的引用),当有线程尝试获取锁失败的时候,AQS就会把该线程打包成一个Node,然后放在同步队列中。当头部节点获取锁并且成功释放后,AQS会唤醒第二个节点的线程,然后去获取锁,依此类推…

Node节点内部维护了两个特别重要的属性:

1
2
volatile Node prev;  // 前一个节点
volatile Node next; // 后一个节点

这种结构,其实也就是双向链表。

  • Condition

Condition内部维护了一个等待队列(数据结构也是Node),当程序调用await()方法时,该线程会被从的同步队列中移出然后放入Condition的等待队列中,当调用了signal()方法的时候,等待队列中的线程会被移出然后重新放入到同步队列中。在一个AQS内部,同步队列只有一个,但是可以有多个等待队列。

AQS内部的结构如下:

利用AQS完成一个排他锁

如果要借助于AQS完成一个排他锁,我们首先要了解AQS通过模板方法模式提供了哪些可以让我们使用的方法。通过阅读AQS的源码,我们可以找到一个acquire(int arg),通过注释我们知道,这个方法就是通过独占模式获取锁的。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

通过分析,我们知道acquire(int arg)方法调用了tryAcquire(arg) ,但是tryAcquire(arg) 方法的默认实现却抛出了一个异常,这里就是我们要重写的方法。

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

通过上面的方法,我们同样可以找到这些需要重写的方法:

1
2
3
4
5
独占式获取  tryAcquire
独占式释放 tryRelease
共享式获取 tryAcquireShared
共享式释放 tryReleaseShared
这个同步器是否处于独占模式 isHeldExclusively

等等,不是说使用的模板方法吗?为什么不去找哪些需要重写的抽象方法呢?

因为AQS内部提供了两类获取锁的方式,使用了独占式就不会使用共享式(所以并不是所有的方法都需要重写的,如果独占锁的逻辑去调用了共享锁的方法,默认的实现也会抛出一个异常来),开发锁的人肯定知道自己要实现什么样的锁,也就知道自己要重写哪些方法,使用锁的人只关心锁的功能,更加不会关心内部实现了。

如果我们要实现一个独占式的锁,需要覆盖AQS的这三个方法:

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

JAVA中的锁一般都会实现Lock接口,下面我们就来完成一个独占锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class MyLock implements Lock {

// 自定义的同步器,继承AQS
Sync sync = new Sync();

private static class Sync extends AbstractQueuedSynchronizer{

// MyLock是一个独占锁,因此需要覆盖AQS中独占锁的方法
// acquire acquireInterruptibly tryAcquireNanos isHeldExclusively

/**
* 判断锁是否是占用状态
* 如果state == 1 说明锁被占用,state == 0 说明锁未被占用
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

/**
* 尝试加锁
* @param arg
* @return 加锁成功:true 反之:false
*/
@Override
protected boolean tryAcquire(int arg) {
// CAS加锁
if(compareAndSetState(0,1)){
// 设置加锁成功的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

/**
* 尝试释放锁
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg){
if(getState() == 0){
// 抛出异常
try {
throw new Exception("锁已经被释放!");
} catch (Exception e) {
e.printStackTrace();
}
}

// 没有线程独占
setExclusiveOwnerThread(null);
// 锁的状态改成0,这里为什么不用CAS去做释放?没有线程抢着去释放,只有拿到锁的线程才会释放锁,所以不用CAS
setState(0);
return true;
}

Condition newCondition(){
// 调用AQS的Condition
return new ConditionObject();
}


}

@Override
public void lock() {
sync.acquire(1); // 独占锁
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}
}

  • 测试代码

我们初始化10个线程,每个线程打印一行信息。因为这里使用的是排他锁,也就是说同一时间只有一个线程可以获取到锁,也就是同一时间只有一条打印信息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestMyLock {

static Lock lock = new MyLock();
// 测试时可以对比ReentrantLock锁来观察输出结果
// static Lock lock = new ReentrantLock();
public static void main(String[] args) {

int i = 10;
while(i > 0){
new Thread(new Worker()).start();
i--;
}
}

static class Worker implements Runnable{

@Override
public void run() {

lock.lock();
try{
System.out.println(new Timestamp(System.currentTimeMillis()) + "线程" + Thread.currentThread().getId() + "已成功获取到锁!");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
2021-03-01 12:30:29.036线程11已成功获取到锁!
2021-03-01 12:30:31.043线程12已成功获取到锁!
2021-03-01 12:30:33.047线程13已成功获取到锁!
2021-03-01 12:30:35.05线程14已成功获取到锁!
2021-03-01 12:30:37.055线程15已成功获取到锁!
2021-03-01 12:30:39.055线程16已成功获取到锁!
2021-03-01 12:30:41.057线程17已成功获取到锁!
2021-03-01 12:30:43.061线程18已成功获取到锁!
2021-03-01 12:30:45.065线程19已成功获取到锁!
2021-03-01 12:30:47.07线程20已成功获取到锁!

Process finished with exit code 0

为了验证代码的准确性,可以把锁换成ReentrantLock,然后再次运行测试代码,观察结果是否一致。