There are two types of channels in Go language, a channel without cache and a buffer with cache, both of which we are familiar with.

Why am I suddenly talking about this infinite cache channel? The main reason is that I was recently reviewing a colleague’s code and I had a problem with a certain design idea that would have been solved by using an infinite cache channel.

  • A dispatcher contains a channel, which holds the URLs to be processed
  • A bunch of workers read tasks from the channel, download and parse the page, extract the links, and put the links into the dispatcher.channel

This colleague had to use the more complex sync.Mutex and sync.Cond in order to solve the problem of concurrency, and defined a bunch of concurrent methods to handle the logic, and here I would like to talk about my wrong idea.

When I reviewed this code, I thought that if each worker started a goroutine, processed the url, and then put the link back into the channel without the complicated Mutex+Cond, etc., but I made the mistake that if the current channel was full, then none of the workers could put the parsed If the current channel is full, then none of the workers can put the parsed results into the channel, they are “blocked” and there are no workers available to consume the url from the channel.

Of course, you can say that you can create a channel with a very large buffer to avoid being filled up, but, first, a very large buffer takes up a lot of memory, and second, how big is the right size? The key is that you can’t guarantee that the channel won’t be full.

It would be nice to have a buffer with unlimited cache length.

In 2017, a request was made to Go officials (#20352) to provide a buffer with unlimited capacity.

After a lengthy discussion, Go would not provide an implementation for this “rare” scenario, and suggested that we implement such a library to handle this scenario by way of a third library, and Griesemer provided an idea to implement such a channel by ringbuffer implementation caching.

There are two implementations of Why Go channels limit the buffer size and Building an Unbounded Channel in Go, both of which are similar.

I have wrapped a library: chanx, based on the first implementation, to provide a generic infinite cache channel.

You can star this library and put it in your codebase, it may help you one day. And I’ve already prepared a generic design, which I’ll change to a generic implementation as soon as Go generics are available.

The cache-infinite channel has the following characteristics.

  1. Does not block write . It can always process the write data, either into the channel to be read or into the cache
  2. When there is no data, reads are blocked. The goroutine reading from the channel is blocked when there is no readable data.
  3. Read and write are done through channel operations . The internal cache is not exposed
  4. Be able to query the current number of data to be read . Since there may be pending data in the cache as well, you need to return len(buffer)+len(chan)
  5. After closing the channel, the unread channel can still be read, and it is only after reading that the channel is finished . This is the same logic as a normal channel, which is called “drain” of unread data

Because we can’t modify the internal channel structure, nor can we overload the chan ← and ← chan operators, we can only encapsulate a data structure by means of two channels to provide read and write.

This data structure is:

1
2
3
4
5
type UnboundedChan struct {
    In     chan<- T // channel for write
    Out    <-chan T // channel for read
    buffer []T      // buffer
}

The In channel is used to write data, while the Out channel is used to read data. You can close the In channel, and the Out channel will be closed automatically after all the data is read. You can’t close the Out channel by yourself, and you can’t close it because it is of type <-chan.

You can get the length of all the data to be read by the Len method, or you can get only the length of the data in the cache, not the length of the data in the outgoing Out channel by BufLen.

1
2
3
4
5
6
7
8
// Len returns len of Out plus len of buffer.
func (c UnboundedChan) Len() int {
    return len(c.buffer) + len(c.Out)
}
// BufLen returns len of the buffer.
func (c UnboundedChan) BufLen() int {
    return len(c.buffer)
}

So the point is, the main logic of the implementation of the following, I added comments in the code, through the comments and code you can well understand the entire implementation of the logic:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func NewUnboundedChan(initCapacity int) UnboundedChan {
    // Create chan type with three fields and unlimited cache
	in := make(chan T, initCapacity)
	out := make(chan T, initCapacity)
	ch := UnboundedChan{In: in, Out: out, buffer: make([]T, 0, initCapacity)}
    // Through a goroutine, the data is continuously read out from in and put into out or buffer
	go func() {
		defer close(out) // in is closed, and out is also closed after the data is read
	loop:
		for {
			val, ok := <-in 
			if !ok { // If in has been closed, exit loop
				break loop
			}
			// Otherwise try to put the data read from in into out
			select {
			case out <- val: //The success of putting in means that out is not full just now and there is no additional data in the buffer to be processed, so go back to the beginning of loop
				continue
			default:
			}
			// If out is full, you need to put the data into the cache
			ch.buffer = append(ch.buffer, val)
            // Process the cache and keep trying to put the data in the cache into the out, until there is no more data in the cache,
            // In order to avoid blocking the in channel, we also try to read data from in, because out is full at this time, so we put the data directly into the cache
			for len(ch.buffer) > 0 {
				select {
				case val, ok := <-in: // Read data from in, put it into the cache, if in is closed, exit loop
					if !ok { 
						break loop
					}
					ch.buffer = append(ch.buffer, val)
				case out <- ch.buffer[0]: // Put the oldest data in the cache into out and move out the first element
					ch.buffer = ch.buffer[1:]
					if len(ch.buffer) == 0 { // Avoid memory leaks. If the cache has finished processing, revert to the original state
						ch.buffer = make([]T, 0, initCapacity)
					}
				}
			}
		}
		// After in is closed and the loop is exited, there may still be unprocessed data in the buffer, which needs to be stuffed into out
        // This logic is called "drain".
        // After this piece of logic is processed, the out can be closed out
		for len(ch.buffer) > 0 {
			out <- ch.buffer[0]
			ch.buffer = ch.buffer[1:]
		}
	}()
	return ch
}

The logic of this paragraph is still very clear, that is, the details need to pay attention to, it is also a good material for learning the use of channel.

Updated May 13

I was on a business trip today and had nothing to do for the long 5 hours on the high speed train, so I grabbed my laptop and did two things, one of which was to optimize the infinite cache channel.

The only thing I’m not too happy about is that its buffer cannot be reused, which is related to the processing of ch.buffer = ch.buffer[1:], sometimes the underlying array is very large, but it has to regenerate new data, resulting in more frequent heap allocations.

According to the design of one of the three Go giants, the underlying buffer is best implemented using ringbuffer, if the buffer is full should be able to automatically expand the capacity:

Such a library should do well in cases of very fast, “bursty” messages. A large enough buffered channel should be able to absorb bursts while a fast dedicated goroutine drains the channel into a ring buffer from which the messages are delivered at a slower pace to the final consumer of the messages. That ring buffer will need to be efficiently implemented, and will need to be able to grow efficiently (irrespective of size) and that will require some careful engineering. Better to leave that code to a library that can be tuned as needed than baking it into the runtime (and then possibly being at the mercy of release cycles).

So I implemented a ringbuffer, which is relatively simple, because here we do not need to consider the problem of concurrency, this ringbuffer will only be used in a goroutine, so its implementation is very simple, you need to pay attention to the “read to catch up with the write”, and “write full” these two boundary issues on the good. By using ringbuffer, the above implementation can be changed to the following code, which further reduces the problem of over-allocation during write bursts.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func NewUnboundedChan(initCapacity int) UnboundedChan {
	in := make(chan T, initCapacity)
	out := make(chan T, initCapacity)
	ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initCapacity)}
	go func() {
		defer close(out)
	loop:
		for {
			val, ok := <-in
			if !ok { // in is closed
				break loop
			}
			// out is not full
			select {
			case out <- val:
				continue
			default:
			}
			// out is full
			ch.buffer.Write(val)
			for !ch.buffer.IsEmpty() {
				select {
				case val, ok := <-in:
					if !ok { // in is closed
						break loop
					}
					ch.buffer.Write(val)
				case out <- ch.buffer.Peek():
					ch.buffer.Pop()
					if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst
						ch.buffer.Reset()
					}
				}
			}
		}
		// drain
		for !ch.buffer.IsEmpty() {
			out <- ch.buffer.Pop()
		}
		ch.buffer.Reset()
	}()
	return ch
}

Reference https://colobu.com/2021/05/11/unbounded-channel-in-go/