Java中的CountDownLatch原理深入解析

 更新时间:2024年01月22日 10:21:28   作者:我不是欧拉_  
这篇文章主要介绍了Java中的CountDownLatch原理深入解析,CountDownLatch是多线程控制的一种同步工具类,它被称为门阀、 计数器或者闭锁,这个工具经常用来用来协调多个线程之间的同步,或者说起到线程之间的通信,需要的朋友可以参考下

1. CountDownLatch是什么?

CountDownLatch是多线程控制的一种同步工具类,它被称为门阀、 计数器或者闭锁。这个工具经常用来用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

当然利用ReentrantLock + Condition也可以实现线程之间通信,达到同样的效果

2. 类图

 可以看出CountDownLatch只有一个内部类Sync,Sync继承AbstractQueuedSynchronizer

3. 实现原理

3.1 示例用法

// N个线程等待主线程
class Driver { // ...
    void main() throws InterruptedException {
        // 开始信号
        CountDownLatch startSignal = new CountDownLatch(1);
        // 完成信号
        CountDownLatch doneSignal = new CountDownLatch(N);
        for (int i = 0; i < N; ++i) // create and start threads
        // 创建N个工作线程并开始运行
        new Thread(new Worker(startSignal, doneSignal)).start();
        // 做准备工作
        doSomethingElse();            // don't let run yet
        // 准备完毕,唤醒工作线程
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        // 等待工作线程结束
        doneSignal.await();           // wait for all to finish
    }
  }
class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    // 构造方法创建工作线程
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }
    public void run() {
        try {
            // 工作线程进入等待状态
            startSignal.await();
            // 工作线程工作
            doWork();
            // 完成工作后,countDown
            doneSignal.countDown();
        } catch (InterruptedException ex) {} // return;
    }
void doWork() { ... }
}
// 主线程等到N个线程
class Driver2 { // ...
    void main() throws InterruptedException {
        // 完成信号
        CountDownLatch doneSignal = new CountDownLatch(N);
        // 创建线程执行器
        Executor e = ...
        for (int i = 0; i < N; ++i) // create and start threads
            // 创建并执行N个准备工作线程
            e.execute(new WorkerRunnable(doneSignal, i));
            // 主线程等到准备工作线程执行完毕
            doneSignal.await();           // wait for all to finish
        }
}
class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;
    // 构造方法
    WorkerRunnable(CountDownLatch doneSignal, int i) {
        this.doneSignal = doneSignal;
        this.i = i;
    }
    // run
    public void run() {
        try {
            // 完成准备工作
            doWork(i);
            // countDown
            doneSignal.countDown();
        } catch (InterruptedException ex) {} // return;
    }
void doWork() { ... }
}

3.2 Sync

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        // State即同步状态,在不同的实现中叫法不一样,只是为了方便理解
        // 构造方法初始化计数器计数值(即同步状态值)
        Sync(int count) {
            setState(count);
        }
        // 获取计数值
        int getCount() {
            return getState();
        }
        // 共享模式获取
        protected int tryAcquireShared(int acquires) {
            // 体现出只有计数值为0时,才能算获取成功
            return (getState() == 0) ? 1 : -1;
        }
        // 共享模式释放
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                // 如果计数值已经为0,直接返回false,结束自旋
                if (c == 0)
                    return false;
                // 否则计数 - 1
                int nextc = c-1;
                // 通过自旋 + CAS方式改变剩余计数
                if (compareAndSetState(c, nextc))
                    // 如果计数为0返回true,否则返回false,结束自旋
                    // 返回true表示可以唤醒等待的线程
                    return nextc == 0;
            }
        }
    }

通过上面代码解析可知, CountDownLatch的实现方法都是在内部类Sync里面。

3.3 CountDownLatch

public class CountDownLatch {
    // 同步队列
    private final Sync sync;
    // 构造方法初始化计数值
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    // 线程等待
    public void await() throws InterruptedException {
        // 调用AQS的acquireSharedInterruptibly方法
        // 即共享模式响应中断的获取
        sync.acquireSharedInterruptibly(1);
    }
    // 计数 - 1
    public void countDown() {
        sync.releaseShared(1);
    }
}

3.3.1 await() 方法解析

// CountDownLatch
public void await() throws InterruptedException {
        // 调用AQS的acquireSharedInterruptibly方法
        sync.acquireSharedInterruptibly(1);
    }
// 进入AQS
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 中断判断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果没有获取到同步状态,或者说计数值不为0
        // 则调用doAcquireSharedInterruptibly方法,进入同步队列
        // 如果计数值为0则执行后续业务逻辑
        if (tryAcquireShared(arg) < 0)
            // 该方法的解析参考文章结尾的链接,此处不再赘述
            doAcquireSharedInterruptibly(arg);
    }
// CountDownLatch 中tryAcquireShared的实现
protected int tryAcquireShared(int acquires) {
            // 当计数为0时,线程才不会进入同步队列
            return (getState() == 0) ? 1 : -1;
        }

通过上面代码可以知道,如果计数值为0,表示获取成功。这就是CountDownLatch的机制,尝试获取latch的线程只有当latch的值减到0的时候,才能获取成功。

3.3.2 countDown() 方法解析

// CountDownLatch
public void countDown() {
        // 调用AQS的releaseShared方法
        sync.releaseShared(1);
    }
