johnpoint

johnpoint

(。・∀・)ノ゙嗨
github

用於 gnet 的 Protocol buffers 編解碼器

要寫一個 TCP 服務端,實現處理在純 TCP 流中傳輸的 Protocol buffers 數據。網絡框架很早就選好了,用性能傑出的 gnet,問題是 gnet 的示例庫裡面沒有直接解析純 Protocol buffers 的編解碼器,於是乎只能自己動手了...

協議分析#

從 TCP 流裡面傳過來的是經過簡單處理的 Protocol buffers 數據,他在數據的頭攜帶了這個數據包的長度信息,像是這樣

[ 頭 ][ 數據 ][ 頭 ][ 數據 ][ 頭 ][ 數據 ][ 頭 ][ 數據 ][ 頭 ][ 數據 ]

調用 golang 的 proto 官方庫中的 func DecodeVarint(b []byte) (uint64, int) 方法可以從數據中拿到兩個值,分別是 數據的完整長度、標明數據長度的頭信息的長度。

由於沒有特定的協議在包與包之間進行明顯的劃分,所以得用他的頭數據來進行分包。

解碼器#

// 儲存連接內的相關信息
type DataStruct struct {
	fullLength   int
	lenNumLength int
	fullData     []byte
}

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	ctx, ok := c.Context().(context.Context)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 從上下文裡面拿出這個連接的編解碼器儲存 struct
	r, ok := ctx.Value("codec").(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 讀取緩衝區內的所有信息
	bytes := c.Read()

    // 判斷是否已經開始讀取包
	if len(r.fullData) == 0 {

        // 調用函數獲取頭中帶的信息
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}

    // 拿到當前時間已經被儲存進 struct 的數據的長度
	fullDataLong := len(r.fullData)

    // 把讀到的數據一把梭全部拼進 fullData
	r.fullData = append(r.fullData, bytes...)

    // 判斷長度是否符合要求
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		c.ShiftN(r.fullLength + r.lenNumLength - fullDataLong)

        // 截取有效的數據
		res := r.fullData[r.lenNumLength : r.fullLength+r.lenNumLength]

        // 連接的緩存清空
		r.fullData = []byte{}
		ctx = context.WithValue(ctx, "codec", r)
		c.SetContext(ctx)
		return res, nil
	}

    // 移動讀取指針
	c.ShiftN(len(bytes))
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(ctx)
	return nil, nil
}

上面那種解碼方式是目前看運行狀況來說暫時沒有出現問題的方法,下面那一種則比較節省內存,兩種解碼方式區別主要是在於調用的 Read 函數不同,前者是把 gnet 的 ring buffer 裡面的內容全部讀取出來,而後者是先把頭讀取出來,拿到了完整的數據長度信息之後調用 ReadN 函數直接準確的將包體取出。

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	ctx, ok := c.Context().(context.Context)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 從上下文裡面拿出這個連接的編解碼器儲存 struct
	r, ok := ctx.Value("codec").(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		ctx = context.WithValue(ctx, "codec", r)
		c.SetContext(ctx)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(ctx)
	return nil, nil
}

在代碼中也可以看見,頭數據中的包體長度信息我是存在連接的上下文中的,所以在 gnet 觸發連接打開的事件時需要將儲存信息的 struct 塞進上下文中。

func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	ctx := context.WithValue(context.Background(), "codec", DataStruct{})
	c.SetContext(ctx)
	return
}

編碼器#

編碼器這個部分就非常簡單了,直接調用 proto 庫裡面的 EncodeVarint 函數就可以生成這個包體的頭,將頭信息放在包體的前面就可以將這個數據發送到客戶端了。

func (d *Codec) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
	buf = append(proto.EncodeVarint(uint64(len(buf))), buf...)
	return buf, nil
}

2021-11-09 更新#

大意了,之前用上下文存儲中間信息的方法有 嚴重的性能問題,在調用 golang 原生的 context.WithValue 方法時候,會在傳入的上下文下面創建一個子上下文,這就導致了在一次又一次解碼中,上下文樹越來越龐大,而且每一層上下文內部都存儲了本次解碼的 DataStruct,造成內存洩漏的問題。

在苦苦查了好幾天,並且修了幾個有可能的內存洩漏隱患之後我才意識到這一點 (禿頭.jpg)

然後再看了下 gnet.Conn 的一個實現的 Context() 方法,發現他只是將我們傳進去的東西存在了一個 map 裡面,並不需要使用 context 相關的,所以簡單的解決方法就是直接將 DataStruct 傳進去,目前來看是解決了內存洩漏的問題,代碼如下

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	// 從上下文裡面拿出這個連接的編解碼器儲存 struct
	r, ok := c.Context().(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		c.SetContext(r)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(r)
	return nil, nil
}
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	var r = DataStruct{}
	c.SetContext(r)
	return
}

2021-12-24 更新#

最近 gnet 發布了 v1.6.x 新版本,新版本的 編解碼器行為有所改變,所以需要改造一下代碼

主要的改動是在 gnet 庫的 eventloop_unix.go 文件的 d1ca7f3 commit 中將進入 React 的時間點從返回的 packet 不為 nil 改為了返回的 err 不為 nil,所以在升級後需要做對應的修改

var (
	ContinueRead = errors.New("continue read")
)


func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	// 從上下文裡面拿出這個連接的編解碼器儲存 struct
	r, ok := c.Context().(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, ContinueRead
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		c.SetContext(r)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(r)
	return nil, ContinueRead
}

參考資料#

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。