When dealing with Http Handler or closing a database connection, etc., suppose there are multiple Jobs in a service with a live worker, how do I wait until all the Jobs are finished before shutting down and deleting the service (using Docker)? Here is the whole operation process:

go shutdown

Problems encountered

When the service is shut down or forced to use ctrl + c to stop, the service should wait until all workers have completed all jobs. First, let’s see what’s wrong with the first version. When the developer presses ctrl + c, the cancel() signal will be sent, then let’s see how the workers were originally written. For the complete code, please refer to here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (c *Consumer) worker(ctx context.Context, num int, wg *sync.WaitGroup) {
    defer wg.Done()
    log.Println("start the worker", num)
    for {
        select {
        case job := <-c.jobsChan:
            if ctx.Err() != nil {
                log.Println("get next job", job, "and close the worker", num)
                return
            }
            c.process(num, job)
        case <-ctx.Done():
            log.Println("close the worker", num)
            return
        }
    }
}

Suppose there are 10 jobs coming in at the same time and there are four workers processing at the same time. Done() channel will be triggered after pressing ctrl + c. Since Select accepts two channels, the developer cannot predict which one will be triggered first, but if jobsChan has other jobs to process, it will be terminated by the program. How to solve this problem? Continue to the next page

Rewrite worker

In fact, it’s as simple as rewriting the worker part and not using the select method (see the code here).

1
2
3
4
5
6
7
8
func (c *Consumer) worker(num int, wg *sync.WaitGroup) {
    defer wg.Done()
    log.Println("start the worker", num)

    for job := range c.jobsChan {
        c.process(num, job)
    }
}

When the for method is used to read jobsChan, the for loop will not end until the channle is completely empty, so there are multiple workers reading jobsChan at the same time. wg.Done() will be triggered after the for is finished to tell the main program that the worker has finished and the main program can proceed to close. Note that the Consumer will receive a cancel() trigger first and then close the jobsChan channel, but it can still read the remaining jobs out of the channel by for. You can see how the consumer is written (see here for the full code):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (c Consumer) startConsumer(ctx context.Context) {
    for {
        select {
        case job := <-c.inputChan:
            select {
            case c.jobsChan <- job:
            default:
                log.Println("job channel has been closed. num:", job)
            }
            if ctx.Err() != nil {
                close(c.jobsChan)
                return
            }
        case <-ctx.Done():
            close(c.jobsChan)
            return
        }
    }
}

Summary

It depends on the needs of the project to decide whether to stop the worker immediately or to wait until all the jobs are finished. The difference between the two approaches is that the former requires two more channels to be processed in the worker, while the latter only requires a for loop to read out all the job channels before finishing.