To write a TCP server that handles Protocol Buffers data transmitted over a pure TCP stream. The network framework was chosen early on, using the high-performance gnet, but the issue is that there is no direct codec for parsing pure Protocol Buffers in gnet's example library, so I had to do it myself...
Protocol Analysis#
The data coming from the TCP stream is processed Protocol Buffers data, which carries the length information of the packet in the header, like this:
[ Header ][ Data ][ Header ][ Data ][ Header ][ Data ][ Header ][ Data ][ Header ][ Data ]
By calling the func DecodeVarint(b []byte) (uint64, int)
method from the official golang proto library, we can obtain two values from the data: the complete length of the data and the length of the header information indicating the data length.
Since there is no specific protocol for clearly delineating between packets, we have to use the header data for packet segmentation.
Decoder#
// Store relevant information within the connection
type DataStruct struct {
fullLength int
lenNumLength int
fullData []byte
}
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
ctx, ok := c.Context().(context.Context)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// Retrieve the codec storage struct for this connection from the context
r, ok := ctx.Value("codec").(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// Read all information from the buffer
bytes := c.Read()
// Check if we have started reading the packet
if len(r.fullData) == 0 {
// Call the function to get the information carried in the header
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
// Get the length of the data already stored in the struct
fullDataLong := len(r.fullData)
// Append all read data into fullData
r.fullData = append(r.fullData, bytes...)
// Check if the length meets the requirements
if len(r.fullData) >= r.fullLength+r.lenNumLength {
c.ShiftN(r.fullLength + r.lenNumLength - fullDataLong)
// Extract the valid data
res := r.fullData[r.lenNumLength : r.fullLength+r.lenNumLength]
// Clear the connection's buffer
r.fullData = []byte{}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return res, nil
}
// Move the read pointer
c.ShiftN(len(bytes))
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return nil, nil
}
The decoding method above is currently running without issues, while the one below is more memory-efficient. The main difference between the two decoding methods lies in the Read function called; the former reads all content from gnet's ring buffer, while the latter first reads the header, obtains the complete data length information, and then calls the ReadN function to accurately extract the packet body.
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
ctx, ok := c.Context().(context.Context)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// Retrieve the codec storage struct for this connection from the context
r, ok := ctx.Value("codec").(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return nil, nil
}
In the code, you can also see that the length information of the packet body in the header is stored in the connection's context, so when gnet triggers the connection opened event, we need to put the storage struct into the context.
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
ctx := context.WithValue(context.Background(), "codec", DataStruct{})
c.SetContext(ctx)
return
}
Encoder#
The encoder part is very simple; we can directly call the EncodeVarint
function in the proto library to generate the header of the packet body, placing the header information at the front of the packet body to send this data to the client.
func (d *Codec) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
buf = append(proto.EncodeVarint(uint64(len(buf))), buf...)
return buf, nil
}
2021-11-09 Update#
I overlooked a serious performance issue with the previous method of storing intermediate information in the context. When calling the native golang context.WithValue
method, it creates a child context under the passed context, which leads to an increasingly large context tree with each decoding, and each layer of context stores the DataStruct
for this decoding, causing memory leaks.
After struggling for several days and fixing several potential memory leak issues, I realized this (bald.jpg).
Then I looked at the implementation of the Context()
method in gnet.Conn
and found that it simply stores what we pass into a map, without needing to use context-related features. So the simple solution is to directly pass in DataStruct
. So far, this seems to have resolved the memory leak issue, and the code is as follows:
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
// Retrieve the codec storage struct for this connection from the context
r, ok := c.Context().(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
c.SetContext(r)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(r)
return nil, nil
}
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
var r = DataStruct{}
c.SetContext(r)
return
}
2021-12-24 Update#
Recently, gnet released version v1.6.x, and the behavior of the codec has changed, so the code needs to be modified.
The main change is in the eventloop_unix.go file of the gnet library, where the commit d1ca7f3 changed the entry point into React from when the returned packet is not nil to when the returned error is not nil, so corresponding modifications need to be made after the upgrade.
var (
ContinueRead = errors.New("continue read")
)
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
// Retrieve the codec storage struct for this connection from the context
r, ok := c.Context().(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, ContinueRead
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
c.SetContext(r)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(r)
return nil, ContinueRead
}