01 channel definition

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// /usr/data/go1.17/go/src/runtime/chan.go

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters
	lock mutex
}

type waitq struct {
	first *sudog
	last  *sudog
}

The underlying data structure of channel is hchan, which itself consists of buf forming a circular linked list, which is a circular queue.

  • qCount indicates the length of the data in the channel.
  • dataqsiz indicates the size of the ring queue.
  • elemsize indicates the occupancy size of the element type and is used to calculate the memory size occupied by the element of the current type.
  • sendx and recvx are used to identify the index in buf corresponding to the current channel send and receive, respectively.
  • sendq and recvq denote Goroutine queues blocking due to insufficient buffering, respectively, using the Doubly linked list representation. The underlying representation of g here is *sudog
  • closed indicates whether channel has been closed.
  • lock is used as a mutually exclusive lock when operations are performed on buf.

02 Initialize channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	var c *hchan
	switch {
	case mem == 0:
		c = (*hchan)(mallocgc(hchanSize, nil, true))		
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:		
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)
	...	
	return c
}

The initialization process is the process of initializing hchan. It calculates the size of the buffer needed and requests the corresponding memory. Create a channel with the following statement.

1
2
3
4
5
6
ch := make(chan int,8)

// hchan.dataqsiz = 8
// hchan.elemtype = int
// hchan.elemsiz = 32
// hchan.buf = [8]int{}

03 Sending data

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    lock(&c.lock)
    if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
    
    // When there are receivers waiting to be consumed, send directly
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
	    return true
    }
    
    // Write data to buffer when buffer is available
    if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        ...
        typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
    }
    // When the buffer is insufficient or there is no buffer, it directly waits for other receivers to receive it, when the current sending Goroutine blocks.
    ...    
    gp := getg()
	mysg := acquireSudog()
    ...   
    mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
    ...
}
  • The channel is used to connect different go routines, so it may be operated by multiple p’s at the same time and there is a data competition problem. Therefore, it is necessary to put a lock on the channel when sending data or receiving data to ensure data security.
  • When there is a receiver waiting to be consumed, send directly by calling the send() method
  • When there is a buffer, write the data to the buffer.
  • When the buffer is insufficient or there is no buffer, it directly waits for other receivers to receive it, when the current sending Goroutine blocks.
1
2
// runtime/chan.go
typedmemmove(c.elemtype, qp, ep)

Sending here means copying data from the original Goroutine memory address to the address of the target receiving Goroutine.

04 Receive data

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
    // If there is a g being sent, receive directly from the sending g
    if sg := c.sendq.dequeue(); sg != nil {
    	 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
    }
    // If buffer data is available, copy from buffer
    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
       	...
        if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
        ...
    }
    // Direct blocking, waiting for a g to be sent
    ...
    gp := getg()
    mysg := acquireSudog()
    ...
    gp.param = nils
	c.recvq.enqueue(mysg)
    ...
}
  • If there is a g being sent, receive it directly from the sending g.
  • If there is buffer data, copy it from the buffer.
  • Otherwise block directly and wait for a g to be sent.