深度解析Java中CountDownLatch的原理

 更新时间:2023年07月19日 09:13:08   作者:我是小趴菜  
在高并发编程中,AbstractQueuedSynchronizer(简称AQS)抽象的队列同步器是我们必须掌握的,本文将通过CountDownLatch底层实现原理来了解AQS共享锁模式的实现原理,快跟随小编一起学习学习吧

在高并发编程中,AbstractQueuedSynchronizer(简称AQS)抽象的队列同步器是我们必须掌握的,AQS底层提供了二种锁模式

  • 独占锁:ReentrantLock就是基于独占锁模式实现的
  • 共享锁:CountDownLatch,ReadWriteLock,Semplere都是基于共享锁模式实现的

接下来我们通过CountDownLatch底层实现原理来了解AQS共享锁模式的实现原理

CountDownLatch用法

CountDownLatch一般是在需要等待多个线程全部执行完毕之后才继续执行剩下的业务逻辑,举个例子,比如你现在去餐厅吃饭点了份辣子鸡。

这时候餐厅有处理鸡块的,有配置调料的,还有烧菜的等多个厨师一起协作最后才能完成一道辣子鸡,而且这几个步骤可以是一起执行的。一个厨师在配置调料的同时,另外一个厨师正在处理鸡块,还有一个厨师正在热油等。

但是作为顾客的我们来说,我们必须等到这几个厨师全部执行完毕之后我们才能吃到辣子鸡

public static void main(String[] args) throws Exception{
    CountDownLatch countDownLatch = new CountDownLatch(3);
    new Thread(() -> {
        System.out.println("处理鸡块");
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
        countDownLatch.countDown();
    }).start();
    new Thread(() -> {
        System.out.println("配置调料");
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
        countDownLatch.countDown();
    }).start();
    new Thread(() -> {
        System.out.println("起锅热油");
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
        countDownLatch.countDown();
    }).start();
    //会阻塞,等待所有的线程执行结束之后才会继续执行剩下的逻辑
    countDownLatch.await();
    //执行剩下业务逻辑
}

首先我们看 countDownLatch.await(); 这段阻塞的代码,看下底层是如何让线程进入阻塞等待的

进入之后到CountDownLatch类中,然后继续这个方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

此时就会进去AQS的内部实现中

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

首先我们看下 tryAcquireShared(arg) < 0 这个判断是干嘛的,他是进入到CountDownLatch的类中,这里判断 state的值是否等于0,在初始化 CountDownLatch 的时候,我们将state的值初始化成了3,只有当执行一次 countDownLatch.countDown(); 的时候,这个值才会减1,但是此时我们的线程还没有执行结束,所以这个值不会等于0,那么这时候就会返回 -1

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

返回-1以后,就会执行 doAcquireSharedInterruptibly(arg); 这个业务逻辑了

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //创建一个新的共享的Node节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //尝试判断state是否已经等于0了,如果是,那么主线程就不用阻塞了,
                //可以继续执行了,以此来提高程序性能
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())  //这里才是真正让主线程阻塞的核心方法
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

我们看一下这里的 addWaiter(Node.SHARED)方法

private Node addWaiter(Node mode) {
    //把当前线程,也就是main线程封装成一个Node,并设置成共享模式
    Node node = new Node(Thread.currentThread(), mode);
    //在第一次的时候,这个tail节点是为null的
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //初始化链表
    enq(node);
    return node;
}

分析下初始化双向链表逻辑

