The Go source code directory structure and corresponding code files provide an insight into Go’s implementation of network I/O modes on different platforms. For example, it is based on epoll on Linux, kqueue on freeBSD, and iocp on Windows.

Since our code is deployed on Linux, this article uses the epoll wrapper implementation as an example to explain the source code implementation of I/O multiplexing in Go.

Introduction

I/O multiplexing

The term I/O multiplexing refers to the select/epoll family of multiplexers: they support a single thread listening to multiple file descriptors (I/O events) at the same time, blocking and waiting, and receiving a notification when one of the file descriptors is available for reading or writing. Just in case many students are not as familiar with select or epoll, here are a few words about these two selectors.

First let’s talk about what a file descriptor is, which, according to its initials, is also referred to as a FD, an abstraction used to represent a reference to a file. It is an index value that points to a table of records maintained by the kernel for each process that opens a file. When a program opens an existing file or creates a new file, the kernel returns a file descriptor to the process.

select

1
2
3
4
5
int select(int nfds,
            fd_set *restrict readfds,
            fd_set *restrict writefds,
            fd_set *restrict errorfds,
            struct timeval *restrict timeout);

writefds, readfds, and exceptfds are three sets of file descriptors. select iterates through the first nfds of each set to find the descriptors that are readable, writable, and error-prone, collectively known as the ready descriptors.

The timeout parameter indicates the length of time blocking when select is called. If all file descriptors are not ready, the calling process blocks until a descriptor is ready, or until the blocking exceeds the set timeout, and then returns. If the timeout parameter is set to NULL, it will block indefinitely until a descriptor is ready; if the timeout parameter is set to 0, it will return immediately without blocking.

When the select function returns, the ready descriptor can be found by traversing the fdset.

select

The disadvantages of select are also listed.

  • The biggest drawback of select is that there is a limit to the number of fd’s that can be opened by a single process, which is set by FD_SETSIZE; the default value is 1024;
  • Each time select is called, the set of fd’s needs to be copied from user state to kernel state, which is a significant overhead when there are a lot of fd’s;
  • Each kernel needs to scan the entire fd_set linearly, so its I/O performance decreases linearly as the number of descriptor fd’s monitored grows;

epoll

epoll is an enhanced version of selec, avoiding the disadvantages of “high performance overhead” and “small number of file descriptors”.

To understand what follows, let’s look at the usage of epoll.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
int listenfd = socket(AF_INET, SOCK_STREAM, 0);   
bind(listenfd, ...)
listen(listenfd, ...)

int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //将所有需要监听的fd添加到epfd中

while(1){
    int n = epoll_wait(...)
    for(接收到数据的socket){
        //处理
    }
}

First create an epoll object instance epfd with epoll_create and return a file descriptor that references the instance. The returned file descriptor only points to the corresponding epoll instance and does not represent a real disk file node.

Internal storage for epoll instances.

  • list of listeners: all file descriptors to listen to, using a red-black tree.
  • ready list: all ready file descriptors, using a chain table.

The fd to be monitored is then added to epfd via epoll_ctl, and a callback function is set for the fd, and the event event is listened for and added to the list of listeners. When an event occurs, the callback function is called and the fd is added to the ready queue of the epoll instance.

Finally epoll_wait is called to block and listen for I/O events for all fd’s on the epoll instance. When there is already data in the ready list, then epoll_wait returns directly, solving the problem that select needs to be polled every time.

Advantages of epoll.

The epoll listener list is stored in a red-black tree. The fd’s added by the epoll_ctl function are placed in one of the nodes of the red-black tree, which has a stable insertion and deletion performance, a time complexity of O(logN), and can store a large number of fd’s, avoiding the limit of 1024 fd’s.

epoll_ctl specifies a callback function for each file descriptor and adds it to the ready list when it is ready, so there is no need to iterate through each file descriptor to detect it, as with select, but only to determine whether the ready list is empty.

Analysis

netpoll is essentially a wrapper around I/O multiplexing technology, so naturally it is the same as epoll, with the following steps.

  1. netpoll creation and its initialization.
  2. adding the tasks to be monitored to the netpoll.
  3. fetching the triggered events from netpoll.

The three functions provided by epoll are wrapped in go.

1
2
3
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList

the netpollinit function is responsible for initializing the netpoll.

netpollopen is responsible for listening for events on the file descriptor.

netpoll blocks waiting for the return of a set of Goroutines that are ready to be used.

The following is a TCP server written in the Go language.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func main() {
    listen, err := net.Listen("tcp", ":8888")
    if err != nil {
        fmt.Println("listen error: ", err)
        return
    } 
    for {
        conn, err := listen.Accept()
        if err != nil {
            fmt.Println("accept error: ", err)
            break
        } 
        // 创建一个goroutine来负责处理读写任务
        go HandleConn(conn)
    }
} 

Here we follow the source code of this TCP server to see where netpoll is used to complete the epoll call.

