For a non-stop restart of a Go program we need to address two issues.

  1. the process restart does not need to close the listening port.
  2. pre-existing requests should be fully processed or timeout.

We’ll see how endless does both of these things later.

Basic concepts

The following is a brief introduction to two knowledge points for the content that follows.

Signal handling

Go signal notification works by sending the os.Signal value on the Channel. If we use Ctrl+C, for example, then a SIGINT signal will be triggered and the OS will interrupt the normal flow of the process and go to the appropriate signal handling function to execute the operation and then return to the interrupted place to continue execution when it is finished.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
    sigs := make(chan os.Signal, 1)
    done := make(chan bool, 1)
    // 监听信号
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) 
    go func() {
        // 接收到信号返回
        sig := <-sigs
        fmt.Println()
        fmt.Println(sig)
        done <- true
    }() 
    fmt.Println("awaiting signal")
    // 等待信号的接收
    <-done
    fmt.Println("exiting")
}

With a few lines of code, we can listen for SIGINT and SIGTERM signals. When Go receives a signal from the OS, it puts the signal value into the sigs pipeline for processing.

Fork subprocesses

The exec package in Go wraps the Fork call nicely for us, and it allows us to use ExtraFiles to inherit open files from the parent process nicely.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
file := netListener.File() // this returns a Dup()
path := "/path/to/executable"
args := []string{
    "-graceful"}
// 产生 Cmd 实例
cmd := exec.Command(path, args...)
// 标准输出
cmd.Stdout = os.Stdout
// 标准错误输出
cmd.Stderr = os.Stderr
cmd.ExtraFiles = []*os.File{file}
// 启动命令
err := cmd.Start()
if err != nil {
    log.Fatalf("gracefulRestart: Failed to launch, error: %v", err)
}

The Cmd instance is returned by calling the Command command of the exec package, passing path (the path of the command to be executed), args (the arguments to the command), specifying additional open files to be inherited by the new process via the ExtraFiles field, and finally calling the Start method to create the child process.

Here netListener.File makes a copy of the file descriptor with the system call dup.

1
2
3
4
5
6
7
8
func Dup(oldfd int) (fd int, err error) {
    r0, _, e1 := Syscall(SYS_DUP, uintptr(oldfd), 0, 0)
    fd = int(r0)
    if e1 != 0 {
        err = errnoErr(e1)
    }
    return
}

We can see the introduction to the dup command

1
2
3
4
5
6
7
dup and dup2 create a copy of the file descriptor oldfd.
After successful return of dup or dup2, the old and new descriptors may
be used interchangeably. They share locks, file position pointers and
flags; for example, if the file position is modified by using lseek on
one of the descriptors, the position is also changed for the other.

The two descriptors do not share the close-on-exec flag, however.

As you can see from the above description, the new file descriptor returned refers to the same file as oldfd, sharing all the claims, read and write pointers, permissions or flags, etc. But it does not share the close flag bit, i.e. it does not affect the writing of new data to oldfd even if it is already closed. However, the close flag bit is not shared, which means that oldfd is already closed and does not affect the writing of new data to newfd.

sobyte

The above diagram shows forking a child process which copies the file descriptor table of the parent process.

endless restart example

I’ll write a little bit about endless for those who haven’t used endless before, but you can skip it if you are familiar with it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import (
    "log"
    "net/http"
    "os"
    "sync"
    "time"

    "github.com/fvbock/endless"
    "github.com/gorilla/mux"
)

func handler(w http.ResponseWriter, r *http.Request) {
    duration, err := time.ParseDuration(r.FormValue("duration"))
    if err != nil {
        http.Error(w, err.Error(), 400)
        return
    }
    time.Sleep(duration)
    w.Write([]byte("Hello World"))
}

func main() {
    mux1 := mux.NewRouter()
    mux1.HandleFunc("/sleep", handler)

    w := sync.WaitGroup{}
    w.Add(1)
    go func() {
        err := endless.ListenAndServe("127.0.0.1:5003", mux1)
        if err != nil {
            log.Println(err)
        }
        log.Println("Server on 5003 stopped")
        w.Done()
    }()
    w.Wait()
    log.Println("All servers stopped. Exiting.")

    os.Exit(0)
}

