Java线程池队列PriorityBlockingQueue原理分析

 更新时间:2023年12月18日 08:32:50   作者:Brycen Liu  
这篇文章主要介绍了Java线程池队列PriorityBlockingQueue原理分析,PriorityBlockingQueue队列是 JDK1.5 的时候出来的一个阻塞队列,但是该队列入队的时候是不会阻塞的,永远会加到队尾,需要的朋友可以参考下

一、什么是PriorityBlockingQueue?

PriorityBlockingQueue队列是 JDK1.5 的时候出来的一个阻塞队列。但是该队列入队的时候是不会阻塞的,永远会加到队尾。下面我们介绍下它的几个特点:

  • PriorityBlockingQueue 和 ArrayBlockingQueue 一样是基于数组实现的,但后者在初始化时需要指定长度,前者默认长度是 11
  • 该队列可以说是真正的无界队列,它在队列满的时候会进行扩容,而前面说的无界阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647)
  • 该队列属于权重队列,可以理解为它可以进行排序,但是排序不是从小到大排或从大到小排,是基于数组的堆结构(具体如何排下面会进行分析)
  • 出队方式和前面的也不同,是根据权重来进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。
  • 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator

注意:

  1. 堆结构实际上是一种完全二叉树,建议学习前了解一下二叉树。
  2. 堆又分为大顶堆小顶堆。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的。

二、PriorityBlockingQueue类图:

在这里插入图片描述

三、源码解析

1、字段讲解

从下面的字段我们可以知道,该队列可以排序,使用显示锁来保证操作的原子性,在空队列时,出队线程会堵塞等。

/**
	* 默认数组长度
	*/
	private static final int DEFAULT_INITIAL_CAPACITY = 11;
    /**
	 * 最大达容量,分配时超出可能会出现 OutOfMemoryError 异常
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    /**
	 * 队列,存储我们的元素
     */
    private transient Object[] queue;
    /**
     * 队列长度
     */
    private transient int size;
    /**
     * 比较器,入队进行权重的比较
     */
    private transient Comparator<? super E> comparator;
    /**
     * 显示锁
     */
    private final ReentrantLock lock;
    /**
     * 空队列时进行线程阻塞的 Condition 对象
     */
    private final Condition notEmpty;

2、构造方法

/**
	* 默认构造,使用长度为 11 的数组,比较器为空
	*/
 	public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
	/**
	* 自定义数据长度构造,比较器为空
	*/
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
	/**
	* 自定义数组长度,可以自定义比较器
	*/
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

3、入队方法

3.1、put方法

入队方法,下面可以看到 put 方法最终会调用 offer 方法,所以我们只看 offer 方法即可。

    public void put(E e) {
        offer(e); // never need to block
    }
 	public boolean offer(E e) {
 		//判断是否为空
        if (e == null)
            throw new NullPointerException();
       	//显示锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        //定义临时对象
        int n, cap;
        Object[] array;
        //判断数组是否满了
        while ((n = size) >= (cap = (array = queue).length))
        	//数组扩容
            tryGrow(array, cap);
        try {
        	//拿到比较器
            Comparator<? super E> cmp = comparator;
            //判断是否有自定义比较器
            if (cmp == null)
            	//堆上浮
                siftUpComparable(n, e, array);
            else
            	//使用自定义比较器进行堆上浮
                siftUpUsingComparator(n, e, array, cmp);
            //队列长度 +1
            size = n + 1;
            //唤醒休眠的出队线程
            notEmpty.signal();
        } finally {
        	//释放锁
            lock.unlock();
        }
        return true;
    }

3.2、上浮调整比较器方法的实现

private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
        	//无符号向左移,目的是找到放入位置的父节点
            int parent = (k - 1) >>> 1;
            //拿到父节点的值
            Object e = array[parent];
            //比较是否大于该元素,不大于就没比较交换
            if (key.compareTo((T) e) >= 0)
                break;
            //以下都是元素位置交换
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

根据上面的代码,可以看出这是完全二叉树在进行上浮调整。调整入队的元素,找出最小的,将元素排列有序化。简单理解就是:父节点元素值一定要比它的子节点得小,如果父节点大于子节点了,那就两者位置进行交换。

3.3、入队图解

说的可能很模糊,我们先写个 demo,根据 demo 来进行图解分析:

/**
 * @Auther: Gentle
 * @Date: 2019/4/14 15:11
 * @Description: PriorityBlockingQueue 简单演示 demo 
 */
public class TestPriorityBlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>();
        concurrentLinkedQueue.offer(10);
        concurrentLinkedQueue.offer(20);
        concurrentLinkedQueue.offer(5);
        concurrentLinkedQueue.offer(1);
        concurrentLinkedQueue.offer(25);
        concurrentLinkedQueue.offer(30);
        //输出元素排列
        concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  "));
        //取出元素
		Integer take = concurrentLinkedQueue.take();
        System.out.println();
        concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  "));
    }
}

上面可以看出,我们要入队的元素是 [10,20,5,1,21,30],接下来我们用图来演示一步步入队情况。

队列初始化时:

在这里插入图片描述

这时,我们开始将元素 元素 10 入队,并用二叉树辅助理解:

在这里插入图片描述

我们在将元素 20 入队:

在这里插入图片描述

将元素 5 入队后发现父节点大于子节点,这时需要进行上浮调整

在这里插入图片描述

开始进行上浮调整,将元素 10 和元素 5进行位置调换,结果如下:

在这里插入图片描述

接着将元素 1 入队后发现父节点大于子节点,继续进行调整:

在这里插入图片描述

