Go使用TimerController解决timer过多的问题

 更新时间:2024年12月24日 08:30:33   作者:AnthonyDong  
多路复用,实际上Go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!本文给大家介绍了Go使用TimerController解决timer过多的问题,需要的朋友可以参考下

背景

  • 在Go里面我们实现超时需要起一个goroutine才能实现,但是当我有大量的任务需要做超时控制就需要起大量的goroutine,实际上是一种开销和负担!
  • 有些时候需要注册一些Timer也是有需要起大量的 goroutine才能实现,比如我要异步定期刷新一个配置,异步的监听啥东西,此时简单做法就是使用大量的 goroutine + timer/sleep实现!

解决思路

多路复用,实际上Go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!

我们的思路是实现一个 TimerController 可以帮助我们管理很多个timer,并且可以开销做到最低!因此使用一个 小顶堆 + Timer调度器即可实现!

实现

小顶堆(最小堆)

使用Go自带的 container/heap 实现 小顶堆

import (
    "container/heap"
)

type HeapItem[T any] interface {
    Less(HeapItem[T]) bool
    GetValue() T
}

// 参考 IntHeap
type heapQueue[T any] []HeapItem[T]

func (h heapQueue[T]) Len() int           { return len(h) }
func (h heapQueue[T]) Less(i, j int) bool { return h[i].Less(h[j]) }
func (h heapQueue[T]) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }

func (h *heapQueue[T]) Push(x any) {
    // Push and Pop use pointer receivers because they modify the slice's length,
    // not just its contents.
    *h = append(*h, x.(HeapItem[T]))
}

func (h *heapQueue[T]) Pop() any {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[0 : n-1]
    return x
}

type HeapQueue[T any] struct {
    queue heapQueue[T]
}

func (h *HeapQueue[T]) ptr() *heapQueue[T] {
    return &h.queue
}

// NewHeapQueue 非并发安全
func NewHeapQueue[T any](items ...HeapItem[T]) *HeapQueue[T] {
    queue := make(heapQueue[T], len(items))
    for index, item := range items {
       queue[index] = item
    }
    heap.Init(&queue)
    return &HeapQueue[T]{queue: queue}
}

func  (h *HeapQueue[T])  Push(item HeapItem[T]) { 
    heap.Push(h.ptr(), item)
}

func  (h *HeapQueue[T])  Pop() (T, bool ) { 
    if h.ptr().Len() == 0 {
       var Nil T
       return Nil, false
    }
    return heap.Pop(h.ptr()).(HeapItem[T]).GetValue(), true
}

// Peek 方法用于返回堆顶元素而不移除它
func  (h *HeapQueue[T])  Peek() (T, bool ) { 
    if h.ptr().Len() > 0 {
       return h.queue[0].GetValue(), true
    }
    var Nil T
    return Nil, false
}

func (h *HeapQueue[T]) Len() int {
    return h.ptr().Len()
}

调度器

type Timer struct {
    Timeout    time.Time
    Name       string
    NotifyFunc func()
}

func (t *Timer) GetCurTimeout() time.Duration {
    return t.Timeout.Sub(time.Now())
}

// Notify todo support async notify
func (t *Timer) Notify() {
    if t.NotifyFunc != nil {
       t.NotifyFunc()
    }
}

func (t *Timer) IsExpired() bool {
    return t.Timeout.Before(time.Now())
}

func (t *Timer) Less(v HeapItem[*Timer]) bool {
    return t.Timeout.Before(v.GetValue().Timeout)
}

func (t *Timer) GetValue() *Timer {
    return t
}

type TimerController struct {
    timers  chan *Timer
    minHeap *HeapQueue[*Timer]

    closeOnce sync.Once
    close     chan struct{}
}

func (t *TimerController) AddTimer(timer *Timer) bool {
    if timer == nil {
       return false
    }
    select {
    case <-t.close:
       return false
    default:
       t.timers <- timer
       return true
    }
}

func (t *TimerController) Close() {
    t.closeOnce.Do(func() { close(t.close) })
}

func NewTimerController(bufferSize int) *TimerController {
    return &TimerController{
       timers:  make(chan *Timer, bufferSize),
       minHeap: NewHeapQueue[*Timer](),
       close:   make(chan struct{}),
    }
}

func (t *TimerController) Start() {
    go t._start()
}
func (t *TimerController) _start() {
    const defaultTimeout = time.Hour * 24

    var (
       curMinTimer *Timer
       timeout     = time.NewTimer(defaultTimeout)
    )
    for {
       select {
       case <-t.close:
          close(t.timers)
          timeout.Stop()
          return
       case timer := <-t.timers:
          t.minHeap.Push(timer)
          curMinTimer, _ = t.minHeap.Peek()
          timeout.Reset(curMinTimer.GetCurTimeout())
          //fmt.Printf("timeout.Reset-1 name: %s, timeout: %s\n", curMinTimer.Name, curMinTimer.GetCurTimeout())
       case <-timeout.C:
          if curMinTimer != nil {
             curMinTimer.Notify()
             curMinTimer = nil
             t.minHeap.Pop()
          }
          curMinTimer, _ = t.minHeap.Peek()
          if curMinTimer == nil {
             timeout.Reset(defaultTimeout)
             continue
          }
          timeout.Reset(curMinTimer.GetCurTimeout())
          //fmt.Printf("timeout.Reset-2 name: %s, timeout: %s\n", curMinTimer.Name, curMinTimer.GetCurTimeout())
       }
    }
}

测试

