Introduction to Pool

Go is known to be an automatic garbage collection programming language that uses a three-color concurrent tagging algorithm to tag objects and recycle them. If you want to develop a high-performance application using Go, you have to consider the performance impact of garbage collection. Because Go has a STW (stop-the-world) time during garbage collection, and it takes time to mark objects if there are too many.

So if you use an object pool to create objects and increase the reuse of objects, you don’t have to recreate objects on the heap when you use them, which saves overhead.

In Go, sync.Pool provides object pooling. It provides three methods to the public: New, Get and Put. Here is a short example to illustrate the use of Pool.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var pool *sync.Pool
type Person struct {
    Name string
}

func init() {
    pool = &sync.Pool{
        New: func() interface{}{
            fmt.Println("creating a new person")
            return new(Person)
        },
    }
}

func main() {

    person := pool.Get().(*Person)
    fmt.Println("Get Pool Object:", person)

    person.Name = "first"
    pool.Put(person)

    fmt.Println("Get Pool Object:",pool.Get().(*Person))
    fmt.Println("Get Pool Object:",pool.Get().(*Person))

}

Results.

1
2
3
4
5
creating a new person
Get Pool Object: &{}
Get Pool Object: &{first}
creating a new person
Get Pool Object: &{}

Here I used the init method to initialize a pool, then get three times and put once into the pool, if there is no object in the pool, then the New function will be called to create a new object, otherwise it will be obtained from the object put in.

Source code analysis

1
2
3
4
5
6
7
8
type Pool struct {
    noCopy noCopy 
    local     unsafe.Pointer  
    localSize uintptr 
    victim     unsafe.Pointer 
    victimSize uintptr 
    New func() interface{}
}

noCopy inside the Pool structure means that this structure is forbidden to be copied, it can be effective when we use the go vet tool.

local is a pointer to a poolLocal array, localSize represents the size of this array; similarly victim is also a pointer to a poolLocal array, each time the garbage collection, Pool will remove the object in victim, and then give the local data to victim; local and victim’s logic will be described in detail below.

The New function is set when the pool is created, when the pool has no cached objects, it will call the New method to generate a new object.

Let’s move on to the structure of the pool.

sobyte

1
2
3
4
type poolLocal struct {
    poolLocalInternal 
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

The local field stores a pointer to a poolLocal array. The size of the poolLocal array is the number of P’s in the goroutine, and when accessed, the id of the P corresponds to the poolLocal array subscript index, so the maximum number of Pools runtime.GOMAXPROCS(0).

By this design, each P has its own local space, and multiple goroutines use the same Pool to reduce competition and improve performance. Students who are confused about the P, G, and M of goroutine may want to check out this article: The Go scheduler.

There is a pad array inside the poolLocal to prevent multiple poolLocalInternal allocations on the cache line from causing false sharing, and you can see this article What’s false sharing and how to solve it, where the definition of Definition of false sharing.

That’s what false sharing is: one core update a variable would force other cores to update cache either.

1
2
3
4
type poolLocalInternal struct {
    private interface{} // Can be used only by the respective P.
    shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}

poolLocalInternal contains two fields private and shared.

private represents an element of the cache that can only be accessed by one of the corresponding P’s. Since a P can only execute one goroutine at a time, there is no concurrency problem.

shared can be accessed by any P, but only the local P can pushHead/popHead, other P can popTail.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type poolChain struct { 
    head *poolChainElt 
    tail *poolChainElt
}

type poolChainElt struct {
    poolDequeue 
    next, prev *poolChainElt
}

type poolDequeue struct { 
    headTail uint64 
    vals []eface
}

poolChain is a double-ended queue, in which the head and tail point to the head and tail of the queue respectively; poolDequeue stores the real data inside, and is a single producer, multiple consumers of a fixed-size unlocked ring queue, headTail is a pointer to the first position of the ring queue, and the position of the head and tail can be resolved by bitwise operations, and the producer can insert and delete from the head, while the consumer can only delete from the tail.

The model of this double-ended queue is roughly like this.

sobyte

poolDequeue inside the ring queue size is fixed, later analysis of the source code we will see, when the ring queue is full will create a size is twice the original size of the ring queue. Everyone this picture to experience, will be repeatedly used.

Get method

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (p *Pool) Get() interface{} {
    ...
    //1.把当前goroutine绑定在当前的P上
    l, pid := p.pin()
    //2.优先从local的private中获取
    x := l.private
    l.private = nil
    if x == nil { 
        //3,private没有,那么从shared的头部获取
        x, _ = l.shared.popHead()
        //4. 如果都没有,那么去别的local上去偷一个
        if x == nil {
            x = p.getSlow(pid)
        }
    }
    //解除抢占
    runtime_procUnpin()
    ...
    //5. 如果没有获取到,尝试使用New函数生成一个新的
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}
  • This code first binds the current goroutine to the current P to return the corresponding local, then tries to get it from the local’s private, and then needs to leave the private field empty because it already has the desired object.
  • not available in private, then go to the head of shared to get it.
  • shared is not there either, then try to iterate through all the locals and try to pop an element from their shared.
  • Finally, if it’s still not there, then just call the pre-set New function and create one out of it.

pin

1
2
3
4
5
6
7
8
9
func (p *Pool) pin() (*poolLocal, int) {
    pid := runtime_procPin() 
    s := atomic.LoadUintptr(&p.localSize) // load-acquire
    l := p.local                          // load-consume
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    return p.pinSlow()
}

pin method will first call the runtime_procPin method will first get the current goroutine, and then bind to the corresponding M, and then return the M currently bound to the id of the P, because the pid will be used later, to prevent the use of the way P is occupied.

If the current pid is greater than the localSize, then it means that the Pool has not yet created the corresponding poolLocal, then call pinSlow to create the work, otherwise call indexLocal to take out the pid corresponding to the poolLocal and return.

1
2
3
4
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    return (*poolLocal)(lp)
}

