1. Data reception process

The process of external data from a computer system is relatively long from the time it enters the NIC to the time it is finally received by the application. The approximate flow is as follows.

  • The NIC receives the data, copies it to the kernel space through the DMA controller, and at the same time initiates a hard interrupt to the CPU.
  • The CPU receives the hard interrupt and simply processes it, then hands it over to the ksoftirqd process, a process called soft interrupt.
  • The ksoftirqd process is used to interact with IO multiplexing, such as select, epoll, etc. It will trigger poll().
  • After the copy is done by IO model (epoll for example), the data will be added to one of the fd in the ready list.
  • By callback (epoll) as an example) the corresponding process is woken up to access the data.

Regarding the ksoftirqd process, it is used to handle soft interrupts and call the registered poll method to start receiving packets.

1
2
3
4
5
➜  ps -aux | grep ksoft
root          12  0.0  0.0      0     0 ?        S    9月6   0:01 [ksoftirqd/0]
root          20  0.0  0.0      0     0 ?        S    9月6   0:00 [ksoftirqd/1]
root          26  0.0  0.0      0     0 ?        S    9月6   0:00 [ksoftirqd/2]
root          32  0.0  0.0      0     0 ?        S    9月6   0:02 [ksoftirqd/3]

2. The overall soft and hard interaction process of epoll

The interaction logic between the kernel and external devices and applications is roughly as follows.

  • The OS will create a call to epoll_create at startup to create an epoll management object.
  • The application will bind and listen to layer 7 data via http/rpc etc. at startup.
  • Register listen events with the kernel and call epoll_wait to block and wait for the data to be ready.
  • When the data is ready it is copied from kernel to user space and wakes up for execution.

3. Initialize epfd

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// runtime/netpoll.go

var netpollInited   uint32

func netpollGenericInit() {
	if atomic.Load(&netpollInited) == 0 {
		lockInit(&netpollInitLock, lockRankNetpollInit)
		lock(&netpollInitLock)
		if netpollInited == 0 {
			netpollinit()
			atomic.Store(&netpollInited, 1)
		}
		unlock(&netpollInitLock)
	}
}

First use atomic to implement the singleton pattern to prevent being initialized multiple times. The specific instantiation is in netpollinit().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// runtime/netpoll.go

func netpollinit() {
    // 尝试epollcreate1
	epfd = epollcreate1(_EPOLL_CLOEXEC)
	if epfd < 0 {
        // 尝试epollcreate
		epfd = epollcreate(1024)
		if epfd < 0 {
			println("runtime: epollcreate failed with", -epfd)
			throw("runtime: netpollinit failed")
		}
		closeonexec(epfd)
	}
	r, w, errno := nonblockingPipe()
	if errno != 0 {
		println("runtime: pipe failed with", -errno)
		throw("runtime: pipe failed")
	}
	ev := epollevent{
		events: _EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
	errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
	if errno != 0 {
		println("runtime: epollctl failed with", -errno)
		throw("runtime: epollctl failed")
	}
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

epollcreate

Due to the Cow mechanism, before the child process executes, it has the same data space, stack and fd list as its parent process. When the child process executes new code, the fd of the parent process will also be overwritten, which will lead to the inability to maintain the fd of the parent process, so the child process needs to turn off the useless fd first and then execute the corresponding code.

1
2
3
4
5
func epollcreate(size int32) int32

func epollcreate1(flags int32) int32

// When flags=_EPOLL_CLOEXEC, it will automatically call closeonexec() when netpoll is closed.

nonblockingPipe

By IO multiplexing, we mean that all threads multiplex a connection. netpoll plays a top-down role.

1
2
3
r, w, errno := nonblockingPipe()
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)

r and w are the lowest level read and write fd’s. They are also the fd’s to be reused, after which all fd’s registered to listen in netpoll are reused for this netpollBreakRd and netpollBreakWr.

epollctl

In the netpoll model, all modifications to the netpoll object are implemented through epollctl. The netpoll object itself is also a file descriptor. It is also known as epfd in the code.

1
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)