func TestTimerController(t *testing.T) {
    controller := NewTimerController(1024)
    controller.Start()
    defer controller.Close()
    now := time.Now()
    arrs := make([]string, 0)
    NewTimer := func(num int) *Timer {
       return &Timer{Timeout: now.Add(time.Duration(num) * time.Millisecond), Name: strconv.Itoa(num), NotifyFunc: func() {
          arrs = append(arrs, strconv.Itoa(num))
       }}
    }
    // 这里乱序的注册了8个timer
    controller.AddTimer(NewTimer(5))
    controller.AddTimer(NewTimer(6))
    controller.AddTimer(NewTimer(3))
    controller.AddTimer(NewTimer(4))
    controller.AddTimer(NewTimer(7))
    controller.AddTimer(NewTimer(8))
    controller.AddTimer(NewTimer(1))
    controller.AddTimer(NewTimer(2))

    time.Sleep(time.Second * 1)
    t.Logf("%#v\n", arrs)
    // 最终我们可以获取到 顺序执行的!
    assert.Equal(t, arrs, []string{"1", "2", "3", "4", "5", "6", "7", "8"})
}




func TestTimerController_Stable(t *testing.T) {
    controller := NewTimerController(1024)
    controller.Start()
    defer controller.Close()
    now := time.Now()
    arrs := make(map[string]bool, 0)
    NewTimer := func(num int, name string) *Timer {
       return &Timer{Timeout: now.Add(time.Duration(num) * time.Millisecond), Name: name, NotifyFunc: func() {
          arrs[name] = true
       }}
    }
    // 我们重复注册了相同实现执行的 timer,那么预期是每次执行的结果和注册顺序一致
    controller.AddTimer(NewTimer(2, "1"))
    controller.AddTimer(NewTimer(2, "2"))
    controller.AddTimer(NewTimer(2, "3"))
    controller.AddTimer(NewTimer(2, "4"))
    controller.AddTimer(NewTimer(2, "5"))

    time.Sleep(time.Second * 1)
    t.Logf("%#v\n", arrs)
    assert.Equal(t, arrs, map[string]bool{"1": true, "2": true, "3": true, "4": true, "5": true})
}

以上就是Go使用TimerController解决timer过多的问题的详细内容,更多关于Go TimerController解决timer过多的资料请关注脚本之家其它相关文章!

相关文章

  • Golang-如何判断一个 interface{} 的值是否为 nil

    Golang-如何判断一个 interface{} 的值是否为 nil

    interface 的内部实现包含了两个字段,一个是 type,一个是 data,这篇文章主要介绍了Golang-如何判断一个interface{}的值是否为nil,需要的朋友可以参考下
    2023-05-05
  • golang执行命令获取执行结果状态(推荐)

    golang执行命令获取执行结果状态(推荐)

    这篇文章主要介绍了golang执行命令获取执行结果状态的相关知识,非常不错,具有一定的参考借鉴价值,需要的朋友参考下吧
    2019-11-11
  • 浅谈golang二进制bit位的常用操作

    浅谈golang二进制bit位的常用操作

    这篇文章主要介绍了浅谈golang二进制bit位的常用操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • 一篇文章带你搞懂Go语言标准库Time

    一篇文章带你搞懂Go语言标准库Time

    在我们开发的过程中,每个项目都需要时间这一类的函数,此时对time这个包的研究深度就显得尤为重要,这篇文章主要给大家介绍了关于如何通过一篇文章带你搞懂Go语言标准库Time的相关资料,需要的朋友可以参考下
    2022-10-10
  • 详解Go语言Sync.Pool为何不加锁也能够实现线程安全

    详解Go语言Sync.Pool为何不加锁也能够实现线程安全

    在这篇文章中,我们将剖析sync.Pool内部实现中,介绍了sync.Pool比较巧妙的内部设计思路以及其实现方式。在这个过程中,也间接介绍了为何不加锁也能够实现线程安全,感兴趣的可以学习一下
    2023-04-04
  • Golang中互斥锁和读写互斥锁原理及示例代码

    Golang中互斥锁和读写互斥锁原理及示例代码

    在Golang中,互斥锁是一种基本的同步原语,用于实现对共享资源的互斥访问,读写互斥锁是一种特殊类型的互斥锁,它允许多个协程同时读取某个共享资源,本文将通过过示例代码详细介绍Golang中互斥锁和读写互斥锁,需要的朋友可以参考下
    2023-05-05
  • Go语言中三个输入函数(scanf,scan,scanln)的区别解析

    Go语言中三个输入函数(scanf,scan,scanln)的区别解析

    本文详细介绍了Go语言中三个输入函数Scanf、Scan和Scanln的区别,包括用法、功能和输入终止条件等,本文给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2024-10-10
  • go get 和 go install 对比介绍

    go get 和 go install 对比介绍

    go install和go get都是Go语言的工具命令,但它们之间有一些区别。go get:用于从远程代码存储库(如 GitHub)中下载或更新Go代码包。go install:用于编译并安装 Go 代码包,本文go get和go install对比介绍的非常详细,需要的朋友可以参考一下
    2023-04-04
  • Go 函数返回nil遇到问题避坑分析

    Go 函数返回nil遇到问题避坑分析

    这篇文章主要为大家介绍了Go 函数返回nil遇到的避坑问题分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • Golang在Window环境使用Imagick7的过程

    Golang在Window环境使用Imagick7的过程

    这篇文章主要介绍了Golang在Window环境使用Imagick7的过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2023-11-11

最新评论