详解Java并发工具类之CountDownLatch和CyclicBarrier

 更新时间:2023年06月19日 14:40:20   作者:nerkeler  
在JDK的并发包中,有几个非常有用的并发工具类,它们分别是:CountDownLatch、CyclicBarrier、Semaphore和Exchanger,本文主要来讲讲其中CountDownLatch和CyclicBarrier的使用,感兴趣的可以了解一下

在JDK的并发包中,有几个非常有用的并发工具类,它们分别是:CountDownLatchCyclicBarrierSemaphoreExchanger

  • CountDownLatch(倒计时门闩):它允许一个或多个线程等待其他线程完成操作后再继续执行。它通过一个计数器来实现,线程通过调用 countDown() 方法来减少计数器的值,await()方法进行阻塞等待计数器减少,当计数器达到零时,等待的线程将被释放。
  • CyclicBarrier(循环屏障):它允许一组线程互相等待,直到到达一个共同的屏障点,然后继续执行后续操作。与CountDownLatch不同的是,CyclicBarrier的计数器可以重复使用(reset()方法),当所有等待线程都到达屏障点后,计数器会重置,线程可以继续下一次等待。
  • Semaphore(信号量):它用于控制对某个资源的访问权限。Semaphore维护了一组许可证,线程在访问资源前需要获取许可证,如果许可证不可用,则线程必须等待,直到有可用的许可证。
  • Exchanger(交换器):它提供了一种线程间交换数据的机制。两个线程可以通过Exchanger交换数据,当两个线程都调用 exchange() 方法后,他们会彼此交换数据,并继续执行后续操作。

CountDownLatch

Latch(门闩)设计模式

当多个线程并发执行任务,然后只有等待所有子任务全部完成进行汇总,程序的门闩才能打开让程序继续往下执行。它指定了一个屏障,只有所有条件都满足的时候,门阀才能打开。

比如小明和小红相约周末去爬山,约定在人民广场碰头,然后一同出发去爬山,他们各自从家里出发,无论是其中某一个先到达了人民广场都要等待另一个到达之后才可以继续进行下去,这里的人民广场碰头就相当于上述的门闩。

示例

还是使用上面的例子,我们模拟小明和小红从家出发,设定不同的等待时间模拟到达人民广场的路程耗时。代码如下

public static void main(String[] args) throws InterruptedException, ExecutionException {
        final int threadNum = 2;
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
​
        executorService.execute(() -> {
​
            System.out.println("小明开始出发");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();  // 计数器 -1
            System.out.println("小明到达人民广场");
​
​
        });
        executorService.execute(() -> {
​
            System.out.println("小红开始出发");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();  // 计数器 -1
            System.out.println("小红到达人民广场");
​
​
        });
        countDownLatch.await();
        System.out.println("小明和小红都到达了人民广场,开始一起出发去爬山");
        executorService.shutdown();
​
    }
​

结果

小明开始出发
小红开始出发
小明到达人民广场  // 2s后打印
小红到达人民广场  // 3s后打印
小明和小红都到达了人民广场,开始一起出发去爬山 //3s后打印

Process finished with exit code 0

与Join()的区别

可能这里会有疑问,使用Thread.join()也可以实现相同的功能,这与使用CountDownLatch有什么区别呢?

join()的实现

public static void main(String[] args) throws InterruptedException {
    Thread thread1 = new Thread(() -> {
        System.out.println("小明开始出发");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("小明到达人民广场");
​
    }, "thread1");
    Thread thread2 = new Thread(() -> {
        System.out.println("小红开始出发");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("小红到达人民广场");
​
    }, "thread2");
​
    thread1.start();
    thread2.start();
    thread1.join();
    thread2.join();
    System.out.println("小明和小红都到达了人民广场,开始一起出发去爬山");
​
}

结果

小明开始出发
小红开始出发
小明到达人民广场  // 2s后打印
小红到达人民广场  // 3s后打印
小明和小红都到达了人民广场,开始一起出发去爬山 //3s后打印

Process finished with exit code 0

发现使用join()实现和countDownCatch实现好像在代码上的体现并没有太大差异,不急,我们接着往下看

join()实现原理

我们点进去join的jdk源码查看它的实现逻辑

public final void join() throws InterruptedException {
    join(0);
}
​
public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
​
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        // 调用join真正执行的方法
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
​

我们看到他的核心代码就几行

 while (isAlive()) {
     wait(0);
 }

