chan introduction

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package main
import "fmt"

func main() {
    c := make(chan int)

    go func() {
        c <- 1 // send to channel
    }()

    x := <-c // recv from channel

    fmt.Println(x)
}

We can view the assembly results like this.

1
2
3
4
go tool compile -N -l -S hello.go
-N表示禁用优化
-l禁用内联
-S打印结果

By doing so above, we can directly see which functions are called by chan.

sobyte

Source code analysis

Structs and creation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type hchan struct {
    qcount   uint           // 循环列表元素个数
    dataqsiz uint           // 循环队列的大小
    buf      unsafe.Pointer // 循环队列的指针
    elemsize uint16         // chan中元素的大小
    closed   uint32         // 是否已close
    elemtype *_type         // chan中元素类型
    sendx    uint           // send在buffer中的索引
    recvx    uint           // recv在buffer中的索引
    recvq    waitq          // receiver的等待队列
    sendq    waitq          // sender的等待队列 
    // 互拆锁
    lock mutex
}

qcount represents the number of elements in chan that have been received but not yet taken, the function len can return the value of this field.

dataqsiz and buf represent the size of the queue buffer, respectively, and the cap function can return the value of this field and a pointer to the queue buffer, which is a fixed-length torus array.

elemtype and elemsiz represent the type of the element in chan and the size of the element.

sendx: the position in the buffer of the pointer to the sent data.

recvx: the position in the buffer of the pointer to receive the request.

recvq and sendq denote the goroutine waiting to receive data and the goroutine waiting to send data, respectively.

The types of sendq and recvq are structures of waitq.

1
2
3
4
type waitq struct {
    first *sudog
    last  *sudog
}

The waitq is connected inside a sudog bidirectional chain table, which holds the waiting goroutine . The entire chan legend looks like this.

sobyte

Here’s a look at creating chan, we can also see through the assembly results that make(chan int) This code will call into the makechan function of runtime.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const (
    maxAlign  = 8
    hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) 
)

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // 略去检查代码
    ... 
    //计算需要分配的buf空间
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case mem == 0:
        // chan的size或者元素的size是0,不必创建buf
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector 
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // 元素不是指针,分配一块连续的内存给hchan数据结构和buf
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        // 表示hchan后面在内存里紧跟着就是buf
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 元素包含指针,那么单独分配buf
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    return c
}

First we can see the calculation of hchanSize.

1
2
maxAlign  = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

maxAlign is 8, then the binary of maxAlign-1 is 111, then and int(unsafe.Sizeof(hchan{})) to take and is to take its lower three, hchanSize is obtained as an integer multiple of 8, do alignment use.

There are three cases of switch here. The first case is that the required size of the buffer is 0, so when allocating memory for hchan, only memory of size sizeof(hchan) needs to be allocated.

The second case is where the required size of the buffer is not 0 and the data type does not contain a pointer, then allocate contiguous memory. Note that we can specify the type as a pointer type when creating the channel.

1
2
3
4
//chan里存入的是int的指针
c := make(chan *int)
//chan里存入的是int的值
c := make(chan int)

The third case is when the required size of the buffer is not 0 and the data type contains a pointer, then instead of using add to keep hchan and buf together, a separate piece of memory is requested for buf.

Sending data

channel blocking non-blocking

Before we look at the code for sending data, let’s look at what is blocking and non-blocking for a channel.

In general, a goroutine that inserts data into a channel blocks until the insertion is successful, when the argument passed is block=true, i.e., a blocking call.

Non-blocking is only this case.

1
2
3
4
5
6
select {
case c <- v:
    ... foo
default:
    ... bar
}

The compiler will change it to.

1
2
3
4
5
if selectnbsend(c, v) {
    ... foo
} else {
    ... bar
}

The block passed in by the selectnbsend method is false.

1
2
3
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())
}

chansend method

Sending data to the channel we can see from the assembly result is implemented in runtime by chansend, the method is rather long and we will understand it in sections.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        // 对于非阻塞的发送,直接返回
        if !block {
            return false
        }
        // 对于阻塞的通道,将 goroutine 挂起
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    ...
}

Here a judgment is made on chan, if it is empty, then for non-blocking sends, it returns false directly; for blocking channels, the goroutine is hung and never returned.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    // 非阻塞的情况下,如果通道没有关闭,满足以下一条:
    // 1.没有缓冲区并且当前没有接收者   
    // 2.缓冲区不为0,并且已满
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }
    ...
}

Guaranteed, so after judging closed although it is not closed, then after reading still may be in this instant from the unclosed state to closed state. Then there are two possibilities.

  • the channel is not closed and it is full, then it needs to return false, no problem.
  • the channel is closed and full, but false is returned in a non-blocking send, also no problem.

