Golang使用singleflight解决并发重复请求

 更新时间:2023年08月01日 15:09:49   作者:写代码的lorre  
高并发的场景下,经常会出现并发重复请求资源的情况,singleflight是golang内置的一个包,这个包提供了对重复函数调用的抑制功能,所以下面我们就来看看如何使用它解决并发重复请求吧

背景

高并发的场景下,经常会出现并发重复请求资源的情况。

比如说,缓存失效时,我们去请求db获取最新的数据,如果这个key是一个热key,那么在缓存失效的瞬间,可能会有大量的并发请求访问到db,导致db访问量陡增,甚至是打崩db,这种场景也就是我们常说的缓存击穿。

针对同一个key的并发请求,这些请求和响应实际上都是一样的。所以我们可以把这种并发请求优化为:只进行一次实际请求去访问资源,然后得到实际响应,所有的并发请求共享这个实际响应的结果

针对分布式场景,我们可以使用分布式锁来实现

针对单机场景,我们可以使用singleflight来实现

singleflight

singleflight是golang内置的一个包,这个包提供了对重复函数调用的抑制功能,也就是保证并发请求只会有一个实际请求去访问资源,所有并发请求共享实际响应。

使用

singleflight在golang sdk源码中的路径为:src/internal/singleflight

但是internal是golang sdk内部的包,所以我们不能直接去使用

使用步骤:

  • 引入go mod
  • 使用singleflight包

引入go mod

go get golang.org/x/sync

使用singleflight包

singleflight包主要提供了三个方法

// 方法作用:保证并发请求只会执行一次函数,并共享实际响应
// 请求参数
// key:请求的唯一标识,相同的key会被视为并发请求
// fn:实际需要执行的函数
// 响应参数
// v:实际执行函数的返回值
// err:实际执行函数的错误
// shared:返回值v是否被共享,若存在并发请求,则为true;若不存在并发请求则为false
func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool)
// 方法作用:和Do类似,不过方法返回的是chan
func (g *Group) DoChan(key string, fn func() (any, error)) (<-chan Result, bool)
// 方法作用:删除key,一般来说不会直接使用这个方法
func (g *Group) ForgetUnshared(key string) bool

针对以上的三个方法,我们重点了解一下Do方法的使用即可

没有使用singleflight之前

package main
import (
   "fmt"
   "sync"
   "testing"
   "time"
)
var (
   mx        sync.Mutex
   wg        sync.WaitGroup
   cacheData = make(map[string]string, 0)
)
func TestSingleFlight(t *testing.T) {
   // 添加10个任务,模拟并发请求
   wg.Add(10)
   for i := 0; i < 10; i++ {
      go getData("demo")
   }
   // 等待所有任务完成
   wg.Wait()
}
func getData(key string) {
   data, _ := getDataFromCache(key)
   if len(data) == 0 {
      // 缓存没有找到,则进行回源
      data, _ = getDataFromDB(key)
      // 设置缓存
      mx.Lock()
      cacheData[key] = data
      mx.Unlock()
   }
   fmt.Println(data)
   // 任务完成
   wg.Done()
}
func getDataFromCache(key string) (string, error) {
   return cacheData[key], nil
}
func getDataFromDB(key string) (string, error) {
   fmt.Println("getDataFromDB key: ", key)
   // 模拟访问db的耗时
   time.Sleep(10 * time.Millisecond)
   return "db data", nil
}

执行TestSingleFlight函数后,会发现并发请求多次调用了getDataFromDB函数

使用singleflight之后

package main
import (
   "fmt"
   "golang.org/x/sync/singleflight"
   "sync"
   "testing"
   "time"
)
var (
   mx        sync.Mutex
   wg        sync.WaitGroup
   g         singleflight.Group
   cacheData = make(map[string]string, 0)
)
func TestSingleFlight(t *testing.T) {
   // 添加10个任务
   wg.Add(10)
   for i := 0; i < 10; i++ {
      go getDataSingleWarp("demo")
   }
   // 等待所有任务完成
   wg.Wait()
}
func getDataSingleWarp(key string) {
   data, _ := getDataFromCache(key)
   if len(data) == 0 {
      // 使用singleflight来避免并发请求,实际改动就这一行
      d, _, shared := g.Do(key, func() (interface{}, error) {
         return getDataFromDB(key)
      })
      fmt.Println(shared)
      data = d.(string)
      // 设置缓存
      mx.Lock()
      cacheData[key] = data
      mx.Unlock()
   }
   fmt.Println(data)
   wg.Done()
}
func getDataFromCache(key string) (string, error) {
   return cacheData[key], nil
}
func getDataFromDB(key string) (string, error) {
   fmt.Println("getDataFromDB key: ", key)
   // 模拟访问db的耗时
   time.Sleep(10 * time.Millisecond)
   return "db data", nil
}

