TCP サーバーを作成し、純粋な TCP ストリームで送信される Protocol buffers データを処理することを実現します。ネットワークフレームワークは早い段階で性能に優れた gnet を選択しましたが、gnet のサンプルライブラリには純粋な Protocol buffers を直接解析するコーデックがないため、自分で作成する必要がありました...
プロトコル解析#
TCP ストリームから送信されるのは、簡単に処理された Protocol buffers データで、データのヘッダーにはこのデータパケットの長さ情報が含まれています。以下のような形式です。
[ ヘッダー ][ データ ][ ヘッダー ][ データ ][ ヘッダー ][ データ ][ ヘッダー ][ データ ][ ヘッダー ][ データ ]
golang の proto 公式ライブラリの func DecodeVarint(b []byte) (uint64, int)
メソッドを呼び出すことで、データから 2 つの値を取得できます。これらはそれぞれデータの完全な長さと、データ長を示すヘッダー情報の長さです。
特定のプロトコルがパケット間で明確な区切りを行わないため、ヘッダーデータを使用してパケットを分割する必要があります。
デコーダ#
// 接続内の関連情報を保存
type DataStruct struct {
fullLength int
lenNumLength int
fullData []byte
}
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
ctx, ok := c.Context().(context.Context)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// コンテキストからこの接続のコーデックの保存 struct を取得
r, ok := ctx.Value("codec").(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// バッファ内のすべての情報を読み取る
bytes := c.Read()
// パケットの読み取りが開始されたかどうかを判断
if len(r.fullData) == 0 {
// ヘッダーに含まれる情報を取得するために関数を呼び出す
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
// 現在の時間に保存された struct のデータの長さを取得
fullDataLong := len(r.fullData)
// 読み取ったデータをすべて fullData に追加
r.fullData = append(r.fullData, bytes...)
// 長さが要件を満たしているかどうかを判断
if len(r.fullData) >= r.fullLength+r.lenNumLength {
c.ShiftN(r.fullLength + r.lenNumLength - fullDataLong)
// 有効なデータを切り取る
res := r.fullData[r.lenNumLength : r.fullLength+r.lenNumLength]
// 接続のバッファをクリア
r.fullData = []byte{}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return res, nil
}
// 読み取りポインタを移動
c.ShiftN(len(bytes))
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return nil, nil
}
上記のデコード方法は、現在のところ問題が発生していない方法ですが、以下の方法はメモリを節約することができます。2 つのデコード方法の主な違いは、呼び出す Read 関数が異なることです。前者は gnet のリングバッファ内の内容をすべて読み取りますが、後者は最初にヘッダーを読み取り、完全なデータ長情報を取得した後に ReadN 関数を呼び出してパケット本体を正確に取得します。
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
ctx, ok := c.Context().(context.Context)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// コンテキストからこの接続のコーデックの保存 struct を取得
r, ok := ctx.Value("codec").(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return nil, nil
}
コードの中でも、ヘッダーデータのパケット長情報は接続のコンテキストに保存されているため、gnet が接続オープンイベントをトリガーする際に、保存情報の struct をコンテキストに挿入する必要があります。
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
ctx := context.WithValue(context.Background(), "codec", DataStruct{})
c.SetContext(ctx)
return
}
エンコーダ#
エンコーダの部分は非常にシンプルで、proto ライブラリ内の EncodeVarint 関数を呼び出すことで、このパケットのヘッダーを生成し、ヘッダー情報をパケットの前に置いてデータをクライアントに送信することができます。
func (d *Codec) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
buf = append(proto.EncodeVarint(uint64(len(buf))), buf...)
return buf, nil
}
2021-11-09 更新#
大意を失い、以前はコンテキストに中間情報を保存する方法に 深刻なパフォーマンス問題 がありました。golang のネイティブな context.WithValue
メソッドを呼び出すと、渡されたコンテキストの下に子コンテキストが作成され、これが繰り返しデコードされるたびにコンテキストツリーがどんどん大きくなり、各レベルのコンテキスト内部に今回のデコードの DataStruct
が保存され、メモリリークの問題を引き起こしました。
数日間苦しんでいくつかの可能性のあるメモリリークの危険を修正した後、私はこの点に気づきました(ハゲ.jpg)
その後、gnet.Conn
の実装の Context()
メソッドを見てみると、私たちが渡したものを単にマップに保存しているだけで、context
関連のものを使用する必要がないことがわかりました。したがって、簡単な解決策は DataStruct
を直接渡すことです。現時点ではメモリリークの問題が解決されたようです。コードは以下の通りです。
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
// コンテキストからこの接続のコーデックの保存 struct を取得
r, ok := c.Context().(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
c.SetContext(r)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(r)
return nil, nil
}
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
var r = DataStruct{}
c.SetContext(r)
return
}
2021-12-24 更新#
最近 gnet が v1.6.x の新バージョンをリリースしました。この新バージョンでは、コーデックの動作が変更されたため、コードを改造する必要があります。
主な変更点は、gnet ライブラリの eventloop_unix.go ファイルの d1ca7f3 コミットで、React に入るタイミングが返されるパケットが nil でない場合から、返されるエラーが nil でない場合に変更されたため、アップグレード後に対応する修正が必要です。
var (
ContinueRead = errors.New("continue read")
)
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
// コンテキストからこの接続のコーデックの保存 struct を取得
r, ok := c.Context().(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, ContinueRead
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
c.SetContext(r)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(r)
return nil, ContinueRead
}