When it comes to the Go language, it is important to mention concurrent programming in Go.

Go adds good support for concurrent programming at the language level, unlike other languages such as Python and Java that use the Thread library to create new threads, and the thread-safe queue library to share data. Go’s support for concurrent programming relies on two fundamental concepts of the Go language: Goroutine and Channel.

Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.

That is, concurrency is the processing of multiple things at the same time, often by programmatic means, with the goal of maximizing CPU utilization, while parallelism is the doing of multiple things at the same time, requiring the support of multi-core CPUs.

Goroutine

Goroutines are one of the core concepts of parallel programming in Go, and are lighter-weight concurrent units than Threads, fully user-state and managed by the Go runtime. The smallest Goroutine requires very little stack memory (about 4-5KB), so that the size of a dozen Goroutines may be reflected in the size of five or six threads at the bottom. Up to thousands of concurrent tasks can run simultaneously; at the same time, the Go language internally implements memory sharing between Goroutines making it more efficient and easier to use than Thread, and we don’t have to use something like the obscure thread-safe queue library to synchronize data.

Creating a Goroutine

To create a Goroutine, we just need to add the go keyword before the function call statement, and the Go language scheduler will automatically schedule it to execute on the appropriate system thread. The Go language scheduler will automatically schedule it to execute on the appropriate system thread.

1
go f(x, y, z)

A new Goroutine will be started and f(x, y, z) will be executed.

Note: When starting a new Goroutine above, the evaluation of f , x , y and z occurs in the current Goroutine, while the execution of f occurs in the new Goroutine.

In fact, in concurrent programming, we often divide a large task into several smaller tasks that can be executed in parallel, creating a Goroutine for each smaller task. When a program starts, its main function runs in a separate Goroutine, which we call the main routine, and then we use the go keyword in the main function to create other Goroutines.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func subTask() {
    i := 0
    for {
        i++
        fmt.Printf("new goroutine: i = %d\n", i)
        time.Sleep(1 * time.Second)
    }
}

func main() {
    go subTask() // Create go rountine to execute sub task

    i := 0
    // main goroutine
    for {
        i++
        fmt.Printf("main goroutine: i = %d\n", i)
        time.Sleep(1 * time.Second)
    }
}

Note that after the main routine exits, other Goroutines created by it will also exit automatically. This reminds us that when programming concurrently, it is important to gracefully close other Goroutines before the main routine exits.

runtime

Speaking of Goroutine, we have to mention the runtime package.

Package runtime contains operations that interact with Go’s runtime system, such as functions to control goroutines. It also includes the low-level type information used by the reflect package; see reflect’s documentation for the programmable interface to the run-time type system.

As you can see, the runtime package is mainly an interface package that deals with the Go runtime, controls the Goroutine, and dynamically obtains the underlying runtime information through the reflection mechanism. Here, we focus on the interface used to control Goroutine in the runtime package.

1. Gosched

The purpose of runtime.Gosched() is to let the current Goroutine actively give up its CPU time slice, allowing the Go scheduler to schedule other waiting Goroutines to run and resume execution from that position at some point next time, very similar to the Java thread library’s Thread.yield.

As an example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func main() {
    runtime.GOMAXPROCS(1)
    exit := make(chan int)
    go func() {
        defer close(exit)
        go func() {
            fmt.Println("b")
        }()
    }()

    for i := 0; i < 6; i++ {
        if i == 4 {
            runtime.Gosched() // switch go routine
        }
        fmt.Println("a:", i)
    }
    <-exit
}

The output of this program above is as follows.

1
2
3
4
5
6
7
a: 0
a: 1
a: 2
a: 3
b
a: 5
a: 6

2. Goexit

