Countdownlatch

以核心函数为突破口countDown和await

CountDownLatch构造器

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    // 设置state为count
    setState(count);
}

CountDownLatch#countDown

public void countDown() {
    sync.releaseShared(1);
}

AbstractQueuedSynchronizer#releaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 如果state为0,走不到这里;如果cas进行state-1后state不为0,也走不到这里。详看tryReleaseShared
        doReleaseShared();
        return true;
    }
    return false;
}

CountDownLatch#tryReleaseShared

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        // 如果线程持有锁
        int nextc = c-1;
        // 可能并发的改state的值,所以这里需要cas
        if (compareAndSetState(c, nextc))
            // 如果nextc不为0,返回false
            return nextc == 0;
    }
}

AbstractQueuedSynchronizer#doReleaseShared

// 如果state不为0,并且cas进行state - 1后,state不为0,则执行doReleaseShared

private void doReleaseShared() {
   for (;;) {
       Node h = head;
       if (h != null && h != tail) {
           int ws = h.waitStatus;

           // signal : waitStatus value to indicate successor's thread needs unparking    
           // 后继节点等待被触发
           if (ws == Node.SIGNAL) {
               // 如果节点状态为signal,cas重置节点状态为0
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;            // loop to recheck cases
               // 如果重置成功,唤醒后继节点       
               unparkSuccessor(h);
           }
           // 如果头节点为状态为初始状态,尝试设置waitStatus为propagate
           // propagate: waitStatus value to indicate the next acquireShared should unconditionally propagate
           else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
               continue;                // loop on failed CAS
       }
       if (h == head)                   // loop if head changed
           break;
   }    }

AbstractQueuedSynchronizer#unparkSuccessor

private void unparkSuccessor(Node node) {
   int ws = node.waitStatus;
   // 唤醒后继节点,需要当前节点的waitStatus置为0或1
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);

   Node s = node.next;
   if (s == null || s.waitStatus > 0) {
       // 如果s为null或者s为取消状态    
       s = null;
       // 从后向前遍历node,直到第一个waitStatus不是cancel的node
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
   }
   // 然后unpark之。
   if (s != null)
       LockSupport.unpark(s.thread);    }

await

使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 包装为共享节点,然后添加进等待队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 参照下doAcquireInterruptibly,这里是不一样滴。
                // 试图在共享模式下获取对象状态
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 这里需要state为0

                    // 这里对比doAcquireInterruptibly
                    // 设置头节点 并传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 后继为空或者为共享模式
            doReleaseShared();
    }
}