Verify the non-stop service created by endless below

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 第一次构建项目
go build main.go
# 运行项目这时就可以做内容修改了
./endless &
# 请求项目60s后返回
curl "http://127.0.0.1:5003/sleep?duration=60s" &
# 再次构建项目这里是新内容
go build main.go
# 重启17171为pid
kill -1 17171
# 新API请求
curl "http://127.0.0.1:5003/sleep?duration=1s" 

After running the above command we can see that for the first request the return value is: Hello world and before sending the second request I change the return value in the handler to: Hello world2222 and then do a build restart.

Since I set the first request to return in 60s and the second request to return in 1s, the value of the second request is returned first and then the value of the first request is returned.

The whole timeline is shown below.

sobyte

And while waiting for the first request to be returned, you can see that two processes are running at the same time.

1
2
3
$ ps -ef |grep main
root      84636  80539  0 22:25 pts/2    00:00:00 ./main
root      85423  84636  0 22:26 pts/2    00:00:00 ./main

After the first response to the request, we can look at the process and see that the parent process has been shut down, allowing a seamless switch between parent and child processes.

Implementation Principles

For the implementation, I am using the endless implementation here, so the following principles and code are explained through its code.

sobyte

The principle of the non-stop reboot we want to do is shown in the figure above.

  1. listen for the SIGHUP signal.
  2. fork the child process when it receives the signal (using the same start command) and pass the socket file descriptor that the service is listening to to the child process.
  3. the child process listens to the socket of the parent process, at which point both the parent and child processes can receive requests.
  4. the child process sends a SIGTERM signal to the parent process after a successful start, which stops receiving new connections and waits for the old connection to be processed (or times out).
  5. the parent process exits and the upgrade is complete.

Code Implementation

As we can see from the example above, the entry point for endless is the ListenAndServe function.

1
2
3
4
5
6
 func ListenAndServe(addr string, handler http.Handler) error {
    // 初始化 server
    server := NewServer(addr, handler)
    // 监听以及处理请求
    return server.ListenAndServe()
}

This method is divided into two parts, first initialising the server, then listening and processing the request.

Initialising the Server

Let’s first look at what an endless service’s Server structure looks like.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type endlessServer struct {
    // 用于继承 http.Server 结构
    http.Server
    // 监听客户端请求的 Listener
    EndlessListener  net.Listener  
    // 用于记录还有多少客户端请求没有完成
    wg               sync.WaitGroup
    // 用于接收信号的管道
    sigChan          chan os.Signal
    // 用于重启时标志本进程是否是为一个新进程
    isChild          bool
    // 当前进程的状态
    state            uint8 
    ...
}

In addition to inheriting all the fields of http.Server, this endlessServer adds several status fields because it needs to listen for signals and determine if it is a new process.

  • wg: marks how many client requests are still outstanding.
  • sigChan: the pipeline used to receive the signal.
  • isChild: used to flag if this process is a new process when restarting.
  • state: the state of the current process.

Let’s see how to initialize endlessServer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
    runningServerReg.Lock()
    defer runningServerReg.Unlock()

    socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")
    // 根据环境变量判断是不是子进程
    isChild = os.Getenv("ENDLESS_CONTINUE") != "" 
    // 由于支持多 server,所以这里需要设置一下 server 的顺序
    if len(socketOrder) > 0 {
        for i, addr := range strings.Split(socketOrder, ",") {
            socketPtrOffsetMap[addr] = uint(i)
        }
    } else {
        socketPtrOffsetMap[addr] = uint(len(runningServersOrder))
    }

    srv = &endlessServer{
        wg:      sync.WaitGroup{},
        sigChan: make(chan os.Signal),
        isChild: isChild,
        ...
        state: STATE_INIT,
        lock:  &sync.RWMutex{},
    }

    srv.Server.Addr = addr
    srv.Server.ReadTimeout = DefaultReadTimeOut
    srv.Server.WriteTimeout = DefaultWriteTimeOut
    srv.Server.MaxHeaderBytes = DefaultMaxHeaderBytes
    srv.Server.Handler = handler

    runningServers[addr] = srv
    ...
    return
}

The initialisation here is all the usual parameters we see in net/http, including the ReadTimeout read timeout, WriteTimeout write timeout, Handler request handler, etc.

Note that the ENDLESS_CONTINUE environment variable is used to determine whether a process is a child, and is written to when the fork child is run. Since endless supports multiple servers, the ENDLESS_SOCKET_ORDER variable is used to determine the order of the servers.