runtime.Goexit() is mainly used to immediately terminate the execution of the current Goroutine. The Go language scheduler ensures that all registered defer statements are called and executed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
    go func() {
        defer fmt.Println("A.defer")

        func() {
            defer fmt.Println("B.defer")
            runtime.Goexit() // exit current go routine, import "runtime"
            fmt.Println("B") // never execute this
        }()

        fmt.Println("A")
    }()

    // for loop
    for {
    }
}

The output of the program is as follows.

1
2
B.defer
A.defer

3. GOMAXPROCS

To use multiple cores in Goroutine, you can use the runtime.GOMAXPROCS() function to set the maximum number of CPU cores that can be computed in parallel and return the previous value, using the default value when the argument is less than 1.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
    //runtime.GOMAXPROCS(1)
    runtime.GOMAXPROCS(2)

    var wg sync.WaitGroup // import "sync"
    wg.Add(2)

    fmt.Println("Starting Go Routines")
    go func() {
        defer wg.Done()

        for char := 'a'; char < 'a'+26; char++ {
            fmt.Printf("%c ", char)
        }
    }()

    go func() {
        defer wg.Done()

        for number := 1; number < 27; number++ {
            fmt.Printf("%d ", number)
        }
    }()

    fmt.Println("Waiting To Finish")
    wg.Wait()

    fmt.Println("\nTerminating Program")
}

Channel

With Goroutine, we can start multiple subtasks concurrently, which greatly improves the processing efficiency, but what if there is data synchronization between multiple subtasks? Let’s say I have two subtasks, and subtask 2 must wait until subtask 1 has processed some data before it can be started, how do I ensure such data sharing and synchronization?

Goroutine runs in the same address space, so it is possible to synchronize the shared memory to be accessed through the locking mechanism provided by the sync package, but this is not often used in Go, and a more convenient way is to use Channel.

A channel is a way for concurrent Goroutines to communicate with each other, similar to the Unix pipeline mechanism, with an underlying first-in-first-out (FIFO) queue.

Types of Channels

Channel is type-dependent, a channel can only pass one type of data, the type is defined in the following format.

1
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType

The arrow (<-) points to the direction of the data flow. If no direction is specified, then the channel is bidirectional and can either receive or send data. ElementType specifies the type of data passed by the channel.

1
2
3
chan T          // Send or Receive Data of type T
chan<- T        // only Send Data of type T
<-chan int      // only Receive Data of type T

The channel must be created before it can be used, and like the map and slice data types, we use the make function to create the channel.

1
make(chan T[, capacity])

The optional second parameter, capacity, represents the maximum number of elements the channel can hold and represents the size of the channel’s buffer. If the capacity is not set, or if it is set to 0, the channel has no cache and communication between the sender and receiver will only happen when they are ready (they were blocking before). If the cache is set, the sender will only be blocked when the cache is full, and the receiver will only be blocked when the cache is empty.

Note: A channel with the value nil will not communicate

Channel operations

Channel’s operator is the arrow <- and also supports multi-valued assignment.

1
2
3
ch <- v       // Send value 'v' to channel 'ch'
v := <-ch     // Receive value fron channel 'ch' and assign it to v
v, ok := <-ch // Receive value fron channel 'ch' and assign it to v, get the status to 'ok'

In the third example above, ok is used to check the status of the channel. If the value of ok is false, then the value of the receiver v is the zero value of the channel pass type, and the channel is closed or empty.

1. Sending

The send operation is used to send data to the Channel, such as ch <- 3, which is defined as follows.

1
2
SendStmt = Channel "<-" Expression
Channel  = Expression

The channel and expression must be evaluated before the communication starts, e.g. (3+4) below calculates the result first and then sends it to the channel.

1
2
3
4
5
c := make(chan int)
defer close(c)
go func() { c <- 3 + 4 }()
i := <-c
fmt.Println(i)

The communication is blocked until the send operation is executed. As mentioned earlier, for channels without a cache, the send operation is executed only when the receiver is ready; if there is a cache and the cache is not full, the send operation is executed immediately.