indexLocal inside is using the address operation, the passed i is the index value of the array, so you need to get the size of poolLocal{} to do a bit of address displacement operation, and then turn it into a poolLocal address to return.

pinSlow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *Pool) pinSlow() (*poolLocal, int) { 
    // 解除pin
    runtime_procUnpin()
    // 加上全局锁
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    // pin住
    pid := runtime_procPin() 
    s := p.localSize
    l := p.local
    // 重新对pid进行检查
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    // 初始化local前会将pool放入到allPools数组中
    if p.local == nil {
        allPools = append(allPools, p)
    } 
    // 当前P的数量
    size := runtime.GOMAXPROCS(0)
    local := make([]poolLocal, size)
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))  
    atomic.StoreUintptr(&p.localSize, uintptr(size))         
    return &local[pid], pid
}

Because allPoolsMu is a global Mutex lock, locking will be slow and may be blocked, so the runtime_procUnpin method is called before locking to unlock the pin operation.

After unbinding, pinSlow may have been called by other threads and p.local may have changed. So at this point the pid needs to be checked again.

Finally, the local is initialized and the local and localSize are set using atomic operations to return the local corresponding to the current P.

This is the end of the pin method. Draw a simple diagram to describe the whole process.

sobyte

Let’s go back to the Get method down below, and I’ll post the code again for easier reading.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (p *Pool) Get() interface{} {
    ...
    //2.优先从local的private中获取
    x := l.private
    l.private = nil
    if x == nil { 
        //3,private没有,那么从shared的头部获取
        x, _ = l.shared.popHead()
        //4. 如果都没有,那么去别的local上去偷一个
        if x == nil {
            x = p.getSlow(pid)
        }
    }
    ...
    return x
}

If there is no value in private, then the popHead method of shared will be called to get the value.

popHead

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (c *poolChain) popHead() (interface{}, bool) {
    // 这里头部是一个poolChainElt
    d := c.head
    // 遍历poolChain链表
    for d != nil {
        // 从poolChainElt的环状列表中获取值
        if val, ok := d.popHead(); ok {
            return val, ok
        } 
        // load poolChain下一个对象
        d = loadPoolChainElt(&d.prev)
    }
    return nil, false
}

The popHead method will get the head node of the poolChain, so if you don’t remember the data structure of the poolChain, it is recommended that you turn up and come back.

