Section1 channel use

1.1 make channel

A channel with cache and a channel without cache are created as follows.

1
2
3
4
// buffered
ch := make(chan Task, 3)
// unbuffered
ch := make(chan int)

buffered channel

If we create a channel with a buffer, the underlying data model is as follows.

buffered channel

When we write data to the channel, it will be directly stored in the circular queue(send). When the Queue is full, it will be in the following state.

circular queue

When dequeueing an element, the following is shown.

dequeueing an element

From the above figure, we can see that recvx has increased by one since, indicating that an element is queued, which is actually a circular array to achieve FIFO semantics.

Then there is another question, when we create a new channel, where is the memory allocated for the hchan data structure created by the underlying layer? In fact, Section2 has been analyzed in the source code analysis, hchan is allocated in the heap.

The following figure shows.

heap

When we use make to create a channel, it actually returns a pointer to the channel, so we can pass the channel object directly between functions instead of passing a pointer to the channel.

1.2 sends and receives

When different goroutines read and write on top of the channel, the process involved is more complicated, such as the following figure.

sends and receives

In the above diagram, G1 writes data to the channel and G2 reads data from the channel.

The flow of G1 acting on the underlying hchan is as follows.

G1

  1. first acquire the global lock.
  2. then enqueue the element (by means of a move copy).
  3. releasing the lock.

G2 reads when acting on the underlying data structure flow as shown in the following diagram.

G2

  1. first acquire the global lock.
  2. then dequeue the element (by means of a move copy).
  3. releasing the lock.

The above read and write idea is actually very simple, except for the hchan data structure, do not go through the shared memory to communicate; instead, through communication (copy) to achieve shared memory.

Writing a full channel scenario

As shown in the figure below: the queue is full after 3 tasks are written to the channel, what will happen when G1 writes the fourth task?

Writing a full channel scenario

G1 will pause at this point until a receiver appears.

This is where Golang’s scheduler needs to be introduced. We know that goroutines are user-space threads, and that they are created and managed by Go’s runtime, not by OS’s thread.

But Go’s runtime schedules execution of goroutines based on OS thread. The following diagram.

Go’s runtime schedules

What happens when data is written to an already full channel? The following diagram.

written to an already full channel

The flow of the above diagram is roughly as follows.

  1. the current goroutine (G1) will call the gopark function to set the current goroutine to the waiting state.
  2. disconnects M from the G1 binding.
  3. scheduler will schedule another ready goroutine to establish a binding relationship with M, and then M will run another G.

So the OS thread will be running all the time and will not be blocked by the blocking of the G1 co-process. Finally the reference of the current G1 will be deposited in the sender queue of the channel (the queue element is the sudog holding the G1).

So how is a blocked G1 restored? G1 is restored when a receiver receives channel data.

In fact the hchan data structure also stores the channel’s sender and receiver’s wait queue. The data prototype is as follows.

channel

Inside the wait queue is a single linked list of sudog, sudog holds a G for goroutine object reference and elem for the elements saved inside the channel. When G1 executes ch<-task4, G1 creates a sudog and saves it into the sendq queue, and the actual hchan structure is as follows.

wait queue

At this time, if G2 performs a read channel operation, the change graphs before and after the read are as follows.

channel

The whole process is as follows.

  1. G2 calls t:=<-ch to get an element.
  2. take an element task1 from the buffer of the channel.
  3. pop a sudog from the sender wait queue.
  4. copy task4 to the position of task1 in the buffer and update the sendx and recvx index values of the buffer.
  5. at this point it is necessary to set G1 to the Runable state, indicating that G1 is ready to resume operation.

G2 will call goready(G1) to wake up G1. the flow is shown in the following figure.

G2 will call goready(G1) to wake up G1

  1. first G2 calls goready(G1) to evoke scheduling of the scheduler.
  2. set G1 to Runable state.
  3. G1 will join the local queue queue of the local scheduler P and wait for it to run.

Scenario of reading an empty channel

When the buffer of a channel is empty, if G2 initiates a read operation first. The following figure.

G2

A sudog will be created, and the sudog representing G2 will be deposited into the recvq wait queue. Then G2 will call the gopark function to enter the wait state, giving up the OS thread, and then G2 will enter the blocking state.

At this time, if there is a G1 performing a write operation, the most intuitive flow is.

  1. store the task in recvq into buffer.
  2. goready(G2) wakes up G2.

