Java中的BlockingQueue阻塞队列原理以及实现详解
一,BlockingQueue
在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,因此可以好好的深入了解一下这个BlockingQueue阻塞队列。
用一句话描述这个阻塞队列就是:它是线程的一个通信工具,在任意时刻,不管并发有多高,在单jvm进程上,同一时间永远只有一个线程能够对队列进行入队和出队的操作,它的特性是在任意时刻只有一个线程可以进行take或者put操作。因此这个队列是一个线程安全的队列。
比较适用于生产者和消费者的场景,因此适用的应用场景如下 线程池,springCloud-Eureka的三级缓存,Nacos,Netty,RakectMq等
所有的阻塞队列都都实现了对这个BlockingQueue接口
public interface BlockingQueue<E> extends Queue<E>
1,主要常用的队列有如下
ArrayBlockingQueue: 由数组支持的有界队列
LinkedBlockingQueue: 由链接节点支持的可选有界队列
PriorityBlockingQueue: 由优先级堆支持的无界优先级队列
DelayQueue: 由优先级堆支持的、基于时间的调度队列
2,基本工作原理实现如下
1,以一个有界队列为例,首先消费者这边获取到锁,然后会生产商品,然后会往队列中填满数据,队列填满之后,生产者端会进行阻塞,同时会释放这把锁,并且会通知这个消费者赶紧去消费。当然内部也做了很多事情,不一定就是说一定要阻塞队列满了之后才会去唤醒生产者去消费,而是消费者那边也会有一个监听事件,只有队列不为空,就会有这个消费者来消费。
2,消费者在接收到生产者的通知之后呢,就会先去获取到这把锁,然后对里面的产品进行消费,当队列里面的产品都被消费完成之后,消费者这边又会释放这把锁,然后将自身阻塞,并同时去唤醒这个生产者继续生产产品。
3,生产者又获取到锁,然后重复执行第一步。
3,基本api使用如下
二,源码剖析
在了解过一定的工作原理之后,接下来可以对源码分析一波。
2.1,ArrayBlockingQueue
这里主要通过这个ArrayBlockingQueue为例,来描述一下这个阻塞队列的工作流程
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);
这个构造方法里面有如下参数
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //非公平锁 notEmpty = lock.newCondition(); //条件对象,用于唤醒指定线程 notFull = lock.newCondition(); //条件对象 }
生产者会向队列中put产品,生产者后会持有锁,此时会向队列中存放产品,如果队列满了,则会阻塞自己,并且在最后会释放锁。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //生产者加锁 lock.lockInterruptibly(); try { while (count == items.length) //如果队列满了,则会阻塞 notFull.await(); enqueue(e); } finally { lock.unlock(); //释放锁 } }
既然涉及到ReentrantLock,那么就用从之前的AQS里面讲起了,这里面这要是一个CLH同步等待队列,由一个双向链表和一个同步阻塞器组成,同步阻塞器会有一个state和一个exclusiveOwnerThread状态组成,state=0表示当前没有对象获取到锁,可以来竞争锁。每个结点由一个前驱指针和一个后继指针,并且里面有一个waitStatus等待状态,该状态主要表示下一个结点的存活状态。
这里的话不会像之前一样使用这个CLH同步等待队列,而是加入了一种新的Condition条件等待队列,如下图。由firstWaiter和nextWaiter组成的单向链表队列,里面的waitStatus为CONDITION:-2 。也就是说如果当前生产者结点后面的结点又是一个生产者节点,因为期间可能存在多个生产者的线程,而为了唤醒接下来的消费者,就会创建一个条件等待队列,去存储后面的生产者结点。 就是说在CLH同步等待队列中,当前结点为生产者的话,在阻塞队列满了之后,如果CLH中的下一个节点还是生产者,则会将waitStatus的状态设置成-2,并将下一个节点移动到这个条件等待队列里面并进行排队,如果下一个结点还是,又会将下一个结点移动到这个条件等待队列里面并进行排队。知道下一个结点是消费者为止。
await()释放锁的流程如下
public final void await() throws InterruptedException { //线程是否被中断,如果被中断,直接抛异常 if (Thread.interrupted()) throw new InterruptedException(); //条件等待队列,会构建一个新的队列 Node node = addConditionWaiter(); //释放锁,并对对应的结点进行唤醒操作 int savedState = fullyRelease(node); int interruptMode = 0; //判断当前结点是在条件队列里面还是在同步队列里面 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
构建条件等待队列如下
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
出队,消费者在获取产品时,产品就会出队,与此同时,在队列出队成功之后,队列中就会有一个空位,会调用notFull.signal()方法,通知生产者可以去生产产品了。并将这个条件等待队列放回这个CLH队列里面,只有在CLH队列里面才会获取锁。最后在CLH中才能进行unPark释放锁的操作。
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //队列中有空位,通知生产者生产产品 notFull.signal(); return x; }
消费者获取产品
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
三,总结
BlockingQueue也是基于这个AQS的方式实现的,主要是利用这个生产者和消费者这个模型来实现。
通过这个AQS中的CLH同步队列来对节点的锁的阻塞和释放,期间利用了这个条件等待队列来实现,如果存在多个生产者的线程的情况下,就会将这些线程加入到一个条件等待的队列里面。
并将这个节点的状态改为-2,condition状态。
在全部进入条件等待队列之后,这个锁还在并没有释放,因此最后又需要将这个条件等待队列里面的结点加回到CLH同步队列中,再进行排队的释放这个锁。结点出队的时候,然后生产者会通过一个singal监听这个消费者,每当这个阻塞队列里面出队,有一个位置的的时候,生产者就会生产这个产品。
消费者也会监听这个队列,队列中只要不为空,就回去消费队列中的产品。
获取锁的条件 只有在CLH队列里等待的Node结点并且前驱结点的 waitStatus 为sinal = -1的可被唤醒的结点。
条件队列里面的这些节点是不能获取到锁的。
到此这篇关于Java中的BlockingQueue阻塞队列原理以及实现详解的文章就介绍到这了,更多相关BlockingQueue阻塞队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Springboot公共字段填充及ThreadLocal模块改进方案
这篇文章主要为大家介绍了Springboot公共字段填充及ThreadLocal模块改进方案详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2022-11-11
最新评论