执行TestSingleFlight函数后,会发现只调用了一次getDataFromDB函数

源码分析

  • Group struct:封装并发请求
  • call struct:每一个需要执行的函数,都会被封装成一个call
  • func Do:对并发请求进行控制的方法
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
import (
   "bytes"
   "errors"
   "fmt"
   "runtime"
   "runtime/debug"
   "sync"
)
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
   value interface{}
   stack []byte
}
// Error implements error interface.
func (p *panicError) Error() string {
   return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {
   stack := debug.Stack()
   // The first line of the stack trace is of the form "goroutine N [status]:"
   // but by the time the panic reaches Do the goroutine may no longer exist
   // and its status will have changed. Trim out the misleading line.
   if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
      stack = stack[line+1:]
   }
   return &panicError{value: v, stack: stack}
}
// call is an in-flight or completed singleflight.Do call
type call struct {
   // 保证相同key,只会进行一次实际请求
   // 相同key的并发请求会共享返回
   wg sync.WaitGroup
   // These fields are written once before the WaitGroup is done
   // and are only read after the WaitGroup is done.
   // 实际执行函数的返回值和错误
   val interface{}
   err error
   // forgotten indicates whether Forget was called with this call's key
   // while the call was still in flight.
   // 是否已删除当前并发请求的key
   forgotten bool
   // These fields are read and written with the singleflight
   // mutex held before the WaitGroup is done, and are read but
   // not written after the WaitGroup is done.
   // 并发请求的次数
   dups  int
   chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
   mu sync.Mutex       // protects m
   // key代表请求的唯一标识,相同的key会被视为并发请求
   // value代表实际请求,每一个实际请求都会被封装为call  
   m  map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
   Val    interface{}
   Err    error
   Shared bool
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   // 加锁
   g.mu.Lock()
   // 懒加载
   if g.m == nil {
      g.m = make(map[string]*call)
   }
   // 判断是否有并发请求,如果key已经存在,则说明存在并发请求
   if c, ok := g.m[key]; ok {
      // 并发请求次数+1
      c.dups++
      // 解锁
      g.mu.Unlock()
      // 等待实际请求执行完
      c.wg.Wait()
      if e, ok := c.err.(*panicError); ok {
         panic(e)
      } else if c.err == errGoexit {
         runtime.Goexit()
      }
      // 共享响应
      return c.val, c.err, true
   }
   c := new(call)
   c.wg.Add(1)
   // 添加并发请求key
   g.m[key] = c
   // 解锁
   g.mu.Unlock()
   // 进行实际请求
   g.doCall(c, key, fn)
   return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
   ch := make(chan Result, 1)
   g.mu.Lock()
   if g.m == nil {
      g.m = make(map[string]*call)
   }
   if c, ok := g.m[key]; ok {
      c.dups++
      c.chans = append(c.chans, ch)
      g.mu.Unlock()
      return ch
   }
   c := &call{chans: []chan<- Result{ch}}
   c.wg.Add(1)
   g.m[key] = c
   g.mu.Unlock()
   go g.doCall(c, key, fn)
   return ch
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
   // 正常返回标识
   normalReturn := false
   // 是否执行了recover标识
   recovered := false
   // use double-defer to distinguish panic from runtime.Goexit,
   // more details see https://golang.org/cl/134395
   defer func() {
      // the given function invoked runtime.Goexit
      if !normalReturn && !recovered {
         c.err = errGoexit
      }
      // 实际请求执行完成
      c.wg.Done()
      // 加锁
      g.mu.Lock()
      defer g.mu.Unlock()
      // 删除并发请求key
      if !c.forgotten {
         delete(g.m, key)
      }
      if e, ok := c.err.(*panicError); ok {
         // In order to prevent the waiting channels from being blocked forever,
         // needs to ensure that this panic cannot be recovered.
         if len(c.chans) > 0 {
            go panic(e)
            select {} // Keep this goroutine around so that it will appear in the crash dump.
         } else {
            panic(e)
         }
      } else if c.err == errGoexit {
         // Already in the process of goexit, no need to call again
      } else {
         // Normal return
         for _, ch := range c.chans {
            ch <- Result{c.val, c.err, c.dups > 0}
         }
      }
   }()
   // 匿名函数立即执行
   func() {
      defer func() {
         if !normalReturn {
            // Ideally, we would wait to take a stack trace until we've determined
            // whether this is a panic or a runtime.Goexit.
            //
            // Unfortunately, the only way we can distinguish the two is to see
            // whether the recover stopped the goroutine from terminating, and by
            // the time we know that, the part of the stack trace relevant to the
            // panic has been discarded.
            if r := recover(); r != nil {
               c.err = newPanicError(r)
            }
         }
      }()
      // 执行实际函数
      c.val, c.err = fn()
      // 正常返回
      normalReturn = true
   }()
   if !normalReturn {
      recovered = true
   }
}
// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
   g.mu.Lock()
   if c, ok := g.m[key]; ok {
      c.forgotten = true
   }
   delete(g.m, key)
   g.mu.Unlock()
}

