java简单手写版本实现时间轮算法

 更新时间:2021年04月07日 17:31:27   作者:扶苏l  
这篇文章主要为大家详细介绍了java简单手写版本实现时间轮算法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

时间轮

关于时间轮的介绍,网上有很多,这里就不重复了

核心思想

  • 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
  • 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
  • 每个槽对应一个环形链表存储该时间应该被执行的任务
  • 需要一个线程去驱动指针运转,获取到期任务

以下给出java 简单手写版本实现

代码实现

时间轮主数据结构

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:31
 */
@Slf4j
public class TimeWheel {
 /**
  * 一个槽的时间间隔(时间轮最小刻度)
  */
 private long tickMs;

 /**
  * 时间轮大小(槽的个数)
  */
 private int wheelSize;

 /**
  * 一轮的时间跨度
  */
 private long interval;

 private long currentTime;

 /**
  * 槽
  */
 private TimerTaskList[] buckets;

 /**
  * 上层时间轮
  */
 private volatile TimeWheel overflowWheel;

 /**
  * 一个timer只有一个delayqueue
  */
 private DelayQueue<TimerTaskList> delayQueue;

 public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
  this.currentTime = currentTime;
  this.tickMs = tickMs;
  this.wheelSize = wheelSize;
  this.interval = tickMs * wheelSize;
  this.buckets = new TimerTaskList[wheelSize];
  this.currentTime = currentTime - (currentTime % tickMs);
  this.delayQueue = delayQueue;
  for (int i = 0; i < wheelSize; i++) {
   buckets[i] = new TimerTaskList();
  }
 }

 public boolean add(TimerTaskEntry entry) {
  long expiration = entry.getExpireMs();
  if (expiration < tickMs + currentTime) {
   //到期了
   return false;
  } else if (expiration < currentTime + interval) {
   //扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去
   long virtualId = (expiration / tickMs);
   int index = (int) (virtualId % wheelSize);
   TimerTaskList bucket = buckets[index];
   bucket.addTask(entry);
   //设置bucket 过期时间
   if (bucket.setExpiration(virtualId * tickMs)) {
    //设好过期时间的bucket需要入队
    delayQueue.offer(bucket);
    return true;
   }
  } else {
   //当前轮不能满足,需要扔到上一轮
   TimeWheel timeWheel = getOverflowWheel();
   return timeWheel.add(entry);
  }
  return false;
 }


 private TimeWheel getOverflowWheel() {
  if (overflowWheel == null) {
   synchronized (this) {
    if (overflowWheel == null) {
     overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
    }
   }
  }
  return overflowWheel;
 }

 /**
  * 推进指针
  *
  * @param timestamp
  */
 public void advanceLock(long timestamp) {
  if (timestamp > currentTime + tickMs) {
   currentTime = timestamp - (timestamp % tickMs);
   if (overflowWheel != null) {
    this.getOverflowWheel().advanceLock(timestamp);
   }
  }
 }
}

定时器接口

/**
 * 定时器
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:30
 */
public interface Timer {

 /**
  * 添加一个新任务
  *
  * @param timerTask
  */
 void add(TimerTask timerTask);


 /**
  * 推动指针
  *
  * @param timeout
  */
 void advanceClock(long timeout);

 /**
  * 等待执行的任务
  *
  * @return
  */
 int size();

 /**
  * 关闭服务,剩下的无法被执行
  */
 void shutdown();
}

定时器实现

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:33
 */
@Slf4j
public class SystemTimer implements Timer {
 /**
  * 底层时间轮
  */
 private TimeWheel timeWheel;
 /**
  * 一个Timer只有一个延时队列
  */
 private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
 /**
  * 过期任务执行线程
  */
 private ExecutorService workerThreadPool;
 /**
  * 轮询delayQueue获取过期任务线程
  */
 private ExecutorService bossThreadPool;


