Golang sync.Pool的源码解析
实际使用
Pool
是用于存放临时对象的集合,这些对象是为了后续的使用,以达到复用对象的效果。其目的是缓解频繁创建对象造成的gc压力。在许多开源组件中均使用了此组件,例如bolt
、gin
等。
下面是一组在非并发和并发场景是否使用Pool
的benchmark:
package pool import ( "io/ioutil" "sync" "testing" ) type Data [1024]byte // 直接创建对象 func BenchmarkWithoutPool(t *testing.B) { for i := 0; i < t.N; i++ { var data Data ioutil.Discard.Write(data[:]) } } // 使用Pool复用对象 func BenchmarkWithPool(t *testing.B) { pool := &sync.Pool{ // 若没有可用对象,则调用New创建一个对象 New: func() interface{} { return &Data{} }, } for i := 0; i < t.N; i++ { // 取 data := pool.Get().(*Data) // 用 ioutil.Discard.Write(data[:]) // 存 pool.Put(data) } } // 并发的直接创建对象 func BenchmarkWithoutPoolConncurrency(t *testing.B) { t.RunParallel(func(pb *testing.PB) { for pb.Next() { var data Data ioutil.Discard.Write(data[:]) } }) } // 使用Pool并发的复用对象 func BenchmarkWithPoolConncurrency(t *testing.B) { pool := &sync.Pool{ // 若没有可用对象,则调用New创建一个对象 New: func() interface{} { return &Data{} }, } t.RunParallel(func(pb *testing.PB) { for pb.Next() { // 取 data := pool.Get().(*Data) // 用 ioutil.Discard.Write(data[:]) // 存 pool.Put(data) } }) }
实际运行效果如下图所示,可以看出sync.Pool
不管是在并发还是非并发场景下,在速度和内存分配上表现均远远优异于直接创建对象。
goos: darwin goarch: amd64 pkg: leetcode/pool cpu: Intel(R) Core(TM) i5-1038NG7 CPU @ 2.00GHz BenchmarkWithoutPool-8 7346660 148.1 ns/op 1024 B/op 1 allocs/op BenchmarkWithPool-8 80391398 14.41 ns/op 0 B/op 0 allocs/op BenchmarkWithoutPoolConncurrency-8 7893248 153.3 ns/op 1024 B/op 1 allocs/op BenchmarkWithPoolConncurrency-8 363329767 4.245 ns/op 0 B/op 0 allocs/op PASS ok leetcode/pool 6.590s
实现原理
Pool
基本结构如下
type Pool struct { noCopy noCopy local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array victim unsafe.Pointer // local from previous cycle victimSize uintptr // size of victims array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() any }
其中最为主要的是属性 local
,是一个和P数量一致的切片,每个P的id都对应切片中的一个元素。为了高效的利用CPU多核,元素中间填充了pad,具体细节可以参考后续的 CacheLine
。
// Local per-P Pool appendix. type poolLocalInternal struct { private any // Can be used only by the respective P. shared poolChain // Local P can pushHead/popHead; any P can popTail. } type poolLocal struct { poolLocalInternal // Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
CacheLine
CPU
缓存会按照CacheLine大小来从内存复制数据,相邻的数据可能会处于同一个 CacheLine
。如果这些数据被多核使用,那么系统需要耗费较大的资源来保持各个cpu缓存中数据的一致性。当一个线程修改某个 CacheLine
中数据的时候,其他读此 CacheLine
数据的线程会被锁给阻塞。
下面是一组在并发场景下原子性的操作对象Age
属性的benchmark:
import ( "sync/atomic" "testing" "unsafe" ) type StudentWithCacheLine struct { Age uint32 _ [128 - unsafe.Sizeof(uint32(0))%128]byte } // 有填充的场景下,并发修改Age func BenchmarkWithCacheLine(b *testing.B) { count := 10 students := make([]StudentWithCacheLine, count) b.RunParallel(func(pb *testing.PB) { for pb.Next() { for j := 0; j < count; j++ { atomic.AddUint32(&students[j].Age, 1) } } }) } type StudentWithoutCacheLine struct { Age uint32 } // 无填充的场景下,并发修改Age func BenchmarkWithoutCacheLine(b *testing.B) { count := 10 students := make([]StudentWithoutCacheLine, count) b.RunParallel(func(pb *testing.PB) { for pb.Next() { for j := 0; j < count; j++ { atomic.AddUint32(&students[j].Age, 1) } } }) }
StudentWithCacheLine
中填充了pad来保证切片中不同的Age处于不同的CacheLine, StudentWithoutCacheLine
中的Age未做任何处理。通过图可以知道根据 CacheLine
填充了pad的Age
原子操作速度远远快于未做任何处理的。
goos: darwin goarch: amd64 pkg: leetcode/pool cpu: Intel(R) Core(TM) i5-1038NG7 CPU @ 2.00GHz BenchmarkWithCacheLine-8 17277380 70.18 ns/op 0 B/op 0 allocs/op BenchmarkWithoutCacheLine-8 8916874 133.8 ns/op 0 B/op 0 allocs/op
生产消费者模型
Pool
的高性能不仅仅使用CacheLine
避免了多核之间的数据竞争,还根据GMP
模型使用了生产者消费者模型来减少数据竞争,每个P都对应一个poolLocalInternal
。以较为复杂的Get
的流程,取数流程如下:
// Get selects an arbitrary item from the Pool, removes it from the // Pool, and returns it to the caller. // Get may choose to ignore the pool and treat it as empty. // Callers should not assume any relation between values passed to Put and // the values returned by Get. // // If Get would otherwise return nil and p.New is non-nil, Get returns // the result of calling p.New. func (p *Pool) Get() any { if race.Enabled { race.Disable() } // 1. 找到当前goroutine所在的P对应的poolLocalInternal和P对应的id l, pid := p.pin() x := l.private l.private = nil // 2. 如果private是空,在从shared进行popHead if x == nil { // Try to pop the head of the local shard. We prefer // the head over the tail for temporal locality of // reuse. // 3. 尝试从shared上取值 x, _ = l.shared.popHead() // 4. 如果还如空,则尝试从其他P的poolLocalInternal或者victim中获取 if x == nil { x = p.getSlow(pid) } } runtime_procUnpin() if race.Enabled { race.Enable() if x != nil { race.Acquire(poolRaceAddr(x)) } } // 5. 如果还如空,则直接使用New初始化一个 if x == nil && p.New != nil { x = p.New() } return x }
优先在当前goroutine所在P对应的
poolLocalInternal
上找,先找private
,再找shared
判断
private
是否有值。对于每个P来说是单线程的,取private
的时候是不用锁,仅仅简单判断即可。如果有值直接返回即可;如果为空,再查找shared
。查看
shared
是否有值。shared是一个双向链表,链起来的是ringbuf(环形数组),在添加ringbuf的时候,其大小是前一个的两倍。对于goroutine来说,既是当前P上取值的消费者,又是当前P上存值的生产者。在这两种场景是使用方法分别是:取值使用
**popHead**
;存值使用**pushHead**
。均是从head
取数据。
// poolChain is a dynamically-sized version of poolDequeue. // // This is implemented as a doubly-linked list queue of poolDequeues // where each dequeue is double the size of the previous one. Once a // dequeue fills up, this allocates a new one and only ever pushes to // the latest dequeue. Pops happen from the other end of the list and // once a dequeue is exhausted, it gets removed from the list. type poolChain struct { // head is the poolDequeue to push to. This is only accessed // by the producer, so doesn't need to be synchronized. head *poolChainElt // tail is the poolDequeue to popTail from. This is accessed // by consumers, so reads and writes must be atomic. tail *poolChainElt } type poolChainElt struct { poolDequeue // next and prev link to the adjacent poolChainElts in this // poolChain. // // next is written atomically by the producer and read // atomically by the consumer. It only transitions from nil to // non-nil. // // prev is written atomically by the consumer and read // atomically by the producer. It only transitions from // non-nil to nil. next, prev *poolChainElt }
如果
private
和shared
均没值,就尝试从其他 P 的poolLocalInternal
上取值。这个时候就是goroutine扮演的就是消费者的角色了,使用的方式是**
popTail
。**从 tail 取数据。如果其他
poolLocalInternal
上也没有值的话,就需要从victim
中取值了。这个victim
就是跨越GC
遗留下的数据。如果都没有的话,就只能使用
New
创建一个新的值了。
此模型减少了数据的竞争,保证了CAS的高效率。对于处于一个P上的多个goroutine来说是单线程的,数据之间不会有竞争关系。每个goroutine取值的时候,优先从对应P上的链表头部取值。只有在链表无数据的时候,才会尝试从其他P上的对应的链表尾部取值。也就是说出现竞争的可能性的地方在于,一个goruotine从链表头部取值或者塞值,另外一个goroutine从链表尾部取值,两者出现冲突的可能性较小。
结论
总的来说Pool在热点数据竞争上做了很多优化,比如CacheLine、GMP、Ringbuf,CAS;另外还跨越GC周期的缓存数据。
本文主要就CacheLine和生产者消费者模式做了介绍,其他部分感兴趣的话可以自行查看源码。
以上就是Golang sync.Pool的源码解析的详细内容,更多关于Go sync.Pool源码的资料请关注脚本之家其它相关文章!
最新评论