Then there is a for loop will traverse down from the head node of the poolChain one by one, until the object is returned.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (d *poolDequeue) popHead() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        // headTail的高32位为head,低32位为tail
        head, tail := d.unpack(ptrs)
        // 首尾相等,那么这个队列就是空的
        if tail == head { 
            return nil, false
        } 
        // 这里需要head--之后再获取slot
        head--
        ptrs2 := d.pack(head, tail)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { 
            slot = &d.vals[head&uint32(len(d.vals)-1)]
            break
        }
    } 
    val := *(*interface{})(unsafe.Pointer(slot))
    // 说明没取到缓存的对象,返回 nil
    if val == dequeueNil(nil) {
        val = nil
    } 
    // 重置slot 
    *slot = eface{}
    return val, true
}
  • The popHead method of poolDequeue will first get the value of headTail, and then call unpack to unpack it. headTail is a 64-bit value, with the high 32 bits representing head and the low 32 bits representing tail.
  • determine if head and tail are equal, equal then the queue is empty.
  • If the queue is not empty, then the head will be used after subtracting one, because head currently refers to the location of the null value, indicating the location of the next new object stored.
  • CAS reset the value of the new headTail, after success to get the slot, here because the size of vals is the nth power of 2, so len(d.vals)-1) after the low n bits are all 1, and head after taking and can get the value of the low n bits of the head.
  • If the object corresponding to the slot is dequeueNil, then it means it is null and return directly, otherwise the value of the corresponding position of the slot pointer is set to null and return val.

If shared’s popHead method also did not get the value, then you need to call getSlow method to get.

getSlow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (p *Pool) getSlow(pid int) interface{} { 
    size := atomic.LoadUintptr(&p.localSize) // load-acquire
    locals := p.local                        // load-consume 
    // 遍历locals列表,从其他的local的shared列表尾部获取对象
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    } 
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    // victim的private不为空则返回
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    //  遍历victim对应的locals列表,从其他的local的shared列表尾部获取对象
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    } 
    // 获取不到,将victimSize置为0
    atomic.StoreUintptr(&p.victimSize, 0)
    return nil
}

The getSlow method iterates through the list of locals, and here it is important to note that the iteration starts at the poolLocal with index pid+1, and tries to call the popTail method of shared to get the object; if it doesn’t get it, then it looks for it from victim. If none is found, then set victimSize to 0 and don’t look for victim next time.

poolChain&popTail

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (c *poolChain) popTail() (interface{}, bool) {
    d := loadPoolChainElt(&c.tail)
    // 如果最后一个节点是空的,那么直接返回
    if d == nil {
        return nil, false
    }

    for { 
        // 这里获取的是next节点,与一般的双向链表是相反的
        d2 := loadPoolChainElt(&d.next)
        // 获取尾部对象
        if val, ok := d.popTail(); ok {
            return val, ok
        }

        if d2 == nil { 
            return nil, false
        } 
        // 因为d已经没有数据了,所以重置tail为d2,并删除d2的上一个节点
        if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
            storePoolChainElt(&d2.prev, nil)
        }
        d = d2
    }
}
  • determine the poolChain, if the last node is empty, then return directly.
  • enter the for loop, get the next node of tail, here it should be noted that this two-way chain table and the general chain table is reversed, not clear can go back to see the first picture.
  • call popTail to get the object of the poolDequeue list, there are objects returned directly.
  • d2 is empty means that it has traversed the entire poolChain bidirectional list, are empty, then return directly; * d2 is empty means that it has traversed the entire poolChain bidirectional list, are empty, then return directly.
  • reset tail to d2 by CAS, because d has no more data, and set the prev node of d2 to nil, then set d to d2 and enter the next loop.

poolDequeue&popTail

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (d *poolDequeue) popTail() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        // 和pophead一样,将headTail解包
        head, tail := d.unpack(ptrs)
        // 首位相等,表示列表中没有数据,返回
        if tail == head { 
            return nil, false
        } 
        ptrs2 := d.pack(head, tail+1)
        // CAS重置tail位置
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { 
            // 获取tail位置对象
            slot = &d.vals[tail&uint32(len(d.vals)-1)]
            break
        }
    } 
    val := *(*interface{})(unsafe.Pointer(slot))
    // 判断对象是不是为空
    if val == dequeueNil(nil) {
        val = nil
    } 
    // 将slot置空
    slot.val = nil
    atomic.StorePointer(&slot.typ, nil) 
    return val, true
}

If you understand popHead, the popTail method is very similar to it.

