Go高级特性探究之协程池详解

 更新时间:2023年06月05日 09:13:42   作者:tracy小猫  
在并发编程中,协程是 Go 语言的核心特性之一,本文将介绍如何使用 Go 协程池构造一个协程池,并解决函数传参问题、优雅关闭协程池和保证协程安全的问题,感兴趣的可以了解一下

在并发编程中,协程是 Go 语言的核心特性之一,但是在实际应用中,协程的创建和销毁成本比较高。当需要同时处理大量的任务时,创建大量的协程会导致系统开销变大,进而影响程序的性能。这时候,就需要使用协程池来管理协程的生命周期,将协程的创建和销毁成本降至最小,提高程序的并发性能。

本文将介绍如何使用 Go 协程池构造一个协程池,并解决函数传参问题、优雅关闭协程池和保证协程安全的问题。

Pool

type Pool struct {
   capacity       uint64            // 最大协程数
   runningWorkers uint64            // 当前正在运行的协程数
   status         int64             // 协程池的状态
   chTask         chan *Task        // 执行任务的 channel
   PanicHandler   func(interface{}) // 处理协程中的 panic 异常
   sync.Once
   sync.Mutex
}

Pool 类型是协程池的主要类型,包含了以下属性:

  • capacity:最大协程数。
  • runningWorkers:当前正在运行的协程数。
  • status:协程池的状态。
  • chTask:执行任务的 channel。
  • PanicHandler:处理协程中的 panic 异常。
  • sync.Once:防止 Stop 函数被多次调用。
  • sync.Mutex: 用于锁定协程池的状态和 channel。

同时 Pool 类型包含以下函数:

  • NewPool:用于初始化协程池。
  • Submit:将任务放到 channel 中供协程进行任务处理。
  • createWorker:用于创建并启动一个协程来执行任务。
  • incRunning:增加协程池的运行协程数。
  • decRunning:减少协程池的运行协程数。
  • Stop:关闭协程池,停止接受任务并且等待所有任务执行完毕后关闭协程池。

NewPool 函数

NewPool 函数用于创建和初始化一个协程池。将最大协程数 n 和处理协程中 panic 异常的函数 panicHandler 传入函数中,创建一个 Pool 类型,并将属性初始化后返回一个 Pool 的指针类型。

func NewPool(n uint64, panicHandler func(interface{})) *Pool {
   return &Pool{
      capacity:     n,
      status:       Running,
      chTask:       make(chan *Task, n),
      PanicHandler: panicHandler,
   }
}

Submit 函数

Submit 函数用于将任务放到 channel 中供协程进行任务处理。首先判断协程池状态是否为 Stopped,如果已经关闭,则返回一个错误;接着加锁,并判断 channel 中是否已满,如果已经满了,则返回一个错误,否则将任务放到 channel 中并返回 nil。

// 将任务放到 channel 中供协程进行任务处理
func (p *Pool) Submit(t *Task) error {
   if p.status == Stopped {
      return errors.New("协程池已关闭,不能提交任务")
   }
   p.Lock()
   defer p.Unlock()
   if len(p.chTask) == int(p.capacity) {
      return errors.New("协程池已满,不能接受新任务")
   }
   p.chTask <- t
   return nil
}

createWorker 函数

createWorker 函数用于创建并启动一个协程来执行任务。首先增加当前运行的协程数,然后在一个 go 协程内执行任务。如果在执行任务的过程中出现 panic 异常,则调用 PanicHandler 处理函数,如果没有设置 PanicHandler 处理函数,则直接将异常信息打印出来。执行完任务后,减少当前运行的协程数。

// 初始化协程池的协程数量
func (p *Pool) createWorker() {
   p.incRunning()
   // 每一个协程获取一个任务,执行任务
   go func() {
      defer func() {
         if r := recover(); r != nil {
            if p.PanicHandler != nil {
               p.PanicHandler(r)
            } else {
               fmt.Println("Panic:", r)
            }
         }
         p.decRunning()
      }()
      for {
         select {
         case t := <-p.chTask:
            if t == nil {
               return
            }
            t.Handler(t.Params...)
         }
      }
   }()
}

incRunning、decRunning 函数

incRunning、decRunning 函数用于增加和减少协程池的运行协程数,使用了 atomic.AddUint64 函数来保证操作的原子性。

// 增加协程池的运行协程数
func (p *Pool) incRunning() {
   atomic.AddUint64(&p.runningWorkers, 1)
}
// 减少协程池的运行协程数
func (p *Pool) decRunning() {
   atomic.AddUint64(&p.runningWorkers, ^uint64(0))
}

Stop 函数

