There are two types of channels in Go language, a channel without cache and a buffer with cache, both of which we are familiar with.
Why am I suddenly talking about this infinite cache channel? The main reason is that I was recently reviewing a colleague’s code and I had a problem with a certain design idea that would have been solved by using an infinite cache channel.
- A dispatcher contains a channel, which holds the URLs to be processed
- A bunch of workers read tasks from the channel, download and parse the page, extract the links, and put the links into the dispatcher.channel
This colleague had to use the more complex sync.Mutex and sync.Cond in order to solve the problem of concurrency, and defined a bunch of concurrent methods to handle the logic, and here I would like to talk about my wrong idea.
When I reviewed this code, I thought that if each worker started a goroutine, processed the url, and then put the link back into the channel without the complicated Mutex+Cond, etc., but I made the mistake that if the current channel was full, then none of the workers could put the parsed If the current channel is full, then none of the workers can put the parsed results into the channel, they are “blocked” and there are no workers available to consume the url from the channel.
Of course, you can say that you can create a channel with a very large buffer to avoid being filled up, but, first, a very large buffer takes up a lot of memory, and second, how big is the right size? The key is that you can’t guarantee that the channel won’t be full.
It would be nice to have a buffer with unlimited cache length.
In 2017, a request was made to Go officials (#20352) to provide a buffer with unlimited capacity.
After a lengthy discussion, Go would not provide an implementation for this “rare” scenario, and suggested that we implement such a library to handle this scenario by way of a third library, and Griesemer provided an idea to implement such a channel by ringbuffer implementation caching.
I have wrapped a library: chanx, based on the first implementation, to provide a generic infinite cache channel.
You can star this library and put it in your codebase, it may help you one day. And I’ve already prepared a generic design, which I’ll change to a generic implementation as soon as Go generics are available.
The cache-infinite channel has the following characteristics.
- Does not block write . It can always process the write data, either into the channel to be read or into the cache
- When there is no data, reads are blocked. The goroutine reading from the channel is blocked when there is no readable data.
- Read and write are done through channel operations . The internal cache is not exposed
- Be able to query the current number of data to be read . Since there may be pending data in the cache as well, you need to return len(buffer)+len(chan)
- After closing the channel, the unread channel can still be read, and it is only after reading that the channel is finished . This is the same logic as a normal channel, which is called “drain” of unread data
Because we can’t modify the internal channel structure, nor can we overload the chan ← and ← chan operators, we can only encapsulate a data structure by means of two channels to provide read and write.
This data structure is:
In channel is used to write data, while the
Out channel is used to read data. You can close the In channel, and the Out channel will be closed automatically after all the data is read. You can’t close the Out channel by yourself, and you can’t close it because it is of type <-chan.
You can get the length of all the data to be read by the
Len method, or you can get only the length of the data in the cache, not the length of the data in the outgoing Out channel by BufLen.
So the point is, the main logic of the implementation of the following, I added comments in the code, through the comments and code you can well understand the entire implementation of the logic:
The logic of this paragraph is still very clear, that is, the details need to pay attention to, it is also a good material for learning the use of channel.
Updated May 13
I was on a business trip today and had nothing to do for the long 5 hours on the high speed train, so I grabbed my laptop and did two things, one of which was to optimize the infinite cache channel.
The only thing I’m not too happy about is that its buffer cannot be reused, which is related to the processing of
ch.buffer = ch.buffer[1:], sometimes the underlying array is very large, but it has to regenerate new data, resulting in more frequent heap allocations.
According to the design of one of the three Go giants, the underlying buffer is best implemented using ringbuffer, if the buffer is full should be able to automatically expand the capacity:
Such a library should do well in cases of very fast, “bursty” messages. A large enough buffered channel should be able to absorb bursts while a fast dedicated goroutine drains the channel into a ring buffer from which the messages are delivered at a slower pace to the final consumer of the messages. That ring buffer will need to be efficiently implemented, and will need to be able to grow efficiently (irrespective of size) and that will require some careful engineering. Better to leave that code to a library that can be tuned as needed than baking it into the runtime (and then possibly being at the mercy of release cycles).
So I implemented a ringbuffer, which is relatively simple, because here we do not need to consider the problem of concurrency, this ringbuffer will only be used in a goroutine, so its implementation is very simple, you need to pay attention to the “read to catch up with the write”, and “write full” these two boundary issues on the good. By using ringbuffer, the above implementation can be changed to the following code, which further reduces the problem of over-allocation during write bursts.