这几行代码不难理解,通过不停的检查join线程是否存活,如果线程状态是活动的,那么就一直等待下去(wait(0)表示永久等待),直到join线程中止后,线程的this.notifyAll()方法会被调用,不过调用notifyAll()方法是在JVM里 实现的,所以在JDK里看不到。

Join()与countDownLatch比较

回到上一个问题,join到底和countDownLatch有什么区别,countDownLatch底层使用了计数器来控制线程的唤醒,提供了更细粒度的线程控制,比如我们运行了100个线程,但是只需要80个线程执行结束就可以继续下去,那么使用join就不合适了。

综上所述 CountDownLatch相对于Join的优势:

  • CountDownLatch可以等待多个线程的完成,而Join只能等待一个线程。
  • CountDownLatch可以灵活地设置计数器的值,不仅仅限于线程数,可以根据需要自由控制。
  • CountDownLatch提供了更细粒度的线程间协作和控制,可以在任意位置进行countDown()和await()的调用,更灵活地控制线程的流程。

CountDownLatch程序实现

上面说了很多CountDownLatch的示例和与join比较,也提了一下CountDownLatch底层的原理,下面就看一下如何实现一个简单的CountDownLatch

程序

我们先新建一个抽象类,包含countDownLatch需要的参数和方法

public abstract class Latch {
    // 控制了多少线程完成后门阀才能打开
    protected int limit;
​
    // 构造函数
    public Latch(int limit){
        this.limit = limit;
    }
​
    // 方法使得线程一直等待
    public abstract void await() throws InterruptedException;
​
    // 当前任务线程完成工作之后调用该方法使得计数器减一
    public abstract void countDown();
​
    // 获取当前还有多少个线程没有完成任务
    public abstract int getUnArrived();
​
}

然后实现这个抽象类,并写入具体逻辑代码

public class CountDownLatch extends Latch {
​
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
​
​
    public CountDownLatch(int limit) {
        super(limit);
    }
​
    @Override
    public void await() throws InterruptedException {
        lock.lock();
        while (limit > 0){
            condition.await();
        }
        lock.unlock();
    }
​
    @Override
    public void countDown() {
​
        lock.lock();
        if(limit < 0){
            throw new IllegalStateException();
        }
        limit--;
        condition.signalAll();
​
        lock.unlock();
​
    }
​
    @Override
    public int getUnArrived() {
        return limit;
    }
}

测试

public class LatchDemo {
    public static void main(String[] args) throws InterruptedException {
        Latch latch = new CountDownLatch(2);
​
        ExecutorService executorService = Executors.newFixedThreadPool(2);
​
        executorService.execute(()->{
            System.out.println("小明开始出发");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();  // 计数器 -1
            System.out.println("小明到达人民广场");
        });
        executorService.execute(()->{
            System.out.println("小红开始出发");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();  // 计数器 -1
            System.out.println("小红到达人民广场");
        });
​
​
        latch.await();
        System.out.println("小明和小红都到达了人民广场,开始一起出发去爬山");
​
        executorService.shutdown();
​
    }
}

结果

小明开始出发
小红开始出发
小明到达人民广场
小红到达人民广场
小明和小红都到达了人民广场,开始一起出发去爬山

Process finished with exit code 0

可以看到结果如前文一致,这就实现了一个简单的CountDownLatch,当然具体实现还有更多的细节,如有需要,请翻阅源码。

总结

通过上面的简单实现,我们可以看到CountDownLatch基于计数器实现了多线程之间的门阀拦截,底层还是通过线程之间的通讯、锁和计数器控制。

CyclicBarrier

除了使用CountDownLatch来实现多线程之间的阻塞同步,也可以使用CyclicBarrier来实现,并且CyclicBarrier提供了比CountDownLatch更强大的功能。

CyclicBarrier的字面意思是可循环使用的屏障。它提供了一种同步机制,使一组线程能够在达到屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开启,所有被阻塞的线程才能继续执行。

网上找的一张示意图

示例

还是用之前的例子,模拟小明和小红去爬山,代码如下,结果就不赘述了。

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
​
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
​
    ExecutorService executorService = Executors.newFixedThreadPool(2);
​
    executorService.execute(()->{
        System.out.println("小明开始出发");
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("小明到达人民广场");
            cyclicBarrier.await(); // 计数器 -1
        } catch (Exception e) {
            e.printStackTrace();
        }
​
    });
    executorService.execute(()->{
        System.out.println("小红开始出发");
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println("小红到达人民广场");
            cyclicBarrier.await(); // 计数器 -1
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    cyclicBarrier.await();
    System.out.println("小明和小红都到达了人民广场,开始一起出发去爬山");
​
    executorService.shutdown();
​
}

不同的是,这里我设置了三个屏障点 cyclicBarrier.await();,而使用CountDownLatch只用了两个计数器减一操作 + 一个wait()方法,使用起来很相似,我们说cyclicBarrierCountDownLatch 功能更强大,那么强大在哪里呢?

重置计数器和获取状态

重置计数器

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
​
    final int threadNum = 3;
    ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
    CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
        System.out.println("所有线程都到达屏障");
    });
