本章重点讲解内容如下:
1、什么是CLH同步队列
2、为什么需要CLH同步队列
3、CLH同步队列原理(即队列如何入队、出队)
一 什么是CLH队列
AbstractQueuedSynchronizer类文件开头,作者Doug Lea一大篇幅来介绍CLH队列,大意如下:
CLH队列是一个FIFO的双向链表:由head、tail、node中间节点组成,每个Node节点包含:thread、waitStatus、next、pre属性
当线程获取同步状态失败后,会将当前线程构造成一个Node节点插入链表(如果第一次插入会初始化head节点为虚拟节点),插入链表都是尾部插入并且setTail为当前节点,同时会阻塞当前线程(调用LockSupport.park方法)。
当线程释放同步状态后,会唤醒当前节点的next节点,next节点会抢占同步资源,抢占失败后重新阻塞,成功后next节点会重新setHead为当前线程的节点,将之前的head废弃。
二 为什么需要CLH队列
是为了减少多线程抢占资源造成不必要的cpu上下文切换开销。通过看AQS源码我们知道抢占同步器状态是调用UnSafe.compareAndSwapInt方法,其实底层就是调用的jvm的cas函数。当多个线程同时在cas的时候,最多只能有一个抢占成功,其余的都在自旋,这样就造成了不必要的cpu开销。
若引入CLH队列队列,至于pre执行完毕,才唤醒next节点,这样最多只有next节点和新进入的线程抢占cpu资源,其余的线程都是阻塞状态,极大的减少了不必要的cpu开销。
三 CLH队列原理(如何入队、出队)
1)入队
入队代码如下:
1 //获取锁 2 public final void acquire(int arg) { 3 //tryAcquire尝试获取锁,Semaphore、coutDownLatch等各个工具类实现不一致 4 if (!tryAcquire(arg) && 5 //acquireQueued:tryAcquire成功就setHead为当前节点,失败则阻塞当前线程 6 //addWaiter加入同步等待队列 7 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 8 selfInterrupt(); 9 } 10 //加入等待队列 11 private Node addWaiter(Node mode) { 12 Node node = new Node(Thread.currentThread(), mode); 13 // Try the fast path of enq; backup to full enq on failure 14 // 非首次插入,可直接setTail 15 // 设置老的tail为当天tail的pre节点 16 Node pred = tail; 17 if (pred != null) { 18 node.prev = pred; 19 if (compareAndSetTail(pred, node)) { 20 pred.next = node; 21 return node; 22 } 23 } 24 //首次插入,需要创建虚拟的head节点 25 enq(node); 26 return node; 27 } 28 private Node enq(final Node node) { 29 for (;;) { 30 Node t = tail; 31 // 如果 tail 是 null,就创建一个虚拟节点,同时指向 head 和 tail,称为 初始化。 32 if (t == null) { // Must initialize 33 if (compareAndSetHead(new Node())) 34 tail = head; 35 } else {// 如果不是 null 36 // 和 上个方法逻辑一样,将新节点追加到 tail 节点后面,并更新队列的 tail 为新节点。 37 // 只不过这里是死循环的,失败了还可以再来 。 38 node.prev = t; 39 if (compareAndSetTail(t, node)) { 40 t.next = node; 41 return t; 42 } 43 } 44 } 45 }
View Code
阻塞当前线程代码如下:
1 // 这里返回的节点是新创建的节点,arg 是请求的数量 2 final boolean acquireQueued(final Node node, int arg) { 3 boolean failed = true; 4 try { 5 boolean interrupted = false; 6 for (;;) { 7 // 找上一个节点 8 final Node p = node.predecessor(); 9 // 如果上一个节点是 head ,就尝试获取锁 10 // 如果 获取成功,就将当前节点设置为 head,注意 head 节点是永远不会唤醒的。 11 if (p == head && tryAcquire(arg)) { 12 setHead(node); 13 p.next = null; // help GC 14 failed = false; 15 return interrupted; 16 } 17 // 在获取锁失败后,就需要阻塞了。 18 // shouldParkAfterFailedAcquire ---> 检查上一个节点的状态,如果是 SIGNAL 就阻塞,否则就改成 SIGNAL。 19 if (shouldParkAfterFailedAcquire(p, node) && 20 //parkAndCheckInterrupt就是调用park方法阻塞当前线程,等待被唤起后,重新进入当前自旋操作,可重新获取锁 21 parkAndCheckInterrupt()) 22 interrupted = true; 23 } 24 } finally { 25 if (failed) 26 cancelAcquire(node); 27 } 28 }
View Code
2)出队
1 //释放当前线程 2 public final boolean release(int arg) { 3 //实际操作就是cas把AQS的state状态-1 4 if (tryRelease(arg)) { 5 Node h = head; 6 if (h != null && h.waitStatus != 0) 7 //核心方法,见后面详解 8 unparkSuccessor(h); 9 return true; 10 } 11 return false; 12 } 13 14 //释放锁核心方法 15 private void unparkSuccessor(Node node) { 16 int ws = node.waitStatus; 17 if (ws < 0) 18 // 将 head 节点的 ws 改成 0,清除信号。表示,他已经释放过了。不能重复释放。 19 compareAndSetWaitStatus(node, ws, 0); 20 21 Node s = node.next; 22 // 如果 next 是 null,或者 next 被取消了。就从 tail 开始向上找节点。 23 if (s == null || s.waitStatus > 0) { 24 s = null; 25 // 从尾部开始,向前寻找未被取消的节点,直到这个节点是 null,或者是 head。 26 // 也就是说,如果 head 的 next 是 null,那么就从尾部开始寻找,直到不是 null 为止,找到这个 head 就不管了。 27 // 如果是 head 的 next 不是 null,但是被取消了,那这个节点也会被略过。 28 for (Node t = tail; t != null && t != node; t = t.prev) 29 if (t.waitStatus <= 0) 30 s = t; 31 } 32 // 唤醒 head.next 这个节点。 33 // 通常这个节点是 head 的 next。 34 // 但如果 head.next 被取消了,就会从尾部开始找。 35 if (s != null) 36 LockSupport.unpark(s.thread); 37 }
View Code
3)如何联动起来(即如何按FIFO顺序唤醒队列)
调用Lock.release()方法会触发unparkSuccessor()–> LockSupport.unpark(node.next)
唤醒next节点,next节点会继续在acquireQueued()方法里自旋,若tryAcquire()成功,执行setHead(node)方法,把CLH队列head设置为当前节点,然后等待当前节点执行逻辑完毕再次调用release方法,重复执行上述逻辑。
若获取锁失败,继续进入阻塞状态,等待下一次被唤醒。