第一次调整将元素 20 和元素 1 进行位置交换,交换完毕后结果如下:

在这里插入图片描述

交换完毕后,我们发现父节点的元素值还是大于子节点,说明还需要进行一次交换,最后交换结果如下:

在这里插入图片描述

接下来将元素 25 和 30 入队,结果如下:

在这里插入图片描述

注: 最小堆的的顶端一定是元素值最小的那个。

4、出队方法

4.1、take方法

出队方法,该方法会阻塞

public E take() throws InterruptedException {
	//显示锁
    final ReentrantLock lock = this.lock;
    //可中断锁
    lock.lockInterruptibly();
    //结果接受对象
    E result;
    try {
    	//判读队列是否为空
        while ( (result = dequeue()) == null)
        	//线程阻塞
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

4.2、dequeue方法

具体出队方法的实现

 private E dequeue() {
	//长度减少 1
    int n = size - 1;
    //判断队列中是否又元素
    if (n < 0)
        return null;
    else {
    	//队列对象
        Object[] array = queue;
        //取出第一个元素
        E result = (E) array[0];
        //拿出最后一个元素
        E x = (E) array[n];
        //置空
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
        	//下沉调整
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
       	//成功则减少队列中的元素数量
        size = n;
        return result;
    }
}

总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。

4.3、下沉调整比较器方法的实现

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
    //判断队列长度
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        //找到队列最后一个元素的父节点的索引。
        //如下图最大元素是30 父节点是 10,对于索引是 2
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
        	//拿到 k 节点下的左子节点
            int child = (k << 1) + 1; // assume left child is least
            //取得子节点对应的值
            Object c = array[child];
            //取得 k 右子节点的索引
            int right = child + 1;
            //比较右节点的索引是否小于队列长度和左右子节点的值进行比较
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            //比较父节点值是否大于子节点
            if (key.compareTo((T) c) <= 0)
                break;
            //下面都是元素替换
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

4.4、出队图解

在这里插入图片描述

这时,我们需要从队列中取出第一个元素 1,元素 1 取出时会与队列中最后一个元素进行交换,并将最后一个元素置空。(实际上源码不是这么做的,源代码中是用变量来保存索引,直到全部下沉调整完成才进行替换)

在这里插入图片描述

替换后,结果就如下图显示一样。我们发现父节点大于子节点了,所以还需要再一次进行替换操作。 在这里插入图片描述再一次替换后,将元素 30 下沉到下一个左边子节点,子节点上浮到原父节点位置。这就完成了下沉调整了。

在这里插入图片描述

四、总结

PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

到此这篇关于Java线程池队列PriorityBlockingQueue原理分析的文章就介绍到这了,更多相关Java的PriorityBlockingQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring Security 自定义授权服务器实践记录

    Spring Security 自定义授权服务器实践记录

    授权服务器(Authorization Server)目前并没有集成在Spring Security项目中,而是作为独立项目存在于Spring生态中,这篇文章主要介绍了Spring Security 自定义授权服务器实践,需要的朋友可以参考下
    2022-08-08
  • Java正则验证IP的方法实例分析【测试可用】

    Java正则验证IP的方法实例分析【测试可用】

    这篇文章主要介绍了Java正则验证IP的方法,结合实例形式对比分析了网上常见的几种针对IP的正则验证方法,最终给出了一个比较靠谱的IP正则验证表达式,需要的朋友可以参考下
    2017-08-08
  • Java InputStream实战之轻松读取操作文件流

    Java InputStream实战之轻松读取操作文件流

    在Java中,输入输出是非常重要的基础功能,其中,InputStream是Java中的一个重要输入流类,用于从输入源读取数据,下面我们就来学习一下InputStream类的相关知识吧
    2023-10-10
  • MyBatis直接执行SQL的工具SqlMapper

    MyBatis直接执行SQL的工具SqlMapper

    今天小编就为大家分享一篇关于MyBatis直接执行SQL的工具SqlMapper,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-12-12
  • java实现猜数字小游戏(Swing版)

    java实现猜数字小游戏(Swing版)

    这篇文章主要介绍了java实现猜数字小游戏,Swing编程版的猜数字游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-05-05
  • 一文浅析Java中的值传递

    一文浅析Java中的值传递

    今天在解决一个问题时,程序总是不能输出正确值,分析逻辑思路没问题后,发现原来是由于函数传递导致了这个情况,下面我们就来看看Java中的值传递到底是什么情况吧
    2023-08-08
  • Mybatis源码分析之存储过程调用和运行流程

    Mybatis源码分析之存储过程调用和运行流程

    这一篇我们学习一下Mybatis调用存储过程的使用和运行流程,首先我们创建一个简单的存储过程,具体创建过程大家可以通过本文学习下
    2016-11-11
  • Java提示缺少返回语句的解决办法

    Java提示缺少返回语句的解决办法

    在本篇文章里小编给大家分享了关于Java提示缺少返回语句的解决办法以及相关知识点,需要的朋友们参考下。
    2019-07-07
  • java多线程:基础详解

    java多线程:基础详解

    这篇文章主要介绍了java多线程编程实例,分享了几则多线程的实例代码,具有一定参考价值,加深多线程编程的理解还是很有帮助的,需要的朋友可以参考下。
    2021-08-08
  • 详解springboot中redis的使用和分布式session共享问题

    详解springboot中redis的使用和分布式session共享问题

    这篇文章主要介绍了详解springboot中redis的使用和分布式session共享问题,详细的介绍了解决分布式系统的session如何共享问题,有兴趣的可以了解一下
    2017-11-11

最新评论