johnpoint

johnpoint

(。・∀・)ノ゙嗨
github

gnet の Protocol buffers コーデック

TCP サーバーを作成し、純粋な TCP ストリームで送信される Protocol buffers データを処理することを実現します。ネットワークフレームワークは早い段階で性能に優れた gnet を選択しましたが、gnet のサンプルライブラリには純粋な Protocol buffers を直接解析するコーデックがないため、自分で作成する必要がありました...

プロトコル解析#

TCP ストリームから送信されるのは、簡単に処理された Protocol buffers データで、データのヘッダーにはこのデータパケットの長さ情報が含まれています。以下のような形式です。

[ ヘッダー ][ データ ][ ヘッダー ][ データ ][ ヘッダー ][ データ ][ ヘッダー ][ データ ][ ヘッダー ][ データ ]

golang の proto 公式ライブラリの func DecodeVarint(b []byte) (uint64, int) メソッドを呼び出すことで、データから 2 つの値を取得できます。これらはそれぞれデータの完全な長さと、データ長を示すヘッダー情報の長さです。

特定のプロトコルがパケット間で明確な区切りを行わないため、ヘッダーデータを使用してパケットを分割する必要があります。

デコーダ#

// 接続内の関連情報を保存
type DataStruct struct {
	fullLength   int
	lenNumLength int
	fullData     []byte
}

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	ctx, ok := c.Context().(context.Context)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // コンテキストからこの接続のコーデックの保存 struct を取得
	r, ok := ctx.Value("codec").(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // バッファ内のすべての情報を読み取る
	bytes := c.Read()

    // パケットの読み取りが開始されたかどうかを判断
	if len(r.fullData) == 0 {

        // ヘッダーに含まれる情報を取得するために関数を呼び出す
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}

    // 現在の時間に保存された struct のデータの長さを取得
	fullDataLong := len(r.fullData)

    // 読み取ったデータをすべて fullData に追加
	r.fullData = append(r.fullData, bytes...)

    // 長さが要件を満たしているかどうかを判断
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		c.ShiftN(r.fullLength + r.lenNumLength - fullDataLong)

        // 有効なデータを切り取る
		res := r.fullData[r.lenNumLength : r.fullLength+r.lenNumLength]

        // 接続のバッファをクリア
		r.fullData = []byte{}
		ctx = context.WithValue(ctx, "codec", r)
		c.SetContext(ctx)
		return res, nil
	}

    // 読み取りポインタを移動
	c.ShiftN(len(bytes))
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(ctx)
	return nil, nil
}

上記のデコード方法は、現在のところ問題が発生していない方法ですが、以下の方法はメモリを節約することができます。2 つのデコード方法の主な違いは、呼び出す Read 関数が異なることです。前者は gnet のリングバッファ内の内容をすべて読み取りますが、後者は最初にヘッダーを読み取り、完全なデータ長情報を取得した後に ReadN 関数を呼び出してパケット本体を正確に取得します。

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	ctx, ok := c.Context().(context.Context)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // コンテキストからこの接続のコーデックの保存 struct を取得
	r, ok := ctx.Value("codec").(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		ctx = context.WithValue(ctx, "codec", r)
		c.SetContext(ctx)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(ctx)
	return nil, nil
}

コードの中でも、ヘッダーデータのパケット長情報は接続のコンテキストに保存されているため、gnet が接続オープンイベントをトリガーする際に、保存情報の struct をコンテキストに挿入する必要があります。

func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	ctx := context.WithValue(context.Background(), "codec", DataStruct{})
	c.SetContext(ctx)
	return
}

エンコーダ#

エンコーダの部分は非常にシンプルで、proto ライブラリ内の EncodeVarint 関数を呼び出すことで、このパケットのヘッダーを生成し、ヘッダー情報をパケットの前に置いてデータをクライアントに送信することができます。

func (d *Codec) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
	buf = append(proto.EncodeVarint(uint64(len(buf))), buf...)
	return buf, nil
}

2021-11-09 更新#

大意を失い、以前はコンテキストに中間情報を保存する方法に 深刻なパフォーマンス問題 がありました。golang のネイティブな context.WithValue メソッドを呼び出すと、渡されたコンテキストの下に子コンテキストが作成され、これが繰り返しデコードされるたびにコンテキストツリーがどんどん大きくなり、各レベルのコンテキスト内部に今回のデコードの DataStruct が保存され、メモリリークの問題を引き起こしました。

数日間苦しんでいくつかの可能性のあるメモリリークの危険を修正した後、私はこの点に気づきました(ハゲ.jpg)

その後、gnet.Conn の実装の Context() メソッドを見てみると、私たちが渡したものを単にマップに保存しているだけで、context 関連のものを使用する必要がないことがわかりました。したがって、簡単な解決策は DataStruct を直接渡すことです。現時点ではメモリリークの問題が解決されたようです。コードは以下の通りです。

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	// コンテキストからこの接続のコーデックの保存 struct を取得
	r, ok := c.Context().(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		c.SetContext(r)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(r)
	return nil, nil
}
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	var r = DataStruct{}
	c.SetContext(r)
	return
}

2021-12-24 更新#

最近 gnet が v1.6.x の新バージョンをリリースしました。この新バージョンでは、コーデックの動作が変更されたため、コードを改造する必要があります。

主な変更点は、gnet ライブラリの eventloop_unix.go ファイルの d1ca7f3 コミットで、React に入るタイミングが返されるパケットが nil でない場合から、返されるエラーが nil でない場合に変更されたため、アップグレード後に対応する修正が必要です。

var (
	ContinueRead = errors.New("continue read")
)

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	// コンテキストからこの接続のコーデックの保存 struct を取得
	r, ok := c.Context().(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, ContinueRead
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		c.SetContext(r)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(r)
	return nil, ContinueRead
}

参考資料#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。