Here the readable event is bound to the underlying netpollBreakRd, i.e. data coming from external network connections are written to the kernel via netpollBreakRd and then distributed to different fd’s.

4. Bind events to user fd

1
2
3
4
5
6
7
8
// runtime/netpoll_epoll.go

func netpollopen(fd uintptr, pd *pollDesc) int32 {
	var ev epollevent
	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

It’s relatively simple here. You call epollctl to bind the readable, writable, pending and edge trigger events. This way the corresponding fd can be invoked immediately when ready to read and write data.

5. Unbind event to user fd

1
2
3
4
5
6
// runtime/netpoll_epoll.go

func netpollclose(fd uintptr) int32 {
	var ev epollevent
	return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
}

Similarly, call epollctl and specify the operation as _EPOLL_CTL_DEL.

6. Listening for events

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
// runtime/netpoll_epoll.go

// `delay` indicates how long to block, and then returns a list of Goroutines that need to be notified `gList`
func netpoll(delay int64) gList {
    // If `epfd` is closed, then it just returns, because there is no fd to listen to.
    if epfd == -1 {
		return gList{}
	}
    ...
    	var events [128]epollevent
retry:
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        ...
        // Timeout
        if waitms > 0 {
			return gList{}
	    }
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        // Listening events without binding
        if ev.events == 0 {
			continue
	    }
        // Bottom readable event: netpollBreakRd, receive data, write to kernel data space
        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            if ev.events != _EPOLLIN {
				println("runtime: netpoll: break fd ready for", ev.events)
				throw("runtime: netpoll: break fd ready for something unexpected")
		    }
            if delay != 0 {
				// Write to
				var tmp [16]byte                
				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
				atomic.Store(&netpollWakeSig, 0)
			}
			continue
        }
        var mode int32
         // As long as the _EPOLLIN readable event is registered, it is read mode
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
		}
         // As long as the _EPOLLOUT writable event is registered, it is write mode
		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			pd.everr = false
			if ev.events == _EPOLLERR {
				pd.everr = true
			}
             // The gorotine corresponding to the notification-ready fd
			netpollready(&toRun, pd, mode)
		}
    }
}

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	if rg != nil {
        // Wakeup Readable Concurrent
		toRun.push(rg)
	}
	if wg != nil {
		// Wakeup Writable Concurrent
		toRun.push(wg)
	}
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	for {
		old := *gpp
		if old == pdReady {
			return nil
		}
		if old == 0 && !ioready {
			// Only set pdReady for ioready. runtime_pollWait
			// will check for timeout/cancel before waiting.
			return nil
		}
		var new uintptr
		if ioready {
			new = pdReady
		}
		if atomic.Casuintptr(gpp, old, new) {
			if old == pdWait {
				old = 0
			}
			return (*g)(unsafe.Pointer(old))
		}
	}
}

The listening part is a constant loop for the events registered via netpoll_ctx, and when the multiplexed connections (netpollBreakRd and netpollBreakWd) are ready for reading and writing, it wakes up the corresponding event-bound Goroutine.

The main processes are as follows.

  1. initialize i.e. get the underlying connections netpollBreakRd and netpollBreakWr. These two are external read/write connection streams, which are themselves a socket object. epoll multiplexes these two fd’s.
  2. Initialization Initialize eventpollfd, through which the read and write events of other sockets are managed. epfd is the core structure of the epoll model.
  3. Registered events A Goroutine initiates an IO request through a system call, the kernel registers the fd to epfd and calls epollctl to bind the required events, such as read events, write events, hang events, etc. Then Goroutine enters a blocking state and waits to be woken up.
  4. Listening events The kernel constantly listens for the underlying connection netpollBreadRd and netpollBreakWr. When the underlying connection is readable, netpollBreadRd is read and then cached on the kernel data space.
  5. Listen to events The kernel keeps looping according to various events registered by epfd, and when a registered event appears, such as readable, writable, etc. it wakes up the Goroutine corresponding to the bound event, i.e. the toRun list above.
  6. Goroutine ready, Goroutine receives the notification and starts reading and writing data to fd. After it finishes, it closes fd. The kernel removes its bound events from epfd.