个人博客
http://www.milovetingting.cn
对AQS的简单理解及自定义锁的实现 AQS AQS
,即AbstractQueuedSynchronizer
,意为队列同步器,是用来构建锁或者其它同步组件的基础框架。
AQS使用一个int
类型的state
表示同步状态。
AQS使用了模版方法
的设计模式,子类继承AQS后,通过实现抽象方法来管理同步状态。
AQS在ReentrantLock
、ReentrantReadWriteLock
、Semaphore
、CountDownLatch
等类中都有广泛使用。
ReentrantLock中的AQS 下面以ReentrantLock为例,来看下AQS的具体使用
ReentrantLock中有一个Sync
类型的成员变量sync
。
1 private final Sync sync;
Sync的定义
1 2 3 abstract static class Sync extends AbstractQueuedSynchronizer { }
Sync是一个继承自AQS的抽象类,只实现了一部分的抽象方法。它有两个子类:
NonfairSync
:用于实现非公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } }
FairSync
:用于实现公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
在Sync的子类实现中,主要用到了getState()
、setState(int newState)
、compareAndSetState(int expect,int update)
方法。
1 2 3 protected final int getState () { return state; }
1 2 3 protected final void setState (int newState) { state = newState; }
1 2 3 4 protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
通过CAS
来实现同步操作。
在使用ReentrantLock时,默认是非公平锁
1 2 3 public ReentrantLock () { sync = new NonfairSync (); }
我们也可以指定使用公平锁或非公平锁
1 2 3 public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
CLH队列锁 CLH
队列锁即Craig, Landin, and Hagersten (CLH) locks。
CLH队列锁也是一种基于链表
的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
自定义锁 自定义不可重入锁 下面基于AQS实现一个自定义的锁
定义一个类实现Lock
接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class CustomLock implements Lock { @Override public void lock () { } @Override public void lockInterruptibly () throws InterruptedException { } @Override public boolean tryLock () { return false ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return false ; } @Override public void unlock () { } @Override public Condition newCondition () { return null ; } }
可以看到,实现Lock后,生成的默认方法如上。
定义一个静态内部类继承自AQS,并实现tryAcquire
、tryRelease
、isHeldExclusively
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (getState() == 0 ) { throw new IllegalMonitorStateException (); } setExclusiveOwnerThread(null ); setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } Condition newCondition () { return new ConditionObject (); } }
定义Sync类型的变量,并具体实现Lock接口定义的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private Sync sync = new Sync ();@Override public void lock () { System.out.println(Thread.currentThread().getName() + " ready get lock" ); sync.acquire(1 ); System.out.println(Thread.currentThread().getName() + " already got lock" ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { System.out.println(Thread.currentThread().getName() + " ready release lock" ); sync.release(1 ); System.out.println(Thread.currentThread().getName() + " already released lock" ); } @Override public Condition newCondition () { return sync.newCondition(); }
使用
1 2 3 4 5 6 7 8 9 10 11 Lock lock = new CustomLock ();try { lock.lock(); } catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); }
以上4步,即可以实现简单的自定义锁。
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class CustomLock implements Lock { private Sync sync = new Sync (); @Override public void lock () { System.out.println(Thread.currentThread().getName() + " ready get lock" ); sync.acquire(1 ); System.out.println(Thread.currentThread().getName() + " already got lock" ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { System.out.println(Thread.currentThread().getName() + " ready release lock" ); sync.release(1 ); System.out.println(Thread.currentThread().getName() + " already released lock" ); } @Override public Condition newCondition () { return sync.newCondition(); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (getState() == 0 ) { throw new IllegalMonitorStateException (); } setExclusiveOwnerThread(null ); setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } Condition newCondition () { return new ConditionObject (); } } }
自定义可重入锁 和上面的步骤差不多,只需要在继承AQS的类的实现方法中修改即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } else if (getExclusiveOwnerThread() == Thread.currentThread()) { setState(getState() + 1 ); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (getExclusiveOwnerThread() != Thread.currentThread()) { throw new IllegalMonitorStateException (); } if (getState() == 0 ) { throw new IllegalMonitorStateException (); } setState(getState() - 1 ); if (getState() == 0 ) { setExclusiveOwnerThread(null ); } return true ; } @Override protected boolean isHeldExclusively () { return getState() > 0 ; } Condition newCondition () { return new ConditionObject (); } }
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class CustomReentrantLock implements Lock { private Sync sync = new Sync (); @Override public void lock () { System.out.println(Thread.currentThread().getName() + " ready get lock" ); sync.acquire(1 ); System.out.println(Thread.currentThread().getName() + " already got lock" ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { System.out.println(Thread.currentThread().getName() + " ready release lock" ); sync.release(1 ); System.out.println(Thread.currentThread().getName() + " already released lock" ); } @Override public Condition newCondition () { return sync.newCondition(); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } else if (getExclusiveOwnerThread() == Thread.currentThread()) { setState(getState() + 1 ); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (getExclusiveOwnerThread() != Thread.currentThread()) { throw new IllegalMonitorStateException (); } if (getState() == 0 ) { throw new IllegalMonitorStateException (); } setState(getState() - 1 ); if (getState() == 0 ) { setExclusiveOwnerThread(null ); } return true ; } @Override protected boolean isHeldExclusively () { return getState() > 0 ; } Condition newCondition () { return new ConditionObject (); } } }