ListenAndServe

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (srv *endlessServer) ListenAndServe() (err error) {
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    // 异步处理信号量
    go srv.handleSignals()
    // 获取端口监听
    l, err := srv.getListener(addr)
    if err != nil {
        log.Println(err)
        return
    }
    // 将监听转为 endlessListener
    srv.EndlessListener = newEndlessListener(l, srv)

    // 如果是子进程,那么发送 SIGTERM 信号给父进程
    if srv.isChild {
        syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
    }

    srv.BeforeBegin(srv.Addr)
    // 响应Listener监听,执行对应请求逻辑
    return srv.Serve()
}

This method is actually quite similar to the net/http library in that it first gets the port to listen on and then calls Serve to process the data that is sent to it.

However, there are a few differences: endless needs to use signal listening in order to achieve a smooth restart, and it is different when it comes to getListener, as the child process needs to inherit the listen fd from the parent process in order to not close the listening port.

handleSignals Signal Handling

sobyte

Signal processing is mainly a listening for signals, which are then cycled according to the different signals.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (srv *endlessServer) handleSignals() {
    var sig os.Signal
    // 注册信号监听
    signal.Notify(
        srv.sigChan,
        hookableSignals...,
    )
    // 获取pid
    pid := syscall.Getpid()
    for {
        sig = <-srv.sigChan
        // 在处理信号之前触发hook
        srv.signalHooks(PRE_SIGNAL, sig)
        switch sig {
        // 接收到平滑重启信号
        case syscall.SIGHUP:
            log.Println(pid, "Received SIGHUP. forking.")
            err := srv.fork()
            if err != nil {
                log.Println("Fork err:", err)
            } 
        // 停机信号
        case syscall.SIGINT:
            log.Println(pid, "Received SIGINT.")
            srv.shutdown()
        // 停机信号
        case syscall.SIGTERM:
            log.Println(pid, "Received SIGTERM.")
            srv.shutdown()
        ...
        // 在处理信号之后触发hook
        srv.signalHooks(POST_SIGNAL, sig)
    }
}

The code in this section is very simple, when we use kill -1 $pid here srv.sigChan receives the appropriate signal and goes into the case syscall.SIGHUP piece of logic.

Note that the syscall.SIGTERM signal sent by the child process to the parent process in the ListenAndServe method above is also handled here, and the shutdown logic is executed.

After getting to the case syscall.SIGHUP piece of logic, the fork function is called, and we will look at the fork logic again.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (srv *endlessServer) fork() (err error) {
    runningServerReg.Lock()
    defer runningServerReg.Unlock()

    // 校验是否已经fork过
    if runningServersForked {
        return errors.New("Another process already forked. Ignoring this one.")
    } 
    runningServersForked = true

    var files = make([]*os.File, len(runningServers))
    var orderArgs = make([]string, len(runningServers))
    // 因为有多 server 的情况,所以获取所有 listen fd
    for _, srvPtr := range runningServers { 
        switch srvPtr.EndlessListener.(type) {
        case *endlessListener: 
            files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()
        default: 
            files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()
        }
        orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
    }
    // 环境变量
    env := append(
        os.Environ(),
    // 启动endless 的时候,会根据这个参数来判断是否是子进程
        "ENDLESS_CONTINUE=1",
    )
    if len(runningServers) > 1 {
        env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))
    }

    // 程序运行路径
    path := os.Args[0]
    var args []string
    // 参数
    if len(os.Args) > 1 {
        args = os.Args[1:]
    }

    cmd := exec.Command(path, args...)
    // 标准输出
    cmd.Stdout = os.Stdout
    // 错误
    cmd.Stderr = os.Stderr
    cmd.ExtraFiles = files
    cmd.Env = env  
    err = cmd.Start()
    if err != nil {
        log.Fatalf("Restart: Failed to launch, error: %v", err)
    } 
    return
}

The fork code first fetches the different listen fd’s according to the server and wraps them in a list of files, then passes the file descriptors into the ExtraFiles parameter when calling cmd, so that the process can be seamlessly hosted on the port the parent is listening on.

Note that there is an ENDLESS_CONTINUE parameter in the env argument list which is checked when endless is started.

1
2
3
4
5
6
7
8
func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
    runningServerReg.Lock()
    defer runningServerReg.Unlock()

    socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")
    isChild = os.Getenv("ENDLESS_CONTINUE") != ""
  ...
}