 public SystemTimer() {
  this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
  this.workerThreadPool = Executors.newFixedThreadPool(100);
  this.bossThreadPool = Executors.newFixedThreadPool(1);
  //20ms推动一次时间轮运转
  this.bossThreadPool.submit(() -> {
   for (; ; ) {
    this.advanceClock(20);
   }
  });
 }


 public void addTimerTaskEntry(TimerTaskEntry entry) {
  if (!timeWheel.add(entry)) {
   //已经过期了
   TimerTask timerTask = entry.getTimerTask();
   log.info("=====任务:{} 已到期,准备执行============",timerTask.getDesc());
   workerThreadPool.submit(timerTask);
  }
 }

 @Override
 public void add(TimerTask timerTask) {
  log.info("=======添加任务开始====task:{}", timerTask.getDesc());
  TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());
  timerTask.setTimerTaskEntry(entry);
  addTimerTaskEntry(entry);
 }

 /**
  * 推动指针运转获取过期任务
  *
  * @param timeout 时间间隔
  * @return
  */
 @Override
 public synchronized void advanceClock(long timeout) {
  try {
   TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
   if (bucket != null) {
    //推进时间
    timeWheel.advanceLock(bucket.getExpiration());
    //执行过期任务(包含降级)
    bucket.clear(this::addTimerTaskEntry);
   }
  } catch (InterruptedException e) {
   log.error("advanceClock error");
  }
 }

 @Override
 public int size() {
  //todo
  return 0;
 }

 @Override
 public void shutdown() {
  this.bossThreadPool.shutdown();
  this.workerThreadPool.shutdown();
  this.timeWheel = null;
 }
}

存储任务的环形链表

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@Data
@Slf4j
class TimerTaskList implements Delayed {
 /**
  * TimerTaskList 环形链表使用一个虚拟根节点root
  */
 private TimerTaskEntry root = new TimerTaskEntry(null, -1);

 {
  root.next = root;
  root.prev = root;
 }

 /**
  * bucket的过期时间
  */
 private AtomicLong expiration = new AtomicLong(-1L);

 public long getExpiration() {
  return expiration.get();
 }

 /**
  * 设置bucket的过期时间,设置成功返回true
  *
  * @param expirationMs
  * @return
  */
 boolean setExpiration(long expirationMs) {
  return expiration.getAndSet(expirationMs) != expirationMs;
 }

 public boolean addTask(TimerTaskEntry entry) {
  boolean done = false;
  while (!done) {
   //如果TimerTaskEntry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止
   entry.remove();
   synchronized (this) {
    if (entry.timedTaskList == null) {
     //加到链表的末尾
     entry.timedTaskList = this;
     TimerTaskEntry tail = root.prev;
     entry.prev = tail;
     entry.next = root;
     tail.next = entry;
     root.prev = entry;
     done = true;
    }
   }
  }
  return true;
 }

 /**
  * 从 TimedTaskList 移除指定的 timerTaskEntry
  *
  * @param entry
  */
 public void remove(TimerTaskEntry entry) {
  synchronized (this) {
   if (entry.getTimedTaskList().equals(this)) {
    entry.next.prev = entry.prev;
    entry.prev.next = entry.next;
    entry.next = null;
    entry.prev = null;
    entry.timedTaskList = null;
   }
  }
 }

 /**
  * 移除所有
  */
 public synchronized void clear(Consumer<TimerTaskEntry> entry) {
  TimerTaskEntry head = root.next;
  while (!head.equals(root)) {
   remove(head);
   entry.accept(head);
   head = root.next;
  }
  expiration.set(-1L);
 }

 @Override
 public long getDelay(TimeUnit unit) {
  return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
 }

 @Override
 public int compareTo(Delayed o) {
  if (o instanceof TimerTaskList) {
   return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
  }
  return 0;
 }
}

存储任务的容器entry

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@Data
class TimerTaskEntry implements Comparable<TimerTaskEntry> {
 private TimerTask timerTask;
 private long expireMs;
 volatile TimerTaskList timedTaskList;
 TimerTaskEntry next;
 TimerTaskEntry prev;

