I recently saw a very hotly debated article, “Concurrency is still not easy”, which even made it to Hack News. If you are interested, you can take a look at it, and this article will use a real-life example to introduce why the author says that writing Concurrency is not so easy. As you know, in Go Language, you can write Concurrency easily by using the keyword go, but multiple Goroutines need to communicate by channel. There are a bunch of Concurrency Patterns available on the web for developers, but the official Go standard library doesn’t include them, so it’s hard to see the problem after implementation. The article mentions that gops encountered the problem of the whole system hanging when implementing Limit Concurrency? What is Limit Concurrency? It is when the system has multiple jobs that need to be executed at the same time, but it needs to limit the amount of Concurrency to avoid the whole resource being eaten up. The following is a description of the problem encountered in the article.

Limit Concurrency problem

Gops is a set of CLI tools that lists all the processes running in Go on the system. Simply install the gops command and run gops first to see the results below.

1
2
3
4
$ gops
98   1    com.docker.vmnetd  go1.13.14 com.docker.vmnetd
2319 2275 gopls              go1.15    /Users/appleboy/go/bin/gopls
4083 452  gops               go1.15    /Users/appleboy/go/bin/gops

Then use the following code to continue to fill up the process to 10, and then run gops to find that the system does not show up at all, and it does not end.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import (
    "fmt"
    "time"
)

func main() {

    timer1 := time.NewTicker(1 * time.Second)

    for v := range timer1.C {
        fmt.Println(v)
    }
}

This problem occurred on 8/5 when a user proposed PR to restrict Concurrency, which caused the above problem, causing the CLI to fail to continue to run and require ctrl + c to end the execution. At the bottom is the modified code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// FindAll returns all the Go processes currently running on this host.
func FindAll() []P {
    const concurrencyProcesses = 10 // limit the maximum number of concurrent reading process tasks
    pss, err := ps.Processes()
    if err != nil {
        return nil
    }

    var wg sync.WaitGroup
    wg.Add(len(pss))
    found := make(chan P)
    limitCh := make(chan struct{}, concurrencyProcesses)

    for _, pr := range pss {
        limitCh <- struct{}{}
        pr := pr
        go func() {
            defer func() { <-limitCh }()
            defer wg.Done()

            path, version, agent, ok, err := isGo(pr)
            if err != nil {
                // TODO(jbd): Return a list of errors.
            }
            if !ok {
                return
            }
            found <- P{
                PID:          pr.Pid(),
                PPID:         pr.PPid(),
                Exec:         pr.Executable(),
                Path:         path,
                BuildVersion: version,
                Agent:        agent,
            }
        }()
    }
    go func() {
        wg.Wait()
        close(found)
    }()
    var results []P
    for p := range found {
        results = append(results, p)
    }
    return results
}

I simplified the above example to a single main function and it works the same way:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    const concurrencyProcesses = 10 // limit the maximum number of concurrent reading process tasks
    const jobCount = 100

    var wg sync.WaitGroup
    wg.Add(jobCount)
    found := make(chan int)
    limitCh := make(chan struct{}, concurrencyProcesses)

    for i := 0; i < jobCount; i++ {
        limitCh <- struct{}{}
        go func(val int) {
            defer func() {
                wg.Done()
                <-limitCh
            }()
            waitTime := rand.Int31n(1000)
            fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
            time.Sleep(time.Duration(waitTime) * time.Millisecond)
            found <- val
        }(i)
    }
    go func() {
        wg.Wait()
        close(found)
    }()
    var results []int
    for p := range found {
        fmt.Println("Finished job:", p)
        results = append(results, p)
    }

    fmt.Println("result:", results)
}

I replaced the ps.Processes() data with the Job count to explain why this code caused the system to hang. The main reason is the limitCh <- struct{}{} in the for loop, which is set to only run 10 Concurrency Processes at a time.

1
2
3
4
5
6
    for i := 0; i < jobCount; i++ {
        limitCh <- struct{}{}
        go func(val int) {
            ....
        }(i)
    }

This is a standard Limit Concurrency problem, after the first Job is read, the empty struct is first dropped into the limitCh channel, at this time limitCh is the remaining 9 can continue to process, and then continue the same action. But when the 11th Job needs to be processed, it will directly stop at limitCh <- struct{}{}, and the code behind the for loop will not be able to execute at all, causing the whole system to deadlock.

From this, we can see that if the number of Processes is less than 10, we can hardly see any problem and the system will work normally (you can replace the Job Count of the example with 10). The following will introduce two ways to bypass this problem, you can refer to see.

Throw limitCh <- struct{}{} into the background

I believe many developers may think that since they are stuck in limitCh <- struct{}{}, they can just throw this code into the goroutine as well.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    found := make(chan int)
    limitCh := make(chan struct{}, concurrencyProcesses)

    for i := 0; i < jobCount; i++ {
        go func() {
            limitCh <- struct{}{}
        }()
        go func(val int) {
            defer func() {
                <-limitCh
                wg.Done()
            }()
            waitTime := rand.Int31n(1000)
            fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
            time.Sleep(time.Duration(waitTime) * time.Millisecond)
            found <- val
        }(i)
    }

I’m glad to see that this approach solves the system hang problem. But have you noticed that the code does not limit Concurrency Processes, but rather 100 Jobs at the same time to the end? Although this approach can solve the problem, but back to the original purpose of the problem, we are to write Limit Concurrency ah.

Writing with worker queue

This is quite common. Since we need to limit the number of workers that can be processed at the same time in the background, it is relative to create a specific number of workers, and each worker reads the data out of Channle. The first step is to create a queue channel and drop all the content into the queue.

1
2
3
4
5
6
7
8
9
    found := make(chan int)
    queue := make(chan int)

    go func(queue chan<- int) {
        for i := 0; i < jobCount; i++ {
            queue <- i
        }
        close(queue)
    }(queue)

This side is also dropped into the background by goroutine to avoid blocking the whole main program. Then a specific worker is created to handle all the jobs.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    for i := 0; i < concurrencyProcesses; i++ {
        go func(queue <-chan int, found chan<- int) {
            for val := range queue {
                defer wg.Done()
                waitTime := rand.Int31n(1000)
                fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
                time.Sleep(time.Duration(waitTime) * time.Millisecond)
                found <- val
            }
        }(queue, found)
    }

As you can see, the for loop here is dominated by concurrencyProcesses, and the goroutine is used to read the channels until all the channels are read, and the whole goroutine will be finished. In addition to this solution, there are still other ways to implement it, so we’ll leave it to you to play.

Summary

The reason why this article is so popular, I guess it’s also because the Repo is officially developed by Google itself, but all the PRs need to go through Googler’s strict review before they can be merged into the Master branch, but like this small detail, if not really tested, it’s really quite difficult to find problems.