Golang中tinyrpc框架的源码解读详解

 更新时间:2023年01月14日 16:58:20   作者:骑牛上青山  
tinyrpc是一个高性能的基于protocol buffer的rpc框架。项目代码非常少,很适合初学者进行golang的学习。本文将从源码的角度带大家了解tinyrpc框架的使用,需要的可以参考一下

tinyrpc是一个高性能的基于protocol buffer的rpc框架。项目代码非常少,很适合初学者进行golang的学习。

tinyrpc功能

tinyrpc基于TCP协议,支持各种压缩格式,基于protocol buffer的序列化协议。其rpc是基于golang原生的net/rpc开发而成。

tinyrpc项目结构

tinyrpc基于net/rpc开发而成,在此基础上集成了额外的能力。项目结构如图:

功能目录如下:

  • codec 编码模块
  • compressor 压缩模块
  • header 请求/响应头模块
  • protoc-gen-tinyrpc 代码生成插件
  • serializer 序列化模块

tinyrpc源码解读

客户端和服务端构建

客户端是以net/rpcrpc.Client为基础构建,在此基础上定义了Option以配置压缩方式和序列化方式:

type Option func(o *options)

type options struct {
	compressType compressor.CompressType
	serializer   serializer.Serializer
}

在创建客户端的时候将配置好的压缩算法和序列化方式作为创建客户端的参数:

func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
	options := options{
		compressType: compressor.Raw,
		serializer:   serializer.Proto,
	}
	for _, option := range opts {
		option(&options)
	}
	return &Client{rpc.NewClientWithCodec(
		codec.NewClientCodec(conn, options.compressType, options.serializer))}
}

服务端是以net/rpcrpc.Server为基础构建,在此基础上扩展了Server的定义:

type Server struct {
	*rpc.Server
	serializer.Serializer
}

在创建客户端和开启服务时传入序列化方式:

func NewServer(opts ...Option) *Server {
	options := options{
		serializer: serializer.Proto,
	}
	for _, option := range opts {
		option(&options)
	}

	return &Server{&rpc.Server{}, options.serializer}
}

func (s *Server) Serve(lis net.Listener) {
	log.Printf("tinyrpc started on: %s", lis.Addr().String())
	for {
		conn, err := lis.Accept()
		if err != nil {
			continue
		}
		go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))
	}
}

压缩算法compressor

压缩算法的实现中首先是定义了压缩的接口:

type Compressor interface {
	Zip([]byte) ([]byte, error)
	Unzip([]byte) ([]byte, error)
}

压缩的接口包含压缩和解压方法。

压缩算法使用的是uint类型,使用iota来初始化,并且使用map来进行所有压缩算法实现的管理:

type CompressType uint16

const (
	Raw CompressType = iota
	Gzip
	Snappy
	Zlib
)

// Compressors which supported by rpc
var Compressors = map[CompressType]Compressor{
	Raw:    RawCompressor{},
	Gzip:   GzipCompressor{},
	Snappy: SnappyCompressor{},
	Zlib:   ZlibCompressor{},
}

序列化 serializer

序列化部分代码非常简单,提供了一个接口:

type Serializer interface {
	Marshal(message interface{}) ([]byte, error)
	Unmarshal(data []byte, message interface{}) error
}

目前只有ProtoSerializer一个实现,ProtoSerializer内部的实现是基于"google.golang.org/protobuf/proto"来实现的,并没有什么特殊的处理,因此就不花费笔墨详述了。

请求/响应头 header

tinyrpc定义了自己的请求头和响应头:

// RequestHeader request header structure looks like:
// +--------------+----------------+----------+------------+----------+
// | CompressType |      Method    |    ID    | RequestLen | Checksum |
// +--------------+----------------+----------+------------+----------+
// |    uint16    | uvarint+string |  uvarint |   uvarint  |  uint32  |
// +--------------+----------------+----------+------------+----------+
type RequestHeader struct {
	sync.RWMutex
	CompressType compressor.CompressType
	Method       string
	ID           uint64
	RequestLen   uint32
	Checksum     uint32
}

请求头由压缩类型,方法,id,请求长度和校验码组成。

