Java如何自定义线程池中队列

 更新时间:2022年07月07日 11:40:40   作者:​ 你呀不牛 ​  
这篇文章主要介绍了Java如何自定义线程池中队列,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下

背景

业务交互的过程中涉及到了很多关于SFTP下载的问题,因此在代码中定义了一些线程池,使用中发现了一些问题,

代码类似如下所示:

public class ExecutorTest {
    private static ExecutorService es = new ThreadPoolExecutor(2,
            100, 1000, TimeUnit.MILLISECONDS
            , new ArrayBlockingQueue<>(10));
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            es.submit(new MyThread());
        }
    }
    static class MyThread implements Runnable {
        @Override
        public void run() {
            for (; ; ) {
                System.out.println("Thread name=" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

如上面的代码所示,定义了一个初始容量为2,最大容量为100,队列长度为10的线程池,期待的运行结果为:

Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-3
Thread name=pool-1-thread-4
Thread name=pool-1-thread-5
Thread name=pool-1-thread-6
Thread name=pool-1-thread-7
Thread name=pool-1-thread-8
Thread name=pool-1-thread-9
Thread name=pool-1-thread-10
Thread name=pool-1-thread-3
Thread name=pool-1-thread-5
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-4
Thread name=pool-1-thread-10
Thread name=pool-1-thread-7
Thread name=pool-1-thread-6
Thread name=pool-1-thread-9
Thread name=pool-1-thread-8
Thread name=pool-1-thread-3
Thread name=pool-1-thread-4
Thread name=pool-1-thread-1
Thread name=pool-1-thread-5
Thread name=pool-1-thread-2
Thread name=pool-1-thread-8
Thread name=pool-1-thread-6
Thread name=pool-1-thread-7
Thread name=pool-1-thread-9
Thread name=pool-1-thread-10

期待十个线程都可以运行,但实际的执行效果如下:

Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1

对比可以看出,用上面的方式定义线程池,最终只有两个线程可以运行,即线程池的初始容量大小。其余线程都被阻塞到了队列ArrayBlockingQueue<>(10)

问题分析

我们知道,Executors框架提供了几种常见的线程池分别为:

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

如果将代码中自定义的线程池改为 :

private static ExecutorService es = Executors.newCachedThreadPool();

运行发现,提交的十个线程都可以运行

Executors.newCachedThreadPool()的源码如下:

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

通过对比发现,newCachedThreadPool使用的是 SynchronousQueue<>()而我们使用的是ArrayBlockingQueue<>(10) 因此可以很容易的发现问题出在队列上。

问题解决

将ArrayBlockingQueue改为SynchronousQueue 问题解决,代码如下:

public class ExecutorTest {
    private static ExecutorService es = new ThreadPoolExecutor(2,
            100, 1000, TimeUnit.MILLISECONDS
            , new SynchronousQueue<>());
    private static ExecutorService es2 = Executors.newCachedThreadPool();
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            es.submit(new MyThread());
        }
    }
    static class MyThread implements Runnable {
        @Override
        public void run() {
            for (; ; ) {
                System.out.println("Thread name=" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

总结

两个队列的UML关系图

从图上我们可以看到,两个队列都继承了AbstractQueue实现了BlockingQueue接口,因此功能应该相似

SynchronousQueue的定义

* <p>Synchronous queues are similar to rendezvous channels used in
* CSP and Ada. They are well suited for handoff designs, in which an
* object running in one thread must sync up with an object running
* in another thread in order to hand it some information, event, or
* task.

SynchronousQueue类似于一个传递通道,只是通过他传递某个元素,并没有任何容量,只有当第一个元素被取走,才能在给队列添加元素。

ArrayBlockingQueue的定义

* A bounded {@linkplain BlockingQueue blocking queue} backed by an
* array.  This queue orders elements FIFO (first-in-first-out).  The
* <em>head</em> of the queue is that element that has been on the
* queue the longest time.  The <em>tail</em> of the queue is that
* element that has been on the queue the shortest time. New elements
* are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.

ArrayBlockingQueue从定义来看就是一个普通的队列,先入先出,当队列为空时,获取数据的线程会被阻塞,当队列满时,添加队列的线程会被阻塞,直到队列可用。

分析

从上面队列的定义中可以看出,导致线程池没有按照预期运行的原因不是因为队列的问题,应该是关于线程池在提交任务时,从队列取数据的方式不同导致的。

jdk源码中关于线程池队列的说明

* <dt>Queuing</dt>
*
* <dd>Any {@link BlockingQueue} may be used to transfer and hold
* submitted tasks.  The use of this queue interacts with pool sizing:
*
* <ul>
*
* <li> If fewer than corePoolSize threads are running, the Executor
* always prefers adding a new thread
* rather than queuing.</li>
*
* <li> If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.</li>
*
* <li> If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.</li>

从说明中可以看到,如果正在运行的线程数必初始容量corePoolSize小,那么Executor会从创建一个新线程去执行任务,如果正在执行的线程数必corePoolSize大,那么Executor会将新提交的任务放到阻塞队列,除非当队列的个数超过了队列的最大长度maxmiumPooSize。

从源码中找到关于提交任务的方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

从源码中看到 subimit实际上是调用了execute方法

execute方法的源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

源码中可以看出,提交任务时,首先会判断正在执行的线程数是否小于corePoolSize,如果条件成立那么会直接创建线程并执行任务。如果条件不成立,且队列没有满,那么将任务放到队列,如果条件不成立但是队列满了,那么同样也新创建线程并执行任务。

到此这篇关于Java如何自定义线程池中队列的文章就介绍到这了,更多相关Java 队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java双冒号(::)运算符使用详解

    Java双冒号(::)运算符使用详解

    之前没用过::这个东西,今天看flink的时候发现官网有个例子用到了这个符号,本文就详细的来介绍一下Java双冒号(::)运算符使用详解,感兴趣的可以了解一下
    2021-09-09
  • MyBatis快速入门之环境搭建和单表映射

    MyBatis快速入门之环境搭建和单表映射

    一说起对象关系映射框架,大家第一时间想到的肯定是Hibernate。Hibernate作为一个著名的框架,功能十分强大。但是由于Hibernate如此强大的功能,导致了它的缺点。好吧,不多说了,具体详情大家通过本文一起学习吧
    2017-03-03
  • SpringBoot中使用多线程的方法示例

    SpringBoot中使用多线程的方法示例

    这篇文章主要介绍了SpringBoot中使用多线程的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • Mybatis-Plus中的MetaObjectHandler组件的使用

    Mybatis-Plus中的MetaObjectHandler组件的使用

    MetaObjectHandler是Mybatis-Plus中一个实用组件,专门用于自动处理实体对象中的特定字段,如创建时间、更新时间、创建人和修改人等,该接口允许开发者在不修改业务代码的情况下,实现自动填充功能,极大地简化了代码的复杂性,感兴趣的可以了解一下
    2024-10-10
  • 深入理解Java中的接口

    深入理解Java中的接口

    下面小编就为大家带来一篇深入理解Java中的接口。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-05-05
  • SpringBoot整合Netty的流程步骤

    SpringBoot整合Netty的流程步骤

    Netty是一个基于Java的开源网络应用框架,它提供了高性能、异步事件驱动的网络编程能力,Netty旨在帮助开发者构建高性能、高可靠性的网络应用程序,本文给大家详细介绍了SpringBoot整合Netty的流程步骤,需要的朋友可以参考下
    2023-09-09
  • java设计模式理解依赖于抽象不依赖具体的分析

    java设计模式理解依赖于抽象不依赖具体的分析

    这篇文章主要为大家介绍了java设计模式的规则,理解依赖于抽象不依赖具体的示例分析,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2021-10-10
  • java获取IP归属地全网显示开源库使用

    java获取IP归属地全网显示开源库使用

    这篇文章主要为大家介绍了java获取IP归属地全网显示的开源库使用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-07-07
  • Java中创建线程的两种方式详细说明

    Java中创建线程的两种方式详细说明

    这篇文章主要介绍了Java中创建线程的两种方式详细说明,Java使用java.lang.Thread类代表线程,所有的线程对象都必须是Thread类或其子类的实例,每个线程的作用是完成一定的任务,实际上就是执行一段程序流即一段顺序执行的代码,需要的朋友可以参考下
    2023-11-11
  • Java实现FTP批量大文件上传下载篇2

    Java实现FTP批量大文件上传下载篇2

    这篇文章主要为大家详细介绍了Java实现FTP批量大文件上传下载的强化篇,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-08-08

最新评论