Java中的BlockingQueue阻塞队列原理以及实现详解

 更新时间:2023年12月22日 08:36:25   作者:huisheng_qaq  
这篇文章主要介绍了Java中的BlockingQueue阻塞队列原理以及实现详解,在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,需要的朋友可以参考下

一,BlockingQueue

在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,因此可以好好的深入了解一下这个BlockingQueue阻塞队列。

用一句话描述这个阻塞队列就是:它是线程的一个通信工具,在任意时刻,不管并发有多高,在单jvm进程上,同一时间永远只有一个线程能够对队列进行入队和出队的操作,它的特性是在任意时刻只有一个线程可以进行take或者put操作。因此这个队列是一个线程安全的队列。

比较适用于生产者和消费者的场景,因此适用的应用场景如下 线程池,springCloud-Eureka的三级缓存,Nacos,Netty,RakectMq等

所有的阻塞队列都都实现了对这个BlockingQueue接口

public interface BlockingQueue<E> extends Queue<E>

1,主要常用的队列有如下

ArrayBlockingQueue: 由数组支持的有界队列

LinkedBlockingQueue: 由链接节点支持的可选有界队列

PriorityBlockingQueue: 由优先级堆支持的无界优先级队列

DelayQueue: 由优先级堆支持的、基于时间的调度队列

2,基本工作原理实现如下

1,以一个有界队列为例,首先消费者这边获取到锁,然后会生产商品,然后会往队列中填满数据,队列填满之后,生产者端会进行阻塞,同时会释放这把锁,并且会通知这个消费者赶紧去消费。当然内部也做了很多事情,不一定就是说一定要阻塞队列满了之后才会去唤醒生产者去消费,而是消费者那边也会有一个监听事件,只有队列不为空,就会有这个消费者来消费。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PwPGs9DF-1657455705341)(C:\Users\HULOUBO\AppData\Roaming\Typora\typora-user-images\1657445347685.png)]

2,消费者在接收到生产者的通知之后呢,就会先去获取到这把锁,然后对里面的产品进行消费,当队列里面的产品都被消费完成之后,消费者这边又会释放这把锁,然后将自身阻塞,并同时去唤醒这个生产者继续生产产品。

在这里插入图片描述

3,生产者又获取到锁,然后重复执行第一步。

3,基本api使用如下

在这里插入图片描述

在这里插入图片描述

二,源码剖析

在了解过一定的工作原理之后,接下来可以对源码分析一波。

2.1,ArrayBlockingQueue

这里主要通过这个ArrayBlockingQueue为例,来描述一下这个阻塞队列的工作流程

BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);

这个构造方法里面有如下参数

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); //非公平锁
    notEmpty = lock.newCondition(); //条件对象,用于唤醒指定线程
    notFull =  lock.newCondition(); //条件对象
}

生产者会向队列中put产品,生产者后会持有锁,此时会向队列中存放产品,如果队列满了,则会阻塞自己,并且在最后会释放锁。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock; //生产者加锁
    lock.lockInterruptibly();
    try {
        while (count == items.length) //如果队列满了,则会阻塞
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock(); //释放锁
    }
}

既然涉及到ReentrantLock,那么就用从之前的AQS里面讲起了,这里面这要是一个CLH同步等待队列,由一个双向链表和一个同步阻塞器组成,同步阻塞器会有一个state和一个exclusiveOwnerThread状态组成,state=0表示当前没有对象获取到锁,可以来竞争锁。每个结点由一个前驱指针和一个后继指针,并且里面有一个waitStatus等待状态,该状态主要表示下一个结点的存活状态。

在这里插入图片描述

这里的话不会像之前一样使用这个CLH同步等待队列,而是加入了一种新的Condition条件等待队列,如下图。由firstWaiter和nextWaiter组成的单向链表队列,里面的waitStatus为CONDITION:-2 。也就是说如果当前生产者结点后面的结点又是一个生产者节点,因为期间可能存在多个生产者的线程,而为了唤醒接下来的消费者,就会创建一个条件等待队列,去存储后面的生产者结点。 就是说在CLH同步等待队列中,当前结点为生产者的话,在阻塞队列满了之后,如果CLH中的下一个节点还是生产者,则会将waitStatus的状态设置成-2,并将下一个节点移动到这个条件等待队列里面并进行排队,如果下一个结点还是,又会将下一个结点移动到这个条件等待队列里面并进行排队。知道下一个结点是消费者为止。

在这里插入图片描述

await()释放锁的流程如下