Note: There are two other things to note about channel operations.

  • Sending data to a channel that has been closed will cause a run-time panic.
  • Sending data to a channel that is empty (nil) will always block

2. Receive

<-ch is used to receive data from the channel. For channels without cache, the receive operation is executed only when the sender is ready; if there is a cache and the cache is not empty, the receive operation is executed immediately.

The sender can close a channel with the close() function to indicate that there are no more values to send. The receiver can test if the channel is closed by assigning a second argument to the receive expression: if there are no values to receive and the channel is closed, ok will be set to false after executing: v, ok := <-ch.

Note:

  • Receiving data from an empty (nil) channel will always be blocked
  • Only the sender can close the channel, but not the receiver.
  • Receiving data from a channel that has been closed will not block, but will return immediately after receiving the sent data with a zero value of element type

3. Range

The Go language often uses the for ... range to read all values from the channel until it is closed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func main() {
    c := make(chan int)
    go func() {
        for i := 0; i < 10; i = i + 1 {
            c <- i
        }
        close(c)
    }()
    for i := range c {
        fmt.Println(i)
    }
    fmt.Println("Finished")
}

In the above code snippet, range c produces an iteration of the value sent to the channel, and it iterates until channel c is closed. If you comment out close(c) in the above example, the program will keep blocking on the line for ...... range.

4. Select

The select statement is similar to the switch statement, except that it is used to handle multiple goroutines communicating concurrently through a channel. The corresponding case clause of select can be a send expression, a receive expression, or a default expression, and the select clause can select a set of possible send and receive operations to process; if more than one case clause is available, select will randomly and fairly select one for execution; if no If no case clause satisfies the processing condition, default is selected by default; if no default clause exists, the select statement will be blocked until a case needs to be processed.

Note: At most one default clause is allowed, it can be placed anywhere in the case clause list, but it is usually placed last; without a default clause, a select statement on a channel with only nil will always be blocked.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}
func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

The select statement, like the switch statement, is not a loop; it will only select a case to process, and if you want to keep processing the channel, you can add an infinite for loop to the outside.

1
2
3
4
5
6
7
8
9
for {
    select {
    case c <- x:
        x, y = y, x+y
    case <-quit:
        fmt.Println("quit")
        return
    }
}

5. Timer & Ticker

A very important application of the select statement is timeout handling. As mentioned before, if there is no case to process, the select statement will keep blocking, so we need a case that will handle the timeout.

In the example below we will send a data to channel c after 2 seconds, but select is set to time out for 1 second, so we will print timeout 1 instead of result 1.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import "time"
import "fmt"

func main() {
    c := make(chan string, 1)
    go func() {
        time.Sleep(time.Second * 2)
        c <- "result 1"
    }()
    select {
    case res := <-c:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
}

Utilizes the time.After method, which returns a one-way channel of type <-chan Time that sends a current time at the specified time to the returned channel.

In fact, timer is a timer that represents some future event. When creating timer you can tell timer how long to wait, and it will create and return a channel that provides a time value to that channel at that future time. The second line in the example below will block for about 2 seconds until the time is up before continuing execution.

1
2
3
timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
fmt.Println("Timer 1 expired")

ticker is a timer that is triggered at regular intervals. It sends an event (current time) to the channel at an interval, and the receiver can read the event from the channel at a fixed interval. The following example is triggered every 500 milliseconds, and you can observe the output time.

1
2
3
4
5
6
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
    for t := range ticker.C {
        fmt.Println("Tick at", t)
    }
}()

Both timer and ticker can be stopped by the Stop() method. Once it stops, the receiver will no longer receive data from the returned channel.

Synchronization Primitives and Locks

Go is a language that natively supports concurrent programming, and when it comes to concurrent programming, the concept of locks is often a must. Locks are Synchronization Primitives in concurrent programming that ensure that multiple Goroutines can access the same piece of memory without problems such as Race condition.