These judgments above are called fast path, because the locking operation is a very heavy operation, so the judgment that can be returned before the locking is done is best done before the locking.

Here’s a look at the locking part of the code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    //加锁
    lock(&c.lock)
    // 是否关闭的判断
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    // 从 recvq 中取出一个接收者
    if sg := c.recvq.dequeue(); sg != nil { 
        // 如果接收者存在,直接向该接收者发送数据,绕过buffer
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    ...
}

After entering the lock area, we also need to judge the status of the following close, and then take out a receiver from recvq, if there is already a receiver, then send the current enqueue message to the first receiver. The point to note here is that if there is a receiver waiting in the queue, the buffer is empty at this time.

Since we are analyzing the code line by line, let’s go to send to see the implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    if sg.elem != nil {
        // 直接把要发送的数据copy到reciever的栈空间
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 唤醒对应的 goroutine
    goready(gp, skip+1)
}

In the send method, sg is the object packed by the goroutine, ep is the pointer to the corresponding data to be sent, and the sendDirect method will call memmove to make a memory copy of the data. Then the goready function will wake up the corresponding goroutine for scheduling.

Go back to the chansend method and continue on.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    // 如果缓冲区没有满,直接将要发送的数据复制到缓冲区
    if c.qcount < c.dataqsiz {
        // 找到buf要填充数据的索引位置
        qp := chanbuf(c, c.sendx)
        ...
        // 将数据拷贝到 buffer 中
        typedmemmove(c.elemtype, qp, ep)
        // 数据索引前移,如果到了末尾,又从0开始
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 元素个数加1,释放锁并返回
        c.qcount++
        unlock(&c.lock)
        return true
    }
    ...
}

Here it will determine if the buf buffer is full, if not, then it will find the index location of the buf to be filled with data, call the typedmemmove method to copy the data to the buf, and then reset the sendx offset.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    // 缓冲区没有空间了,所以对于非阻塞调用直接返回
    if !block {
        unlock(&c.lock)
        return false
    }
    // 创建 sudog 对象
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // 将sudog 对象入队
    c.sendq.enqueue(mysg)
    // 进入等待状态
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    ...
}

For non-blocking calls it returns directly; for blocking calls it creates the sudog object, and then after queuing the sudog object gopark puts the goroutine into the waiting state and unlocks it. After gopark is called, the code statement that sends data to the channel appears to the user to be blocking.

It is also important to note here that if the buffer is 0, then it will also enter here and will call gopark to block immediately, so you need to remember to receive data when using it to prevent the end that sends data to chan from blocking forever, e.g.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func process(timeout time.Duration) bool {
    ch := make(chan bool)

    go func() {
        // 模拟处理耗时的业务
        time.Sleep((timeout + time.Second))
        ch <- true // block
        fmt.Println("exit goroutine")
    }()
    select {
    case result := <-ch:
        return result
    case <-time.After(timeout):
        return false
    }
}

If here in select directly timeout returned without calling result := <-ch, then goroutine will block forever.

This is the end of the code sent here, the whole process is roughly as follows.

For example, I want to execute: ch<-10

sobyte

  1. check if recvq is empty, if not, take a goroutine from the recvq header and send the data there.
  2. if recvq is empty,, and buf is not full, put the data into buf; if buf is full, put the data into buf
  3. if buf is full, pack the data to be sent and the current goroutine into sudog, then queue it into sendq queue and block the current goroutine in waiting state.

Receiving data

The function to get data from chan is chanrecv, and we will see the code implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if c == nil {
        // 如果 c 为空且是非阻塞调用,那么直接返回 (false,false)
        if !block {
            return
        }
        // 阻塞调用直接等待
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // 对于非阻塞的情况,并且没有关闭的情况
    // 如果是无缓冲chan或者是chan中没有数据,那么直接返回 (false,false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }
    // 上锁
    lock(&c.lock)
    // 如果已经关闭,并且chan中没有数据,返回 (true,false)
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
    ...
}

chanrecv method and chansend method is the same, first also do non-empty judgment, if chan is not initialized, then if non-blocking call, then directly return (false,false), blocking call will be directly wait;.

The following two if judgments I put together to explain, because here and chansend is not the same, chanrecv to return different results according to different conditions need.

The judgment before locking is a boundary condition judgment: if the non-blocking call will determine that chan has no sender (dataqsiz is empty and the send queue is empty), or chan’s buffer is empty (dataqsiz>0 and qcount==0) and chan is not closed, then it needs to return (false,false); while chan is closed and there is no data in buf, it needs to return (true,false).

In order to achieve this requirement, so the boundary conditions inside the chanrecv method are judged using the atomic method to obtain.

