Java并发工具类Phaser详解

 更新时间:2023年11月30日 08:58:44   作者:啊几  
这篇文章主要介绍了Java并发工具类Phaser详解,Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行,它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行,需要的朋友可以参考下

前言

Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行。

它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行。

Phaser可以被视为CyclicBarrierCountDownLatch的进化版,它能够自适应地调整并发线程数,可以动态地增加或减少参与线程的数量。

所以Phaser特别适合使用在重复执行或者重用的情况。

在这里插入图片描述

常用API

构造方法

  • Phaser(): 参与任务数0
  • Phaser(int parties) :指定初始参与任务数
  • Phaser(Phaser parent) :指定parent阶段器, 子对象作为一个整体加入parent对象, 当子对象中没有参与者时,会自动从parent对象解除注册
  • Phaser(Phaser parent,int parties) : 集合上面两个方法

增减参与任务数方法

  • int register() 增加一个任务数,返回当前阶段号。
  • int bulkRegister(int parties) 增加指定任务个数,返回当前阶段号。
  • int arriveAndDeregister() 减少一个任务数,返回当前阶段号。

到达、等待方法

  • int arrive() 到达(任务完成),返回当前阶段号。
  • int arriveAndAwaitAdvance() 到达后等待其他任务到达,返回到达阶段号。
  • int awaitAdvance(int phase) 在指定阶段等待(必须是当前阶段才有效)
  • int awaitAdvanceInterruptibly(int phase) 阶段到达触发动作
  • int awaitAdvanceInterruptiBly(int phase,long timeout,TimeUnit unit)
  • protected boolean onAdvance(int phase,int registeredParties)类似CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作,该方法返回true将终结Phaser对象。

Phaser使用

多线程批量处理数据

public class PhaserBatchProcessorDemo {

    private final List<String> data;
    private final int batchsize;//一次处理多少数据
    private final int threadCount;//处理的线程数
    private final Phaser phaser;
    private final List<String> processedData;

    public PhaserBatchProcessorDemo(List<String> data,int batchsize,int threadCount){
        this.data = data;
        this.batchsize = batchsize;
        this.threadCount = threadCount;
        this.phaser = new Phaser(1);
        //this.phaser = new Phaser();
        this.processedData = new ArrayList<>();
    }

    public void process() throws InterruptedException {
        for(int i = 0;i<threadCount;i++){
            System.out.println("phaser.register():"+phaser.register());
            new Thread(new BatchProcessor(i)).start();
            Thread.sleep(50);
        }
        phaser.arriveAndDeregister();//主线程执行结束
        System.out.println("结束");
    }

    private class BatchProcessor implements Runnable{
        private final int threadIndex;
        public BatchProcessor(int threadIndex){this.threadIndex = threadIndex;}