// ResponseHeader request header structure looks like:
// +--------------+---------+----------------+-------------+----------+
// | CompressType |    ID   |      Error     | ResponseLen | Checksum |
// +--------------+---------+----------------+-------------+----------+
// |    uint16    | uvarint | uvarint+string |    uvarint  |  uint32  |
// +--------------+---------+----------------+-------------+----------+
type ResponseHeader struct {
	sync.RWMutex
	CompressType compressor.CompressType
	ID           uint64
	Error        string
	ResponseLen  uint32
	Checksum     uint32
}

响应头由压缩类型,id,错误信息,返回长度和校验码组成。

为了实现头的重用,tinyrpc为头构建了缓存池:

var (
	RequestPool  sync.Pool
	ResponsePool sync.Pool
)

func init() {
	RequestPool = sync.Pool{New: func() interface{} {
		return &RequestHeader{}
	}}
	ResponsePool = sync.Pool{New: func() interface{} {
		return &ResponseHeader{}
	}}
}

在使用时get出来,生命周期结束后放回池子,并且在put之前需要进行重置:

    h := header.RequestPool.Get().(*header.RequestHeader)
	defer func() {
		h.ResetHeader()
		header.RequestPool.Put(h)
	}()
// ResetHeader reset request header
func (r *RequestHeader) ResetHeader() {
	r.Lock()
	defer r.Unlock()
	r.ID = 0
	r.Checksum = 0
	r.Method = ""
	r.CompressType = 0
	r.RequestLen = 0
}

// ResetHeader reset response header
func (r *ResponseHeader) ResetHeader() {
	r.Lock()
	defer r.Unlock()
	r.Error = ""
	r.ID = 0
	r.CompressType = 0
	r.Checksum = 0
	r.ResponseLen = 0
}

搞清楚了头的结构以及对象池的复用逻辑,那么具体的头的编码与解码就是很简单的拆装工作,就不在此一行一行解析了,大家有兴趣可以自行去阅读。

编码 codec

由于tinyrpc是基于net/rpc开发,那么其codec模块自然也是依赖于net/rpcClientCodecServerCodec接口来实现的。

客户端实现

客户端是基于ClientCodec实现的能力:

type ClientCodec interface {
	WriteRequest(*Request, any) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(any) error

	Close() error
}

client定义了一个clientCodec类型,并且实现了ClientCodec的接口方法:

type clientCodec struct {
	r io.Reader
	w io.Writer
	c io.Closer

	compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
	serializer serializer.Serializer
	response   header.ResponseHeader // rpc response header
	mutex      sync.Mutex            // protect pending map
	pending    map[uint64]string
}

WriteRequest实现:

// WriteRequest Write the rpc request header and body to the io stream
func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
	c.mutex.Lock()
	c.pending[r.Seq] = r.ServiceMethod
	c.mutex.Unlock()

	if _, ok := compressor.Compressors[c.compressor]; !ok {
		return NotFoundCompressorError
	}
	reqBody, err := c.serializer.Marshal(param)
	if err != nil {
		return err
	}
	compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
	if err != nil {
		return err
	}
	h := header.RequestPool.Get().(*header.RequestHeader)
	defer func() {
		h.ResetHeader()
		header.RequestPool.Put(h)
	}()
	h.ID = r.Seq
	h.Method = r.ServiceMethod
	h.RequestLen = uint32(len(compressedReqBody))
	h.CompressType = compressor.CompressType(c.compressor)
	h.Checksum = crc32.ChecksumIEEE(compressedReqBody)

	if err := sendFrame(c.w, h.Marshal()); err != nil {
		return err
	}
	if err := write(c.w, compressedReqBody); err != nil {
		return err
	}

	c.w.(*bufio.Writer).Flush()
	return nil
}

可以看到代码的实现还是比较清晰的,主要分为几个步骤:

  • 将数据进行序列化构成请求体
  • 选择相应的压缩算法进行压缩
  • 从Pool中获取请求头实例将数据全部填入其中构成最后的请求头
  • 分别通过io操作发送处理过的请求头和请求体

ReadResponseHeader实现:

