Golang中channel的原理解读(推荐)

 更新时间:2021年10月15日 10:50:32   作者:可问春风丶  
channel主要是为了实现go的并发特性,用于并发通信的,也就是在不同的协程单元goroutine之间同步通信。接下来通过本文给大家介绍Golang中channel的原理解读,感兴趣的朋友一起看看吧

数据结构

channel的数据结构在$GOROOT/src/runtime/chan.go文件下:

type hchan struct {

   qcount   uint           // 当前队列中剩余元素个数

   dataqsiz uint           // 环形队列长度,即可以存放的元素个数

   buf      unsafe.Pointer // 环形队列指针

   elemsize uint16         // 每个元素的大小

   closed   uint32         // 标记是否关闭

   elemtype *_type         // 元素类型

   sendx    uint           // 队列下标,指向元素写入时存放到队列中的位置

   recvx    uint           // 队列下标,指向元素从队列中读出的位置

   recvq    waitq          // 等待读消息的groutine队列

   sendq    waitq          // 等待写消息的groutine队列

   lock     mutex          // 互斥锁

}

chan内部实现了一个环形队列作为缓冲区,队列的长度在创建chan时指定:

在这里插入图片描述

等待队列(recvq/sendq)使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog结构:

type waitq struct {
   first *sudog
   last  *sudog
}

type sudog struct {
   g            *g
   next         *sudog
   prev         *sudog
   elem         unsafe.Pointer // data element (may point to stack)
   
   acquiretime  int64
   releasetime  int64
   ticket       uint32
   isSelect     bool
   
   parent       *sudog // semaRoot binary tree
   waitlink     *sudog // g.waiting list or semaRoot
   waittail     *sudog // semaRoot
   c            *hchan // channel
}

创建channel

通常使用make(channel string, 0)的方式创建无缓存的channel,使用make(channel string, 10)创建有缓存的channel。

源码:

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }
   var c *hchan
   switch {
   
   case mem == 0:
   // 如果当前 Channel 中不存在缓冲区,那么就只会为 runtime.hchan 分配一段内存空间;
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
   // 如果当前 Channel 中存储的类型不是指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间;
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
   //单独为 runtime.hchan 和缓冲区分配内存;
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)
   // 在函数的最后会统一更新elemsize、elemtype 和 dataqsiz 几个字段;
   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

channel读写

  1. 当有新数据来时,首先判断recvq中是否有groutine存在,如果recvq不为空,则说明缓冲区为空,或者没有缓冲区,因为如果缓冲区有数据会被recvq里面的groutine消费。此时从recvq中拿出一个groutine并绑定数据,唤醒该groutine执行任务,这个过程跳过了将数据写入缓冲区的过程。
  2. 如果缓冲区有数据并有空余位置,将数据放入缓冲区。
  3. 如果缓冲区有数据但没有空余位置,当前groutine绑定数据并放入sendx,进入睡眠,等待被唤醒。

在这里插入图片描述

源码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   .....
   lock(&c.lock)

   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   // 如果Channel 没有被关闭并且已经有处于读等待的 Goroutine,
   // 那么从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据
   if sg := c.recvq.dequeue(); sg != nil {
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }
   
   // 如果recvq为空且缓冲区中还有剩余空间
   if c.qcount < c.dataqsiz {
   // 计算出下一个可以存储数据的位置,
      qp := chanbuf(c, c.sendx)
      // raceenabled: 是否启用数据竞争检测,在编译时指定,默认为false
      if raceenabled {
      // 发出数据竞争警告
         raceacquire(qp)
         racerelease(qp)
      }
      // 将发送的数据拷贝到缓冲区中,产生内存拷贝
      typedmemmove(c.elemtype, qp, ep)
      // 增加 sendx 索引
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // 增加计数器
      c.qcount++
      unlock(&c.lock)
      return true
   }
   
   if !block {
      unlock(&c.lock)
      return false
   }

   // 将channel数据绑定到当前groutine并使groutine休眠
   // 获取发送数据使用的 Goroutine
   gp := getg()
   // 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,
   // 例如发送的 Channel、是否在 select 中和待发送数据的内存地址等
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // 将刚刚创建并初始化的 mysg 加入发送等待队列,并设置到当前 Goroutine的waiting上,
   // 表示 Goroutine 正在等待该sudog准备就绪
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)
   // 休眠groutine
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   // 保证传入的数据不被GC
   KeepAlive(ep)

   // someone woke us up.
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   return true
}

  1. 如果sendx不为空且缓冲区不为空,从缓冲区头部读出数据并在当前G执行任务,在sendx中拿出一个G,将其数据写入缓冲区尾部并唤醒该G。
  2. 如果sendx不为空且缓冲区为空,直接从sendx中拿出一个G,将G中数据取出并唤醒该G。
  3. 如果sendx为空且缓冲区不为空,则从缓冲区头部拿出一个数据。
  4. 如果sendx为空且缓冲区为空,将该G放入recvq,进入休眠,等待被唤醒。