But we have a smarter way: direct send; which actually means that G1 writes the data directly to the elem in G2, so that it doesn’t have to go through the process of copying the elem in G2 to the buffer, and then copying from the buffer to G1. as follows.

direct send

The specific process is that G1 writes data directly to G2’s stack. This way G2 does not need to get the global lock of the channel and operate the buffer.

Section2 channel source code

2.1 channel data storage structure

The channel data model is defined in the source code runtime/chan.go, which can be understood as a buffered queue. This buffered queue is used to store elements and provides the semantics of a FIFO. The source code is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type hchan struct {
    //channel队列里面总的数据量
    qcount   uint           // total data in the queue
    // 循环队列的容量,如果是非缓冲的channel就是0
    dataqsiz uint           // size of the circular queue
    // 缓冲队列,数组类型。
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    // 元素占用字节的size
    elemsize uint16
    // 当前队列关闭标志位,非零表示关闭
    closed   uint32
    // 队列里面元素类型
    elemtype *_type // element type
    // 队列send索引
    sendx    uint   // send index
    // 队列索引
    recvx    uint   // receive index
    // 等待channel的G队列。
    recvq    waitq  // list of recv waiters
    // 向channel发送数据的G队列。
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    // 全局锁
    lock mutex
}

The data structure of channel is relatively simple and consists of two main structures.

  1. an array implementation of the ring queue, the array has two subscript indexes indicating the read and write indexes respectively, which are used to store the channel buffer data.
  2. The channel’s send and recv queues, which hold the goroutine’s sudog elements, are implemented as doubly linked list.
  3. The global lock of the channel.

2.2 Ring Queue

chan internally implements a ring queue as its buffer. The length of the queue is specified when the chan is created.

The following diagram shows a schematic of a chan that can cache 6 elements.

Ring Queue

  • dataqsiz: indicates that the queue length is 6, i.e., 6 elements can be cached.
  • buf: points to the memory of the queue, with two elements remaining in the queue.
  • qcount: indicates that there are two elements left in the queue.
  • sendx: indicates the location where the subsequent writes are stored, taking the values [0, 6).
  • recvx: indicates the location from which data is read, takes the values [0, 6).

2.3 Waiting Queue

Read data from a channel, if the channel buffer is empty or there is no buffer, the current goroutine will be blocked. Write data to the channel, if the channel buffer is full or if there is no buffer, the current goroutine will be blocked.

The blocked goroutine will hang in the channel’s waiting queue.

  • A goroutine blocked by a read will be woken up by a goroutine writing data to the channel.
  • a goroutine blocked by a write will be woken up by a goroutine that reads data from the channel.

The following figure shows a channel without buffers with several goroutines blocking waiting to read data.

channel without buffers with several goroutines blocking waiting to read data

Note that in general at least one of recvq and sendq is empty. The only exception is when the same goroutine uses a select statement to write data to a channel while reading data from it.

2.4 Type Information

A channel can only pass one type of value, and the type information is stored in the hchan data structure.

  • elemtype represents the type and is used for assignment during data transfer.
  • elemsize represents the type size and is used to locate the element position in the buf.

2.5 Locks

A channel can only be read or written by one goroutine at a time. For simplicity, the rest of this chapter does not cover locking and unlocking when describing the read/write process.

Section3 channel reading and writing

3.1 Creating a Channel

When we create a new channel, we usually use the make(chan, n) statement, which is rewritten by the compiler and then executed by the makechan function in chan.go. The source code of the function is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case size == 0 || elem.size == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = unsafe.Pointer(c)
    case elem.kind&kindNoPointers != 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

The function takes two arguments, one is the data type of the elements stored inside the channel, and one is the buffer capacity (if 0 means non-buffer buffer), the creation process is as follows.

  • Depending on whether the passed buffer size size is zero, create a channel without buffer or a buffered channel with size, respectively.
    • For unbuffered channels, request the memory size of a hchan data structure.
    • For a buffered channel, new a hchan object and initialize the buffer memory
  • Update the key attributes of the circular queue in chan: elemsize, elemtype, dataqsiz.

The process of creating a channel is actually initializing the hchan structure. Where the type information and buffer length are passed in by the make statement, and the buf size is determined in conjunction with the element size and buffer length.

