JAVA多线程之实现用户任务排队并预估排队时长
实现流程
初始化一定数量的任务处理线程和缓存线程池,用户每次调用接口,开启一个线程处理。
假设初始化5个处理器,代码执行 BlockingQueue.take 时候,每次take都会处理器队列就会减少一个,当处理器队列为空时,take就是阻塞线程,当用户处理某某任务完成时候,调用资源释放接口,在处理器队列put 一个处理器对象,原来阻塞的take ,就继续执行。
排队论简介
排队论是研究系统随机聚散现象和随机系统工作工程的数学理论和方法,又称随机服务系统理论,为运筹学的一个分支。我们下面对排队论做下简化处理,先看下图:
代码具体实现
任务队列初始化 TaskQueue
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 初始化队列及线程池 * @author tarzan * */ @Component public class TaskQueue { //处理器队列 public static BlockingQueue<TaskProcessor> taskProcessors; //等待任务队列 public static BlockingQueue<CompileTask> waitTasks; //处理任务队列 public static BlockingQueue<CompileTask> executeTasks; //线程池 public static ExecutorService exec; //初始处理器数(计算机cpu可用线程数) public static Integer processorNum=Runtime.getRuntime().availableProcessors(); /** * 初始化处理器、等待任务、处理任务队列及线程池 */ @PostConstruct public static void initEquipmentAndUsersQueue(){ exec = Executors.newCachedThreadPool(); taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum); //将空闲的设备放入设备队列中 setFreeDevices(processorNum); waitTasks =new LinkedBlockingQueue<CompileTask>(); executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum); } /** * 将空闲的处理器放入处理器队列中 */ private static void setFreeDevices(int num) { //获取可用的设备 for (int i = 0; i < num; i++) { TaskProcessor dc=new TaskProcessor(); try { taskProcessors.put(dc); } catch (InterruptedException e) { e.printStackTrace(); } } } public static CompileTask getWaitTask(Long clazzId) { return get(TaskQueue.waitTasks,clazzId); } public static CompileTask getExecuteTask(Long clazzId) { return get(TaskQueue.executeTasks,clazzId); } private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) { CompileTask compileTask =null; if (CollectionUtils.isNotEmpty(users)){ Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst(); if(optional.isPresent()){ compileTask = optional.get(); } } return compileTask; } public static Integer getSort(Long clazzId) { AtomicInteger index = new AtomicInteger(-1); BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks; if (CollectionUtils.isNotEmpty(compileTasks)){ compileTasks.stream() .filter(e -> { index.getAndIncrement(); return e.getClazzId().longValue() == clazzId.longValue(); }) .findFirst(); } return index.get(); } //单位秒 public static int estimatedTime(Long clazzId){ return estimatedTime(60,getSort(clazzId)+1); } //单位秒 public static int estimatedTime(int cellMs,int num){ int a= (num-1)/processorNum; int b= cellMs*(a+1); return b; }
编译任务类 CompileTask
import lombok.Data; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.gis.common.enums.DataScheduleEnum; import org.springblade.gis.dynamicds.service.DynamicDataSourceService; import org.springblade.gis.modules.feature.schedule.service.DataScheduleService; import java.util.Date; @Data public class CompileTask implements Runnable { //当前请求的线程对象 private Long clazzId; //用户id private Long userId; //当前请求的线程对象 private Thread thread; //绑定处理器 private TaskProcessor taskProcessor; //任务状态 private Integer status; //开始时间 private Date startTime; //结束时间 private Date endTime; private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class); private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class); @Override public void run() { compile(); } /** * 编译 */ public void compile() { try { //取出一个设备 TaskProcessor taskProcessor = TaskQueue.taskProcessors.take(); //取出一个任务 CompileTask compileTask = TaskQueue.waitTasks.take(); //任务和设备绑定 compileTask.setTaskProcessor(taskProcessor); //放入 TaskQueue.executeTasks.put(compileTask); System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId); //切换用户数据源 dataSourceService.switchDataSource(userId); //添加进度 dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState()); } catch (InterruptedException e) { System.err.println( e.getMessage()); } } }
任务处理器 TaskProcessor
import lombok.Data; import java.util.Date; @Data public class TaskProcessor { /** * 释放 */ public static Boolean release(CompileTask task) { Boolean flag=false; Thread thread=task.getThread(); synchronized (thread) { try { if(null!=task.getTaskProcessor()){ TaskQueue.taskProcessors.put(task.getTaskProcessor()); TaskQueue.executeTasks.remove(task); task.setEndTime(new Date()); long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime(); flag=true; System.out.println("用户"+task.getClazzId()+"耗时"+intervalMilli+"ms"); } } catch (InterruptedException e) { e.printStackTrace(); } return flag; } } }
Controller控制器接口实现
import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springblade.core.tool.api.R; import org.springblade.gis.multithread.TaskProcessor; import org.springblade.gis.multithread.TaskQueue; import org.springblade.gis.multithread.CompileTask; import org.springframework.web.bind.annotation.*; import java.util.Date; @RestController @RequestMapping("task") @Api(value = "数据编译任务", tags = "数据编译任务") public class CompileTaskController { @ApiOperation(value = "添加等待请求 @author Tarzan Liu") @PostMapping("compile/{clazzId}") public R<Integer> compile(@PathVariable("clazzId") Long clazzId) { CompileTask checkUser=TaskQueue.getWaitTask(clazzId); if(checkUser!=null){ return R.fail("已经正在排队!"); } checkUser=TaskQueue.getExecuteTask(clazzId); if(checkUser!=null){ return R.fail("正在执行编译!"); } //获取当前的线程 Thread thread=Thread.currentThread(); //创建当前的用户请求对象 CompileTask compileTask =new CompileTask(); compileTask.setThread(thread); compileTask.setClazzId(clazzId); compileTask.setStartTime(new Date()); //将当前用户请求对象放入队列中 try { TaskQueue.waitTasks.put(compileTask); } catch (InterruptedException e) { e.printStackTrace(); } TaskQueue.exec.execute(compileTask); return R.data(TaskQueue.waitTasks.size()-1); } @ApiOperation(value = "查询当前任务前还有多少任务等待 @author Tarzan Liu") @PostMapping("sort/{clazzId}") public R<Integer> sort(@PathVariable("clazzId") Long clazzId) { return R.data(TaskQueue.getSort(clazzId)); } @ApiOperation(value = "查询当前任务预估时长 @author Tarzan Liu") @PostMapping("estimate/time/{clazzId}") public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) { return R.data(TaskQueue.estimatedTime(clazzId)); } @ApiOperation(value = "任务释放 @author Tarzan Liu") @PostMapping("release/{clazzId}") public R<Boolean> release(@PathVariable("clazzId") Long clazzId) { CompileTask task=TaskQueue.getExecuteTask(clazzId); if(task==null){ return R.fail("资源释放异常"); } return R.status(TaskProcessor.release(task)); } @ApiOperation(value = "执行 @author Tarzan Liu") @PostMapping("exec") public R exec() { Long start=System.currentTimeMillis(); for (Long i = 1L; i < 100; i++) { compile(i); } System.out.println("消耗时间:"+(System.currentTimeMillis()-start)+"ms"); return R.status(true); } }
接口测试
根据任务id查询该任务前还有多少个任务待执行
根据任务id查询该任务预估执行完成的剩余时间,单位秒
补充知识
BlockingQueue
BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:
在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
阻塞与非阻塞
入队
offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞
put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞
offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞
被唤醒
等待时间超时
当前线程被中断
出队
poll():如果没有元素,直接返回null;如果有元素,出队
take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞
poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:
被唤醒
等待时间超时
当前线程被中断
到此这篇关于JAVA多线程之实现用户任务排队并预估排队时长的文章就介绍到这了,更多相关JAVA 多线程 用户任务排队内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
mybatis中insert主键ID获取和多参数传递的示例代码
这篇文章主要介绍了mybatis中insert主键ID获取和多参数传递的示例代码,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2021-03-03基于Spring Boot使用JpaRepository删除数据时的注意事项
这篇文章主要介绍了Spring Boot使用JpaRepository删除数据时的注意事项,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-06-06Spring Integration 实现消息驱动的详细步骤
Spring Integration是一个用于构建消息驱动的中间件轻量级框架,它提供了一种模型和工具,用于在Spring应用程序中实现企业集成模式,这篇文章主要介绍了Spring Integration 实现消息驱动,需要的朋友可以参考下2024-05-05
最新评论