Reflecting channels in gong

1. The “limitations” of channel syntax

The Go language implements a concurrency scheme based on the theory of CSP (Communicating Sequential Processes). The scheme consists of two important elements: the Goroutine, which is the basic building block and execution unit for concurrent design of Go applications, and the channel, which plays an important role in the concurrency model. channel can be used to implement both inter-Goroutine communication and inter-Goroutine synchronization.

Let’s start with a brief review of the general syntax about channels.

We can create an instance of a channel type with element type T and capacity n by making(chan T, n), e.g.

1
2
ch1 := make(chan int)    // Create an unbuffered channel instance ch1
ch2 := make(chan int, 5)  // Create a buffered channel instance ch2

Go provides the “<-” operator for sending and receiving operations on channel type variables. Here are some code examples for sending and receiving operations on the above channels ch1 and ch2.

1
2
3
4
ch1 <- 13    // Send the integer literal value 13 to the unbuffered channel type variable ch1
n := <- ch1  // Receive an integer value from the unbuffered channel type variable ch1 and store it in the integer variable n
ch2 <- 17    // Send the integer literal value 17 to the buffered channel type variable ch2
m := <- ch2  // Receive an integer value from the buffered channel type variable ch2 and store it in the integer variable m

Go not only provides syntax to operate on channels individually, but also provides select-case syntax to operate on multiple channels at the same time, such as the following code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
select {
case x := <-ch1:     // Receive data from channel ch1
  ... ...

case y, ok := <-ch2: // Receive data from channel ch2, and determine if ch2 is closed based on the ok value
  ... ...

case ch3 <- z:       // Send the z-value to channel ch3:
  ... ...

default:             // The default branch is executed when none of the channel communications in the case above are triggered
}

We see that the number of cases in the select syntax must be fixed, and we can only prepare the channels to be “listened” by select in advance and tile them in the select statement. This is a limitation of the regular syntax of the select statement, i.e., select syntax does not support dynamic case collections. If the number of channels we want to listen to is indeterminate and changes dynamically at runtime, then select syntax will not meet our requirements.

So how do you break this limitation? You can use with the reflect package.

2. reflect.Select and reflect.SelectCase

Many of you, like me, may have assumed that reflect’s ability to manipulate channels was only available in newer versions of Go because you hadn’t used the reflect package to manipulate channels. Select, the function used to manipulate channels in the reflect package, and SelectCase, the element type for its slice parameter, were added to Go as early as Go 1.1.

Reflect.Select and reflect.SelectCase

So how does it work? Let’s look at some examples.

First, let’s look at the first and best understood case, i.e. the select of receive operations from a dynamic set of channels.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-recv/main.go
package main

import (
    "fmt"
    "math/rand"
    "reflect"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    var rchs []chan int
    for i := 0; i < 10; i++ {
        rchs = append(rchs, make(chan int))
    }

    var cases = createRecvCases(rchs)

    // Consumer goroutine
    go func() {
        defer wg.Done()
        for {
            chosen, recv, ok := reflect.Select(cases)
            if ok {
                fmt.Printf("recv from channel [%d], val=%v\n", chosen, recv)
                continue
            }
            // one of the channels is closed, exit the goroutine
            fmt.Printf("channel [%d] closed, select goroutine exit\n", chosen)
            return
        }
    }()

    // Producer goroutine
    go func() {
        defer wg.Done()
        var n int
        s := rand.NewSource(time.Now().Unix())
        r := rand.New(s)
        for i := 0; i < 10; i++ {
            n = r.Intn(10)
            rchs[n] <- n
        }
        close(rchs[n])
    }()

    wg.Wait()
}

func createRecvCases(rchs []chan int) []reflect.SelectCase {
    var cases []reflect.SelectCase

    for _, ch := range rchs {
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(ch),
        })
    }
    return cases
}

In this example, we create a slice of element type reflect.SelectCase with the function createRecvCases, and then use reflect.Select to listen to this slice collection and, just like the regular select syntax, randomly select one from the set of recv channels with data and return it.

Reflect.SelectCase has three fields.

1
2
3
4
5
6
// $GOROOT/src/reflect/value.go
type SelectCase struct {
    Dir  SelectDir // direction of case
    Chan Value     // channel to use (for send or receive)
    Send Value     // value to send (for send)
}

The value of the Dir field is an “enumeration” with the following values.