// 进入AQS
public final boolean releaseShared(int arg) {
        // 共享模式释放
        if (tryReleaseShared(arg)) {
            // 如果释放成功则唤醒等待的线程,并返回true
            // 具体唤醒逻辑不再赘述,参考AQS解析文章
            doReleaseShared();
            return true;
        }
        return false;
    }
// CountDownLatch 中tryReleaseShared的实现
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                 // 通过自旋 + CAS方式改变剩余计数
                if (compareAndSetState(c, nextc))
                    // 如果计数为0返回true,否则返回false,结束自旋
                    // 返回true表示可以唤醒等待的线程
                    return nextc == 0;
            }
        }

3.3.3 CountDownLatch如何唤醒所有调用 await() 等待的线程呢?

当调用doReleaseShared()唤醒后继节点后,回到线程被挂起的地方,也就是doAcquireSharedInterruptibly(int arg)方法中

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 将当前线程加入同步队列的尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            // 自旋
            for (;;) {
                // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是头结点,则尝试获取同步状态
                if (p == head) {
                    // 当前节点尝试获取同步状态
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 如果获取成功,则设置当前节点为头结点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                // 如果当前节点的前驱不是头结点,尝试挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

当头结点的后继节点被唤醒后,线程将从挂起的地方醒来,继续执行,因为没有return,所以进入下一次循环。

此时,获取同步状态成功,执行setHeadAndPropagate(node, r)。

    // 如果执行这个函数,那么propagate一定等于1
    private void setHeadAndPropagate(Node node, int propagate) {
        // 获取头结点
        Node h = head;
        // 因为当前节点被唤醒,设置当前节点为头结点
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 获取当前节点的下一个节点
            Node s = node.next;
            // 如果下一个节点为null或者节点为shared节点
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    private void doReleaseShared() {
    // 自旋
    for (;;) {
        Node h = head;
        // 如果队列存在排队的节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // CAS设置不成功则不断循环
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // CAS操作成功后释放后继节点,并唤醒线程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 队列不存在排队的节点,直接结束自旋
        if (h == head)                   // loop if head changed
            break;
    }
 

调用doReleaseShared方法唤醒后继节点,后继节点又回到线程被挂起的地方,也就是doAcquireSharedInterruptibly(int arg)方法中,实现循环唤醒所有await的线程。

此篇文章只解析了CountDownLatch的实现,它就是一个基于 AQS 的计数器,它内部的方法都是围绕 AQS 框架来实现的。

建议感兴趣的同学先去了解AQS原理,只要明白了AQS的实现原理,再来看CountDownLatch、Semaphore、ReentrantLock等实现原理就一目了然了。

到此这篇关于Java中的CountDownLatch原理深入解析的文章就介绍到这了,更多相关CountDownLatch原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringMVC @RequestBody自动转json Http415错误的解决

    SpringMVC @RequestBody自动转json Http415错误的解决

    这篇文章主要介绍了SpringMVC @RequestBody自动转json Http415错误的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-04-04
  • Spring整合Junit详解

    Spring整合Junit详解

    Spring 是目前主流的 Java Web 开发框架,是 Java 世界最为成功的框架。该框架是一个轻量级的开源框架,具有很高的凝聚力和吸引力,本篇文章带你了解如何配置数据源、注解开发以及整合Junit
    2022-07-07
  • Java之理解Redis回收算法LRU案例讲解

    Java之理解Redis回收算法LRU案例讲解

    这篇文章主要介绍了Java之理解Redis回收算法LRU案例讲解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • springboot整合腾讯云短信开箱即用的示例代码

    springboot整合腾讯云短信开箱即用的示例代码

    这篇文章主要介绍了springboot整合腾讯云短信开箱即用,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-03-03
  • 浅谈java泛型的作用及其基本概念

    浅谈java泛型的作用及其基本概念

    下面小编就为大家带来一篇浅谈java泛型的作用及其基本概念。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-08-08
  • Java后端学习精华之TCP通信传输协议详解

    Java后端学习精华之TCP通信传输协议详解

    TCP/IP是一种面向连接的、可靠的、基于字节流的传输层通信协议,它会保证数据不丢包、不乱序。TCP全名是Transmission Control Protocol,它是位于网络OSI模型中的第四层
    2021-09-09
  • Java高级特性(基础)

    Java高级特性(基础)

    这篇文章主要介绍了Java高级特性(基础),需要的朋友可以参考下
    2017-04-04
  • Java中的异常Exception详细解析

    Java中的异常Exception详细解析

    这篇文章主要介绍了Java中的异常Exception详细解析,Java语言中,将程序执行中发生的不正常情况称为"异常",(开发过程中的语法错误和逻辑错误不是异常),异常分为两大类,运行时异常和编译时异常,需要的朋友可以参考下
    2024-01-01
  • 详解log4j-over-slf4j与slf4j-log4j12共存stack overflow异常分析

    详解log4j-over-slf4j与slf4j-log4j12共存stack overflow异常分析

    这篇文章主要介绍了详解log4j-over-slf4j与slf4j-log4j12共存stack overflow异常分析,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • 如何在springboot中实现页面的国际化

    如何在springboot中实现页面的国际化

    今天带大家学习如何在springboot中实现页面的国际化,文中有非常详细的图文解说及代码示例,对正在学习java的小伙伴们有很好地帮助,需要的朋友可以参考下
    2021-05-05

最新评论