Java中的阻塞队列BlockingQueue使用详解
1.BlockingQueue 简介
BlockingQuene是一个阻塞队列接口,当BlockingQueue操作无法立即响应时,有四种处理方式:
- 抛出异常;
- 返回特定的值,根据操作不同,可能是null或者false中的一个;
- 无限制的阻塞当前线程,直到操作可以成功为止;
- 根据阻塞超时设置来进行阻塞;
BlockingQueue的核心和未响应处理方式的对应形式如下:
方式 | 抛出异常 | 返回特定值 | 无限阻塞 | 超时 |
插入 | add(e) | offer (e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
查询 | element() | peek() |
2.ArrayBlockingQueue(有界队列)
ArrayBlockingQueue是基于数组实现的有界BlockingQueue,该队列满足先入先出(FIFO)的特性,当队列满时,存数据的操作会被阻塞;队列空的时候,取数据的操作会被阻塞。
/** * @Author Dominick Li * @CreateTime 2022/3/6 20:03 * @Description 消息生产者 **/ public class Product implements Runnable { private BlockingQueue<String> bq; /** * 多少秒生产一条任务 */ private int period; private Random r = new Random(); /** * 生产者名称 */ private String name; Product(BlockingQueue<String> bq, int period, String name) { this.bq = bq; this.period=period; this.name=name; } @Override public void run() { try { while (true){ Thread.sleep(period); String product=String.valueOf(r.nextInt(100)); //如果队列满了则阻塞 bq.put(product); System.out.println("生产者["+this.name+"]生产"+product+",当前队列中产品为:"+bq); } }catch (Exception e){ e.printStackTrace(); } } } /** * @Author Dominick Li * @CreateTime 2022/3/6 20:11 * @Description 消费者 **/ public class Cusumer implements Runnable { private BlockingQueue<String> bq; /** * 多少秒获取一条任务 */ private int period; /** * 消费者名称 */ private String name; Cusumer(BlockingQueue<String> bq, int period, String name) { this.bq = bq; this.period=period; this.name=name; } @Override public void run() { try { while (true){ Thread.sleep(period); String value=bq.take(); System.out.println("消费者["+this.name+"]消费"+value+",当前队列中产品为:"+bq); } }catch (Exception e){ e.printStackTrace(); } } } public class Test { public static void main(String[] args) { BlockingQueue blockingQueue = new ArrayBlockingQueue(5); ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(new Product(blockingQueue, 1000, "生产者")); pool.execute(new Cusumer(blockingQueue, 5000, "消费者001")); pool.execute(new Cusumer(blockingQueue, 5000, "消费者002")); pool.shutdown(); } }
运行效果如下
3.LinkedBlockingQueue(双锁线程安全队列)
与ArrayBlockingQueue相比,LinkedBlockingQueue的重入锁被分成了两份,分别对应存值和取值,这种实现方法被称为双锁队列算法,这样的好处是读写操作的lock操作由两个锁控制,因此可以同时进程读操作和写操作,这也是LinkedBlockingQueue吞吐量超出ArrayBlockingQueue的主要原因,但是使用两个锁比一个锁复杂很多,需要考虑各种死锁的状态。 使用方法和ArrayBlockingQueue一致
public class Test { public static void main(String[] args) { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(5); ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(new Product(linkedBlockingQueue, 1000, "生产者")); pool.execute(new Cusumer(linkedBlockingQueue, 5000, "消费者1")); pool.shutdown(); } }
4.PriorityBlockingQueue(优先级队列)
优先级阻塞队列ProiorityBlockQueue不是FIFO(先入先出)队列,它要求使用者提供一个Comparetor比较器,或者队列内部元素实现Comparable接口,队头元素会是整个队列里的最小元素.
PriorityBlockQueue是用数组实现的最小堆结构,利用的原理是: 在数组实现的完全二叉树中根节点的下标为子节点的下标除以2,长度是不定的,会随着数据的增长而逐步扩容
public class PriorityProduct implements Comparable<PriorityProduct> { /** * 任务的优先级 */ private int priority; private String productName; public PriorityProduct(int priority, String productName) { this.priority = priority; this.productName = productName; } @Override public int compareTo(PriorityProduct o) { if (o == null) return -1; if (o == this) return 0; return o.priority - this.priority; } @Override public String toString(){ return "{priority="+priority+",name="+this.productName; } } public class PriorityBlockQueueProduct implements Runnable { private PriorityBlockingQueue<PriorityProduct> bq; /** * 多少秒生产一条任务 */ private int period; private Random r = new Random(); public PriorityBlockQueueProduct(PriorityBlockingQueue<PriorityProduct> bq, int period) { this.bq = bq; this.period = period; } @Override public void run() { try { while (true) { Thread.sleep(period); if(bq.size()>10){ //限制大小 continue; } //随机生成优先级5以内的 PriorityProduct priorityProduct = new PriorityProduct(r.nextInt(5), "test"); //如果队列满了则阻塞 bq.put(priorityProduct); //System.out.println("生产者商品[" +priorityProduct + "],当前队列中产品为:" + bq); } } catch (Exception e) { e.printStackTrace(); } } } public class PriorityBlockQueueCusumer implements Runnable { private PriorityBlockingQueue<PriorityProduct> bq; /** * 多少秒消费一条任务 */ private int period; public PriorityBlockQueueCusumer(PriorityBlockingQueue<PriorityProduct> bq, int period) { this.bq = bq; this.period = period; } @Override public void run() { try { while (true) { Thread.sleep(period); //如果队列满了则阻塞 PriorityProduct priorityProduct=bq.take(); System.out.println("消费产品[" +priorityProduct + "],当前队列中产品为:" + bq); } } catch (Exception e) { e.printStackTrace(); } } } public class PriorityTest { public static void main(String[] args) { PriorityBlockingQueue<PriorityProduct> priorityProducts=new PriorityBlockingQueue<>(); ExecutorService executorService= Executors.newFixedThreadPool(2); executorService.execute(new PriorityBlockQueueProduct(priorityProducts,100)); executorService.execute(new PriorityBlockQueueCusumer(priorityProducts,1000)); } }
运行结果如下,可以查看消费者在消费的时候只会消费任务队列中优先级最高的任务
到此这篇关于Java中的阻塞队列BlockingQueue使用详解的文章就介绍到这了,更多相关Java阻塞队列BlockingQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Maven项目如何在pom文件中引入lib下的第三方jar包并打包进去
在使用Maven进行项目开发时,引入第三方私有的Jar包可能会遇到问题,一种常见的解决方案是将Jar包添加到项目的lib目录,并通过IDE进行配置,但这需要每个开发者单独操作,效率低下,更好的方法是通过Maven的pom.xml文件管理这些Jar包2024-09-09Spring Boot Web应用开发 CORS 跨域请求支持
本篇文章主要介绍了Spring Boot Web应用开发 CORS 跨域请求支持,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧2017-05-05
最新评论