public final void await() throws InterruptedException {
    //线程是否被中断,如果被中断,直接抛异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //条件等待队列,会构建一个新的队列
    Node node = addConditionWaiter();
    //释放锁,并对对应的结点进行唤醒操作
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断当前结点是在条件队列里面还是在同步队列里面
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

构建条件等待队列如下

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

出队,消费者在获取产品时,产品就会出队,与此同时,在队列出队成功之后,队列中就会有一个空位,会调用notFull.signal()方法,通知生产者可以去生产产品了。并将这个条件等待队列放回这个CLH队列里面,只有在CLH队列里面才会获取锁。最后在CLH中才能进行unPark释放锁的操作。

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //队列中有空位,通知生产者生产产品
    notFull.signal();
    return x;
}

消费者获取产品

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

三,总结

BlockingQueue也是基于这个AQS的方式实现的,主要是利用这个生产者和消费者这个模型来实现。

通过这个AQS中的CLH同步队列来对节点的锁的阻塞和释放,期间利用了这个条件等待队列来实现,如果存在多个生产者的线程的情况下,就会将这些线程加入到一个条件等待的队列里面。

并将这个节点的状态改为-2,condition状态。

在全部进入条件等待队列之后,这个锁还在并没有释放,因此最后又需要将这个条件等待队列里面的结点加回到CLH同步队列中,再进行排队的释放这个锁。结点出队的时候,然后生产者会通过一个singal监听这个消费者,每当这个阻塞队列里面出队,有一个位置的的时候,生产者就会生产这个产品。

消费者也会监听这个队列,队列中只要不为空,就回去消费队列中的产品。

获取锁的条件 只有在CLH队列里等待的Node结点并且前驱结点的 waitStatus 为sinal = -1的可被唤醒的结点。

条件队列里面的这些节点是不能获取到锁的。

到此这篇关于Java中的BlockingQueue阻塞队列原理以及实现详解的文章就介绍到这了,更多相关BlockingQueue阻塞队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 多模块项目使用枚举配置spring-cache缓存方案详解

    多模块项目使用枚举配置spring-cache缓存方案详解

    这篇文章主要为大家介绍了多模块项目使用枚举配置spring-cache缓存的方案详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-05-05
  • Java 程序设计总复习题(java基础代码)

    Java 程序设计总复习题(java基础代码)

    这篇文章主要介绍了Java 程序设计总复习题,主要是java基础代码,方便学习java的同学
    2021-05-05
  • Java Runtime类详解_动力节点Java学院整理

    Java Runtime类详解_动力节点Java学院整理

    Runtime类封装了运行时的环境。每个 Java 应用程序都有一个 Runtime 类实例,使应用程序能够与其运行的环境相连接。下面通过本文给大家分享Java Runtime类详解,需要的朋友参考下吧
    2017-04-04
  • idea常用习惯操作设置方法图解

    idea常用习惯操作设置方法图解

    这篇文章主要介绍了idea常用习惯操作设置方法,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-03-03
  • java对象转换String类型的三种方法

    java对象转换String类型的三种方法

    在很多情况下我们都需要将一个对象转换为String类型。一般来说有三种方法可以实现:Object.toString()、(String)Object、String.valueOf(Object)。下面对这三种方法一一分析
    2013-11-11
  • java报错非法的前向引用问题

    java报错非法的前向引用问题

    这篇文章主要介绍了java报错非法的前向引用问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-05-05
  • java防反编译最简单的技巧分享

    java防反编译最简单的技巧分享

    这篇文章主要给大家分享了关于java防反编译最简单的技巧,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考借鉴,下面随着小编来一起学习学习吧。
    2017-09-09
  • 深入讲解SPI 在 Spring 中的应用

    深入讲解SPI 在 Spring 中的应用

    这篇文章主要介绍了深入讲解SPI在Spring中的应用,SPI是Java内置的一种服务提供发现机制,可以用来提高框架的扩展性,主要用于框架的开发中
    2022-06-06
  • Springboot公共字段填充及ThreadLocal模块改进方案

    Springboot公共字段填充及ThreadLocal模块改进方案

    这篇文章主要为大家介绍了Springboot公共字段填充及ThreadLocal模块改进方案详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • Netty分布式Server启动流程服务端初始化源码分析

    Netty分布式Server启动流程服务端初始化源码分析

    本章主要讲解server启动的关键步骤, 读者只需要了解server启动的大概逻辑, 知道关键的步骤在哪个类执行即可, 并不需要了解每一步的运作机制, 之后会对每个模块进行深度分析
    2022-03-03

最新评论