关于java自定义线程池的原理与实现
这一节来自定义一个简单的线程池。
一、自定义阻塞队列
生产者创建任务添加到线程池中,线程池中有若干线程来执行任务,如果任务数大于线程数,线程池中要有一个地方来存储多余的任务
线程池中需要一个存放任务的阻塞队列,所以需要先定义一个阻塞队列
class BlockingQueue<T> { static Logger LOG = LoggerFactory.getLogger(BlockingQueue.class); //队列 private Deque<T> queue = new ArrayDeque<>(); //队列的容量 private int capcity; private ReentrantLock lock= new ReentrantLock(); //获取元素时队列为空就到这个Condition中等待 private Condition emptySet = lock.newCondition(); // 添加元素时如果队列已到达最大容量就到这个condition等待 private Condition fullSet = lock.newCondition(); public BlockingQueue(int capcity) { this.capcity = capcity; } //添加元素 public void put(T t){ //queue是共享变量,多线程操作要加锁 try { lock.lock(); while(queue.size()==capcity){ //队列中元素已达到最大容量,添加元素的线程等待 try { LOG.info("队列元素已满,添加元素线程等待"); fullSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //走到这里表示队列中有空位了 queue.addLast(t); LOG.info("元素添加成功"); //唤醒等待的获取元素的线程 emptySet.signalAll(); } finally { lock.unlock(); } } //获取元素的方法 public T take(){ try { lock.lock(); while (queue.size()==0){ //队列中没有元素 try { LOG.info("队列为空,获取元素线程等待"); emptySet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //走到这里表示队列中有元素了 T t = queue.removeFirst(); //叫醒添加元素的等待线程 fullSet.signalAll(); return t; } finally { lock.unlock(); } } // 带超时时间的获取元素的方法 public T poll(long time, TimeUnit timeUnit){ try { lock.lock(); long nanos = timeUnit.toNanos(time); while (queue.size()==0){ //队列中没有元素 try { if(nanos<=0){ LOG.info("等待超时时间到,返回null"); return null; } LOG.info("队列为空,获取元素线程等待"); //这个方法的返回值表示剩余的等待时间,例如本来等待5s,等了三秒被叫醒了,返回值就是2 nanos = emptySet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } //走到这里表示队列中有元素了 T t = queue.removeFirst(); //叫醒添加元素的等待线程 fullSet.signalAll(); return t; } finally { lock.unlock(); } } }
我们的线程池中需要一个阻塞队列来存放任务,可以使用上边定义的这个
二、自定义线程池
线程池的代码
class ThreadPool { static Logger logger = LoggerFactory.getLogger(ThreadPool.class); //用来存储任务的队列 private BlockingQueue<Runnable> taskQueue; //核心数,即线程中可以创建的最大线程数 private int coreSize; //存放线程的集合 private HashSet<Worker> workers = new HashSet<>(); //线程的空闲时间,池中的一个线程如果在这段时间后还获取不到任务就会自动终止 private long time; private TimeUnit timeUnit; /** * * @param coreSize * @param capacity 线程池中任务队列的容量 * @param time * @param timeUnit */ public ThreadPool(int coreSize,int capacity, long time, TimeUnit timeUnit) { this.coreSize = coreSize; this.time = time; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(capacity); } //提交任务的方法 public void execute(Runnable task){ synchronized (workers){ if(workers.size()<coreSize){ //线程数小于核心数,启动新线程来执行任务 Worker worker = new Worker(task); workers.add(worker); logger.info("线程池新增线程执行任务"); worker.start(); } else{ //线程数已达到核心数,把任务添加到队列中 taskQueue.put(task); logger.info("线程池添加任务到队列中"); } } } //用来描述线程池中工作线程的类 class Worker extends Thread { private Runnable task; public Worker(Runnable task){ this.task =task; } @Override public void run() { //执行任务的逻辑 //一个任务执行完后继续从任务队列中获取任务来执行 while (task!=null || (task=taskQueue.poll(time,timeUnit))!=null){ try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { //任务执行完后清空 task = null; } } //走到这里表示没有从任务队列中获取到任务,当前线程将要结束 synchronized (workers){ //从线程集合中删除当前线程 workers.remove(this); } } } }
三、测试
public class Test9 { private static Logger LOG = LoggerFactory.getLogger(Test9.class); public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS); for (int i = 0; i < 4; i++) { int a=i; threadPool.execute(new Runnable() { @Override public void run() { LOG.info("第{}个任务",a); } }); } } }
四、拒绝策略
上边的线程池存在一个问题,当有大量任务提交到线程池超过了任务队列的容量时,提交任务的线程就会一直阻塞等待,
//核心1,队列容量1 ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS); for (int i = 0; i < 4; i++) { int a=i; threadPool.execute(new Runnable() { @Override public void run() { LOG.info("第{}个任务",a); try { Thread.sleep(1000000); } catch (InterruptedException e) { e.printStackTrace(); } } }); }
像这样把任务时间延长,提交的任务就会超过队列容量,这时主线程就会阻塞住
实际上应该提供一种拒绝策略来让提交任务的线程自己决定是阻塞死等还是放弃执行任务,
为了实现这个功能,先抽象一个接口来封装拒绝策略
interface RejectPolicy<T> { // 把任务队列和当前要提交的任务作为参数 public void applyPolicy(BlockingQueue<T> queue,T task); }
然后为了应用拒绝策略需要在阻塞队列BlockingQueue中添加一个tryPut方法
public void tryPut(RejectPolicy<T> rejectPolicy,T t){ try { lock.lock(); if(queue.size()>=capacity){ //应用拒绝测试 rejectPolicy.applyPolicy(this,t); } else { //还有空间正常添加任务 queue.addLast(t); } } finally { lock.unlock(); } }
然后修改线程池,添加一个rejectPolicy属性,在构造方法中由任务提交者来赋值
线程池新增的属性
private RejectPolicy<Runnable> rejectPolicy;
线程池的构造方法
public ThreadPool(int coreSize,int capacity, long time, TimeUnit timeUnit,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.time = time; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(capacity); this.rejectPolicy=rejectPolicy; }
线程池的execute方法
public void execute(Runnable task){ synchronized (workers){ if(workers.size()<coreSize){ //线程数小于核心数,启动新线程来执行任务 Worker worker = new Worker(task); workers.add(worker); logger.info("线程池新增线程执行任务"); worker.start(); } else{ //线程数已达到核心数,把任务添加到队列中,传递拒绝策略 taskQueue.tryPut(rejectPolicy,task); logger.info("线程池添加任务到队列中"); } } }
可以看到,拒绝策略是有提交任务的线程指定,最终是由阻塞队列来执行,阻塞队列不知道拒绝策略具体是什么,这也是java多态的一种体现,面向抽象编程。
到此这篇关于关于java自定义线程池的文章就介绍到这了,更多相关java自定义线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
基于MockMvc进行springboot调试(SpringbootTest)
这篇文章主要介绍了基于MockMvc进行springboot调试(SpringbootTest),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2019-10-10java Swing组件setBounds()简单用法实例分析
这篇文章主要介绍了java Swing组件setBounds()简单用法,结合实例形式分析了Swing组件setBounds()方法的功能与简单使用方法,需要的朋友可以参考下2017-11-11解决Test类中不能使用Autowired注入bean的问题
这篇文章主要介绍了解决Test类中不能使用Autowired注入bean的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-09-09创建动态代理对象bean,并动态注入到spring容器中的操作
这篇文章主要介绍了创建动态代理对象bean,并动态注入到spring容器中的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2021-02-02
最新评论