在JAVA中大多数的同步类(比如ReentrantLock
、CountDownLatch
、Semaphore
等)都是基于AbstractQueuedSynchronier
(简称AQS)来实现的。AQS是一个提供了原子式的管理状态同步、线程阻塞以及线程唤醒的简单框架,他内部使用了模板方法 模式,这就使得其他的同步类都可以有自己的实现(ReentrantLock
、CountDownLatch
等都是通过变相继承AQS来实现同步功能的,注意看源码的Sync
的实现)。
AQS可以做什么 借助于AQS,我们可以实现哪些功能呢?
比如我们可以自己写一个排他锁,可以自己实现公平锁和非公平锁,比如可以自己写一个共享读锁等等。
为什么AQS可以完成这些功能呢?它内部是如何实现的呢?
要回答上面的问题,我们需要了解AQS内部的几个重要的属性以及他们的数据存储结构。
在AQS内部,有三个特别重要的属性:
源码如下(为了减少篇幅,删减了注释部分):
1 2 3 4 5 6 7 8 9 10 11 12 /** * The synchronization state. */ private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; }
这个就是同步的标志位,外部程序可以通过改变state
来实现同步状态控制。
AQS内部维护了一个虚拟的FIFO同步队列(队列包含了头节点和尾节点的引用),当有线程尝试获取锁失败的时候,AQS就会把该线程打包成一个Node
,然后放在同步队列中。当头部节点获取锁并且成功释放后,AQS会唤醒第二个节点的线程,然后去获取锁,依此类推…
Node
节点内部维护了两个特别重要的属性:
1 2 volatile Node prev; // 前一个节点 volatile Node next; // 后一个节点
这种结构,其实也就是双向链表。
Condition内部维护了一个等待队列(数据结构也是Node
),当程序调用await()
方法时,该线程会被从的同步队列中移出然后放入Condition的等待队列中,当调用了signal()
方法的时候,等待队列中的线程会被移出然后重新放入到同步队列中。在一个AQS内部,同步队列只有一个,但是可以有多个等待队列。
AQS内部的结构如下:
利用AQS完成一个排他锁 如果要借助于AQS完成一个排他锁,我们首先要了解AQS通过模板方法模式提供了哪些可以让我们使用的方法。通过阅读AQS的源码,我们可以找到一个acquire(int arg)
,通过注释我们知道,这个方法就是通过独占模式获取锁的。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 /** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
通过分析,我们知道acquire(int arg)
方法调用了tryAcquire(arg)
,但是tryAcquire(arg)
方法的默认实现却抛出了一个异常,这里就是我们要重写的方法。
1 2 3 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
通过上面的方法,我们同样可以找到这些需要重写的方法:
1 2 3 4 5 独占式获取 tryAcquire 独占式释放 tryRelease 共享式获取 tryAcquireShared 共享式释放 tryReleaseShared 这个同步器是否处于独占模式 isHeldExclusively
等等,不是说使用的模板方法吗?为什么不去找哪些需要重写的抽象方法呢?
因为AQS内部提供了两类获取锁的方式,使用了独占式就不会使用共享式(所以并不是所有的方法都需要重写的,如果独占锁的逻辑去调用了共享锁的方法,默认的实现也会抛出一个异常来),开发锁的人肯定知道自己要实现什么样的锁,也就知道自己要重写哪些方法,使用锁的人只关心锁的功能,更加不会关心内部实现了。
如果我们要实现一个独占式的锁,需要覆盖AQS的这三个方法:
1 2 3 4 5 6 7 8 9 10 11 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
JAVA中的锁一般都会实现Lock接口,下面我们就来完成一个独占锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 public class MyLock implements Lock { // 自定义的同步器,继承AQS Sync sync = new Sync(); private static class Sync extends AbstractQueuedSynchronizer{ // MyLock是一个独占锁,因此需要覆盖AQS中独占锁的方法 // acquire acquireInterruptibly tryAcquireNanos isHeldExclusively /** * 判断锁是否是占用状态 * 如果state == 1 说明锁被占用,state == 0 说明锁未被占用 * @return */ @Override protected boolean isHeldExclusively() { return getState() == 1; } /** * 尝试加锁 * @param arg * @return 加锁成功:true 反之:false */ @Override protected boolean tryAcquire(int arg) { // CAS加锁 if(compareAndSetState(0,1)){ // 设置加锁成功的线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 尝试释放锁 * @param arg * @return */ @Override protected boolean tryRelease(int arg){ if(getState() == 0){ // 抛出异常 try { throw new Exception("锁已经被释放!"); } catch (Exception e) { e.printStackTrace(); } } // 没有线程独占 setExclusiveOwnerThread(null); // 锁的状态改成0,这里为什么不用CAS去做释放?没有线程抢着去释放,只有拿到锁的线程才会释放锁,所以不用CAS setState(0); return true; } Condition newCondition(){ // 调用AQS的Condition return new ConditionObject(); } } @Override public void lock() { sync.acquire(1); // 独占锁 } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
我们初始化10个线程,每个线程打印一行信息。因为这里使用的是排他锁,也就是说同一时间只有一个线程可以获取到锁,也就是同一时间只有一条打印信息,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class TestMyLock { static Lock lock = new MyLock(); // 测试时可以对比ReentrantLock锁来观察输出结果 // static Lock lock = new ReentrantLock(); public static void main(String[] args) { int i = 10; while(i > 0){ new Thread(new Worker()).start(); i--; } } static class Worker implements Runnable{ @Override public void run() { lock.lock(); try{ System.out.println(new Timestamp(System.currentTimeMillis()) + "线程" + Thread.currentThread().getId() + "已成功获取到锁!"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } }
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 2021-03-01 12:30:29.036线程11已成功获取到锁! 2021-03-01 12:30:31.043线程12已成功获取到锁! 2021-03-01 12:30:33.047线程13已成功获取到锁! 2021-03-01 12:30:35.05线程14已成功获取到锁! 2021-03-01 12:30:37.055线程15已成功获取到锁! 2021-03-01 12:30:39.055线程16已成功获取到锁! 2021-03-01 12:30:41.057线程17已成功获取到锁! 2021-03-01 12:30:43.061线程18已成功获取到锁! 2021-03-01 12:30:45.065线程19已成功获取到锁! 2021-03-01 12:30:47.07线程20已成功获取到锁! Process finished with exit code 0
为了验证代码的准确性,可以把锁换成ReentrantLock,然后再次运行测试代码,观察结果是否一致。