// ReadResponseHeader read the rpc response header from the io stream
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
	c.response.ResetHeader()
	data, err := recvFrame(c.r)
	if err != nil {
		return err
	}
	err = c.response.Unmarshal(data)
	if err != nil {
		return err
	}
	c.mutex.Lock()
	r.Seq = c.response.ID
	r.Error = c.response.Error
	r.ServiceMethod = c.pending[r.Seq]
	delete(c.pending, r.Seq)
	c.mutex.Unlock()
	return nil
}

此方法作用是读取返回的响应头,并解析成具体的结构体

ReadResponseBody实现:

func (c *clientCodec) ReadResponseBody(param interface{}) error {
	if param == nil {
		if c.response.ResponseLen != 0 {
			if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
				return err
			}
		}
		return nil
	}

	respBody := make([]byte, c.response.ResponseLen)
	err := read(c.r, respBody)
	if err != nil {
		return err
	}

	if c.response.Checksum != 0 {
		if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
			return UnexpectedChecksumError
		}
	}

	if c.response.GetCompressType() != c.compressor {
		return CompressorTypeMismatchError
	}

	resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
	if err != nil {
		return err
	}

	return c.serializer.Unmarshal(resp, param)
}

此方法是用于读取返回的响应结构体,流程如下:

  • 读取流获取响应体
  • 根据响应头中的校验码来比对响应体是否完整
  • 根据压缩算法来解压具体的结构体
  • 进行反序列化

服务端实现

服务端是基于ServerCodec实现的能力:

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(any) error
	WriteResponse(*Response, any) error

	// Close can be called multiple times and must be idempotent.
	Close() error
}

和客户端类似,server定义了一个serverCodec类型,并且实现了ServerCodec的接口方法:

type serverCodec struct {
	r io.Reader
	w io.Writer
	c io.Closer

	request    header.RequestHeader
	serializer serializer.Serializer
	mutex      sync.Mutex // protects seq, pending
	seq        uint64
	pending    map[uint64]*reqCtx
}

ReadRequestHeader实现:

// ReadRequestHeader read the rpc request header from the io stream
func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
	s.request.ResetHeader()
	data, err := recvFrame(s.r)
	if err != nil {
		return err
	}
	err = s.request.Unmarshal(data)
	if err != nil {
		return err
	}
	s.mutex.Lock()
	s.seq++
	s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}
	r.ServiceMethod = s.request.Method
	r.Seq = s.seq
	s.mutex.Unlock()
	return nil
}

此方法用于读取请求头并解析成结构体

ReadRequestBody实现:

// ReadRequestBody read the rpc request body from the io stream
func (s *serverCodec) ReadRequestBody(param interface{}) error {
	if param == nil {
		if s.request.RequestLen != 0 {
			if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
				return err
			}
		}
		return nil
	}

	reqBody := make([]byte, s.request.RequestLen)

	err := read(s.r, reqBody)
	if err != nil {
		return err
	}

	if s.request.Checksum != 0 {
		if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {
			return UnexpectedChecksumError
		}
	}

	if _, ok := compressor.
		Compressors[s.request.GetCompressType()]; !ok {
		return NotFoundCompressorError
	}

	req, err := compressor.
		Compressors[s.request.GetCompressType()].Unzip(reqBody)
	if err != nil {
		return err
	}

	return s.serializer.Unmarshal(req, param)
}

此方法用于读取请求体,流程和读取响应体差不多,大致如下:

  • 读取流并解析成请求体
  • 根据请求头中的校验码进行校验
  • 根据压缩算法进行解压
  • 反序列化

WriteResponse实现:

// WriteResponse Write the rpc response header and body to the io stream
func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {
	s.mutex.Lock()
	reqCtx, ok := s.pending[r.Seq]
	if !ok {
		s.mutex.Unlock()
		return InvalidSequenceError
	}
	delete(s.pending, r.Seq)
	s.mutex.Unlock()

	if r.Error != "" {
		param = nil
	}
	if _, ok := compressor.
		Compressors[reqCtx.compareType]; !ok {
		return NotFoundCompressorError
	}

	var respBody []byte
	var err error
	if param != nil {
		respBody, err = s.serializer.Marshal(param)
		if err != nil {
			return err
		}
	}

	compressedRespBody, err := compressor.
		Compressors[reqCtx.compareType].Zip(respBody)
	if err != nil {
		return err
	}
	h := header.ResponsePool.Get().(*header.ResponseHeader)
	defer func() {
		h.ResetHeader()
		header.ResponsePool.Put(h)
	}()
	h.ID = reqCtx.requestID
	h.Error = r.Error
	h.ResponseLen = uint32(len(compressedRespBody))
	h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
	h.CompressType = reqCtx.compareType

	if err = sendFrame(s.w, h.Marshal()); err != nil {
		return err
	}

	if err = write(s.w, compressedRespBody); err != nil {
		return err
	}
	s.w.(*bufio.Writer).Flush()
	return nil
}

