I found a Go framework that claims to be ten times faster than net/http - Fasthttp. Let’s take a look at what great design it has to offer and let’s dig in.

A typical HTTP service should look like this:

HTTP service

The standard model of a service built on HTTP consists of two ends, a client (‘Client’) and a server (‘Server’). HTTP requests are sent from the client, the server receives the request, processes it and returns the response to the client. So the job of the http server is to accept the request from the client and return the response to the client.

In this article we will talk about the Server side of the implementation.

Implementation principles

net/http versus fasthttp implementation

As we discussed in net/http, the processing flow is roughly as follows:

net/http

  1. registering the processor into a hash table, which can be matched by key-value routing.
  2. after registration, a loop is opened to listen, and a Goroutine is created every time a connection is listened to.
  3. the created Goroutine will wait in a loop to receive the request data, then match the processor with the request address in the processor routing table and pass the request to the processor for processing.

This is fine when the number of connections is small, but when the number of connections is very large, creating a Goroutine for each connection puts a strain on the system. This creates a bottleneck in net/http when dealing with high concurrency.

Let’s look at how fasthttp does this again.

sobyte

  1. initiating a listen.
  2. listen to the port in a loop to get a connection.
  3. first fetch the workerChan from the ready queue when it gets a connection, and then fetch it from the object pool if it doesn’t.
  4. pass the listened connection into the workerChan’s channel;
  5. the workerChan has a Goroutine that loops through the data in the channel, and when it gets it, it processes the request and returns it.

As mentioned above, a workerChan is actually a connection handler object with a channel to pass connections to; each workerChan has a Goroutine in the background that loops through the connections in the channel and processes them. If the maximum number of simultaneous connections is not set, the default is 256 * 1024. This allows the service to be available to the public at the same time, even at high concurrency.

In addition, the implementation uses sync.Pool to reuse a large number of objects and reduce memory allocation, e.g.

workerChanPool, ctxPool, readerPool, writerPool and many more than 30 sync.Pools.

In addition to reusing objects, fasthttp also slices, reducing the need to recreate slices with s = s[:0] and s = append(s[:0], b...).

fasthttp also tries to avoid the memory allocation and copy consumption of []byte to string conversions in a number of ways, since there is a lot of dealing with strings.

Summary

In summary, we’ve outlined the performance-enhancing points of fasthttp.

  1. controlling the number of simultaneous processes in an asynchronous Goroutine, with a default maximum of 256 * 1024.
  2. use sync.Pool to reuse a large number of objects and slices to reduce memory allocation.
  3. avoiding the memory allocation and copy consumption associated with []byte to string conversions as much as possible.

Source code analysis

Let’s start with a simple example.

1
2
3
4
5
6
7
8
9
func main() { 
    if err := fasthttp.ListenAndServe(":8088", requestHandler); err != nil {
        log.Fatalf("Error in ListenAndServe: %s", err)
    }
}

func requestHandler(ctx *fasthttp.RequestCtx) {
    fmt.Fprintf(ctx, "Hello, world!\n\n")
}

Our call to the ListenAndServe function will start the service listening and waiting for the task to be processed.

The ListenAndServe function actually calls the Server’s ListenAndServe method, so here we look at the fields of the Server structure

sobyte

The above diagram briefly lists some common fields of the Server structure, including: request processor, service name, request read timeout, request write timeout, maximum number of requests per connection, etc. In addition there are many other parameters that control some of the parameters on the server side in various dimensions.

The Server’s ListenAndServe method will fetch the TCP listener and then call the Serve method to perform the logical processing on the server side.

sobyte

The Server method does several things.

  1. initializes and starts the worker pool.
  2. receives a request for a Connection.
  3. pass the Connection to the worker Pool for processing.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s *Server) Serve(ln net.Listener) error {
    ...
    s.mu.Unlock()
    // 初始化 worker Pool
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
        connState:       s.setState,
    }
    // 启动 worker Pool
    wp.Start() 
    // 循环处理 connection
    for {
        // 获取 connection
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
            wp.Stop()
            if err == io.EOF {
                return nil
            }
            return err
        }
        s.setState(c, StateNew)
        atomic.AddInt32(&s.open, 1)
        // 处理 connection
        if !wp.Serve(c) {
            // 进入if 说明已到并发极限
            ...
        }
        c = nil
    }
}

worker Pool

