Go I/O multiplexing

In the article “Go Classical Blocking TCP Stream Parsing in Practice”, we implemented a custom TCP stream-based parsing protocol based on Go’s classical blocking I/O model. The advantage of this one-connection-per-goroutine model is that it is simple, well-written, and well-understood, reducing the mental burden on developers. However, once the number of connections is up, the number of goroutines increases linearly. When faced with a large number of connections, this model is overwhelmed: there will be a large number of goroutines in the system, and the overhead of goroutine scheduling and switching will be too much.

So how should we solve the massive connection scenario? The industry’s mature solution: use I/O multiplexing model . If you know the implementation of Go net package, you will know that Go also uses I/O multiplexing at runtime, and its implementation is netpoll in runtime. Conn (both Accept and Dial) at the goroutine level exhibit “blocking” characteristics, but the fd (file descriptor) of the underlying net. The Go runtime is responsible for calling multiplexing mechanisms such as epoll to monitor whether these fd’s are readable or writable, and wake up the goroutine to continue network I/O operations when appropriate, which reduces system calls and reduces the frequency of M (OS threads) running the goroutine getting stuck in kernel state waiting due to system calls and having to create new threads due to blocking and losing M. to create new threads.

So what are the shortcomings of building your own I/O multiplexing at the user level? Complex, not easy to write, not easy to understand . But there doesn’t seem to be any other better way. Unless you change the language, you’ll have to tough it out ^_^. The good thing is, the Go community already has several good Go user-level non-blocking I/O multiplexing development framework libraries available, such as evio, gnet, easygo, etc. We choose gnet. We choose gnet. But note: choosing does not mean recommending, here is only to do this practice only, whether to use gnet development on the production program, you need to determine your own evaluation.

1. gnet-based development of TCP stream protocol parser

One of the thresholds for using the framework is that you have to go and learn the framework itself . The good thing is that gnet provides several very typical examples, which we can base custom_codec to quickly develop our TCP stream protocol parser.

The following is a key loop for implementing custom codec based on gnet framework. Understanding this loop, we know where to call Frame codec and packet codec, which determines the structure of the subsequent demo program.

gnet custom_codec

The frame codec, packet codec and React in the dashed box on the right in the above figure are to be implemented by the user. The eventloop.loopRead method of the gnet framework will call the frame codec and React in a loop to realize the processing of TCP streams and the return of responses. With such a “map”, we can clarify the general location of each package in the demo program.

Our demo is adapted from gnet’s example custom_codec, whose main package structure comes from custom_codec.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/cmd/server/main.go

type customCodecServer struct {
    *gnet.EventServer
    addr       string
    multicore  bool
    async      bool
    codec      gnet.ICodec
    workerPool *goroutine.Pool
}