此方法用于写入响应体,大致与写入请求体差不多,流程如下:

  • 将响应体序列化
  • 使用压缩算法将响应体进行压缩
  • 使用Pool管理响应头
  • 分别发送返回头和返回体

总结

tinyrpc是基于golang原生的net/rpc包实现,在此基础上实现了压缩和序列化等能力扩展。整体来看tinyrpc的代码非常简单,比较适合刚接触golang的程序员来进行阅读学习,学习一些golang的基础的开发技巧和一些语言特性。

以上就是Golang中tinyrpc框架的源码解读详解的详细内容,更多关于Golang tinyrpc框架的资料请关注脚本之家其它相关文章!

相关文章

  • Golang拾遗之实现一个不可复制类型详解

    Golang拾遗之实现一个不可复制类型详解

    在这篇文章中我们将实现一个无法被复制的类型,顺便加深对引用类型、值传递以及指针的理解。文中的示例代码讲解详细,感兴趣的可以了解一下
    2023-02-02
  • Golang算法问题之数组按指定规则排序的方法分析

    Golang算法问题之数组按指定规则排序的方法分析

    这篇文章主要介绍了Golang算法问题之数组按指定规则排序的方法,结合实例形式分析了Go语言数组排序相关算法原理与操作技巧,需要的朋友可以参考下
    2017-02-02
  • 详解Go如何优雅的对时间进行格式化

    详解Go如何优雅的对时间进行格式化

    这篇文章主要为大家详细介绍了Go语言中是如何优雅的对时间进行格式化的,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起了解一下
    2023-06-06
  • golang实现简单的tcp数据传输

    golang实现简单的tcp数据传输

    这篇文章主要为大家介绍了golang实现简单的tcp数据传输,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • 详解Golang函数式选项(Functional Options)模式

    详解Golang函数式选项(Functional Options)模式

    什么是函数式选项模式,为什么要这么写,这个编程模式解决了什么问题呢?其实就是为了解决动态灵活的配置不同的参数的问题。下面通过本文给大家介绍Golang函数式选项(Functional Options)模式的问题,感兴趣的朋友一起看看吧
    2021-12-12
  • Golang单元测试与断言编写流程详解

    Golang单元测试与断言编写流程详解

    这篇文章主要介绍了Golang单元测试与断言编写流程,单元测试也是一个很重要的事情。单元测试是指在开发中,对一个函数或模块的测试。其强调的是对单元进行测试
    2022-12-12
  • golang recover函数使用中的一些坑解析

    golang recover函数使用中的一些坑解析

    这篇文章主要为大家介绍了golang recover函数使用中的一些坑解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-02-02
  • Go语言基础之网络编程全面教程示例

    Go语言基础之网络编程全面教程示例

    这篇文章主要为大家介绍了Go语言基础之网络编程全面教程示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • 使用systemd部署和守护golang应用程序的操作方法

    使用systemd部署和守护golang应用程序的操作方法

    systemd是一个流行的守护进程管理器,可以轻松管理服务的启动、停止、重启等操作,让我们的应用程序始终保持在线,本文介绍了如何使用systemd部署和守护golang应用程序,感兴趣的朋友一起看看吧
    2023-10-10
  • Go方法简单性和高效性的充分体现详解

    Go方法简单性和高效性的充分体现详解

    本文深入探讨了Go语言中方法的各个方面,包括基础概念、定义与声明、特性、实战应用以及性能考量,文章充满技术深度,通过实例和代码演示,力图帮助读者全面理解Go方法的设计哲学和最佳实践
    2023-10-10

最新评论