Go provides some basic primitives for synchronization in the sync package, including the common sync.Mutex, sync.RWMutex, sync.WaitGroup, sync.Once, and sync.Cond.

Note: The Go language base synchronization primitive is a relatively primitive synchronization mechanism, and in most cases we should use a higher level of abstraction channel to implement synchronization.

1. Mutex

The Go language sync.Mutex consists of two fields state and sema. Where state indicates the current state of the mutex lock, and sema is a semaphore used to control the lock state.

1
2
3
4
type Mutex struct {
    state int32
    sema  uint32
}

The above two fields together occupy only 8 bytes of space in a structure representing a mutex lock in the Go language. The lowest three bits are mutexLocked, mutexWoken and mutexStarving. By default, all the state bits of a mutex lock are 0, and the different bits in int32 indicate different states; the remaining bits are used to indicate how many Goroutine is waiting for the release of a mutex lock.

sync.Mutex has two modes of operation: normal mode and starvation mode. In normal mode, lock waiters will acquire locks in first-in-first-out order. However, when a newly-awakened Goroutine competes with a newly-created Goroutine, there is a high probability that the lock will not be acquired. To reduce this situation, once a Goroutine has not acquired a lock for more than 1 millisecond, it switches the current mutually exclusive lock to starvation mode to prevent some of the Go concurrently from being starved to death.

From a programming point of view, the sync.Mutex mutex lock involves two methods: Lock and Unlock.

We can guarantee mutually exclusive execution of a piece of code by calling the Lock method before the code and the Unlock method after the code. You can also use the defer statement to ensure that the mutually exclusive lock will be unlocked.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// SafeCounter is safe to use concurrently.
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    // Lock so only one goroutine at a time can access the map c.v.
    c.v[key]++
}

// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(key string) int {
    return c.v[key]
}

func main() {
    c := SafeCounter{v: make(map[string]int)}
    for i := 0; i < 1000; i++ {
        go c.Inc("somekey")
    }
    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey"))
}

2. RWMutex

Read-write mutex lock sync.RWMutex is a fine-grained mutex lock, which does not restrict concurrent reads of resources, but read-write and write-write operations cannot be executed in parallel. Generally speaking, the ratio of resource read/write for common services will be very high, because most of the read requests will not affect each other, so we can separate the read/write operations as a way to improve the performance of the service.

- Read Write
read Y N
write N N

The sync.RWMutex contains the following 5 fields in total.

1
2
3
4
5
6
7
type RWMutex struct {
    w           Mutex
    writerSem   uint32
    readerSem   uint32
    readerCount int32
    readerWait  int32
}

where w provides the ability to reuse the capabilities provided by a mutex lock; writerSem and readerSem are used for write wait reads and read wait writes, respectively: readerCount stores the number of read operations currently being performed; readerWait indicates the number of read operations waiting when a write operation is blocked.

  • For a “write lock”, the write lock is acquired by first blocking the write lock and then blocking the read lock, which ensures that the read operation is not starved by successive write operations.
  • For “read locks”, the read lock’s add method will add one to readerCount via sync/atomic.AddInt32. If the method returns a negative number, the other Goroutine has acquired the write lock and the current Goroutine falls into hibernation waiting for the lock to be released; if the method results in a non-negative number, no Goroutine has acquired the write lock and the current method returns successfully; the unlocking process for the read lock is basically the opposite process.

3. WaitGroup

sync.WaitGroup can wait for the return of a group of Goroutines, a common usage scenario is to initiate RPC or HTTP requests in bulk.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        res, err := service.call(r)
    }(request)
}
wg.Wait()

With sync.WaitGroup, code that would otherwise be executed sequentially is executed concurrently in multiple Goroutines, speeding up program processing.

The sync.WaitGroup structure definition contains only two member variables.

1
2
3
4
type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

In particular, noCopy is a special private structure that guarantees that sync.WaitGroup will not be copied by the developer through reassignment, and state1 stores the state and semaphore.