func (cs *customCodecServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
    log.Printf("custom codec server is listening on %s (multi-cores: %t, loops: %d)\n",
        srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
    return
}

func customCodecServe(addr string, multicore, async bool, codec gnet.ICodec) {
    var err error
    codec = frame.Frame{}
    cs := &customCodecServer{addr: addr, multicore: multicore, async: async, codec: codec, workerPool: goroutine.Default()}
    err = gnet.Serve(cs, addr, gnet.WithMulticore(multicore), gnet.WithTCPKeepAlive(time.Minute*5), gnet.WithCodec(codec))
    if err != nil {
        panic(err)
    }
}

func main() {
    var port int
    var multicore bool

    // Example command: go run server.go --port 8888 --multicore=true
    flag.IntVar(&port, "port", 8888, "server port")
    flag.BoolVar(&multicore, "multicore", true, "multicore")
    flag.Parse()
    addr := fmt.Sprintf("tcp://:%d", port)
    customCodecServe(addr, multicore, false, nil)
}

Two things to note about the above code.

  • The third parameter of customCodecServe we pass in false, i.e. we choose to reply to the answer synchronously instead of asynchronously.
  • We pass the custom frame codec (which implements the gnet.ICodec interface) instance to the customCodecServer instance, so that the subsequent gnet loopRead call is our custom frame codec.

In the order of the flowchart above, the byte stream read by gnet from conn will be passed to our frame codec, and we will see the implementation of gnet-based Frame codec below (our custom protocol definition can be found in the article “Practice of Go Classical Blocking TCP Protocol Stream Parsing”).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/pkg/frame/frame.go

type Frame []byte

func (cc Frame) Decode(c gnet.Conn) ([]byte, error) {
    // read length
    var frameLength uint32
    if n, header := c.ReadN(4); n == 4 {
        byteBuffer := bytes.NewBuffer(header)
        _ = binary.Read(byteBuffer, binary.BigEndian, &frameLength)

        if frameLength > 100 {
            c.ResetBuffer()
            return nil, errors.New("length value is wrong")
        }

        if n, wholeFrame := c.ReadN(int(frameLength)); n == int(frameLength) {
            c.ShiftN(int(frameLength)) // shift frame length
            return wholeFrame[4:], nil // return frame payload
        } else {
            return nil, errors.New("not enough frame payload data")
        }
    }
    return nil, errors.New("not enough frame length data")
}

The Decode implementation of Frame above is responsible for both frame decoding and checking the integrity of the current data of the frame, if a complete frame is not yet ready, Decode will return an error, and then gnet will call the Decode function again when the connection (conn) is readable. ReadN method, which is essentially a Peek operation (called lazyRead by gnet), that is, it only previews the data without moving the position of the “read pointer” in the data stream. When the frame is not fully ready, gnet uses the RingBuffer to store some of the data of the frame that is already in place. If the frame is ready, Decode calls the gnet.

If the pre-read frame length is too long (the 100 in the code is a magic number for demo purposes only, you can use the maximum possible value for the frame), the current cache will be cleared and an error will be returned. (But gnet does not disconnect from the client because of this, whether this piece of gnet’s mechanism is reasonable is still open to question.)

If the decoding goes well, according to our custom protocol spec, we will return the payload of the frame, i.e. from the fifth byte of the frame.

As seen in the above figure, the payload returned by frame Decode will be passed as input data to the eventHandler.React method, which is also implemented by ourselves.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/cmd/server/main.go

func (cs *customCodecServer) React(framePayload []byte, c gnet.Conn) (out []byte, action gnet.Action) {
    var p packet.Packet
    var ackFramePayload []byte
    p, err := packet.Decode(framePayload)
    if err != nil {
        fmt.Println("react: packet decode error:", err)
        action = gnet.Close // close the connection
        return
    }

    switch p.(type) {
    case *packet.Submit:
        submit := p.(*packet.Submit)
        fmt.Printf("recv submit: id = %s, payload=%s\n", submit.ID, string(submit.Payload))
        submitAck := &packet.SubmitAck{
            ID:     submit.ID,
            Result: 0,
        }
        ackFramePayload, err = packet.Encode(submitAck)
        if err != nil {
            fmt.Println("handleConn: packet encode error:", err)
            action = gnet.Close // close the connection
            return
        }
        out = []byte(ackFramePayload)
        return
    default:
        return nil, gnet.Close // close the connection
    }
}

In React, we use the packet package to Decode the incoming frame payload and process the resulting Packet, and encode the packet response after processing (encode), and the byte sequence (ackFramePayload) obtained after encoding will be returned as the first return value out of React.

Frame will Encode the ackFramePayload returned by React, and the encoded byte sequence will be written to the outbound tcp stream by gnet.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/pkg/frame/frame.go

func (cc Frame) Encode(c gnet.Conn, framePayload []byte) ([]byte, error) {
    result := make([]byte, 0)

    buffer := bytes.NewBuffer(result)

    // encode frame length(4+ framePayload length)
    length := uint32(4 + len([]byte(framePayload)))
    if err := binary.Write(buffer, binary.BigEndian, length); err != nil {
        s := fmt.Sprintf("Pack length error , %v", err)
        return nil, errors.New(s)
    }

    // encode frame payload
    n, err := buffer.Write(framePayload)
    if err != nil {
        s := fmt.Sprintf("Pack frame payload error , %v", err)
        return nil, errors.New(s)
    }

    if n != len(framePayload) {
        s := fmt.Sprintf("Pack frame payload length error , %v", err)
        return nil, errors.New(s)
    }

    return buffer.Bytes(), nil
}

This completes a loopRead loop. We can test this program using the client from the article “Go Classic Blocking TCP Protocol Stream Parsing in Practice”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// demo2的client
$./client
2021/07/25 16:35:34 dial ok
send submit id = 00000001, payload=full-bluestreak-207e
the result of submit ack[00000001] is 0
send submit id = 00000002, payload=cosmic-spider-ham-2985
the result of submit ack[00000002] is 0
send submit id = 00000003, payload=true-forge-3552
the result of submit ack[00000003] is 0

// demo4的server
$./server
2021/07/25 16:35:31 custom codec server is listening on :8888 (multi-cores: true, loops: 8) recv submit: id = 00000001, payload=full-bluestreak-207e
recv submit: id = 00000002, payload=cosmic-spider-ham-2985
recv submit: id = 00000003, payload=true-forge-3552

2. Compression test comparison

gnet has made many optimizations for memory allocation, cache reuse, etc. Let’s do a simple performance comparison with the blocking I/O model program (due to limited resources, our crush test here is also the same as in the previous article, using 100 client connections sent with best effort (best effort) instead of a huge number of connections).

The following is a performance comparison of demo1 (blocking I/O model without optimization), demo3 (blocking I/O model after optimization) and demo4 (io multiplexing model).

Compression test comparison

Roughly speaking, the program with the gnet I/O multiplexing model (demo4) is on average 15%-20% higher in performance than the program with the blocking I/O model optimized (demo3).

In addition, the system monitoring data collected by dstat also shows that the cpu system time (sys) usage is also about 5 points less when running demo4 than demo3.

The output of dstat -tcdngym when running demo3.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
----system---- ----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system-- ------memory-usage-----
     time     |usr sys idl wai hiq siq| read  writ| recv  send|  in   out | int   csw | used  buff  cach  free
23-07 17:03:17|  2   1  97   0   0   0|3458B   19k|   0     0 |   0     0 | 535  2475 |1921M  225M 5354M 8386M
23-07 17:03:18| 40  45   5   0   0  11|   0     0 |  66B   54B|   0     0 |  11k   15k|1922M  225M 5354M 8384M
23-07 17:03:19| 39  46   6   0   0   9|   0     0 |  66B 1158B|   0     0 |  12k   18k|1922M  225M 5354M 8384M
23-07 17:03:20| 35  48   7   0   0  11|   0     0 |  66B  462B|   0     0 |  12k   22k|1922M  225M 5354M 8385M
23-07 17:03:21| 39  44   7   0   0  10|   0    12k|  66B  462B|   0     0 |  11k   16k|1922M  225M 5354M 8385M
23-07 17:03:22| 38  45   6   0   0  10|   0     0 |  66B  102B|   0     0 |  11k   16k|1923M  225M 5354M 8384M
23-07 17:03:23| 38  45   7   0   0  10|   0     0 |  66B  470B|   0     0 |  12k   20k|1923M  225M 5354M 8384M
23-07 17:03:24| 39  46   6   0   0   9|   0     0 |  66B  462B|   0     0 |  11k   19k|1923M  225M 5354M 8384M

Output of dstat -tcdngym when running demo4.

1
2
3
4
5
6
7
----system---- ----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system-- ------memory-usage-----
     time     |usr sys idl wai hiq siq| read  writ| recv  send|  in   out | int   csw | used  buff  cach  free
24-07 20:28:38| 43  42   7   0   0   8|   0    20k|1050B   14k|   0     0 |  11k   18k|1954M  234M 5959M 7738M
24-07 20:28:39| 44  41   9   0   0   7|   0    16k| 396B 7626B|   0     0 |  11k   17k|1954M  234M 5959M 7739M
24-07 20:28:40| 43  42   6   0   0   8|   0     0 | 132B 7044B|   0     0 |  11k   16k|1954M  234M 5959M 7738M
24-07 20:28:41| 42  42   8   0   0   8|   0     0 | 630B   12k|   0     0 |  12k   20k|1955M  234M 5959M 7738M
24-07 20:28:42| 45  41   7   0   0   7|   0     0 | 726B 9980B|   0     0 |  11k   16k|1955M  234M 5959M 7738M

2. Asynchronous reply

In the above example, we use the gnet synchronous reply method. gnet also supports the asynchronous reply method, that is, the ackFramePayload obtained in React is submitted to a goroutine worker pool created by gnet, and one of the free goroutines in the worker pool will subsequently encode the The ackFramePayload is encoded into a complete ackFrame and returned to the client.

To support asynchronous replies, we need to make several changes to demo4 (see demo5), the main changes are in cmd/server/main.go.

The first one: when the main function calls customCodecServe, set the third parameter async to true.

1
2
3
4
5
6
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo5/cmd/server/main.go

func main() {
    ... ...
    customCodecServe(addr, multicore, true, nil)
}

Second place: In the React method of customCodecServer, after we get the encoded ackFramePayload, we don’t assign it to out and return it immediately, but judge whether to return the answer asynchronously. If the answer is returned asynchronously, the ackFramePayload will be submitted to the workerpool, and the workerPool will subsequently allocate the goroutine and write the answer back to the client via AsyncWrite of gnet. If it is not asynchronous, the ackFramePayload is assigned to out and returned after.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo5/cmd/server/main.go

func (cs *customCodecServer) React(framePayload []byte, c gnet.Conn) (out []byte, action gnet.Action) {
    ... ...
    switch p.(type) {
    case *packet.Submit:
        submit := p.(*packet.Submit)
        fmt.Printf("recv submit: id = %s, payload=%s\n", submit.ID, string(submit.Payload))
        submitAck := &packet.SubmitAck{
            ID:     submit.ID,
            Result: 0,
        }
        ackFramePayload, err = packet.Encode(submitAck)
        if err != nil {
            fmt.Println("handleConn: packet encode error:", err)
            action = gnet.Close // close the connection
            return
        }
    default:
        return nil, gnet.Close // close the connection
    }

    if cs.async {
        data := append([]byte{}, ackFramePayload...)
        _ = cs.workerPool.Submit(func() {
            fmt.Println("handleConn: async write ackFramePayload")
            c.AsyncWrite(data)
        })
        return
    }
    out = ackFramePayload
    return
}

Other than that, the rest of the package code remains the same. We still also do a pressure test to see how the demo5 performance of asynchronous answerback really is!

asynchronous answerback

From the above figure, the performance of this scenario is much lower than that of the blocking I/O model by means of asynchronous replies. I didn’t look deeper into this, but I guess it is possible that the workerpool creates many goroutines when there are too many replies and concentrated replies at the same time, which not only does not play the role of pooling, but also brings the overhead of goroutine creation and scheduling.

3. Summary

In this paper, we have replaced the blocking I/O model with an I/O multiplexing model and re-implemented a custom TCP stream protocol parser based on the gnet framework. With the strategy of synchronous answer-back, the performance of the TCP stream protocol parser developed based on gnet has been improved compared to the blocking I/O model program.

All the code covered in this paper can be downloaded from here.