Java中的阻塞队列BlockingQueue使用详解

 更新时间:2023年10月24日 10:38:23   作者:皓亮君  
这篇文章主要介绍了Java中的阻塞队列BlockingQueue使用详解,阻塞队列是一种线程安全的数据结构,用于在多线程环境下进行数据交换,它提供了一种阻塞的机制,当队列为空时,消费者线程将被阻塞,直到队列中有数据可供消费,需要的朋友可以参考下

1.BlockingQueue 简介

BlockingQuene是一个阻塞队列接口,当BlockingQueue操作无法立即响应时,有四种处理方式:

  • 抛出异常;
  • 返回特定的值,根据操作不同,可能是null或者false中的一个;
  • 无限制的阻塞当前线程,直到操作可以成功为止;
  • 根据阻塞超时设置来进行阻塞;
    BlockingQueue的核心和未响应处理方式的对应形式如下:
方式抛出异常返回特定值无限阻塞超时
插入add(e)offer (e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
查询element()peek()

2.ArrayBlockingQueue(有界队列)

ArrayBlockingQueue是基于数组实现的有界BlockingQueue,该队列满足先入先出(FIFO)的特性,当队列满时,存数据的操作会被阻塞;队列空的时候,取数据的操作会被阻塞。

/**
 * @Author Dominick Li
 * @CreateTime 2022/3/6 20:03
 * @Description 消息生产者
 **/
public class Product implements Runnable {
    private BlockingQueue<String> bq;
    /**
     * 多少秒生产一条任务
     */
    private int period;
    private Random r = new Random();
    /**
     * 生产者名称
     */
    private String name;
    Product(BlockingQueue<String> bq, int period, String name) {
        this.bq = bq;
        this.period=period;
        this.name=name;
    }
    @Override
    public void run() {
        try {
            while (true){
                Thread.sleep(period);
                String product=String.valueOf(r.nextInt(100));
                //如果队列满了则阻塞
                bq.put(product);
                System.out.println("生产者["+this.name+"]生产"+product+",当前队列中产品为:"+bq);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
/**
 * @Author Dominick Li
 * @CreateTime 2022/3/6 20:11
 * @Description 消费者
 **/
public class Cusumer implements Runnable {
    private BlockingQueue<String> bq;
    /**
     * 多少秒获取一条任务
     */
    private int period;
    /**
     * 消费者名称
     */
    private String name;
    Cusumer(BlockingQueue<String> bq, int period, String name) {
        this.bq = bq;
        this.period=period;
        this.name=name;
    }
    @Override
    public void run() {
        try {
            while (true){
                Thread.sleep(period);
                String value=bq.take();
                System.out.println("消费者["+this.name+"]消费"+value+",当前队列中产品为:"+bq);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class Test {
    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
        ExecutorService pool = Executors.newCachedThreadPool();
        pool.execute(new Product(blockingQueue, 1000, "生产者"));
        pool.execute(new Cusumer(blockingQueue, 5000, "消费者001"));
        pool.execute(new Cusumer(blockingQueue, 5000, "消费者002"));
        pool.shutdown();
    }
}

运行效果如下

在这里插入图片描述

3.LinkedBlockingQueue(双锁线程安全队列)

与ArrayBlockingQueue相比,LinkedBlockingQueue的重入锁被分成了两份,分别对应存值和取值,这种实现方法被称为双锁队列算法,这样的好处是读写操作的lock操作由两个锁控制,因此可以同时进程读操作和写操作,这也是LinkedBlockingQueue吞吐量超出ArrayBlockingQueue的主要原因,但是使用两个锁比一个锁复杂很多,需要考虑各种死锁的状态。 使用方法和ArrayBlockingQueue一致

public class Test {
    public static void main(String[] args) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(5);
        ExecutorService pool = Executors.newCachedThreadPool();
        pool.execute(new Product(linkedBlockingQueue, 1000, "生产者"));
        pool.execute(new Cusumer(linkedBlockingQueue, 5000, "消费者1"));
        pool.shutdown();
    }
}

4.PriorityBlockingQueue(优先级队列)

优先级阻塞队列ProiorityBlockQueue不是FIFO(先入先出)队列,它要求使用者提供一个Comparetor比较器,或者队列内部元素实现Comparable接口,队头元素会是整个队列里的最小元素.

PriorityBlockQueue是用数组实现的最小堆结构,利用的原理是: 在数组实现的完全二叉树中根节点的下标为子节点的下标除以2,长度是不定的,会随着数据的增长而逐步扩容

public class PriorityProduct implements Comparable<PriorityProduct> {
    /**
     * 任务的优先级
     */
    private int priority;
    private String productName;
    public PriorityProduct(int priority, String productName) {
        this.priority = priority;
        this.productName = productName;
    }
    @Override
    public int compareTo(PriorityProduct o) {
        if (o == null) return -1;
        if (o == this) return 0;
        return o.priority - this.priority;
    }
    @Override
    public String toString(){
        return "{priority="+priority+",name="+this.productName;
    }
}
public class PriorityBlockQueueProduct implements Runnable {
    private PriorityBlockingQueue<PriorityProduct> bq;
    /**
     * 多少秒生产一条任务
     */
    private int period;
    private Random r = new Random();
    public PriorityBlockQueueProduct(PriorityBlockingQueue<PriorityProduct> bq, int period) {
        this.bq = bq;
        this.period = period;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(period);
                if(bq.size()>10){
                    //限制大小
                    continue;
                }
                  //随机生成优先级5以内的
                PriorityProduct priorityProduct = new PriorityProduct(r.nextInt(5), "test");
                //如果队列满了则阻塞
                bq.put(priorityProduct);
                //System.out.println("生产者商品[" +priorityProduct + "],当前队列中产品为:" + bq);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class PriorityBlockQueueCusumer implements Runnable {
    private PriorityBlockingQueue<PriorityProduct> bq;
    /**
     * 多少秒消费一条任务
     */
    private int period;
    public PriorityBlockQueueCusumer(PriorityBlockingQueue<PriorityProduct> bq, int period) {
        this.bq = bq;
        this.period = period;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(period);
                //如果队列满了则阻塞
                PriorityProduct priorityProduct=bq.take();
                System.out.println("消费产品[" +priorityProduct + "],当前队列中产品为:" + bq);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class PriorityTest {
    public static void main(String[] args) {
        PriorityBlockingQueue<PriorityProduct> priorityProducts=new PriorityBlockingQueue<>();
        ExecutorService executorService= Executors.newFixedThreadPool(2);
        executorService.execute(new PriorityBlockQueueProduct(priorityProducts,100));
        executorService.execute(new PriorityBlockQueueCusumer(priorityProducts,1000));
    }
}

运行结果如下,可以查看消费者在消费的时候只会消费任务队列中优先级最高的任务

在这里插入图片描述

到此这篇关于Java中的阻塞队列BlockingQueue使用详解的文章就介绍到这了,更多相关Java阻塞队列BlockingQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 如何在Spring中自定义scope的方法示例

    如何在Spring中自定义scope的方法示例

    这篇文章主要介绍了如何在Spring中自定义scope的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-02-02
  • Java中Synchronized锁的使用和原理详解

    Java中Synchronized锁的使用和原理详解

    这篇文章主要介绍了Java中Synchronized锁的使用和原理详解,synchronized是 Java 内置的关键字,它提供了一种独占的加锁方式,synchronized的获取和释放锁由JVM实现,用户不需要显示的释放锁,非常方便,需要的朋友可以参考下
    2023-07-07
  • 详解Java类动态加载和热替换

    详解Java类动态加载和热替换

    本文主要介绍类加载器、自定义类加载器及类的加载和卸载等内容,并举例介绍了Java类的热替换。
    2021-05-05
  • MyBatis动态sql查询及多参数查询方式

    MyBatis动态sql查询及多参数查询方式

    这篇文章主要介绍了MyBatis动态sql查询及多参数查询方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-10-10
  • Java前端开发框架实现的流程和代码示例

    Java前端开发框架实现的流程和代码示例

    我们可以实现一个Java前端开发框架,这个框架包含了初始化、组件渲染、组件更新、事件监听和事件触发等功能,希望这个指南能够对刚入行的小白有所帮助
    2023-10-10
  • Maven项目如何在pom文件中引入lib下的第三方jar包并打包进去

    Maven项目如何在pom文件中引入lib下的第三方jar包并打包进去

    在使用Maven进行项目开发时,引入第三方私有的Jar包可能会遇到问题,一种常见的解决方案是将Jar包添加到项目的lib目录,并通过IDE进行配置,但这需要每个开发者单独操作,效率低下,更好的方法是通过Maven的pom.xml文件管理这些Jar包
    2024-09-09
  • 四种引用类型在JAVA Springboot中的使用详解

    四种引用类型在JAVA Springboot中的使用详解

    这篇文章主要介绍了springboot的四种引用类型,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-09-09
  • SpringBoot可以同时处理多少请求流程分析

    SpringBoot可以同时处理多少请求流程分析

    SpringBoot默认的内嵌容器是Tomcat,也就是我们的程序实际上是运行在Tomcat里的,所以与其说SpringBoot可以处理多少请求,到不如说Tomcat可以处理多少请求,这篇文章主要介绍了SpringBoot可以同时处理多少请求,需要的朋友可以参考下
    2023-02-02
  • Spring Boot Web应用开发 CORS 跨域请求支持

    Spring Boot Web应用开发 CORS 跨域请求支持

    本篇文章主要介绍了Spring Boot Web应用开发 CORS 跨域请求支持,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • SpringBoot 关于Feign的超时时间配置操作

    SpringBoot 关于Feign的超时时间配置操作

    这篇文章主要介绍了SpringBoot 关于Feign的超时时间配置操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09

最新评论