The pseudo code for creating a channel is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func makechan(t *chantype, size int) *hchan {
    var c *hchan
    c = new(hchan)
    c.buf = malloc(元素类型大小*size)
    c.elemsize = 元素类型大小
    c.elemtype = 元素类型
    c.dataqsiz = size

    return c
}

3.2 Writing to a channel by a goroutine (goroutine sender data)

All code that executes c < ep to send ep to a channel ends up calling the chansend function in chan.go.

The function is defined as follows.

1
2
3
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
}

The function has three parameters, the first represents the data structure of the channel, the second is a pointer to the data to be written, and the third block represents whether the write operation is blocking or not.

The main flow of writing data to a channel is as follows:

  • CASE1: When the channel is empty or uninitialized. If block means blocking, then sending data to it will be permanently blocked; if block means non-positive, it will be returned directly;
  • CASE2: The previous scenario, block is non-blocking, and the channel is not closed (closed channel can not write data) and (channel is non-buffered queue and receiver wait queue is empty) or (channel is buffered queue but queue is full), this time directly return;
    • Call lock(&clock) to lock the channel’s global lock;
  • CASE3:Can’t send data to closed channels, it will cause panic.
  • CASE4: If the recv queue on the channel is not empty, skip the channel’s cache queue and send the message directly to the receiving goroutine:
    • Call the sendDirect method to send the message to be written to the receiving goroutine;
    • Release the global lock on the channel;
    • Call the goready function to set the receiving qoroutine to the ready state and wait for scheduling.
  • CASE5: If the cache queue is not full, the message is copied to the cache queue, and then the global lock is released;
  • CASE6: If the cache queue is full and the receive message queue recy is empty, add the current aoroutine to the send queue:
    • Get the sudog of the current goroutine and put it into the send queue of the channel;
    • Hibernate the current goroutine

The simple process of writing data to a channel is as follows.

  1. if the waiting receive queue recvq is not empty, it means that there is no data in the buffer or there is no buffer, at this time, directly take out G from recvq and write the data, finally wake up the G and end the sending process.
  2. if there is a free space in the buffer, write the data to the buffer and end the sending process.
  3. if there is no free space in the buffer, write the data to be sent to G, add the current G to sendq, go to sleep and wait to be woken up by the read goroutine.

The flowchart is as follows.

process of writing data to a channel

3.3 goroutine receive data from a channel

All code that executes ep < c to receive data from a channel using ep will end up calling the chanrecv function in chan.go.

The function is defined as follows.

1
2
3
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......
}

As you can see from the source comment, this function receives data from the channel and writes the received data to the object pointed to by the ep pointer.

There is also a block argument, which indicates whether the channel blocks when it fails to return data. When block=false and there is no data in the channel, the function returns (false,false).

The flow of receiving data from a channel is as follows:

  • CASE1: Scenario where the preceding channel is nil:
    • If the block is non-blocking, return directly;
    • If block is blocking, call gopark() to block the current goroutine and throw an exception.
    • If the block is non-blocking and the channel is a non-buffered queue, the sender wait queue is empty or the channel is a buffered queue but the number of elements in the queue is 0. and the channel is not closed, this time directly return;
    • Call lock(&clock) to lock the global lock on the channel;
  • CASE2: The channel has been closed and there is no more data in the channel buffer, then return directly to success and null;
  • CASE3:sender queue is not empty, call func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) function to handle:
    • channel is a non-buffered channel, call the recvDirect function directly to recv elements from sender directly to ep object, so that only one copy is used;
    • For non-empty sender queue, the buffer queue of the buffered channel must be full:
      • First take the pair of head elements of the channel buffer queue and copy them to the receiver (that is, ep);
      • Copy the data inside the header element of the sender queue to the position of the element just popped from the channel buffer queue, so that the buffer queue doesn’t have to move the data.
    • Release the global lock on the channel;
    • Call the goready function to mark the current goroutine as ready to run;
  • CASE4:sender queue is empty buffer queue is non-empty, take queue elements directly, move header index
  • CASE5:sender queue is empty, buffer queue is also not empty and does not block the goroutine, directly return(falsefalse);
  • CASE6:sender queue is empty, cache queue of channel is empty, add aoroutine to recv queue and yin stuff.

