go高性能网络框架Netpoll-编解码

内容纲要
  • 概述

    Netpoll框架不提供编解码,这事得自己来。这次来探讨下编解码吧。编解码也叫序列化和反序列化,就是struct等转换为字节流,然后把字节流转换成struct等。

现在有很多编解码框架可以用,比如:protocolbuf等,但是如果你不想使用这些框架,你应该怎么进行编解码?

可选编码方法

  1. 文本类:JSON,XML,CSV,HTTP,asn.1:JER/XER/GXER
  2. 二进制:asn.1:BER/CER/DER/JER/PER
  3. 自定义二进制:自己根据编码规则进行自定义协议进行编解码

asn.1有点强大,强大则会相对复杂,go中也支持asn.1编码,不过只支持DER,大家有兴趣可以了解一下,包名:encoding/asn1。它提供了编码和解码的方法:

// 编码
func Marshal(val any) ([]byte, error)

// 解码
func Unmarshal(b []byte, val any) (rest []byte, err error) 

粘包问题

这个问题是网络通信中必须要解决的一个问题,因为你要解决一条完整消息的问题,专业数据好像叫:消息定界

粘包只有TCP才会发生(因为TCP是基于流的),UDP不会发生粘包,但是TCP应用的比较广泛(面试中也会被经常问到)。

消息定界

消息定界有太多方法了,参考asn.1中的TLV,LV,分隔符,固定长度等等。

我觉得既然自定义就简单点就用Length-Value编码就可以了。

关于Length-Value编码的话,我之前写过一篇文章:传输与存储-LV(Length-Value)编码

协议头

Length-Value编码用来进行消息定界,我们还得解决一些消息传输中的控制信息,通常我们使用消息头来解决。比如:我们可以要在消息里面定义一个消息版本,类似与ipv4和ipv6。

协议头也有定长协议头和变长协议头,他们的用途主要是:

  1. 定长协议头主要是用来进行快速解析和处理。
  2. 变长协议头一般用于对消息的描述,也就是真正消息的元数据。

理论应该差不多了,可以来一波自定义协议了,就采用Length-Value。

我们来实现一Length-Value编解码:LengthBaseCodec。

编解码基本描述

编解码接口定义

首先我们定义一个接口Codec 接口定义了 Encode 和 Decode 方法:


type Codec interface {
    Encode(frame *Frame) ([]byte, error)
    Decode(data []byte) (*Frame, error) 
}

数据帧定义

我们定义数据帧来存储数据,这个数据帧包含了:

  1. Version: 消息版本
  2. Type: 帧类型
  3. Headers: 变长协议头
  4. Payload: 有效载荷
    其中Version和Type是定长协议头,Headers变长协议头(采用 Length-Value 编码),有效载荷就是Byte数组了,具体由消息处理端来进行编解码,通用编解码这层不管由上层处理。
type Frame struct {
    Version  uint8            
    Type     uint8            
    Headers  map[string][]byte 
    Payload  []byte           
}

LengthBaseCodec实现

这个比较简单,定义一个struct来实现之前定义的Codec就可以了。

type LengthBaseCodec struct{}

// 编码实现
func (c *LengthBaseCodec) Encode(frame *Frame) ([]byte, error) 

// 解码实现
func (c *LengthBaseCodec) Decode(data []byte) (*Frame, error) 

编码

func (c *LengthBaseCodec) Encode(frame *Frame) ([]byte, error) {
    buf := new(bytes.Buffer)

    // 编码定长字段,使用 Varint 编码
    buf.Write(EncodeVarint(uint64(frame.Version)))
    buf.Write(EncodeVarint(uint64(frame.Type)))

    // 编码变长协议头
    headerBuf := new(bytes.Buffer)
    for key, value := range frame.Headers {
        headerBuf.Write(EncodeVarint(uint64(len(key))))
        headerBuf.WriteString(key)
        headerBuf.Write(EncodeVarint(uint64(len(value))))
        headerBuf.Write(value)
    }
    // 变长协议头写入buf中
    buf.Write(EncodeVarint(uint64(headerBuf.Len())))
    buf.Write(headerBuf.Bytes())

    // 编码 Payload
    if _, err := buf.Write(frame.Payload); err != nil {
        return nil, err
    }

    // 实际编码的时候只需要调用Writer.Write()
    // 先写EncodeVarint(uint64(buf.Available()))这几个字节就行了
    lvBuf := new(bytes.Buffer)
    lvBuf.Write(EncodeVarint(uint64(buf.Available())))
    lvBuf.Write(buf.Bytes())

    return lvBuf.Bytes(), nil
}

func EncodeVarint(variable uint64) []byte {
    var cmdBuf [binary.MaxVarintLen64]byte
    encodeLen := binary.PutUvarint(cmdBuf[:], variable)
    return cmdBuf[:encodeLen]
}

解码


// Decode 对数据进行解码,返回 Frame 指针
func (c *LengthBaseCodec) Decode(data []byte) (*Frame, error) {
    buf := bytes.NewReader(data)
    frame := &Frame{
        Headers: make(map[string][]byte),
    }

    // 实际编码中,调用Reader
    // 可以一个一个字节的读取
    // 判断这个字节最高位是否为0
    // 这个过程binary已经封装好了
    // len, err := binary.ReadUvarint(reader)
    len, _ := binary.ReadUvarint(buf)
    fmt.Printf("frame length: %d\n", len)

    // 解码定长字段,使用 Varint 解码
    version, _ := binary.ReadUvarint(buf)
    frame.Version = uint8(version)

    frameType, _ := binary.ReadUvarint(buf)
    frame.Type = uint8(frameType)

    // 解码变长协议头
    headerLen, _ := binary.ReadUvarint(buf)
    headLen := int(headerLen)
    for headLen > 0 {
        // 解析key长度
        keyLen64, _ := binary.ReadUvarint(buf)
        keyLen := int(keyLen64)
        // 这里我们已经知道了key的长度为1,所以就直接减1了,实际开发需要计算

        // 解析Key的值
        key := make([]byte, keyLen)
        buf.Read(key)

        headLen -= (keyLen + 1)

        // 解析Value的长度
        valLen64, _ := binary.ReadUvarint(buf)
        valLen := int(valLen64)
        // 解析Value的值
        value := make([]byte, valLen)
        buf.Read(value)
        frame.Headers[string(key)] = value
        headLen -= (valLen + 1)
    }

    // 其余的全部是 Payload
    frame.Payload, _ = io.ReadAll(buf)

    return frame, nil
}

代码运行起来


func main() {
    // 示例使用
    codec := LengthBaseCodec{}
    frame := &Frame{
        Version: 1,
        Type:    10,
        Headers: map[string][]byte{
            "Key1": []byte("Value1"),
            "Key2": []byte("Value2"),
        },
        Payload: []byte("Hello, World!"),
    }

    encodedFrame, _ := codec.Encode(frame)
    fmt.Println("Encoded Frame:", encodedFrame)

    decodedFrame, _ := codec.Decode(encodedFrame)
    for key, value := range decodedFrame.Headers {
        fmt.Println("Header:", key, "Value:", string(value))
    }
    fmt.Println("Version:", decodedFrame.Version)
    fmt.Println("Type:", decodedFrame.Type)
    fmt.Println("Payload:", string(decodedFrame.Payload))
}

file

总结

自定义编码和解码都是自己来实现,过程还是挺麻烦的。我提供的代码都是仅供参考,实际业务场景中你根据自己的情况来改一下也是可以使用,尤其是变长协议头这块你可以直接重构掉。

搞定收工

发表评论

您的电子邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部