1
2
3
4
5
6
7
// $GOROOT/src/reflect/value.go
const (
    _             SelectDir = iota
    SelectSend              // case Chan <- Send
    SelectRecv              // case <-Chan:
    SelectDefault           // default
)

We can also see from the constant names that Dir is used to identify the type of case, SelectRecv means that this is a case that does receive from a channel, SelectSend means that this is a case that does send to a channel, and SelectDefault means that this is a default case.

Once the slices of the SelectCase are constructed, we can pass them to reflect.Select. the semantics of the Select function are the same as the semantics of the select keyword, and it will listen to all incoming SelectCases. in the example above, if all channels have no data, then reflect.Select will block until a channel has data or is closed.

The Select function has three return values.

1
2
// $GOROOT/src/reflect/value.go
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

For the above example, if a case listening has data, the return value of Select stores the index of the channel in the cases slice in chosen and the value received from the channel in recv. recvOK is equivalent to ok in comma, ok mode. When the value sent by the send channel operation is received normally, recvOK is true, if the channel is closed, recvOK is false.

The above example starts two goroutines. One goroutine acts as a consumer, listening to a set of channels by reflect.Select, and exits when a channel is closed. The other goroutine sends data to these channels randomly, and after 10 sends, closes one of the channels to notify the consumer to exit.

Let’s run the sample program and get the following results.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
$go run main.go
recv from channel [1], val=1
recv from channel [4], val=4
recv from channel [5], val=5
recv from channel [8], val=8
recv from channel [1], val=1
recv from channel [1], val=1
recv from channel [8], val=8
recv from channel [3], val=3
recv from channel [5], val=5
recv from channel [9], val=9
channel [9] closed, select goroutine exit

We often add default branches to select statements in our everyday coding to prevent select from blocking completely, so let’s revamp the example to add support for default branches.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-recv-with-default/main.go

package main

import (
    "fmt"
    "math/rand"
    "reflect"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    var rchs []chan int
    for i := 0; i < 10; i++ {
        rchs = append(rchs, make(chan int))
    }

    var cases = createRecvCases(rchs, true)

    // Consumer goroutine
    go func() {
        defer wg.Done()
        for {
            chosen, recv, ok := reflect.Select(cases)
            if cases[chosen].Dir == reflect.SelectDefault {
                fmt.Println("choose the default")
                continue
            }
            if ok {
                fmt.Printf("recv from channel [%d], val=%v\n", chosen, recv)
                continue
            }
            // one of the channels is closed, exit the goroutine
            fmt.Printf("channel [%d] closed, select goroutine exit\n", chosen)
            return
        }
    }()

    // Producer goroutine
    go func() {
        defer wg.Done()
        var n int
        s := rand.NewSource(time.Now().Unix())
        r := rand.New(s)
        for i := 0; i < 10; i++ {
            n = r.Intn(10)
            rchs[n] <- n
        }
        close(rchs[n])
    }()

    wg.Wait()
}

func createRecvCases(rchs []chan int, withDefault bool) []reflect.SelectCase {
    var cases []reflect.SelectCase

    for _, ch := range rchs {
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(ch),
        })
    }

    if withDefault {
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectDefault,
            Chan: reflect.Value{},
            Send: reflect.Value{},
        })
    }

    return cases
}

In this example, our createRecvCases function adds a withDefault boolean parameter, and when withDefault is true, the returned cases slice will contain a default case. we see that when creating a defaultCase, the fields Chan and Send need to be passed in with an empty reflect.Value.

In the consumer goroutine, we determine if the default case is selected by whether the Dir field of the selected case is reflect.SelectDefault or not, and the rest of the processing logic remains the same.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$go run main.go
recv from channel [8], val=8
recv from channel [8], val=8
choose the default
choose the default
choose the default
choose the default
choose the default
recv from channel [1], val=1
choose the default
choose the default
choose the default
recv from channel [3], val=3
recv from channel [6], val=6
choose the default
choose the default
recv from channel [0], val=0
choose the default
choose the default
choose the default
recv from channel [5], val=5
recv from channel [2], val=2
choose the default
choose the default
choose the default
recv from channel [2], val=2
choose the default
choose the default
recv from channel [2], val=2
choose the default
choose the default
channel [2] closed, select goroutine exit

We see that the default case has a pretty good chance of being selected.

Finally, let’s see how to use the reflect package to send data to the channel, see the following sample code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-send/main.go

package main