在这里插入图片描述

源码:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 // block:这次接收是否阻塞
   if debugChan {
      print("chanrecv: chan=", c, "\n")
   }

   if c == nil {
      if !block {
         return
      }
      // 从一个空 Channel 接收数据时会直接让出处理器的使用权
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   // Fast path: check for failed non-blocking operation without acquiring the lock.
   if !block && empty(c) {
     // 如果channel为空并且未关闭,直接返回
      if atomic.Load(&c.closed) == 0 {
         return
      }

      if empty(c) {
         // The channel is irreversibly closed and empty.
         if raceenabled {
            raceacquire(c.raceaddr())
         }
         if ep != nil {
         // 手动标记清楚对象
            typedmemclr(c.elemtype, ep)
         }
         return true, false
      }
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   lock(&c.lock)
    //如果channel为空,并且已关闭,说明对象不可达
   if c.closed != 0 && c.qcount == 0 {
      if raceenabled {
         raceacquire(c.raceaddr())
      }
      unlock(&c.lock)
      if ep != nil {
      // 手动标记清除
         typedmemclr(c.elemtype, ep)
      }
      return true, false
   }
    // 如果sendq不为空,直接消费,避免sendq --> queue --> recvx的过程
   if sg := c.sendq.dequeue(); sg != nil {
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }
    
    // 当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 
    // recvx 的索引位置中取出数据进行处理
   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         raceacquire(qp)
         racerelease(qp)
      }
      // 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove将缓冲区中的数据拷贝到内存中
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // 使用 runtime.typedmemclr清除队列中的数据并完成收尾工作
      typedmemclr(c.elemtype, qp)
      c.recvx++
      // recvx位置归零
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount-- // 计数减一
      unlock(&c.lock) 
      return true, true
   }

   if !block {
      unlock(&c.lock)
      return false, false
   }

   // 当 sendq不为空 并且缓冲区中也不存在任何数据时,阻塞并休眠当前groutine
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

   // someone woke us up
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}

到此这篇关于Golang中channel的原理解读的文章就介绍到这了,更多相关Golang channel原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Go语言kube-scheduler深度剖析开发之scheduler初始化

    Go语言kube-scheduler深度剖析开发之scheduler初始化

    这篇文章主要介绍了Go语言kube-scheduler深度剖析开发之scheduler初始化实现过程示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • golang整合日志zap的实现示例

    golang整合日志zap的实现示例

    Go语言中的zap库提供了强大的日志管理功能,支持日志记录到文件、日志切割、多日志级别、结构化日志输出等,它通过三种方法zap.NewProduction()、zap.NewDevelopment()和zap.NewExample(),快速构建适用于不同环境的logger,感兴趣的可以了解一下
    2024-10-10
  • Golang time.Sleep()用法及示例讲解

    Golang time.Sleep()用法及示例讲解

    Go语言中的Sleep()函数用于在至少规定的持续时间d内停止最新的go-routine,这篇文章主要介绍了Golang time.Sleep()用法及示例讲解,需要的朋友可以参考下
    2023-02-02
  • golang并发编程的实现

    golang并发编程的实现

    这篇文章主要介绍了golang并发编程的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-01-01
  • Go编译32位GNU静态链接库的方法

    Go编译32位GNU静态链接库的方法

    Go链接库系统的难用可谓是人尽皆知,不同Go版本编译出来的不兼容,而且只支持GNU的,不能编译出Windows上的dll和lib。这篇文章给大家介绍Go编译32位GNU静态链接库的方法,感兴趣的朋友一起看看吧
    2020-05-05
  • Go Excelize API源码阅读GetPageLayout及SetPageMargins

    Go Excelize API源码阅读GetPageLayout及SetPageMargins

    这篇文章主要为大家介绍了Go Excelize API源码阅读GetPageLayout及SetPageMargins的方法示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • golang struct 实现 interface的方法

    golang struct 实现 interface的方法

    这篇文章主要介绍了golang struct 实现 interface的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • go语言 bool类型的使用操作

    go语言 bool类型的使用操作

    这篇文章主要介绍了go语言 bool类型的使用操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • Go语言入门之函数的定义与使用

    Go语言入门之函数的定义与使用

    函数是一段代码的片段,包含连续的执行语句,它可以将零个或多个输入参数映射到零个或多个参数输出。本文将通过示例和大家详细聊聊Go语言中函数的定义与使用,感兴趣的可以了解一下
    2022-11-11
  • Golang Makefile示例深入讲解使用

    Golang Makefile示例深入讲解使用

    一次偶然的机会,在 github 上看到有人用 Makefile,就尝试了一下,发现真的非常合适,Makefile 本身就是用来描述依赖的,可读性非常好,而且与强大的 shell 结合在一起,基本可以实现任何想要的功能
    2023-01-01

最新评论