到此这篇关于Golang使用singleflight解决并发重复请求的文章就介绍到这了,更多相关Go singleflight内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅析Go语言编程当中映射和方法的基本使用

    浅析Go语言编程当中映射和方法的基本使用

    这篇文章主要介绍了浅析Go语言编程当中映射和方法的基本使用,是golang入门学习中的基础知识,需要的朋友可以参考下
    2015-10-10
  • Go语言中定时任务库Cron使用方法介绍

    Go语言中定时任务库Cron使用方法介绍

    cron的意思计划任务,说白了就是定时任务。我和系统约个时间,你在几点几分几秒或者每隔几分钟跑一个任务(job),今天通过本文给大家介绍下Go语言中定时任务库Cron使用方法,感兴趣的朋友一起看看吧
    2022-03-03
  • Go 1.21中引入的新包maps和cmp功能作用详解

    Go 1.21中引入的新包maps和cmp功能作用详解

    这篇文章主要为大家介绍了Go 1.21中引入的新包maps和cmp功能作用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11
  • 使用Gin框架搭建一个Go Web应用程序的方法详解

    使用Gin框架搭建一个Go Web应用程序的方法详解

    在本文中,我们将要实现一个简单的 Web 应用程序,通过 Gin 框架来搭建,主要支持用户注册和登录,用户可以通过注册账户的方式创建自己的账号,并通过登录功能进行身份验证,感兴趣的同学跟着小编一起来看看吧
    2023-08-08
  • golang grpc 负载均衡的方法

    golang grpc 负载均衡的方法

    这篇文章主要介绍了golang grpc 负载均衡的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • 源码分析Go语言使用cgo导致线程增长的原因

    源码分析Go语言使用cgo导致线程增长的原因

    这篇文章主要从一个cgo调用开始解析Go语言源码,从而分析一下造成线程增长的原因,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一学习一下
    2023-06-06
  • 详解golang开发中select多路选择

    详解golang开发中select多路选择

    这篇文章主要介绍了golang开发中select多路选择,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • Golang实现支持多种类型的set

    Golang实现支持多种类型的set

    在项目开发中,常常会用到set去重,为什么不写一个set呢,而且go现在支持了泛型,所以本文就来用Golang实现一个支持多种类型的set呢
    2023-05-05
  • Golang时间处理中容易踩的坑分析解决

    Golang时间处理中容易踩的坑分析解决

    这篇文章主要为大家介绍了Golang时间处理中容易踩的坑分析解决,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • 一文搞懂Golang中iota的用法和原理

    一文搞懂Golang中iota的用法和原理

    我们知道iota是go语言的常量计数器,本文尝试全面总结其使用用法以及其实现原理,需要的朋友可以参考以下内容,希望对大家有所帮助
    2022-08-08

最新评论