The simple process of reading data from a channel is as follows.

  1. if the waiting send queue sendq is not empty and there is no buffer, take out G from sendq directly, read the data in G out, and finally wake up G to end the reading process.
  2. if the waiting send queue sendq is not empty, it means the buffer is full, read out the data from the first part of the buffer, write the data in G to the end of the buffer, wake up G, and end the reading process.
  3. if there is data in the buffer, retrieve the data from the buffer and end the reading process.
  4. add the current goroutine to recvq, go to sleep and wait to be woken up by the write goroutine.

The simple flowchart is as follows.

simple process of reading data from a channel

3.4 Closing a channel

The channel is closed when we perform the close operation of the channel.

The main flow of closing is shown below.

  1. obtaining a global lock.
  2. set the close flag bit of the channel data structure chan.
  3. get the read goroutines on the current channel and link them into a doubly linked list.
  4. get the write goroutine on the current channel and link it to the back of the previous read doubly linked list.
  5. release the global lock.
  6. wake up all the read and write goroutines.

When closing the channel, all Gs in recvq will be woken up, and the data location where Gs should have been written is nil. all Gs in sendq will be woken up, but these Gs will panic.

In addition, there are some common scenarios where panic occurs.

  1. closing a channel with a nil value
  2. closing a channel that has already been closed
  3. writing data to a closed channel

Section4 Common Uses

4.1 Unidirectional Channels

A unidirectional channel is one that can only be used to send or receive data.

We know that a channel can be passed as an argument, so a one-way channel is just a restriction on the use of a channel, which is the same reason that C uses const to modify function arguments to be read-only.

  • func readChan(chanName <-chan int): Restricts the function internally to read data from the channel only by means of a formal parameter
  • func writeChan(chanName chan<- int): The function can only write data to the channel internally by means of a formal parameter.

A simple example program is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func readChan(chanName <-chan int) {
    <- chanName
}
 
func writeChan(chanName chan<- int) {
    chanName <- 1
}
 
func main() {
    var mychan = make(chan int, 10)
 
    writeChan(mychan)
    readChan(mychan)
}

mychan is a normal channel, while the readChan() parameter restricts the incoming channel to read only, and the writeChan() parameter restricts the incoming channel to write only.

4.2 select

You can use select to monitor multiple channels, for example, monitor multiple channels and read data from one of them when it is available.

A simple example program is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main
 
import (
    "fmt"
    "time"
)
 
func addNumberToChan(chanName chan int) {
    for {
        chanName <- 1
        time.Sleep(1 * time.Second)
    }
}
 
func main() {
    var chan1 = make(chan int, 10)
    var chan2 = make(chan int, 10)
 
    go addNumberToChan(chan1)
    go addNumberToChan(chan2)
 
    for {
        select {
        case e := <- chan1 :
            fmt.Printf("Get element from chan1: %d\n", e)
        case e := <- chan2 :
            fmt.Printf("Get element from chan2: %d\n", e)
        default:
            fmt.Printf("No element in chan1 and chan2.\n")
            time.Sleep(1 * time.Second)
        }
    }
}

Two channels are created in the program: chan1 and chan2. The addNumberToChan() function writes data to both channels periodically. The select function monitors both channels and reads data from them when either one is readable.

The program output is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
D:\SourceCode\GoExpert\src>go run main.go
Get element from chan1: 1
Get element from chan2: 1
No element in chan1 and chan2.
Get element from chan2: 1
Get element from chan1: 1
No element in chan1 and chan2.
Get element from chan2: 1
Get element from chan1: 1
No element in chan1 and chan2.

As you can see from the output, the order in which the data is read from the channel is random. In fact, the order in which the multiple cases of the select statement are executed is random, and there is a special section on how select is implemented.

The point I want to make with this example is that select’s case statement reads the channel without blocking, even though there is no data in the channel. This is because the case statement compiles and calls the read channel with explicitly passed in non-blocking arguments, which does not add the current goroutine to the waiting queue when no data is read, but returns it directly.

4.3 range

The range can be used to continuously read data from a channel as if it were traversing an array. When there is no data in the channel, it blocks the current goroutine, the same as the blocking mechanism for reading channels.

1
2
3
4
5
func chanRange(chanName chan int) {
    for e := range chanName {
        fmt.Printf("Get element from chan: %d\n", e)
    }
}

Note: If the goroutine writing to this channel exits, the system will panic when it detects this condition, otherwise the range will block permanently.