要寫一個 TCP 服務端,實現處理在純 TCP 流中傳輸的 Protocol buffers 數據。網絡框架很早就選好了,用性能傑出的 gnet,問題是 gnet 的示例庫裡面沒有直接解析純 Protocol buffers 的編解碼器,於是乎只能自己動手了...
協議分析#
從 TCP 流裡面傳過來的是經過簡單處理的 Protocol buffers 數據,他在數據的頭攜帶了這個數據包的長度信息,像是這樣
[ 頭 ][ 數據 ][ 頭 ][ 數據 ][ 頭 ][ 數據 ][ 頭 ][ 數據 ][ 頭 ][ 數據 ]
調用 golang 的 proto 官方庫中的 func DecodeVarint(b []byte) (uint64, int)
方法可以從數據中拿到兩個值,分別是 數據的完整長度、標明數據長度的頭信息的長度。
由於沒有特定的協議在包與包之間進行明顯的劃分,所以得用他的頭數據來進行分包。
解碼器#
// 儲存連接內的相關信息
type DataStruct struct {
fullLength int
lenNumLength int
fullData []byte
}
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
ctx, ok := c.Context().(context.Context)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// 從上下文裡面拿出這個連接的編解碼器儲存 struct
r, ok := ctx.Value("codec").(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// 讀取緩衝區內的所有信息
bytes := c.Read()
// 判斷是否已經開始讀取包
if len(r.fullData) == 0 {
// 調用函數獲取頭中帶的信息
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
// 拿到當前時間已經被儲存進 struct 的數據的長度
fullDataLong := len(r.fullData)
// 把讀到的數據一把梭全部拼進 fullData
r.fullData = append(r.fullData, bytes...)
// 判斷長度是否符合要求
if len(r.fullData) >= r.fullLength+r.lenNumLength {
c.ShiftN(r.fullLength + r.lenNumLength - fullDataLong)
// 截取有效的數據
res := r.fullData[r.lenNumLength : r.fullLength+r.lenNumLength]
// 連接的緩存清空
r.fullData = []byte{}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return res, nil
}
// 移動讀取指針
c.ShiftN(len(bytes))
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return nil, nil
}
上面那種解碼方式是目前看運行狀況來說暫時沒有出現問題的方法,下面那一種則比較節省內存,兩種解碼方式區別主要是在於調用的 Read 函數不同,前者是把 gnet 的 ring buffer 裡面的內容全部讀取出來,而後者是先把頭讀取出來,拿到了完整的數據長度信息之後調用 ReadN 函數直接準確的將包體取出。
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
ctx, ok := c.Context().(context.Context)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// 從上下文裡面拿出這個連接的編解碼器儲存 struct
r, ok := ctx.Value("codec").(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return nil, nil
}
在代碼中也可以看見,頭數據中的包體長度信息我是存在連接的上下文中的,所以在 gnet 觸發連接打開的事件時需要將儲存信息的 struct 塞進上下文中。
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
ctx := context.WithValue(context.Background(), "codec", DataStruct{})
c.SetContext(ctx)
return
}
編碼器#
編碼器這個部分就非常簡單了,直接調用 proto 庫裡面的 EncodeVarint 函數就可以生成這個包體的頭,將頭信息放在包體的前面就可以將這個數據發送到客戶端了。
func (d *Codec) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
buf = append(proto.EncodeVarint(uint64(len(buf))), buf...)
return buf, nil
}
2021-11-09 更新#
大意了,之前用上下文存儲中間信息的方法有 嚴重的性能問題,在調用 golang 原生的 context.WithValue
方法時候,會在傳入的上下文下面創建一個子上下文,這就導致了在一次又一次解碼中,上下文樹越來越龐大,而且每一層上下文內部都存儲了本次解碼的 DataStruct
,造成內存洩漏的問題。
在苦苦查了好幾天,並且修了幾個有可能的內存洩漏隱患之後我才意識到這一點 (禿頭.jpg)
然後再看了下 gnet.Conn
的一個實現的 Context()
方法,發現他只是將我們傳進去的東西存在了一個 map 裡面,並不需要使用 context
相關的,所以簡單的解決方法就是直接將 DataStruct
傳進去,目前來看是解決了內存洩漏的問題,代碼如下
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
// 從上下文裡面拿出這個連接的編解碼器儲存 struct
r, ok := c.Context().(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
c.SetContext(r)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(r)
return nil, nil
}
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
var r = DataStruct{}
c.SetContext(r)
return
}
2021-12-24 更新#
最近 gnet 發布了 v1.6.x 新版本,新版本的 編解碼器行為有所改變,所以需要改造一下代碼
主要的改動是在 gnet 庫的 eventloop_unix.go 文件的 d1ca7f3 commit 中將進入 React 的時間點從返回的 packet 不為 nil 改為了返回的 err 不為 nil,所以在升級後需要做對應的修改
var (
ContinueRead = errors.New("continue read")
)
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
// 從上下文裡面拿出這個連接的編解碼器儲存 struct
r, ok := c.Context().(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, ContinueRead
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
c.SetContext(r)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(r)
return nil, ContinueRead
}