johnpoint

johnpoint

(。・∀・)ノ゙嗨
github

用于 gnet 的 Protocol buffers 编解码器

要写一个 TCP 服务端,实现处理在纯 TCP 流中传输的 Protocol buffers 数据。网络框架很早就选好了,用性能杰出的 gnet,问题是 gnet 的示例库里面没有直接解析纯 Protocol buffers 的编解码器,于是乎只能自己动手了...

协议分析#

从 TCP 流里面传过来的是经过简单处理的 Protocol buffers 数据,他在数据的头携带了这个数据包的长度信息,像是这样

[ 头 ][ 数据 ][ 头 ][ 数据 ][ 头 ][ 数据 ][ 头 ][ 数据 ][ 头 ][ 数据 ]

调用 golang 的 proto 官方库中的 func DecodeVarint(b []byte) (uint64, int) 方法可以从数据中拿到两个值,分别是 数据的完整长度、标明数据长度的头信息的长度。

由于没有特定的协议在包与包之间进行明显的划分,所以得用他的头数据来进行分包。

解码器#

// 储存连接内的相关信息
type DataStruct struct {
	fullLength   int
	lenNumLength int
	fullData     []byte
}

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	ctx, ok := c.Context().(context.Context)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 从上下文里面拿出这个连接的编解码器储存 struct
	r, ok := ctx.Value("codec").(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 读取缓冲区内的所有信息
	bytes := c.Read()

    // 判断是否已经开始读取包
	if len(r.fullData) == 0 {

        // 调用函数获取头中带的信息
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}

    // 拿到当前时间已经被储存进 struct 的数据的长度
	fullDataLong := len(r.fullData)

    // 把读到的数据一把梭全部拼进 fullData
	r.fullData = append(r.fullData, bytes...)

    // 判断长度是否符合要求
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		c.ShiftN(r.fullLength + r.lenNumLength - fullDataLong)

        // 截取有效的数据
		res := r.fullData[r.lenNumLength : r.fullLength+r.lenNumLength]

        // 连接的缓存清空
		r.fullData = []byte{}
		ctx = context.WithValue(ctx, "codec", r)
		c.SetContext(ctx)
		return res, nil
	}

    // 移动读取指针
	c.ShiftN(len(bytes))
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(ctx)
	return nil, nil
}

上面那种解码方式是目前看运行状况来说暂时没有出现问题的方法,下面那一种则比较节省内存,两种解码方式区别主要是在于调用的 Read 函数不同,前者是把 gnet 的 ring buffer 里面的内容全部读取出来,而后者是先把头读取出来,拿到了完整的数据长度信息之后调用 ReadN 函数直接准确的将包体取出。

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	ctx, ok := c.Context().(context.Context)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 从上下文里面拿出这个连接的编解码器储存 struct
	r, ok := ctx.Value("codec").(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		ctx = context.WithValue(ctx, "codec", r)
		c.SetContext(ctx)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(ctx)
	return nil, nil
}

在代码中也可以看见,头数据中的包体长度信息我是存在连接的上下文中的,所以在 gnet 触发连接打开的事件时需要将储存信息的 struct 塞进上下文中。

func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	ctx := context.WithValue(context.Background(), "codec", DataStruct{})
	c.SetContext(ctx)
	return
}

编码器#

编码器这个部分就非常简单了,直接调用 proto 库里面的 EncodeVarint 函数就可以生成这个包体的头,将头信息放在包体的前面就可以将这个数据发送到客户端了。

func (d *Codec) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
	buf = append(proto.EncodeVarint(uint64(len(buf))), buf...)
	return buf, nil
}

2021-11-09 更新#

大意了,之前用上下文存储中间信息的方法有 严重的性能问题,在调用 golang 原生的 context.WithValue 方法时候,会在传入的上下文下面创建一个子上下文,这就导致了在一次又一次解码中,上下文树越来越庞大,而且每一层上下文内部都存储了本次解码的 DataStruct,造成内存泄漏的问题。

在苦苦查了好几天,并且修了几个有可能的内存泄漏隐患之后我才意识到这一点 (秃头.jpg)

然后再看了下 gnet.Conn 的一个实现的 Context() 方法,发现他只是将我们传进去的东西存在了一个 map 里面,并不需要使用 context 相关的,所以简单的解决方法就是直接将 DataStruct 传进去,目前来看是解决了内存泄漏的问题,代码如下

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	// 从上下文里面拿出这个连接的编解码器储存 struct
	r, ok := c.Context().(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		c.SetContext(r)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(r)
	return nil, nil
}
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	var r = DataStruct{}
	c.SetContext(r)
	return
}

2021-12-24 更新#

最近 gnet 发布了 v1.6.x 新版本,新版本的 编解码器行为有所改变,所以需要改造一下代码

主要的改动是在 gnet 库的 eventloop_unix.go 文件的 d1ca7f3 commit 中将进入 React 的时间点从返回的 packet 不为 nil 改为了返回的 err 不为 nil,所以在升级后需要做对应的修改

var (
	ContinueRead = errors.New("continue read")
)


func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	// 从上下文里面拿出这个连接的编解码器储存 struct
	r, ok := c.Context().(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, ContinueRead
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		c.SetContext(r)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(r)
	return nil, ContinueRead
}

参考资料#

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。