I recently used Gunicorn’s Graceful Shutdown feature in a project, so I read the code to learn about Gunicorn’s signal processing.

Master

Gunicorn starts with.

1
2
3
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
    BaseApplication().run()
        Arbiter(self).run()

The main control logic of the Master is implemented in Arbiter, including the signal handling and main loop logic. A call to Arbiter().run() will eventually lead to a call to Arbiter.init_signals(), where the signal functions defined in the Master that need to be handled are registered accordingly.

1
2
3
4
5
6
7
Arbiter().run():
    Arbiter().start() 
        Arbiter().init_signals() # 初始化 Master 信号处理
            # initialize all signals
            for s in self.SIGNALS: # "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH"
                signal.signal(s, self.signal)
            signal.signal(signal.SIGCHLD, self.handle_chld)

All defined signal handling functions are Aribiter().signal() which stores the received signals in Arbiter.SIG_QUEUE, keeping up to 5 signals, and then triggers Arbiter().wakeup() which, in Arbiter().wakeup(), writes data to the PIPE pipe, which mainly serves to wake up the master loop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def signal(self, sig, frame):
    if len(self.SIG_QUEUE) < 5:
        self.SIG_QUEUE.append(sig)
        self.wakeup()

def wakeup(self):
    """\
    Wake up the arbiter by writing to the PIPE
    """
    try:
        os.write(self.PIPE[1], b'.')
    except IOError as e:
        if e.errno not in [errno.EAGAIN, errno.EINTR]:
            raise

The main difference between TERM and INT signal processing is whether Arbiter().stop(False) is executed first, and finally raise StopIteration. For the HUP signal the Arbiter().reload() logic is executed itself.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def handle_hup(self):
    """\
    HUP handling.
    - Reload configuration
    - Start the new worker processes with a new configuration
    - Gracefully shutdown the old worker processes
    """
    self.log.info("Hang up: %s", self.master_name)
    self.reload()

def handle_term(self):
    "SIGTERM handling"
    raise StopIteration

def handle_int(self):
    "SIGINT handling"
    self.stop(False)
    raise StopIteration

def handle_quit(self):
    "SIGQUIT handling"
    self.stop(False)
    raise StopIteration

def handle_ttin(self):
    """\
    SIGTTIN handling.
    Increases the number of workers by one.
    """
    self.num_workers += 1
    self.manage_workers()

After the Master performs the signal processing initialization action, it enters the Master loop phase.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
try:
    self.manage_workers()           # 启动 Worker,如果 Worker 数量不足则启动;如果数量超过预期则 kill。

    while True:
        self.maybe_promote_master()

        sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
        if sig is None:
            self.sleep() # 如果 Master 没有接收到信号,则进入到 `Arbiter().sleep()` ,该函数从 `Arbiter.PIPE` 读取数据(阻塞),直到 `Arbiter.PIPE` 有数据后才返回
            self.murder_workers() # 如果在当前循环中,接收到信号,则进行 worker 处理,清理无用 Worker
            self.manage_workers() # 重新进入创建、清理 Worker 逻辑
            continue # 进入下一循环,在下一次循环中,sig 为刚刚唤醒 Master 的信号

        if sig not in self.SIG_NAMES:  # 如果接收到的信号不需要处理,则忽略
            self.log.info("Ignoring unknown signal: %s", sig)
            continue

        signame = self.SIG_NAMES.get(sig) # 获取信号名称
        handler = getattr(self, "handle_%s" % signame, None) # 获取 Master 针对该信号的处理函数
        if not handler:
            self.log.error("Unhandled signal: %s", signame)
            continue
        self.log.info("Handling signal: %s", signame)
        handler() # 执行信号处理
        self.wakeup() 
except (StopIteration, KeyboardInterrupt): # 如果信号为 TERM 或 INT,则会触发 Exception,进入到 `Arbiter().halt()` 逻辑,清理配置并退出进程
    self.halt()

Worker

When the master creates a worker, it first executes worker.init_process() to initialize the process

1
2
3
4
5
self.init_signals()
...
# Enter main run loop
self.booted = True
self.run()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def init_signals(self):
    # reset signaling
    for s in self.SIGNALS:
        signal.signal(s, signal.SIG_DFL)

    # init new signaling 
    signal.signal(signal.SIGQUIT, self.handle_quit)
    signal.signal(signal.SIGTERM, self.handle_exit)
    signal.signal(signal.SIGINT, self.handle_quit)
    signal.signal(signal.SIGWINCH, self.handle_winch)
    signal.signal(signal.SIGUSR1, self.handle_usr1)
    signal.signal(signal.SIGABRT, self.handle_abort)

    # Don't let SIGTERM and SIGUSR1 disturb active requests
    # by interrupting system calls
    signal.siginterrupt(signal.SIGTERM, False)
    signal.siginterrupt(signal.SIGUSR1, False)

    if hasattr(signal, 'set_wakeup_fd'):
        signal.set_wakeup_fd(self.PIPE[1])

In the Worker, all signal handling functions are triggered directly, without the SIG_QUEUE form of the Master.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def handle_usr1(self, sig, frame):
    self.log.reopen_files()

def handle_exit(self, sig, frame):
    self.alive = False

def handle_quit(self, sig, frame):
    self.alive = False
    # worker_int callback
    self.cfg.worker_int(self)
    time.sleep(0.1)
    sys.exit(0)

def handle_abort(self, sig, frame):
    self.alive = False
    self.cfg.worker_abort(self)
    sys.exit(1)

In the worker’s loop logic, the priority is to check if self.alive is True, and if it is, then the request processing logic goes to

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def run_for_one(self, timeout):
    listener = self.sockets[0]
    while self.alive:
        self.notify()

        # Accept a connection. If we get an error telling us
        # that no connection is waiting we fall down to the
        # select which is where we'll wait for a bit for new
        # workers to come give us some love.
        try:
            self.accept(listener)
            # Keep processing clients until no one is waiting. This
            # prevents the need to select() for every client that we
            # process.
            continue

        except EnvironmentError as e:
            if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
                               errno.EWOULDBLOCK):
                raise

        if not self.is_parent_alive():
            return

        try:
            self.wait(timeout)
        except StopWaiting:
            return

Complete process

Taking the HUP signal as an example, in the Gunicorn documentation, HUP can gracefully restart the worker process by.

HUP: Reload the configuration, start the new worker processes with a new configuration and gracefully shutdown older workers. If the application is not preloaded (using the preload_app option), Gunicorn will also load the new version of it.

When the master process receives the HUP signal.

  • Master loop wakes up and enters murder_workers and manage_workers processing
  • The master loop enters the next loop and gets the HUP signal processing function
  • Execute Arbiter().handle_hup()
  • execute Arbiter().reload() to reload the configuration, reload the WSGI app, and start the new worker according to the worker configuration, and after the start, execute manage_workers to process the original worker, because at this time the number of workers is double the expected number, so it will enter the stop the original Worker processing logic, execute Arbiter().kill_worker() logic, execute os.kill, pass the signal as TERM
  • At this point the master signal processing is complete and Arbiter().wakeup() is executed to bring it back into the loop blocking logic.

When the Worker process receives the TERM signal.

  • Execute the Worker().handle_exit() logic, setting self.alive to False
  • In the worker’s loop logic, the priority is to check if self.active is True, and if it is not True, the next loop exits after the current logic is processed, thus achieving graceful shutdown