The sync.WaitGroup structure exposes three methods to the public: sync.WaitGroup.Add, sync.WaitGroup.Wait and sync.WaitGroup.Done.

where sync.WaitGroup.Done simply passes -1 to the sync.WaitGroup.Add method, and sync.WaitGroup must be returned by the syncWaitGroupWait method before it can be reused.

sync.WaitGroup.Done is just a simple wrapper around the sync.WaitGroup.Add method, we can pass any negative number to the sync.WaitGroup.Add method (we need to make sure the counter is not negative) to quickly zero the counter to wake up the waiting Goroutine; there can be multiple Goroutines at the same time waiting for the current sync.WaitGroup counter to be zeroed, and these Goroutines will be woken up at the same time.

4. Once

sync.Once guarantees that a piece of code will only be executed once during the runtime of a Go program, for example.

1
2
3
4
5
6
7
8
func main() {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        o.Do(func() {
            fmt.Println("only once")
        })
    }
}

The program runs with the following results.

1
2
$ go run main.go
only once

The sync.Once structure contains only a done to identify whether the block has been executed and a mutex sync.Mutex.

1
2
3
4
type Once struct {
    done uint32
    m    Mutex
}

sync.Once.Do is the only externally exposed method of the sync.Once structure, which takes a function with a null input: if the incoming function has already been executed, it will return directly; if the incoming function has not been executed, sync.Once.doSlow will be called to execute the incoming function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

The sync.Once structure, which is used to guarantee the number of times a function is executed, uses a mutex lock and the methods provided by the sync/atomic package to implement the semantics that a function can only be executed once during the program runtime. When using this structure, we also need to be aware of the following issues: the function passed in the sync.Once.Do method will only be executed once, even if a panic occurs in the function; two calls to the sync.Once.Do method passing in different functions will only execute the function passed in the first call.

5. Cond

sync.Cond allows a group of Goroutines to be woken up when certain conditions are met, and the sync.Cond structure is initialized with a mutex lock. For example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
var status int64

func main() {
    c := sync.NewCond(&sync.Mutex{})
    for i := 0; i < 10; i++ {
        go listen(c)
    }
    time.Sleep(1 * time.Second)
    go broadcast(c)

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    atomic.StoreInt64(&status, 1)
    c.Broadcast()
    c.L.Unlock()
}

func listen(c *sync.Cond) {
    c.L.Lock()
    for atomic.LoadInt64(&status) != 1 {
        c.Wait()
    }
    fmt.Println("listen")
    c.L.Unlock()
}

The program runs with the following results.

1
2
3
4
$ go run main.go
listen
...
listen

The above code runs 10 Goroutines simultaneously waiting for a specific condition to be met by sync.CondWait; one Goroutine calls sync.Cond.Broadcast to wake up all the Goroutines that are waiting; after calling the sync.Cond.Broadcast method, the above code prints out “listen” 10 times and ends the call.

The sync.Cond externally exposed sync.Cond.Wait method will put the current Goroutine into a dormant state.

sync.Cond.Signal and sync.Cond.Broadcast will wake up a dormant Goroutine. There are some subtle differences in their implementation: the sync.Cond.Signal method wakes up the Goroutine at the top of the queue, while the sync.Cond.Broadcast method wakes up all the Goroutines in the queue.

Note that sync.Cond is not a commonly used synchronization mechanism, but it can provide CPU utilization by giving up processor usage when conditions are not met for a long time, as opposed to using for {} for busy waiting. In addition, it is important to note that.

  • sync.Cond.Wait must be used to obtain a mutex lock before calling it, otherwise it will trigger a program crash.
  • sync.Cond.Signal wakes up the Goroutine at the top of the queue and the longest waiting Goroutine.
  • sync.Cond.Broadcast will broadcast notifications to all waiting Goroutines in a certain order.