Golang 实现Redis 协议解析器的解决方案
本文是 《用 Golang 实现一个 Redis》系列文章第二篇,本文将分别介绍Redis 通信协议 以及 协议解析器 的实现,若您对协议有所了解可以直接阅读协议解析器部分。
Redis 通信协议
Redis 自 2.0 版本起使用了统一的协议 RESP (REdis Serialization Protocol),该协议易于实现,计算机可以高效的进行解析且易于被人类读懂。
RESP 是一个二进制安全的文本协议,工作于 TCP 协议上。RESP 以行作为单位,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。
二进制安全是指允许协议中出现任意字符而不会导致故障。比如 C 语言的字符串以 \0
作为结尾不允许字符串中间出现\0
, 而 Go 语言的 string 则允许出现 \0
,我们说 Go 语言的 string 是二进制安全的,而 C 语言字符串不是二进制安全的。
RESP 的二进制安全性允许我们在 key 或者 value 中包含 \r
或者 \n
这样的特殊字符。在使用 redis 存储 protobuf、msgpack 等二进制数据时,二进制安全性尤为重要。
RESP 定义了5种格式:
- 简单字符串(Simple String): 服务器用来返回简单的结果,比如"OK"。非二进制安全,且不允许换行。
- 错误信息(Error): 服务器用来返回简单的错误信息,比如"ERR Invalid Synatx"。非二进制安全,且不允许换行。
- 整数(Integer): llen、scard 等命令的返回值, 64位有符号整数
- 字符串(Bulk String): 二进制安全字符串, 比如 get 等命令的返回值
- 数组(Array, 又称 Multi Bulk Strings): Bulk String 数组,客户端发送指令以及 lrange 等命令响应的格式
RESP 通过第一个字符来表示格式:
- 简单字符串:以"+" 开始, 如:"+OK\r\n"
- 错误:以"-" 开始,如:"-ERR Invalid Synatx\r\n"
- 整数:以":"开始,如:":1\r\n"
- 字符串:以
$
开始 - 数组:以
*
开始
Bulk String有两行,第一行为 $
+正文长度,第二行为实际内容。如:
$3\r\nSET\r\n
Bulk String 是二进制安全的可以包含任意字节,就是说可以在 Bulk String 内部包含 "\r\n" 字符(行尾的CRLF被隐藏):
$4a\r\nb
$-1
表示 nil, 比如使用 get 命令查询一个不存在的key时,响应即为$-1
。
Array 格式第一行为 "*"+数组长度,其后是相应数量的 Bulk String。如, ["foo", "bar"]
的报文:
*2 $3 foo $3 bar
客户端也使用 Array 格式向服务端发送指令。命令本身将作为第一个参数,如 SET key value
指令的RESP报文:
*3 $3 SET $3 key $5 value
将换行符打印出来:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
协议解析器
我们在 实现TCP服务器 一文中已经介绍过TCP服务器的实现,协议解析器将实现其 Handler 接口充当应用层服务器。
协议解析器将接收 Socket 传来的数据,并将其数据还原为 [][]byte
格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n"
将被还原为 ['SET', 'key', 'value']
。
本文完整代码: github.com/hdt3213/godis/redis/parser
来自客户端的请求均为数组格式,它在第一行中标记报文的总行数并使用CRLF
作为分行符。
bufio
标准库可以将从 reader 读到的数据缓存到 buffer 中,直至遇到分隔符或读取完毕后返回,所以我们使用 reader.ReadBytes('\n')
来保证每次读取到完整的一行。
需要注意的是RESP是二进制安全
的协议,它允许在正文中使用CRLF
字符。举例来说 Redis 可以正确接收并执行SET "a\r\nb" 1
指令, 这条指令的正确报文是这样的:
*3 $3 SET $4 a\r\nb $7 myvalue
当 ReadBytes
读取到第五行 "a\r\nb\r\n"时会将其误认为两行:
*3 $3 SET $4 a // 错误的分行 b // 错误的分行 $7 myvalue
因此当读取到第四行$4
后, 不应该继续使用 ReadBytes('\n')
读取下一行, 应使用 io.ReadFull(reader, msg)
方法来读取指定长度的内容。
msg = make([]byte, 4 + 2) // 正文长度4 + 换行符长度2 _, err = io.ReadFull(reader, msg)
首先我们来定义解析器的接口:
// Payload stores redis.Reply or error type Payload struct { Data redis.Reply Err error } // ParseStream 通过 io.Reader 读取数据并将结果通过 channel 将结果返回给调用者 // 流式处理的接口适合供客户端/服务端使用 func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } // ParseOne 解析 []byte 并返回 redis.Reply func ParseOne(data []byte) (redis.Reply, error) { ch := make(chan *Payload) reader := bytes.NewReader(data) go parse0(reader, ch) payload := <-ch // parse0 will close the channel if payload == nil { return nil, errors.New("no reply") } return payload.Data, payload.Err }
接下来我们可以看一下解析器核心流程的伪代码,您可以在parser.go看到完整代码:
func parse0(reader io.Reader, ch chan<- *Payload) { // 初始化读取状态 readingMultiLine := false expectedArgsCount := 0 var args [][]byte var bulkLen int64 for { // 上文中我们提到 RESP 是以行为单位的 // 因为行分为简单字符串和二进制安全的BulkString,我们需要封装一个 readLine 函数来兼容 line, err = readLine(reader, bulkLen) if err != nil { // 处理错误 return } // 接下来我们对刚刚读取的行进行解析 // 我们简单的将 Reply 分为两类: // 单行: StatusReply, IntReply, ErrorReply // 多行: BulkReply, MultiBulkReply if !readingMultiLine { if isMulitBulkHeader(line) { // 我们收到了 MulitBulkReply 的第一行 // 获得 MulitBulkReply 中 BulkString 的个数 expectedArgsCount = parseMulitBulkHeader(line) // 等待 MulitBulkReply 后续行 readingMultiLine = true } else if isBulkHeader(line) { // 我们收到了 BulkReply 的第一行 // 获得 BulkReply 第二行的长度, 通过 bulkLen 告诉 readLine 函数下一行 BulkString 的长度 bulkLen = parseBulkHeader() // 这个 Reply 中一共有 1 个 BulkString expectedArgsCount = 1 // 等待 BulkReply 后续行 readingMultiLine = true } else { // 处理 StatusReply, IntReply, ErrorReply 等单行 Reply reply := parseSingleLineReply(line) // 通过 ch 返回结果 emitReply(ch) } } else { // 进入此分支说明我们正在等待 MulitBulkReply 或 BulkReply 的后续行 // MulitBulkReply 的后续行有两种,BulkHeader 或者 BulkString if isBulkHeader(line) { bulkLen = parseBulkHeader() } else { // 我们正在读取一个 BulkString, 它可能是 MulitBulkReply 或 BulkReply args = append(args, line) } if len(args) == expectedArgsCount { // 我们已经读取了所有后续行 // 通过 ch 返回结果 emitReply(ch) // 重置状态, 准备解析下一条 Reply readingMultiLine = false expectedArgsCount = 0 args = nil bulkLen = 0 } } } }
贴一下工具函数的实现:
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { // read simple line msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { // read bulk line (binary safe) msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil } func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { // first line of multi bulk reply state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseSingleLineReply(msg []byte) (redis.Reply, error) { str := strings.TrimSuffix(string(msg), "\n") str = strings.TrimSuffix(str, "\r") var result redis.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) default: // parse as text protocol strs := strings.Split(str, " ") args := make([][]byte, len(strs)) for i, s := range strs { args[i] = []byte(s) } result = reply.MakeMultiBulkReply(args) } return result, nil }
到此这篇关于Golang 实现 Redis 协议解析器的文章就介绍到这了,更多相关go redis 协议解析器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
最新评论