        @Override
        public void run() {
            int index = 0;
            while(true){
                //所有线程都到达这个点之前会阻塞
                System.out.println("线程"+threadIndex+"phaser.arriveAndAwaitAdvance1():");
                phaser.arriveAndAwaitAdvance();

                //从未处理数据中找到一个可以处理的批次
                List<String> batch = new ArrayList<>();
                synchronized (data){
                    while (index < data.size()&&batch.size()<batchsize){
                        String d = data.get(index);
                        if(!processedData.contains(d)){
                            batch.add(d);
                            processedData.add(d);
                        }
                        index++;
                    }
                }
                //处理数据
                for(String d:batch){
                    System.out.println("线程"+threadIndex+"处理数据"+d);
                }
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                //所有数据都处理完当前批次之前会阻塞
                System.out.println("线程"+threadIndex+"phaser.arriveAndAwaitAdvance2():");
                phaser.arriveAndAwaitAdvance();
                //所有线程都处理完当前批次并且未处理数据已经处理完之前会阻塞
                if(batch.isEmpty()||index >= data.size()){
                    System.out.println("线程"+threadIndex+"phaser.arriveAndDeregister()"+phaser.arriveAndDeregister());
                    break;
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        List<String> data = new ArrayList<>();
        for(int i = 1;i<=15;i++){
            data.add(String.valueOf(i));
        }

        PhaserBatchProcessorDemo processor = new PhaserBatchProcessorDemo(data,4,3);
        processor.process();
    }
}

/**
 * phaser.register():0
 * 线程0phaser.arriveAndAwaitAdvance1():
 * phaser.register():0
 * 线程1phaser.arriveAndAwaitAdvance1():
 * phaser.register():0
 * 线程2phaser.arriveAndAwaitAdvance1():
 * 结束
 * 线程2处理数据9
 * 线程2处理数据10
 * 线程1处理数据5
 * 线程1处理数据6
 * 线程1处理数据7
 * 线程1处理数据8
 * 线程2处理数据11
 * 线程2处理数据12
 * 线程0处理数据1
 * 线程0处理数据2
 * 线程0处理数据3
 * 线程0处理数据4
 * 线程2phaser.arriveAndAwaitAdvance2():
 * 线程0phaser.arriveAndAwaitAdvance2():
 * 线程1phaser.arriveAndAwaitAdvance2():
 * 线程2phaser.arriveAndAwaitAdvance1():
 * 线程1phaser.arriveAndAwaitAdvance1():
 * 线程0phaser.arriveAndAwaitAdvance1():
 * 线程0处理数据13
 * 线程0处理数据14
 * 线程0处理数据15
 * 线程0phaser.arriveAndAwaitAdvance2():
 * 线程1phaser.arriveAndAwaitAdvance2():
 * 线程2phaser.arriveAndAwaitAdvance2():
 * 线程1phaser.arriveAndDeregister()4
 * 线程2phaser.arriveAndDeregister()4
 * 线程0phaser.arriveAndDeregister()4
 */

这里提出一个问题:为什么要给主线程也注册呢?如果不给主线程注册会怎么样呢?

在这里插入图片描述

这里就要提及register() 增加任务数量和Phaser()初始化定义任务数量的区别

register()有一个需要注意的点是,如果主线程执行速度缓慢的话,很有可能在第一个线程已经arrive的时候,第二个线程任务还没增加,导致第一个线程因为parties只有1,而没有阻塞等待就进入下一阶段了。

如果不给主线程注册添加任务,运行结果如下

phaser.register():0
线程0phaser.arriveAndAwaitAdvance1():
线程0处理数据1
线程0处理数据2
线程0处理数据3
线程0处理数据4
phaser.register():1
线程1phaser.arriveAndAwaitAdvance1():
phaser.register():1
线程2phaser.arriveAndAwaitAdvance1():
结束
线程0phaser.arriveAndAwaitAdvance2():
线程0phaser.arriveAndAwaitAdvance1():
线程2处理数据5
线程2处理数据6
线程2处理数据7
线程2处理数据8
线程1处理数据9
线程1处理数据10
线程1处理数据11
线程1处理数据12
线程2phaser.arriveAndAwaitAdvance2():
线程1phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndAwaitAdvance1():
线程1phaser.arriveAndAwaitAdvance1():
线程0处理数据13
线程0处理数据14
线程0处理数据15
线程0phaser.arriveAndAwaitAdvance2():
线程0phaser.arriveAndDeregister()4
线程1phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndDeregister()5
线程1phaser.arriveAndDeregister()5

而Phaser()初始化就定义了parties,会让所有线程都必须到达之前都阻塞才能进入下一阶段。

给主线程也增加一个任务的目的就在于此 如果主线程也有任务,就算主线程执行缓慢,第一个线程也必须阻塞等待主线程在第一阶段之前,把所有线程都start()启动。

阶段性任务:模拟伙伴出游

public class PhaserDemo {
    public static void main(String[] args) {
        final Phaser phaser = new Phaser(){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                //参与者数量,去除主线程
                int persons = registeredParties - 1;
                switch (phase){
                    case 0:
                        System.out.println("大家都到佘山站了,可以出发去佘山了,人数:"+persons);
                        break;
                    case 1:
                        System.out.println("大家都到佘山了,出发去爬山,人数:"+persons);
                        break;
                    case 2:
                        System.out.println("大家都到山顶了,开始休息,人数:"+persons);
                        break;

                }
                //判断是否只剩下一个主线程,如果是,返回true,代表终止
                return registeredParties ==1;
            }
        };

        phaser.register();
        final PersonTask personTask = new PersonTask();
        //3个全程参加的伙伴
        for(int i = 0;i<3;i++){

            phaser.register();
            new Thread(()->{
                try{
                    personTask.step1Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step2Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step3Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step4Task();
                    phaser.arriveAndDeregister();


                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }).start();
        }
        

        //两个在山腰半路返回
        for(int i = 0;i<2;i++){
            phaser.register();
            new Thread(()->{
                try{
                    personTask.step1Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step2Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step3Task();
                    System.out.println("伙伴【"+Thread.currentThread().getName()+"】中途山腰返回");
                    phaser.arriveAndDeregister();

                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }).start();
        }

        while (!phaser.isTerminated()) {
            int phase = phaser.arriveAndAwaitAdvance();
            if (phase == 2) {
                //两个在佘山直接会合
                for(int i = 0;i<2;i++){
                    phaser.register();
                    new Thread(()->{
                        try{
                            System.out.println("伙伴【"+Thread.currentThread().getName()+"】中途加入");

                            personTask.step3Task();
                            phaser.arriveAndAwaitAdvance();

                            personTask.step4Task();
                            phaser.arriveAndDeregister();

                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }).start();
                }
            }
        }
    }

    static final Random random = new Random();

    static class PersonTask{
        public void step1Task() throws InterruptedException {
            String person = "伙伴【"+Thread.currentThread().getName()+"】";
            System.out.println(person+"从家出发了......");
            Thread.sleep(random.nextInt(5000));
            System.out.println(person+"到达佘山站");
        }

        public void step2Task() throws InterruptedException {
            String person = "伙伴【"+Thread.currentThread().getName()+"】";
            System.out.println(person+"出发去佘山");
            Thread.sleep(random.nextInt(5000));
            System.out.println(person+"到达佘山");
        }

        public void step3Task() throws InterruptedException {
            String person = "伙伴【"+Thread.currentThread().getName()+"】";
            System.out.println(person+"出发去爬山");
            Thread.sleep(random.nextInt(5000));
            System.out.println(person+"到达山顶");
        }

        public void step4Task() throws InterruptedException {
            String person = "伙伴【"+Thread.currentThread().getName()+"】";
            System.out.println(person+"开始休息");
            Thread.sleep(random.nextInt(5000));
            System.out.println(person+"休息结束,下山回家");
        }
    }
}

/**
 * 伙伴【Thread-3】从家出发了......
 * 伙伴【Thread-1】从家出发了......
 * 伙伴【Thread-2】从家出发了......
 * 伙伴【Thread-4】从家出发了......
 * 伙伴【Thread-0】从家出发了......
 * 伙伴【Thread-1】到达佘山站
 * 伙伴【Thread-4】到达佘山站
 * 伙伴【Thread-3】到达佘山站
 * 伙伴【Thread-0】到达佘山站
 * 伙伴【Thread-2】到达佘山站
 * 大家都到佘山站了,可以出发去佘山了,人数:5
 * 伙伴【Thread-3】出发去佘山
 * 伙伴【Thread-2】出发去佘山
 * 伙伴【Thread-4】出发去佘山
 * 伙伴【Thread-1】出发去佘山
 * 伙伴【Thread-0】出发去佘山
 * 伙伴【Thread-1】到达佘山
 * 伙伴【Thread-4】到达佘山
 * 伙伴【Thread-3】到达佘山
 * 伙伴【Thread-0】到达佘山
 * 伙伴【Thread-2】到达佘山
 * 大家都到佘山了,出发去爬山,人数:5
 * 伙伴【Thread-1】出发去爬山
 * 伙伴【Thread-4】出发去爬山
 * 伙伴【Thread-0】出发去爬山
 * 伙伴【Thread-2】出发去爬山
 * 伙伴【Thread-3】出发去爬山
 * 伙伴【Thread-6】中途加入
 * 伙伴【Thread-6】出发去爬山
 * 伙伴【Thread-5】中途加入
 * 伙伴【Thread-5】出发去爬山
 * 伙伴【Thread-2】到达山顶
 * 伙伴【Thread-5】到达山顶
 * 伙伴【Thread-0】到达山顶
 * 伙伴【Thread-4】到达山顶
 * 伙伴【Thread-4】中途山腰返回
 * 伙伴【Thread-6】到达山顶
 * 伙伴【Thread-3】到达山顶
 * 伙伴【Thread-3】中途山腰返回
 * 伙伴【Thread-1】到达山顶
 * 大家都到山顶了,开始休息,人数:5
 * 伙伴【Thread-5】开始休息
 * 伙伴【Thread-2】开始休息
 * 伙伴【Thread-6】开始休息
 * 伙伴【Thread-0】开始休息
 * 伙伴【Thread-1】开始休息
 * 伙伴【Thread-2】休息结束,下山回家
 * 伙伴【Thread-0】休息结束,下山回家
 * 伙伴【Thread-1】休息结束,下山回家
 * 伙伴【Thread-5】休息结束,下山回家
 * 伙伴【Thread-6】休息结束,下山回家
 */

应用场景总结

以下是一些常见的 Phaser 应用场景:

  1. 多线程任务分配:Phaser 可以用于将复杂的任务分配给多个线程执行,并协调线程间的合作。
  2. 多级任务流程:Phaser 可以用于实现多级任务流程,在每一级任务完成后触发下一级任务的开始。
  3. 模拟并行计算:Phaser 可以用于模拟并行计算,协调多个线程间的工作。
  4. 阶段性任务:Phaser 可以用于实现阶段性任务,在每一阶段任务完成后触发下一阶段任务的开始。

到此这篇关于Java并发工具类Phaser详解的文章就介绍到这了,更多相关Java的Phaser内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot访问不到controller的解决方案

    SpringBoot访问不到controller的解决方案

    这篇文章主要介绍了SpringBoot访问不到controller的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • Spring Boot 项目启动自动执行方法的两种实现方式

    Spring Boot 项目启动自动执行方法的两种实现方式

    这篇文章主要介绍了Spring Boot 项目启动自动执行方法的两种实现方式,帮助大家更好的理解和学习使用Spring Boot框架,感兴趣的朋友可以了解下
    2021-05-05
  • 彻底解决Spring MVC中文乱码问题的方案

    彻底解决Spring MVC中文乱码问题的方案

    这篇文章主要介绍了彻底解决Spring MVC中文乱码问题的方案,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2016-11-11
  • 掷6面骰子6000次每个点数出现的概率

    掷6面骰子6000次每个点数出现的概率

    今天小编就为大家分享一篇关于掷6面骰子6000次每个点数出现的概率,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-02-02
  • 解析Spring中@Controller@Service等线程安全问题

    解析Spring中@Controller@Service等线程安全问题

    这篇文章主要为大家介绍解析了Spring中@Controller@Service等线程的安全问题,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-03-03
  • SpringMvc直接接收json数据自动转化为Map的实例

    SpringMvc直接接收json数据自动转化为Map的实例

    今天小编就为大家分享一篇SpringMvc直接接收json数据自动转化为Map的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-08-08
  • Javaweb接收表单数据并处理中文乱码

    Javaweb接收表单数据并处理中文乱码

    这篇文章主要介绍了Javaweb接收表单数据并处理中文乱码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • Java操作Redis详细介绍

    Java操作Redis详细介绍

    这篇文章主要介绍了Java操作Redis详细介绍,涉及对key的操作,string数据类型,list数据类型等相关内容,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • Java编程学习的几个典型实例详解

    Java编程学习的几个典型实例详解

    这篇文章主要给大家介绍了Java编程学习的几个典型实例,其中包括模拟酒店房间管理系统、螺旋矩阵 例或者百鸡问题的变形等经典实例,具体来一起看详细内容吧,需要的朋友可以参考学习。
    2017-02-02
  • Java 使用keytool创建CA证书的操作

    Java 使用keytool创建CA证书的操作

    这篇文章主要介绍了Java 使用keytool创建CA证书的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01

最新评论