popTail simply removes an element from the end of the queue and returns false if the queue is empty, but note that this popTail may be called by multiple consumers, so it needs to loop through CAS to get the object; in the poolDequeue ring list tail has data, so it doesn’t need to be like popHead where head--. .

Finally, the slot needs to be empty.

You can review the code against the diagram again.

sobyte

Put method

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    ...
    l, _ := p.pin()
    if l.private == nil {
        l.private = x
        x = nil
    }
    if x != nil {
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
    ...
}

After looking at the Get method, it’s much easier to look at the Put method. Again the Put method first goes to Pin the current goroutine and P, and then tries to assign x to the private field. If private is not empty, then pushHead is called to put it into the shared queue.

poolChain&pushHead

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (c *poolChain) pushHead(val interface{}) {
    d := c.head
    // 头节点没有初始化,那么设值一下
    if d == nil {
        const initSize = 8 // Must be a power of 2
        d = new(poolChainElt)
        d.vals = make([]eface, initSize)
        c.head = d
        storePoolChainElt(&c.tail, d)
    }
    // 将对象加入到环状队列中
    if d.pushHead(val) {
        return
    }
    newSize := len(d.vals) * 2
    // 这里做了限制,单个环状队列不能超过2的30次方大小
    if newSize >= dequeueLimit {
        newSize = dequeueLimit
    }
    // 初始化新的环状列表,大小是d的两倍
    d2 := &poolChainElt{prev: d}
    d2.vals = make([]eface, newSize)
    c.head = d2
    storePoolChainElt(&d.next, d2)
    // push到新的队列中
    d2.pushHead(val)
}

If the head node is empty, then a new poolChainElt object needs to be created as the head node, of size 8; then pushHead is called to put it into the ring queue.

if placement fails, then create a poolChainElt node and double the length of the double-ended queue, which of course cannot exceed dequeueLimit, i.e. 2 to the 30th power.

Then bind the new nodes d2 and d to each other and set d2 to the value of the head node and push the incoming object into d2; the

poolDequeue&pushHead

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (d *poolDequeue) pushHead(val interface{}) bool {
    ptrs := atomic.LoadUint64(&d.headTail)
    // 解包headTail
    head, tail := d.unpack(ptrs)
    // 判断队列是否已满
    if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { 
        return false
    }
    // 找到head的槽位
    slot := &d.vals[head&uint32(len(d.vals)-1)] 
    // 检查slot是否和popTail有冲突
    typ := atomic.LoadPointer(&slot.typ)
    if typ != nil { 
        return false
    } 
    if val == nil {
        val = dequeueNil(nil)
    }
    // 将 val 赋值到 slot,并将 head 指针值加 1
    *(*interface{})(unsafe.Pointer(slot)) = val 
    atomic.AddUint64(&d.headTail, 1<<dequeueBits)
    return true
}

First determine if the queue is full by bitwise operation, that is, add len(d.vals) to the tail pointer, because head points to the position that will be filled, so the head and tail positions are separated by len(d.vals), and then take the lower 31 bits to see if it is equal to head. If the queue is full, return false directly.

then find the slot slot of head, and determine whether typ is empty, because popTail is set val first, and then set typ to nil, so if there is a conflict, then return directly.

finally set the value slot and return the head plus 1.

GC

In the init function of the pool.go file, the function of how to clean up the Pool when GC occurs is registered.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func init() {
    runtime_registerPoolCleanup(poolCleanup)
}

func poolCleanup() { 
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    } 
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    } 
    oldPools, allPools = allPools, nil
}

poolCleanup will be called during the STW phase. The main purpose is to swap local and victim, then instead of GC emptying all the Pools, it will take two GC cycles to be released. If the rate of sync.Pool acquisition and release is stable, then no new pool objects will be allocated.

Summary

The concept of Pool is a very important tool in backend optimization, such as using Http connection pools when using Http, and database connection pools when using databases. These through object reuse and pre-allocation can reduce the pressure on the server.

When we are in the later stages of project development, if we find that the GC time consumption is very high, there are a large number of temporary objects may wish to consider using Pool.

For example, we find that the number of goroutines in the current system is very large, and since the initial stack of a goroutine is 2048 bytes, it is very time-consuming to run hundreds of thousands of goroutines on a server; at this time, we can consider using Worker Pool to reduce the use of goroutines.