 public TimerTaskEntry(TimerTask timedTask, long expireMs) {
  this.timerTask = timedTask;
  this.expireMs = expireMs;
  this.next = null;
  this.prev = null;
 }

 void remove() {
  TimerTaskList currentList = timedTaskList;
  while (currentList != null) {
   currentList.remove(this);
   currentList = timedTaskList;
  }
 }

 @Override
 public int compareTo(TimerTaskEntry o) {
  return ((int) (this.expireMs - o.expireMs));
 }
}

任务包装类(这里也可以将工作任务以线程变量的方式去传入)

@Data
@Slf4j
class TimerTask implements Runnable {
 /**
  * 延时时间
  */
 private long delayMs;
 /**
  * 任务所在的entry
  */
 private TimerTaskEntry timerTaskEntry;

 private String desc;

 public TimerTask(String desc, long delayMs) {
  this.desc = desc;
  this.delayMs = delayMs;
  this.timerTaskEntry = null;
 }

 public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {
  // 如果这个timetask已经被一个已存在的TimerTaskEntry持有,先移除一个
  if (timerTaskEntry != null && timerTaskEntry != entry) {
   timerTaskEntry.remove();
  }
  timerTaskEntry = entry;
 }

 public TimerTaskEntry getTimerTaskEntry() {
  return timerTaskEntry;
 }

 @Override
 public void run() {
  log.info("============={}任务执行", desc);
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

您可能感兴趣的文章:

相关文章

  • spring注解 @Valid 的作用说明

    spring注解 @Valid 的作用说明

    这篇文章主要介绍了spring注解 @Valid 的作用说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • Mybatis动态SQL之if、choose、where、set、trim、foreach标记实例详解

    Mybatis动态SQL之if、choose、where、set、trim、foreach标记实例详解

    动态SQL就是动态的生成SQL。接下来通过本文给大家介绍Mybatis动态SQL之if、choose、where、set、trim、foreach标记实例详解的相关知识,感兴趣的朋友一起看看吧
    2016-09-09
  • JavaSwing坦克大战游戏的设计和实现

    JavaSwing坦克大战游戏的设计和实现

    JavaSwing坦克大战游戏的设计要有图形用户界面,界面能够反映游戏所有的细节,在最终呈现的游戏中也要满足所有需求,感兴趣的小伙伴一起来看看吧
    2021-08-08
  • Java inputstream和outputstream使用详解

    Java inputstream和outputstream使用详解

    这篇文章主要介绍了Java inputstream和outputstream使用详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • 基于JDBC封装的BaseDao(实例代码)

    基于JDBC封装的BaseDao(实例代码)

    下面小编就为大家带来一篇基于JDBC封装的BaseDao(实例代码)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-07-07
  • Spring Boot中@Import三种使用方式实例详解

    Spring Boot中@Import三种使用方式实例详解

    这篇文章主要介绍了Spring Boot中@Import三种使用方式,主要有引入普通类,引入importSelector的实现类及引入importBeanDefinitionRegister的实现类,结合实例代码给大家讲解的非常详细,需要的朋友可以参考下
    2022-11-11
  • 多层嵌套的json的值如何解析/替换

    多层嵌套的json的值如何解析/替换

    这篇文章主要介绍了多层嵌套的json的值如何解析/替换的方法示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • 详解Java8 Collect收集Stream的方法

    详解Java8 Collect收集Stream的方法

    这篇文章主要介绍了Java8-Collect收集Stream的方法,提到了收集器的作用,连接收集器的方法,需要的朋友可以参考下
    2018-04-04
  • 详解Spring Security如何配置JSON登录

    详解Spring Security如何配置JSON登录

    这篇文章主要介绍了详解Spring Security如何配置JSON登录,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • java解一个比较特殊的数组合并题

    java解一个比较特殊的数组合并题

    这篇文章主要介绍了java解一个比较特殊的数组合并题,需要的朋友可以参考下
    2014-06-06

最新评论