private Node enq(final Node node) {
    for (;;) {  //注意:这里是死循环
        Node t = tail;
        //第一次进来,因为tail=null,所以会进入到if里面去
        if (t == null) { // Must initialize
            //这里新创建一个空的Node节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第一次进来:因为第一次进来的时候tail=null,所以会进入到if中去,然后创建一个新的空的节点,然后将头节点和尾节点都指向这个节点

然后进入第二次循环:这时候tail已经不为空了,所以会进入到else分支里面去,所以的操作就是将当前线程封装成的Node设置尾巴节点,然后设置前置节点和后置节点的关系

现在回头addWaiter()方法已经清楚了,继续分析剩下的逻辑

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //创建一个新的共享的Node节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) { //这里还是个死循环
            //拿到头节点
            final Node p = node.predecessor();
            if (p == head) {
                //继续判断state的值是否等于0,如果已经等于0了,那么主线程就不需要阻塞等待了,可以继续执行了
                int r = tryAcquireShared(arg);
                //如果state的值等于0,这里r=1,不等于0,r=-1
                //我们假设现在就是不等于0,也就是其它线程还没有执行结束,所以不会进入到if
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

进入 shouldParkAfterFailedAcquire()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //这里获取Node的waitStatus,在Node初始化之后,默认是是0,
    //所以会进入到 else 分支里面去,将Node的waitStatus的值修改成Node.SIGNAL
    //但是在上一步中是一个死循环,所以会再次进入到这个方法中,这时候waitStatus的值是Node.SIGNAL
    //所以会进入到第一个if分支里面去,最后返回true
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这时候shouldParkAfterFailedAcquire()方法返回了true,就会执行 parkAndCheckInterrupt()方法了

private final boolean parkAndCheckInterrupt() {
    //真正让线程阻塞的核心方法
    LockSupport.park(this);
    return Thread.interrupted();
}

当主线程挂起之后,只有全部线程执行结束了,才会继续执行,所以我们来分析下 countDownLatch.countDown();

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

进入tryReleaseShared(arg)方法,是判断state是否等于0的,

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        //第一次进来,因为state=3,所以不会进入if,只有在初始化的时候将state设置成0,
        //或者你有10个资源,但是有11个线程来获取资源,最后一个线程进来的时候也会等于0
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

一直到第三次进来之后,nextc就会等于0,因为一共减了三次1,也就是最后一个线程执行到这里来了,最后返回true,返回true以后就会执行doReleaseShared();方法了

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 核心方法,唤醒阻塞线程,这里传入的是头节点  
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        if (h == head)                   
            break;
    }
}
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //拿到真正封装了当前线程的Node
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 执行唤醒操作
        LockSupport.unpark(s.thread);
}

以上就是深度解析Java中CountDownLatch的原理的详细内容,更多关于Java CountDownLatch的资料请关注脚本之家其它相关文章!

相关文章

  • Java关键字synchronized基本使用详解

    Java关键字synchronized基本使用详解

    这篇文章主要给大家介绍了关于Java关键字synchronized基本使用的相关资料,synchronized可以用来同步静态和非静态方法,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-01-01
  • 快速掌握Java中注解与反射

    快速掌握Java中注解与反射

    本文详细介绍了Java中注解和反射的概念及应用,注解是用于给代码添加元数据的标记,如@Override、@Deprecated等,反射机制则是在运行时获取和操作类的内部信息,提高了代码的灵活度,感兴趣的朋友跟随小编一起看看吧
    2023-06-06
  • Java UrlRewriter伪静态技术运用深入分析

    Java UrlRewriter伪静态技术运用深入分析

    通常我们为了更好的缓解服务器压力,和增强搜索引擎的友好面,都将文章内容生成静态页面,这就产生了伪静态技术,也就是我们常说的Url Rewriter重写技术
    2012-12-12
  • Java Swing中JDialog实现用户登陆UI示例

    Java Swing中JDialog实现用户登陆UI示例

    这篇文章主要介绍了Java Swing中JDialog实现用户登陆UI功能,结合完整实例形式分析了Swing使用JDialog实现用户登陆UI界面窗口功能的步骤与相关操作技巧,需要的朋友可以参考下
    2017-11-11
  • Spring中初始化泛型类的方法实例

    Spring中初始化泛型类的方法实例

    这篇文章主要给大家介绍了Spring中如何初始化泛型类,文中给出详细的介绍和方法实例,对大家的理解和学习具有一定的参考借鉴价值,有需要的朋友可以参考学习,下面来一起看看吧。
    2017-01-01
  • 简单了解java ibatis #及$的区别和用法

    简单了解java ibatis #及$的区别和用法

    这篇文章主要介绍了简单了解java ibatis #及$的区别和用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09
  • Spring循环依赖的解决方法详解

    Spring循环依赖的解决方法详解

    Spring的解决循环依赖是有前置条件的,要解决循环依赖我们首先要了解Spring Bean对象的创建过程和依赖注入的方式。依赖注入方式,我之前的博客有所分享,大家可以在看本篇文章之前进行一下小小的回顾
    2022-08-08
  • Java设计模式之策略模式(Strategy模式)介绍

    Java设计模式之策略模式(Strategy模式)介绍

    这篇文章主要介绍了Java设计模式之策略模式(Strategy模式)介绍,Strategy是属于设计模式中对象行为型模式,要是定义一系列的算法,这些算法一个个封装成单独的类,需要的朋友可以参考下
    2015-03-03
  • Seata AT模式TM处理流程图文示例详解

    Seata AT模式TM处理流程图文示例详解

    这篇文章主要为大家介绍了Seata AT模式TM处理流程图文示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-09-09
  • mybatis项目实现动态表名的三种方法

    mybatis项目实现动态表名的三种方法

    有时在开发过程中java代码中的表名和数据库的表名并不是一致的,此时我们就需要动态的设置表名,本文主要介绍了mybatis项目实现动态表名的三种方法,具有一定的参考价值,感兴趣的可以了解一下
    2024-01-01

最新评论