Since it is necessary to correctly get that chan is closed and the buf is empty to return (true, false) instead of (false,false), it is necessary to use atomic to get the parameters to prevent reordering (Happens Before) before locking, so the order of the qcount and closed read operations here must be secured by atomic operations order.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    // 从发送者队列获取数据
    if sg := c.sendq.dequeue(); sg != nil { 
        // 发送者队列不为空,直接从发送者那里提取数据
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    } 
    ...
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果是无缓冲区chan
    if c.dataqsiz == 0 {
        ...
        if ep != nil {
            // 直接从发送者拷贝数据
            recvDirect(c.elemtype, sg, ep)
        }
    // 有缓冲区chan
    } else { 
        // 获取buf的存放数据指针
        qp := chanbuf(c, c.recvx) 
        ...
        // 直接从缓冲区拷贝数据给接收者
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        } 
        // 从发送者拷贝数据到缓冲区
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 将发送者唤醒
    goready(gp, skip+1)
}

Here if there is a sender waiting in the queue, then the data is fetched directly from the sender and the sender is woken up. Note that since there is a sender waiting, if there is a buffer, then the buffer must be full .

Before waking up the sender, we need to make a judgment on the buffer, if there is no buffer, then we will extract the data from the sender directly; if there is a buffer, we will first get the pointer of recvx, then we will copy the data from the buffer to the receiver, then we will copy the sender data to the buffer.

Then add 1 to recvx, which is equivalent to moving the new data to the end of the queue, then assign the value of recvx to sendx, and finally call goready to wake up the sender, which is a bit roundabout here, so let’s show it through pictures.

sobyte

What is shown here is that the data is copied into the buffer in chansend, and the pointer to sendx is set to 0 when the data is full, so when the buf ring queue is full sendx equals recvx.

Then see what is done to hand over the data in the buffer when the sender queue in chanrecv has data.

sobyte

Here the data at recvx 0 will be copied directly from the buffer to the receiver, then the sender will copy the data to the buffer recvx pointer, then add 1 to the recvx pointer and assign recvx to sendx, since it is full so the effect of adding 1 to recvx is used to achieve the operation of adding the newly added data to the end of the queue.

Moving on to the next page.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    // 如果缓冲区中有数据
    if c.qcount > 0 { 
        qp := chanbuf(c, c.recvx)
        ...
        // 从缓冲区复制数据到 ep
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        // 接收数据的指针前移
        c.recvx++
        // 环形队列,如果到了末尾,再从0开始
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // 缓冲区中现存数据减一
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    ...
}

At this point, it means there is data in the buffer, but no data in the sender’s queue, then copy the data to the concurrent process of receiving data, then move the pointer of receiving data forward, if it is already at the end of the queue, then start from 0, and finally unlock the existing data in the buffer minus one.

The following is the case when there is no data in the buffer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    // 非阻塞,直接返回
    if !block {
        unlock(&c.lock)
        return false, false
    } 
    // 创建sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    } 
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 将sudog添加到接收队列中
    c.recvq.enqueue(mysg)
    // 阻塞住goroutine,等待被唤醒
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    ...
}

If it is a non-blocking call, it returns directly; a blocking call will wrap the current goroutine into a sudog, then add the sudog to the receive queue and call gopark to block the goroutine and wait to be woken up.

Closing the channel

Closing the channel will call to the closechan method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func closechan(c *hchan) {
    // 1. 校验chan是否已初始化
    if c == nil {
        panic(plainError("close of nil channel"))
    }
    // 加锁
    lock(&c.lock)
    // 如果已关闭了,那么不能被再次关闭
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    ...
    // 设置chan已关闭
    c.closed = 1
    // 申明一个存放g的list,用于存放在等待队列中的groutine
    var glist gList

    // 2. 获取所有接收者
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        // 加入队列中
        glist.push(gp)
    }

    // 获取所有发送者
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        // 加入队列中
        glist.push(gp)
    }
    unlock(&c.lock)

    // 3.唤醒所有的glist中的goroutine 
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}
  1. this method first checks if the chan has been initialized, then adds a lock and then checks if it has been closed, and if the checks all pass, then sets the closed field to a value of 1.
  2. iterate through all the receivers and senders and add their goroutines to the glist.
  3. add all the goroutines in the glist to the scheduling queue and wait to be woken up, noting here that the sender will panic after being woken up.

Summary

chan is a very powerful tool in go, it can achieve a lot of functions, but in order to be able to use it efficiently we should also understand how it is implemented inside. This article is a step-by-step analysis to understand how go’s chan is implemented from scratch, and what matters need to be noted in the use of the process, chan’s buf ring queue is how to maintain, I hope it can help you ~