​
    for (int i = 0; i < threadNum; i++) {
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " 到达屏障");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    }
​
    Thread.sleep(2000); // 等待一段时间,确保所有线程都到达屏障
​
    cyclicBarrier.reset(); // 重置屏障
​
    System.out.println("屏障已重置");
​
    for (int i = 0; i < threadNum-1; i++) {
        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 到达屏障");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    }
​
    System.out.println("第二次进入 循环屏障");
    cyclicBarrier.await();
    System.out.println("第二次循环  迈过屏障");
​
​
    executorService.shutdown();
​
​
}

结果

pool-1-thread-2 到达屏障
pool-1-thread-3 到达屏障
pool-1-thread-1 到达屏障
所有线程都到达屏障
屏障已重置
第二次进入 循环屏障
pool-1-thread-2 到达屏障
pool-1-thread-1 到达屏障
所有线程都到达屏障
第二次循环  迈过屏障

Process finished with exit code 0

先说一下CyclicBarrier提供的另一个构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,也就是上方代码中用到的这几段

CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
    System.out.println("所有线程都到达屏障");
});

这里用于提示所有的线程到达屏障。紧接着是比较常规的代码,循环构造线程并在线程中执行了 cyclicBarrier.await();到达屏障。重点是 cyclicBarrier.reset();重置屏障后,我留下一个屏障给主线程测试使用,而在新构造的线程中停留1s, System.out.println("第二次循环 迈过屏障");打印在 System.out.println(Thread.currentThread().getName() + " 到达屏障");之后,说明屏障计数器已经重置并且生效了。

获取状态

除了上述的基本功能外,CyclicBarrier也提供了以下API用来查看状态,

  • getNumberWaiting() // 顾名思义,获取目前正在屏障处阻塞等待的线程数量。
  • getParties() // 获取屏障数量 也就是我们传入构造函数中的parties参数
  • isBroken() // 查询阻塞的线程是否被中断

以上就是详解Java并发工具类之CountDownLatch和CyclicBarrier的详细内容,更多关于Java CountDownLatch CyclicBarrier的资料请关注脚本之家其它相关文章!

相关文章

  • java8新特性之stream流中reduce()求和知识总结

    java8新特性之stream流中reduce()求和知识总结

    今天带大家回顾Java8的新特性,文中对stream流中reduce()求和的相关知识作了详细的介绍,对正在学习java的小伙伴们有很好地帮助,需要的朋友可以参考下
    2021-05-05
  • java web支持jsonp的实现代码

    java web支持jsonp的实现代码

    这篇文章主要介绍了java web支持jsonp的实现代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-11-11
  • springboot集成@DS注解实现数据源切换的方法示例

    springboot集成@DS注解实现数据源切换的方法示例

    本文主要介绍了springboot集成@DS注解实现数据源切换的方法示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • 浅谈java指令重排序的问题

    浅谈java指令重排序的问题

    下面小编就为大家带来一篇浅谈java指令重排序的问题。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-09-09
  • 使用jxls自定义命令设置动态行高

    使用jxls自定义命令设置动态行高

    这篇文章主要介绍了使用jxls自定义命令设置动态行高,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • 初识Spark入门

    初识Spark入门

    这篇文章主要介绍了初识Spark入门,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-06-06
  • 深入了解Java线程池:从设计思想到源码解读

    深入了解Java线程池:从设计思想到源码解读

    这篇文章将从设计思想到源码解读,带大家深入了解Java的线程池,文中的示例代码讲解详细,对我们的学习或工作有一定的帮助,需要的可以参考一下
    2021-12-12
  • 老生常谈Java动态编译(必看篇)

    老生常谈Java动态编译(必看篇)

    下面小编就为大家带来一篇老生常谈Java动态编译(必看篇)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • java性能优化之代码缓存优化

    java性能优化之代码缓存优化

    这篇文章主要介绍了java性能优化之代码缓存优化,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-07-07
  • Java实现滑块拼图验证码

    Java实现滑块拼图验证码

    这篇文章主要为大家详细介绍了Java实现滑块拼图验证码,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-07-07

最新评论