net.Listen

This TCP server calls net.Listen to create a socket and return the corresponding fd, which is used to initialize the listener’s netFD (the go-level wrapper network file descriptor), and then calls the netFD’s listenStream method to complete the bind& listen and netFD.

The call process is as follows.

sobyte

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // 创建一个socket
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    ...
    // 创建fd
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    } 
    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            // 调用 netFD的listenStream方法完成对 socket 的 bind&listen和netFD的初始化
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        case syscall.SOCK_DGRAM:
            ...
        }
    }
    ...
    return fd, nil
}

func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
    ret := &netFD{
        pfd: poll.FD{
            Sysfd:         sysfd,
            IsStream:      sotype == syscall.SOCK_STREAM,
            ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
        },
        family: family,
        sotype: sotype,
        net:    net,
    }
    return ret, nil
}

The sysSocket method will initiate a system call to create a socket, newFD will create a netFD, then call the listenStream method of the netFD to perform bind&listen operations and init the netFD.

sobyte

netFD is a file descriptor wrapper. netFD contains an FD data structure, which contains two important data structures, Sysfd and pollDesc. Sysfd is the socket system file descriptor returned by sysSocket, and pollDesc is used to monitor whether the file descriptor is readable or writable.

Let’s move on to listenStream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    ...
    // 完成绑定操作
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    // 进行监听操作
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    // 初始化fd
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

The listenStream method will call the Bind method to complete the fd binding operation, then call listenFunc to listen, then call the init method of the fd to complete the FD, pollDesc initialization.

1
2
3
4
5
6
7
8
func (pd *pollDesc) init(fd *FD) error {
    // 调用到runtime.poll_runtime_pollServerInit
    serverInit.Do(runtime_pollServerInit)
    // 调用到runtime.poll_runtime_pollOpen
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    ...
    return nil
}

runtime_pollServerInit is wrapped in Once to ensure that it can only be called once; this function creates an epoll file descriptor instance on Linux platforms.

poll_runtime_pollOpen calls netpollopen which registers the fd to the epoll instance and returns a pollDesc.