Stop 函数用于关闭协程池,停止接受任务并且等待所有任务执行完毕后关闭协程池。首先判断协程池状态是否为 Running,如果已经关闭,则直接返回;接着将协程池状态设置为 Stopped,然后使用 sync.Once 确保关闭 channel 的操作仅被执行一次,同时创建运行的协程数个协程,等待它们执行完毕后关闭协程池。

// 关闭协程池
func (p *Pool) Stop() {
   if p.status == Running {
      p.status = Stopped
      p.Once.Do(func() {
         close(p.chTask)
         for i := uint64(0); i < p.runningWorkers; i++ {
            p.createWorker()
         }
      })
   }
}

解决函数传参问题

在使用协程池时,需要向协程池提交任务,但是协程池内部的协程如何知道要执行什么样的任务,参数又应该如何传递呢?

为了解决这个问题,可以定义一个 Task 结构体,用于存储要执行的函数和函数参数,如下所示:

type Task struct {
    Handler func(v ...interface{})
    Params  []interface{}
}

Task 类型是一个结构体,用于封装协程池的任务。其中 Handler 是一个函数类型,用于任务执行的函数;Params 是一个可变参数,调用 Handler 时传递给它的参数。

其中,Handler 是一个无返回值的函数,且该函数可接受变长参数,Params 是一个任意类型的切片,用于传递函数的参数列表。

在向协程池提交任务时,可以将 Task 对象作为参数进行提交。

pool.Submit(&Task{
    Handler: func(v ...interface{}) {
        // 执行任务的代码
    },
    Params: []interface{}{...}, // 任务的参数列表
})

在协程内部,可以通过调用 Task.Handler 方法,并将 Task.Params 作为参数传递进去,来运行具体的任务。

select {
case t := <-p.chTask:
    if t == nil {
        return
    }
    t.Handler(t.Params...)
}

通过这种方式,协程池就能够动态地执行不同的任务,并且传递任意类型和数量的参数。

优雅关闭协程池

在使用协程池时,如何正确地关闭协程池,以避免因未正确关闭而导致的内存泄漏和程序崩溃呢?

首先,需要明确协程池的运行状态,通过内部的 status 参数控制协程池的开关。当协程池处于运行状态时,协程池才能够接受新的任务,否则应该拒绝新的任务请求,并尽快释放内部的资源。

其次,在关闭协程池时,需要确保所有的已运行的协程都已经执行完任务并退出。这时,可以使用 sync.Once 来执行一次协程池的清理工作。当协程池处于关闭状态时,不再接受新的任务,并通知所有的协程退出任务循环,最终实现协程池的优雅关闭。

func (p *Pool) Stop() {
 if p.status == Running {
  p.status = Stopped
  p.Once.Do(func() {
   close(p.chTask)
   for i := uint64(0); i < p.runningWorkers; i++ {
    p.createWorker()
   }
  })
 }
}

保证协程安全

在使用协程池时,需要注意线程安全问题,尤其是在多个协程同时访问协程池时,需要保证协程池的内部状态是线程安全的。

同时对于状态的变更以及数量的增减,还需要保证代码的安全性。

为了保证线程安全,可以使用互斥锁 sync.Mutex 来锁定协程池,以避免多个协程同时读写协程池的运行状态和其他内部参数。

在协程池的内部实现中,使用的 sync.Once 只会单次执行的特性可以保证协程池只会初始化一次,防止因多次初始化而导致的内存泄漏或其他异常。

测试用例

为了测试协程池的正确性,以下是一个简单的测试用例。该测试用例创建一个容量为 3 的协程池,并向其中提交 10 个任务,每个任务随机睡眠一段时间,并输出当前时间。

package main
import (
 "fmt"
 "math/rand"
 "sync"
 "testing"
 "time"
)
func TestPool(t *testing.T) {
 pool := NewPool(3, func(err interface{}) {
  fmt.Println("发生 panic,错误信息:", err)
 })
 var wg sync.WaitGroup
 for i := 0; i < 10; i++ {
  wg.Add(1)
  go func(id int) {
   defer wg.Done()
   task := \&Task{
    Handler: func(v ...interface{}) {
     fmt.Printf("任务 %d 开始执行,时间:%v\n", id, time.Now().Format("2006-01-02 15:04:05"))
     rand.Seed(time.Now().UnixNano())
     time.Sleep(time.Duration(rand.Intn(5)) \* time.Second)
     fmt.Printf("任务 %d 执行完毕,时间:%v\n", id, time.Now().Format("2006-01-02 15:04:05"))
    },
    Params: \[]interface{}{},
   }
   pool.Submit(task)
  }(i)
 }
 wg.Wait()
}

输出结果如下:

