Java的CyclicBarrier循环屏障解析

 更新时间:2023年12月16日 09:57:51   作者:niulx111  
这篇文章主要介绍了Java的CyclicBarrier循环屏障解析,CyclicBarrier和CountDownLatch一样,是一个同步工具类,它允许一组线程相互等待直到达到某个common barrier point,在程序中CyclicBarrier是非常有用的,它适用于一组线程必须互相等待的情况,需要的朋友可以参考下

CyclicBarrier循环屏障

CyclicBarrier和CountDownLatch一样,是一个同步工具类,它允许一组线程相互等待直到达到某个common barrier point。

在程序中CyclicBarrier是非常有用的,它适用于一组线程必须互相等待的情况。barrier被称为周期是因为等待的线程在释放后可以重用。

构造函数

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

CyclicBarrier有两种构造方法,默认是指定一个线程数量,并把它赋给等待状态的线程count,还有一个barrierAction任务,它是在满足barrier时,所有的线程准备好之前,才执行这个runnable,这是最后一个线程来执行的。parties是表示必须有几个线程要达到barrier,count是表示当前未达到barrier的线程数量,只有count=0时,线程才能继续执行。

看一下它的示例。

public static void main(String[] args) {
		CyclicBarrier c = new CyclicBarrier(5,new Runnable() {
			@Override
			public void run() {
				System.out.println("开始");
			}
		});
		ExecutorService es = Executors.newCachedThreadPool();
		for (int i = 0; i < 5; i++) {
			es.execute(new Task(i, c));
		}
		es.shutdown();
	}
	static public class Task implements Runnable {
		private int i;
		private CyclicBarrier cyclicBarrier;
		public Task(int i,CyclicBarrier cyclicBarrier) {
			this.i = i;
			this.cyclicBarrier = cyclicBarrier;
		}
		@Override
		public void run() {
			System.out.println(i+"准备完毕");
			try {
				cyclicBarrier.await();
				Thread.sleep(100);
				System.out.println(i+"加入了");
			} catch (InterruptedException e) {	
				e.printStackTrace();
			} catch (BrokenBarrierException e) {
				e.printStackTrace();
			}
		}
	}

执行结果

0准备完毕
4准备完毕
3准备完毕
1准备完毕
2准备完毕
开始
4加入了
1加入了
2加入了
3加入了
0加入了

看一下它的阻塞await方法。

    /**
     * 如果当前线程不是最后一个达到的它就会休眠直到出现下面的任意一种情况:
     *  1. 最后一个线程达到
     *  2. 一些其他的线程中断了当前线程
     *  3. 一些其他的线程中断了正在等待的线程
     *  4. 一些其他线程在等待过程中超时了
     *  5. 一些其他的线程在barrier被重置
     *  当前线程在等待的过程中如果被中断直接会抛出异常并且当前线程的中断状态会被清除
     *  如果在等待的过程中一些线程被中断,其他的等待线程就会抛出异常,barrier就会被设置成broken为true的状态
     *  如果当前线程是最后一个达到的话,如果构造方法中有runnable任务,直接由最后一个线程去执行,其他线程继续等待
     *  在barrier action中,如果出现异常会由当前线程传播并且barrier被设置成broken为true的状态
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // 永远不会进入catch
        }
    }

主要调用了dowait方法,看一下这个方法,方法比较长。

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

首先会创建一个静态内部类Generation,这个类中只有一个属性,broken,表示当前屏障是否被中断,默认为false,未中断.

如果当前broken为true,就会传播这个异常.

如果当前线程被中断,broken为中断状态,调用breakBarrier方法,把broken设为true,重置count.唤醒其他的线程.

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

如果前面的条件都不成立,那就count自减,表示当前线程已经完成了任务,当count=0的时候,表示所有的任务都已经完成,此时在判断是否构造函数中有传入的runnable,如果command不为空,总是最后一个线程去执行,如果在执行这个command的过程中出现异常,再次调用breakBarrier方法重置count.

执行成功,调用nextGeneration方法,唤醒所有在等待队列上的线程,重置count.再次把broken设为false.

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

如果当前count不为0,那就进行自旋,没有超时直接释放锁进入等待队列,如果线程在释放锁进入等待队列的过程中被中断直接重置count,抛出异常.

在自旋过程中,如果发生了broken状态的变化或者超时,直接抛出异常,并做相应的处理,最后释放锁.

总结

CyclicBarrier的count是可以重用的,这是与CountDownLatch最大的区别.

barrier中任务是由最有一个线程执行的,并且会执行构造方法中传入的任务.这个任务总是在到达barrier时执行.

到此这篇关于Java的CyclicBarrier循环屏障解析的文章就介绍到这了,更多相关CyclicBarrier循环屏障内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java8中新的Date和Time详解

    java8中新的Date和Time详解

    这篇文章主要是java8中新的Date和Time,探讨新Date类和Time类背后的设计原则,有所需要的小伙伴希望能帮助到你
    2016-07-07
  • Maven入门之使用Nexus搭建Maven私服及上传下载jar包

    Maven入门之使用Nexus搭建Maven私服及上传下载jar包

    这篇文章主要介绍了Maven入门之使用Nexus搭建Maven私服及上传下载jar包,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-12-12
  • Mybatis中TypeHandler使用小结

    Mybatis中TypeHandler使用小结

    MyBatis的TypeHandler是一个强大的机制,它为我们提供了一种灵活的方式来处理Java类型与数据库类型之间的转换,本文主要介绍了Mybatis中TypeHandler使用小结,具有一定的参考价值,感兴趣的可以了解一下
    2024-02-02
  • 浅入浅出的讲解Spring循环依赖问题

    浅入浅出的讲解Spring循环依赖问题

    循环依赖其实就是循环引用,也就是两个或则两个以上的bean互相持有对方,最终形成闭环,下面这篇文章主要给大家介绍了关于Spring循环依赖问题的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2021-10-10
  • Java中继承、多态、重载和重写介绍

    Java中继承、多态、重载和重写介绍

    这篇文章主要介绍了Java中继承、多态、重载和重写介绍,需要的朋友可以参考下
    2014-07-07
  • mybatis-plus条件构造器的操作代码

    mybatis-plus条件构造器的操作代码

    mybatis-plus提供了AbstractWrapper抽象类,提供了很多sql语法支持的方法,比如模糊查询,比较,区间,分组查询,排序,判断空,子查询等等,方便我们用面向对象的方式去实现sql语句,本文重点给大家介绍mybatis-plus条件构造器的操作代码,感兴趣的朋友一起看看吧
    2022-03-03
  • spring kafka @KafkaListener详解与使用过程

    spring kafka @KafkaListener详解与使用过程

    这篇文章主要介绍了spring-kafka @KafkaListener详解与使用,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-02-02
  • Java中的三种标准注解和四种元注解说明

    Java中的三种标准注解和四种元注解说明

    这篇文章主要介绍了Java中的三种标准注解和四种元注解说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • 基于Feign实现异步调用

    基于Feign实现异步调用

    近期,需要对之前的接口进行优化,缩短接口的响应时间,但是springcloud中的feign是不支持传递异步化的回调结果的,因此有了以下的解决方案,记录一下,需要的朋友可以参考下
    2021-05-05
  • SpringBoot配置文件方式,在线yml文件转properties

    SpringBoot配置文件方式,在线yml文件转properties

    这篇文章主要介绍了SpringBoot配置文件方式,在线yml文件转properties,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-07-07

最新评论