The worker Pool is used to handle all requests for Connections, so here’s a little look at the fields in the workerPool structure.

sobyte

  • WorkerFunc: used to match the handler corresponding to the request and execute it.
  • MaxWorkersCount: the maximum number of requests to be processed simultaneously.
  • ready: the idle workerChan.
  • workerChanPool: a pool of workerChan objects, of type sync.Pool.
  • workersCount: the number of requests currently being processed.

Let’s look at the Start method of the workerPool.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (wp *workerPool) Start() {
    if wp.stopCh != nil {
        panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    // 设置 worker Pool 的创建函数
    wp.workerChanPool.New = func() interface{} {
        return &workerChan{
            ch: make(chan net.Conn, workerChanCap),
        }
    }
    go func() {
        var scratch []*workerChan
        for {
            // 没隔一段时间会清理空闲超时的 workerChan
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                // 默认是 10 s
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

The Start method mainly:

  1. set the workerChanPool creation function.
  2. start a Goroutine that regularly cleans up the free workerChan stored in ready in the workerPool, by default every 10s.

Getting a connection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
    for {
        c, err := ln.Accept()
        if err != nil {
            if c != nil {
                panic("BUG: net.Listener returned non-nil conn and non-nil error")
            }
            ...
            return nil, io.EOF
        }
        if c == nil {
            panic("BUG: net.Listener returned (nil, nil)")
        }
        // 校验每个ip对应的连接数
        if s.MaxConnsPerIP > 0 {
            pic := wrapPerIPConn(s, c)
            if pic == nil {
                if time.Since(*lastPerIPErrorTime) > time.Minute {
                    s.logger().Printf("The number of connections from %s exceeds MaxConnsPerIP=%d",
                        getConnIP4(c), s.MaxConnsPerIP)
                    *lastPerIPErrorTime = time.Now()
                }
                continue
            }
            c = pic
        }
        return c, nil
    }
}

There’s not much to say about getting a connection, the TCPListener’s accept method is called just like the net/http library to get a TCP Connection.

Handling connections

The first part of processing a connection is to get the workerChan, which contains two fields: lastUseTime and channel.

1
2
3
4
type workerChan struct {
    lastUseTime time.Time
    ch          chan net.Conn
}
  • lastUseTime identifies the time when it was last used.
  • ch is used to pass the Connection.

Once the Connection is obtained, it is passed into the workerChan’s channel, and each corresponding workerChan has an asynchronous Goroutine that handles the Connection inside the channel.

Get workerChan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (wp *workerPool) Serve(c net.Conn) bool {
    // 获取 workerChan 
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    // 将 Connection 放入到 channel 中
    ch.ch <- c
    return true
}

The Serve method essentially gets the workerChan via the getCh method and passes the current Connection into the workerChan’s channel.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    // 尝试从空闲队列里获取 workerChan
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()
    // 获取不到则从对象池中获取
    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        ch = vch.(*workerChan)
        // 为新的 workerChan 开启 goroutine
        go func() {
            // 处理 channel 中的数据
            wp.workerFunc(ch)
            // 处理完之后重新放回到对象池中
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

The getCh method first fetches a workerChan from the ready free queue, or from the object pool if it is not available, and the new workerChan from the object pool starts the Goroutine to process the data in the channel.

Handling connections

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    // 消费 channel 中的数据
    for c = range ch.ch {
        if c == nil {
            break
        }
        // 读取请求数据并响应返回
        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
            ...
        } 
        c = nil
        // 将当前的 workerChan 放入的 ready 队列中
        if !wp.release(ch) {
            break
        }
    }

    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

This loop iterates over the Connection in the channel where the workerChan was fetched and then executes the WorkerFunc function to process the request, after which the current workerChan is put back into the ready queue for reuse.

Note that this loop will jump out of the loop when the Connection is nil, which is the nil that the workerPool will pass to the channel when it checks that the workerChan has been idle for too long by calling the clean method asynchronously.

The workerFunc function set here is the Server’s serveConn method, which will get the parameters of the request, then call the corresponding handler to process the request according to the request, and return the response.

Summary

We have analyzed the implementation principle of fasthttp here, through the principle we can know the difference between fasthttp and net/http in terms of implementation, so we can roughly conclude the reason why fasthttp is fast, and then from its implementation details we can know how it is implemented to reduce memory allocation and thus improve performance.