任务 0 开始执行,时间:2021-10-05 16:52:22
任务 1 开始执行,时间:2021-10-05 16:52:22
任务 2 开始执行,时间:2021-10-05 16:52:22
任务 0 执行完毕,时间:2021-10-05 16:52:27
任务 3 开始执行,时间:2021-10-05 16:52:27
任务 4 开始执行,时间:2021-10-05 16:52:27
任务 1 执行完毕,时间:2021-10-05 16:52:28
任务 5 开始执行,时间:2021-10-05 16:52:28
任务 6 开始执行,时间:2021-10-05 16:52:28
任务 7 开始执行,时间:2021-10-05 16:52:28
任务 4 执行完毕,时间:2021-10-05 16:52:29
任务 8 开始执行,时间:2021-10-05 16:52:29
任务 9 开始执行,时间:2021-10-05 16:52:29
任务 2 执行完毕,时间:2021-10-05 16:52:32
任务 5 执行完毕,时间:2021-10-05 16:52:33
任务 7 执行完毕,时间:2021-10-05 16:52:33
任务 6 执行完毕,时间:2021-10-05 16:52:34
任务 3 执行完毕,时间:2021-10-05 16:52:35
任务 9 执行完毕,时间:2021-10-05 16:52:35
任务 8 执行完毕,时间:2021-10-05 16:52:37

从输出结果可以看出,协程池成功并行处理了所有的任务,并且在容量限制的情况下,成功地保证了协程池的线程安全性。

改进

可考虑增加对协程池容量的动态调整算法,例如在高峰期时增加协程池的容量,低谷期时降低协程池的容量。另外可以增加协程池的超时控制机制,以避免任务执行时间过长导致系统资源浪费和性能下降。

总结

协程池是 Go 语言中一种重要的并发编程模式,通过协程池可以高效地管理协程的生命周期、避免协程的频繁创建和销毁,提高程序的并发性能。在使用协程池时,需要注意解决函数传参问题、优雅关闭协程池和保证协程安全的问题,通过合理使用互斥锁和 sync.Once 可以有效解决这些问题,从而保证协程池的正确性和高效性。

以上就是Go高级特性探究之协程池详解的详细内容,更多关于Go协程池的资料请关注脚本之家其它相关文章!

相关文章

  • golang并发编程中Goroutine 协程的实现

    golang并发编程中Goroutine 协程的实现

    Go语言中的协程是一种轻量级线程,通过在函数前加go关键字来并发执行,具有动态栈、快速启动和低内存使用等特点,本文就来详细的介绍一下,感兴趣的可以了解一下
    2024-10-10
  • 搭建Go语言的ORM框架Gorm的具体步骤(从Java到go)

    搭建Go语言的ORM框架Gorm的具体步骤(从Java到go)

    很多朋友不知道如何使用Goland软件,搭建一个ORM框架GORM,今天小编给大家分享一篇教程关于搭建Go语言的ORM框架Gorm的具体步骤(从Java到go),感兴趣的朋友跟随小编一起学习下吧
    2022-09-09
  • Go的os/exec执行超时导致程序死机的解决方案

    Go的os/exec执行超时导致程序死机的解决方案

    这篇文章主要介绍了Go的os/exec执行超时导致程序死机的几种解决方案,文中通过代码示例给大家介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-04-04
  • Golang中设置全局变量并在其他文件中使用

    Golang中设置全局变量并在其他文件中使用

    全局变量是被整个程序都可见的变量,通常用于存储程序中需要共享的数据,本文就来介绍一下Golang中设置全局变量并在其他文件中使用的方法,感兴趣的可以了解一下
    2024-01-01
  • Go map发生内存泄漏解决方法

    Go map发生内存泄漏解决方法

    这篇文章主要介绍了Go map发生内存泄漏解决方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • 聊聊Golang的语言结构和变量问题

    聊聊Golang的语言结构和变量问题

    这篇文章主要介绍了Golang的语言结构和变量问题,在golang中定义变量的一般形式是使用 var 关键字,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2021-11-11
  • 深入剖析Go语言中数组和切片的区别

    深入剖析Go语言中数组和切片的区别

    本文将深入探讨 Go 语言数组和切片的区别,包括它们的定义、内存布局、长度和容量、初始化和操作等方面。从而更好地在实际开发中选择和使用合适的数据结构,提高代码的效率和可维护性,需要的可以参考一下
    2023-05-05
  • Go 语言进阶freecache源码学习教程

    Go 语言进阶freecache源码学习教程

    这篇文章主要为大家介绍了Go 语言进阶freecache源码学习教程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • 深入了解Go语言中context的用法

    深入了解Go语言中context的用法

    这篇文章主要为大家详细介绍了Go语言中context用法的相关知识,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-07-07
  • Golang int函数使用实例全面教程

    Golang int函数使用实例全面教程

    这篇文章主要为大家介绍了Golang int函数使用实例全面教程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10

最新评论