import (
    "fmt"
    "reflect"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    ch0, ch1, ch2 := make(chan int), make(chan int), make(chan int)
    var schs = []chan int{ch0, ch1, ch2}

    var cases = createCases(schs)

    // Producer goroutine
    go func() {
        defer wg.Done()
        for range cases {
            chosen, _, _ := reflect.Select(cases)
            fmt.Printf("send to channel [%d], val=%v\n", chosen, cases[chosen].Send)
            cases[chosen].Chan = reflect.Value{}
        }
        fmt.Println("select goroutine exit")
        return
    }()

    // Consumer goroutine
    go func() {
        defer wg.Done()
        for range schs {
            var v int
            select {
            case v = <-ch0:
                fmt.Printf("recv %d from ch0\n", v)
            case v = <-ch1:
                fmt.Printf("recv %d from ch1\n", v)
            case v = <-ch2:
                fmt.Printf("recv %d from ch2\n", v)
            }
        }
    }()

    wg.Wait()
}

func createCases(schs []chan int) []reflect.SelectCase {
    var cases []reflect.SelectCase

    for i, ch := range schs {
        n := i + 100
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectSend,
            Chan: reflect.ValueOf(ch),
            Send: reflect.ValueOf(n),
        })
    }

    return cases
}

In this example, we have created SelectCases for three channels: ch0, ch1 and ch2, and the Send field of each SelectCase is assigned a value to be sent to that channel, here “100+index” is used.

One thing that is “different” in the producer goroutine is that every time a write operation is triggered, I reset the Chan in that SelectCase to an empty Value to prevent the channel from being re-elected next time.

1
cases[chosen].Chan = reflect.Value{}

Run the example and the output will be as follows.

1
2
3
4
5
6
7
8
$go run main.go
recv 101 from ch1
send to channel [1], val=101
send to channel [0], val=100
recv 100 from ch0
recv 102 from ch2
send to channel [2], val=102
select goroutine exit

Through the above examples we see that reflect.Select has equivalent semantics with select, and also supports dynamic addition, deletion and modification of case, the function can not be not powerful, now there is still a point to focus on, that is, how is its performance? Let’s move on to the next.

3. reflect.Select’s performance

Let’s use benchmark test to compare the performance difference between regular select and reflect.Select.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-benchmark/benchmark_test.go
package main

import (
    "reflect"
    "testing"
)

func createCases(rchs []chan int) []reflect.SelectCase {
    var cases []reflect.SelectCase

    for _, ch := range rchs {
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(ch),
        })
    }
    return cases
}

func BenchmarkSelect(b *testing.B) {
    var c1 = make(chan int)
    var c2 = make(chan int)
    var c3 = make(chan int)

    go func() {
        for {
            c1 <- 1
        }
    }()
    go func() {
        for {
            c2 <- 2
        }
    }()
    go func() {
        for {
            c3 <- 3
        }
    }()

    b.ReportAllocs()
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        select {
        case <-c1:
        case <-c2:
        case <-c3:
        }
    }
}

func BenchmarkReflectSelect(b *testing.B) {
    var c1 = make(chan int)
    var c2 = make(chan int)
    var c3 = make(chan int)

    go func() {
        for {
            c1 <- 1
        }
    }()
    go func() {
        for {
            c2 <- 2
        }
    }()
    go func() {
        for {
            c3 <- 3
        }
    }()

    chs := createCases([]chan int{c1, c2, c3})

    b.ReportAllocs()
    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        _, _, _ = reflect.Select(chs)
    }
}

Run the benchmark.

1
2
3
4
5
6
7
8
9
$go test -bench .
goos: darwin
goarch: amd64
pkg: github.com/bigwhite/experiments/reflect-operate-channel/select-benchmark
... ...
BenchmarkSelect-8            2765396           427.8 ns/op         0 B/op          0 allocs/op
BenchmarkReflectSelect-8     1839706           806.0 ns/op       112 B/op          6 allocs/op
PASS
ok      github.com/bigwhite/experiments/reflect-operate-channel/select-benchmark    3.779s

We see that the execution efficiency of reflect.Select is still worse than select, and additional memory allocation has to be done during its execution.

4. Summary

This article introduced the structure of reflec.Select and SelectCase and how to use them to manipulate channels in different scenarios. but in most cases, we don’t need to use reflect.Select, the regular select syntax is sufficient for our requirements. And reflect.Select has a constraint on the number of cases, the maximum number of cases supported is 65536, although this constraint is sufficient for most occasions.

5. Ref

  • https://tonybai.com/2022/11/15/using-reflect-to-manipulate-channels/