Let’s see what shutdown does when it receives a SIGTERM signal.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (srv *endlessServer) shutdown() {
    if srv.getState() != STATE_RUNNING {
        return
    }

    srv.setState(STATE_SHUTTING_DOWN)
    // 默认 DefaultHammerTime 为 60秒
    if DefaultHammerTime >= 0 {
        go srv.hammerTime(DefaultHammerTime)
    }
    // 关闭存活的连接
    srv.SetKeepAlivesEnabled(false)
    err := srv.EndlessListener.Close()
    if err != nil {
        log.Println(syscall.Getpid(), "Listener.Close() error:", err)
    } else {
        log.Println(syscall.Getpid(), srv.EndlessListener.Addr(), "Listener closed.")
    }
}

shutdown Here the connection is shut down first, as the child process is already started, so it is no longer processing requests and needs to shut down the port listening. The srv.hammerTime method is also called asynchronously to wait 60 seconds for the parent process to finish processing the request before shutting it down.

getListener Get Port Listener

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {
    // 如果是子进程
    if srv.isChild {
        var ptrOffset uint = 0
        runningServerReg.RLock()
        defer runningServerReg.RUnlock()
        // 这里还是处理多个 server 的情况
        if len(socketPtrOffsetMap) > 0 {
            // 根据server 的顺序来获取 listen fd 的序号
            ptrOffset = socketPtrOffsetMap[laddr] 
        }
        // fd 0,1,2是预留给 标准输入、输出和错误的,所以从3开始
        f := os.NewFile(uintptr(3+ptrOffset), "")
        l, err = net.FileListener(f)
        if err != nil {
            err = fmt.Errorf("net.FileListener error: %v", err)
            return
        }
    } else {
        // 父进程 直接返回 listener
        l, err = net.Listen("tcp", laddr)
        if err != nil {
            err = fmt.Errorf("net.Listen error: %v", err)
            return
        }
    }
    return
}

There’s nothing to say here if it’s the parent process, just create a port to listen on and return.

But for the child process there are a few detours, starting with why the os.NewFile argument starts with 3. Because when the child process inherits the fd from the parent process, 0, 1 and 2 are reserved for standard input, output and errors, so the first fd given by the parent process is ordered from 3 in the child process.

As shown below, the first three fd’s are reserved for standard input, output and errors, and fd 3 is incremented according to the array of ExtraFiles passed in.

sobyte

In fact, here we can also experiment with the opening example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 第一次构建项目
go build main.go
# 运行项目,这时就可以做内容修改了
./endless &
# 这个时候我们看看父进程打开的文件
lsof  -P -p 17116
COMMAND   PID USER   FD      TYPE  DEVICE SIZE/OFF     NODE NAME
...
main    18942 root    0u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    1u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    2u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    3u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
# 请求项目,60s后返回
curl "http://127.0.0.1:5003/sleep?duration=60s" & 
# 重启,17116为父进程pid
kill -1 17116
# 然后我们看一下 main 程序的进程应该有两个
ps -ef |grep ./main
root      17116  80539  0 04:19 pts/2    00:00:00 ./main
root      18110  17116  0 04:21 pts/2    00:00:00 ./main
# 可以看到子进程pid 为18110,我们看看该进程打开的文件
lsof  -P -p 18110
COMMAND   PID USER   FD      TYPE  DEVICE SIZE/OFF     NODE NAME
...
main    19073 root    0r      CHR     1,3      0t0     1028 /dev/null
main    19073 root    1u      CHR   136,2      0t0        5 /dev/pts/2
main    19073 root    2u      CHR   136,2      0t0        5 /dev/pts/2
main    19073 root    3u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
main    19073 root    4u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
# 新API请求
curl "http://127.0.0.1:5003/sleep?duration=1s" 

Summary

We have learned through endless how to restart a service without stopping the service. I believe this feature will be used in many scenarios and students who have not used it can try it on their own systems.

Hot restart in general allows the service to be restarted without interrupting the connections already established, the old service process will not accept new connection requests and new connection requests will be accepted in the new service process. For connections already established in the original service process, it is also possible to set them to read off and wait until the requests on the connection have been smoothly processed and the connection is free before exiting.

In this way, it is ensured that the established connection is not interrupted and that the new service process can accept connection requests normally.