netpollinit initialization

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        lock(&netpollInitLock)
        if netpollInited == 0 {
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

netpollGenericInit calls the platform specific implementation of netpollinit, which in Linux calls into the netpollinit method of netpoll_epoll.go.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
var (
    epfd int32 = -1 // epoll descriptor 
)

func netpollinit() {
    // 创建一个新的 epoll 文件描述符
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    ...
    // 创建一个用于通信的管道
    r, w, errno := nonblockingPipe()
    ...
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    // 将读取数据的文件描述符加入监听
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    ...
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

Calling the epollcreate1 method creates an instance of an epoll file descriptor, it should be noted that epfd is a global property. Then a pipe is created for communication and epollctl is called to add the file descriptor for reading data to the listener.

netpollopen joins the event listener

Here’s another look at the poll_runtime_pollOpen method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()
    lock(&pd.lock)
    if pd.wg != 0 && pd.wg != pdReady {
        throw("runtime: blocked write on free polldesc")
    }
    if pd.rg != 0 && pd.rg != pdReady {
        throw("runtime: blocked read on free polldesc")
    }
    pd.fd = fd
    pd.closing = false
    pd.everr = false
    pd.rseq++
    pd.rg = 0
    pd.rd = 0
    pd.wseq++
    pd.wg = 0
    pd.wd = 0
    pd.self = pd
    unlock(&pd.lock)

    var errno int32
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

The poll_runtime_pollOpen method initializes a pollDesc structure with a total size of about 4KB via pollcache.alloc. Then it resets the properties of pd and calls netpollopen to add a new polling event to the epoll instance epfd to listen for the readable and writable state of the file descriptor.

Let’s look at how pollCache initialises pollDesc again.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type pollCache struct {
    lock  mutex
    first *pollDesc 
}

const pollBlockSize = 4 * 1024

func (c *pollCache) alloc() *pollDesc {
    lock(&c.lock)
    // 初始化首节点
    if c.first == nil {
        const pdSize = unsafe.Sizeof(pollDesc{})
        n := pollBlockSize / pdSize
        if n == 0 {
            n = 1
        } 
        mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
        // 初始化pollDesc链表
        for i := uintptr(0); i < n; i++ {
            pd := (*pollDesc)(add(mem, i*pdSize))
            pd.link = c.first
            c.first = pd
        }
    }
    pd := c.first
    c.first = pd.link
    lockInit(&pd.lock, lockRankPollDesc)
    unlock(&c.lock)
    return pd
}

The pollCache’s chain table head, if empty, initializes the first node, which is a pollDesc’s chain table head, and each call to this structure returns the pollDesc whose chain table head has not yet been used.

sobyte

This completes the analysis of net.Listen, so let’s look at listener.Accept.

Listener.Accept

The Listener.Accept method will eventually be called into the netFD’s accept method.

sobyte

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (fd *netFD) accept() (netfd *netFD, err error) {
    // 调用netfd.FD的Accept接受新的 socket 连接,返回 socket 的 fd
    d, rsa, errcall, err := fd.pfd.Accept()
    ...
    // 构造一个新的netfd
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    // 调用 netFD 的 init 方法完成初始化
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

This method will first call Accept to the FD to accept a new socket connection and return the fd corresponding to the new socket, then call newFD to construct a new netfd and initialise it with the init method.

We have already seen the init method above, so let’s look at the Accept method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    ...
    for {
        // 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EINTR:
            continue
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                // 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED: 
            continue
        }
        return -1, nil, errcall, err
    }
}

The FD.Accept method receives new connections using the linux system call accept, creates the corresponding socket, and if there are no readable messages, waitRead is blocked. These parked goroutines will be woken up by calling runtime.netpoll in the goroutine’s dispatch.

pollWait event wait

pollDesc.waitRead actually calls runtime.poll_runtime_pollWait.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    ...
    // 进入 netpollblock 并且判断是否有期待的 I/O 事件发生
    for !netpollblock(pd, int32(mode), false) {
        ...
    }
    return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    // 这个 for 循环是为了等待 io ready 或者 io wait
    for {
        old := *gpp
        // gpp == pdReady 表示此时已有期待的 I/O 事件发生,
        // 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        // 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }
    if waitio || netpollcheckerr(pd, mode) == 0 {
        // 让出当前线程,将 Goroutine 转换到休眠状态并等待运行时的唤醒
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

poll_runtime_pollWait will call the netpollblock function with a for loop to determine if an expected I/O event has occurred, and will not exit the loop until netpollblock returns true indicating io ready.

The netpollblock method will determine if the current state is in pdReady, if so then return true directly; if not then set gpp to pdWait via CAS and exit the for loop. The current goroutine is parked by gopark until a read/write or other I/O event occurs on the corresponding fd.

These parked goroutines will be woken up by calling runtime.netpoll in the goroutine’s dispatch.

netpoll polling and waiting

The core logic of runtime.netpoll is to set the timeout value of the call to epoll_wait based on the input delay, call epoll_wait to get the list of IO-ready fd’s from epoll’s eventpoll.rdllist bidirectional list, iterate through the list of fd’s returned by epoll_wait Assemble a runnable goroutine based on the context information encapsulated in the call to epoll_ctl to register the fd and return it.

After executing netpoll, a list of goroutines corresponding to the list of ready fd’s is returned, and the ready goroutines are then added to the scheduling queue to wait for scheduling to run.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func netpoll(delay int64) gList {
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    // 因为传入delay单位是纳秒,下面将纳秒转换成毫秒
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }
    var events [128]epollevent
retry:
    // 等待文件描述符转换成可读或者可写
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    // 返回负值,那么重新调用epollwait进行等待
    if n < 0 {
        ...
        goto retry
    }
    var toRun gList
    // 意味着被监控的文件描述符出现了待处理的事件
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        } 
        ...
        // 判断发生的事件类型,读类型或者写类型
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            // 取出保存在 epollevent 里的 pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            // 调用 netpollready,传入就绪 fd 的 pollDesc
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

netpoll calls epollwait to get a list of ready fd’s, the corresponding epoll function is epoll_wait. toRun is a chain of g’s that stores the goroutines to be resumed and finally returns them to the caller. If the n returned by epollwait is greater than zero, then it means that there is a pending event in the monitored file descriptor, and then a for loop needs to be called to process it. Inside the loop the mode will be set according to the time type and then the corresponding pollDesc will be taken out and the netpollready method will be called.

Here we look at netpollready again.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    // 获取对应的g的指针
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    // 将对应的g加入到toRun列表中
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    // 根据传入的mode判断事件类型
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        // 取出 gpp 存储的 g
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        // cas 将读或者写信号量转换成 pdReady
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdWait {
                old = 0
            }
            // 返回对应的 g指针
            return (*g)(unsafe.Pointer(old))
        }
    }
}

Having covered the source code of runtime.netpoll there is one thing to note, there are two calls to runtime.netpoll.

  • runtime.schedule() is executed in the scheduler, and runtime.findrunable() is executed in that method, where runtime.findrunable() calls runtime.netpoll to get the goroutine to be executed.
  • Go runtime creates a separate sysmon monitor thread when the program starts. sysmon runs every 20us~10ms, and each run will check if the last netpoll execution is more than 10ms away, and if so will call runtime.netpoll once.

If you are interested in these entry calls, you can check them out for yourself.

Summary

This article started with I/O multiplexing to explain select and epoll, and then went back to the go language to see how it implements such a structure. By tracing the source code, you can see that go also wraps its own functions based on epoll.

1
2
3
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

These three functions are used to create instances, register, and wait for events on epoll.

Students who do not know much about I/O multiplexing can also take this opportunity to learn more about network programming and expand their knowledge.