Generic Queues

In kubernetes, using go’s channel can’t satisfy kubernetes application scenarios, such as delaying, rate-limiting, etc.; there are three kinds of queues in kubernetes: common queue, delaying queue, and rate limiters queue.

Inferface

Interface is defined as an abstraction of all queues.

1
2
3
4
5
6
7
8
type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShuttingDown() bool
}

Implementation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type Type struct { // 一个work queue
    queue []t // queue用slice做存储
    dirty set // 脏位,定义了需要处理的元素,类似于操作系统,表示已修改但为写入
    processing set // 当前正在处理的元素集合
    cond *sync.Cond
    shuttingDown bool
    metrics queueMetrics
    unfinishedWorkUpdatePeriod time.Duration
    clock                      clock.Clock
}
type empty struct{}
type t interface{} // t queue中的元素
type set map[t]empty // dirty 和 processing中的元素

You can see that the core properties are queue , dirty , processing.

Delay queue

Before looking at priority queues, you need to have some understanding of Heap, because delay queue uses heap for delay queues.

Heap

Heap is a special data structure based on the tree property; heap is a fully binary tree type with two types.

  • For example, if B is a child node of A, $key(A) \geq key(B)$. This means that the element with the maximum Key is always located at the root node, and this type of Heap is called MaxHeap.
  • A parent node with a value less than or equal to the value of its left and right children is called MinHeap.

Storage rules for a binomial heap.

  • Each node contains elements that are greater than or equal to the elements of the node’s children.
  • The tree is a complete binary tree.

So which of the following images is the heap?

heap

Implementation of heap

Example: The process of adding an element with the value 42 to the left

Step 1: Put the new element into the first available position in the heap. This will keep the structure as a complete binary tree, but it may no longer be a heap, as the new element may have a larger value than its parent.

Step 2: If the value of the new element is greater than the parent element, swap the new element with the parent element until the new element reaches the root, or the new element is greater than or equal to the value of its parent element will stop.

heap

This process is called reheapification upward.

Example: Removing the root

Step 1: Copy the root element to the variable used to return the value, copy the last element at the deepest level to the root, and then remove the last node from the tree. This element is called out-of-place.

Removing the root

Step 2: While swapping the dissimilar element with the child of its maximum value and returning the value saved in step 1.

Removing the root

This process is called reheapification downward.

Priority queue

Behavior of the priority queue.

  • Elements are placed in the queue and then taken out.
  • Each element in the priority queue has a number associated with it, called the priority.
  • When an element leaves the priority queue, the element with the highest priority leaves first.

How it is implemented.

  • In the priority queue, each node of the heap contains an element along with the element’s priority, and maintains the tree so that it follows the heap storage rules for comparing nodes using the element’s priority:.
    • Each node contains an element whose priority is greater than or equal to the priority of the node’s child elements.
    • The tree is a complete binary tree.
  • Implemented code: golang priorityQueue

Reference heap

Client-go’s delaying queue

The design of delaying queue in Kubernetes is beautifully done by using a delayed queue implemented by heap, together with a pass-through queue in kubernetes.

1
2
3
4
5
// 注释中给了一个hot-loop热循环,通过这个loop实现了delaying
type DelayingInterface interface {
    Interface // 继承了workqueue的功能
    AddAfter(item interface{}, duration time.Duration) // 在time后将内容添加到工作队列中
}

Specifically implements an instance of DelayingInterface.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type delayingType struct {
    Interface // 通用的queue 
    clock clock.Clock // 对比的时间 ,包含一些定时器的功能
        type Clock interface {
            PassiveClock
                    type PassiveClock interface {
                        Now() time.Time
                        Since(time.Time) time.Duration
                    }
            After(time.Duration) <-chan time.Time
            NewTimer(time.Duration) Timer
            Sleep(time.Duration)
            NewTicker(time.Duration) Ticker
        }
    stopCh chan struct{} // 停止loop
    stopOnce sync.Once // 保证退出只会触发一次
    heartbeat clock.Ticker // 一个定时器,保证了loop的最大空事件等待时间
    waitingForAddCh chan *waitFor // 普通的chan,用来接收数据插入到延迟队列中
    metrics retryMetrics // 重试的指数
}

Then the whole data structure of the delay queue is shown in the figure below.

data structure of the delay queue

And as mentioned in the above section, the core of this delay queue is a priority queue, which in turn needs to satisfy.

  • Each element in the priority queue has an associated number, called the priority.
  • When an element leaves the priority queue, the element with the highest priority is the first to leave.

And waitFor is the data structure of this priority queue

1
2
3
4
5
type waitFor struct {
    data    t // 数据
    readyAt time.Time // 加入工作队列的时间
    index int // 优先级队列中的索引
}

The waitForPriorityQueue is an implementation of container/heap/heap.go.Inferface, whose data structure is a MinHeap that keeps the minimum readyAt at Root.

1
2
3
4
5
type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}

And the implementation of this is waitForPriorityQueue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type waitForPriorityQueue []*waitFor

func (pq waitForPriorityQueue) Len() int {
    return len(pq)
}
// 这个也是最重要的一个,就是哪个属性是排序的关键,也是heap.down和heap.up中使用的
func (pq waitForPriorityQueue) Less(i, j int) bool {
    return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
    pq[i].index = i
    pq[j].index = j
}
// push 和pop 必须使用heap.push 和heap.pop
func (pq *waitForPriorityQueue) Push(x interface{}) {
    n := len(*pq)
    item := x.(*waitFor)
    item.index = n
    *pq = append(*pq, item)
}


func (pq *waitForPriorityQueue) Pop() interface{} {
    n := len(*pq)
    item := (*pq)[n-1]
    item.index = -1
    *pq = (*pq)[0:(n - 1)]
    return item
}

// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
    return pq[0]
}

The core of the whole delay queue is waitingLoop, which serves as the main logic of the delay queue, checking waitingForAddCh for content to be delayed, taking out the delayed content and placing it in Heap; and ensuring the maximum blocking period.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (q *delayingType) waitingLoop() {
    defer utilruntime.HandleCrash()
    never := make(<-chan time.Time) // 作为占位符
    var nextReadyAtTimer clock.Timer // 最近一个任务要执行的定时器
    waitingForQueue := &waitForPriorityQueue{} // 优先级队列,heap
    heap.Init(waitingForQueue)
    waitingEntryByData := map[t]*waitFor{} // 检查是否反复添加

    for {
        if q.Interface.ShuttingDown() {
            return
        }

        now := q.clock.Now()
        for waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            if entry.readyAt.After(now) {
                break // 时间没到则不处理
            }

            entry = heap.Pop(waitingForQueue).(*waitFor) // 从优先级队列中取出一个
            q.Add(entry.data) // 添加到延迟队列中
            delete(waitingEntryByData, entry.data) // 删除map表中的数据
        }

        // 如果存在数据则设置最近一个内容要执行的定时器
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            if nextReadyAtTimer != nil {
                nextReadyAtTimer.Stop()
            }
            entry := waitingForQueue.Peek().(*waitFor) // 窥视[0]和值
            nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 创建一个定时器
            nextReadyAt = nextReadyAtTimer.C()
        }

        select {
        case <-q.stopCh: // 退出
            return
        case <-q.heartbeat.C(): // 多久没有任何动作时重新一次循环
        case <-nextReadyAt: // 如果有元素时间到了,则继续执行循环,处理上面添加的操作
        case waitEntry := <-q.waitingForAddCh:
            if waitEntry.readyAt.After(q.clock.Now()) { // 时间没到,是用readyAt和now对比time.Now
                // 添加到延迟队列中,有两个 waitingEntryByData waitingForQueue
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                q.Add(waitEntry.data)
            }

            drained := false // 保证可以取完q.waitingForAddCh // addafter
            for !drained {
                select {
                // 这里是一个有buffer的队列,需要保障这个队列读完
                case waitEntry := <-q.waitingForAddCh: 
                    if waitEntry.readyAt.After(q.clock.Now()) {
                        insert(waitingForQueue, waitingEntryByData, waitEntry)
                    } else {
                        q.Add(waitEntry.data)
                    }
                default: // 保证可以退出,但限制于上一个分支的0~n的读取
                // 如果上一个分支阻塞,则为没有数据就是取尽了,走到这个分支
                // 如果上个分支不阻塞则读取到上个分支阻塞为止,代表阻塞,则走default退出
                    drained = true
                }
            }
        }
    }
}

Rate Limiting Queue

The rate-limiting queue RateLimiting is a queue in which the priority queue is an extension of the delay queue.

1
2
3
4
5
6
7
8
9
type RateLimitingInterface interface {
    DelayingInterface // 继承延迟队列
    // 在限速器准备完成后(即合规后)添加条目到队列中
    AddRateLimited(item interface{})
    // drop掉条目,无论成功或失败
    Forget(item interface{})
    // 被重新放入队列中的次数
    NumRequeues(item interface{}) int
}

You can see that the abstraction of a limited queue corresponds to a delayed queue as long as it satisfies AddRateLimited() , Forget() , NumRequeues() are all limited queues. After looking at and understanding the rules, the specific implementation needs to be analyzed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type rateLimitingType struct {
    DelayingInterface
    rateLimiter RateLimiter
}

func (q *rateLimitingType) AddRateLimited(item interface{}) {
    q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

func (q *rateLimitingType) NumRequeues(item interface{}) int {
    return q.rateLimiter.NumRequeues(item)
}

func (q *rateLimitingType) Forget(item interface{}) {
    q.rateLimiter.Forget(item)
}

rateLimitingType is an implementation of the abstract specification RateLimitingInterface, which can be seen by adding a rate limiter RateLimiter to the delay queue.

1
2
3
4
5
6
7
8
9
type RateLimiter interface {
    // when决定等待多长时间
    When(item interface{}) time.Duration
    // drop掉item
    // or for success, we'll stop tracking it
    Forget(item interface{})
    // 重新加入队列中的次数
    NumRequeues(item interface{}) int
}

The abstract speed limiters are implemented as BucketRateLimiter , ItemBucketRateLimiter , ItemExponentialFailureRateLimiter , ItemFastSlowRateLimiter , MaxOfRateLimiter , the following analysis of these rate-limiters

BucketRateLimiter

BucketRateLimiter is a token bucket that implements rate.Limiter with the abstract RateLimiter. Initialization is done with workqueueDefault.ControllerRateLimiter().

1
2
3
4
5
6
7
func DefaultControllerRateLimiter() RateLimiter {
    return NewMaxOfRateLimiter(
        NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
        // 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
        &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    )
}

ItemBucketRateLimiter

The ItemBucketRateLimiter is an implementation that stores each token bucket as a list, with each key being a separate limiter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type ItemBucketRateLimiter struct {
    r     rate.Limit
    burst int

    limitersLock sync.Mutex
    limiters     map[interface{}]*rate.Limiter
}

func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
    return &ItemBucketRateLimiter{
        r:        r,
        burst:    burst,
        limiters: make(map[interface{}]*rate.Limiter),
    }
}

ItemExponentialFailureRateLimiter

As the name knows ItemExponentialFailureRateLimiter limiter is an error index limiter, according to the number of errors, the index is used for the length of delay. You can see that When absolutely determines the delay time of traffic shaping, according to the number of errors for the index to extend the retry time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
type ItemExponentialFailureRateLimiter struct {
    failuresLock sync.Mutex
    failures     map[interface{}]int // 失败的次数

    baseDelay time.Duration // 延迟基数
    maxDelay  time.Duration // 最大延迟
}

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    exp := r.failures[item]
    r.failures[item] = r.failures[item] + 1

    // The backoff is capped such that 'calculated' value never overflows.
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    if backoff > math.MaxInt64 {
        return r.maxDelay
    }

    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {
        return r.maxDelay
    }

    return calculated
}

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    return r.failures[item]
}

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    delete(r.failures, item)
}

ItemFastSlowRateLimiter

ItemFastSlowRateLimiter, the limiter retries a certain number of times quickly and then slowly.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type ItemFastSlowRateLimiter struct {
    failuresLock sync.Mutex
    failures     map[interface{}]int

    maxFastAttempts int // 最大尝试次数
    fastDelay       time.Duration // 快的速度
    slowDelay       time.Duration // 慢的速度
}


func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
    return &ItemFastSlowRateLimiter{
        failures:        map[interface{}]int{},
        fastDelay:       fastDelay,
        slowDelay:       slowDelay,
        maxFastAttempts: maxFastAttempts,
    }
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    r.failures[item] = r.failures[item] + 1
    // 当错误次数没超过快速的阈值使用快速,否则使用慢速
    if r.failures[item] <= r.maxFastAttempts {
        return r.fastDelay
    }

    return r.slowDelay
}

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    return r.failures[item]
}

func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    delete(r.failures, item)
}

MaxOfRateLimiter

MaxOfRateLimiter returns the limiter with the largest delay in the limiter list.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type MaxOfRateLimiter struct {
    limiters []RateLimiter
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
    ret := time.Duration(0)
    for _, limiter := range r.limiters {
        curr := limiter.When(item)
        if curr > ret {
            ret = curr
        }
    }

    return ret
}

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
    return &MaxOfRateLimiter{limiters: limiters}
}

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
    ret := 0
    // 找到列表內所有的NumRequeues(失败的次数),以最多次的为主。 
    for _, limiter := range r.limiters {
        curr := limiter.NumRequeues(item)
        if curr > ret {
            ret = curr
        }
    }

    return ret
}

func (r *MaxOfRateLimiter) Forget(item interface{}) {
    for _, limiter := range r.limiters {
        limiter.Forget(item)
    }
}

How to use Kubernetes’ rate-limiter

Containers for rate-limited queues based on traffic control that can be bursted in large numbers but needs to be shaped and the add operation will be added based on the time to wait as designed in When(). Different ways of delay are implemented depending on the queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "fmt"
    "log"
    "strconv"
    "time"

    "k8s.io/client-go/util/workqueue"
)

func main() {
    stopCh := make(chan string)
    timeLayout := "2006-01-02:15:04:05.0000"
    limiter := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    length := 20 // 一共请求20次
    chs := make([]chan string, length)
    for i := 0; i < length; i++ {
        chs[i] = make(chan string, 1)
        go func(taskId string, ch chan string) {
            item := "Task-" + taskId + time.Now().Format(timeLayout)
            log.Println(item + " Added.")
            limiter.AddRateLimited(item) // 添加会根据When() 延迟添加到工作队列中

        }(strconv.FormatInt(int64(i), 10), chs[i])

        go func() {
            for {
                key, quit := limiter.Get()
                if quit {
                    return
                }
                log.Println(fmt.Sprintf("%s process done", key))
                defer limiter.Done(key)

            }
        }()
    }
    <-stopCh
}

Because the default speed limiter does not support initialized QPS, modify the one within the source code to $BT(1, 5)$, and the execution result can be seen that when the number of tokens in the bucket is exceeded during large bursts of traffic, it will be released based on the speed of token generation.

In the figure, the tasks are added in bursts and the logs are printed to be added at the same time, but the logs are output before the addition, and the consumer side can see that they are actually delayed. The configuration is one token per second, and the release traffic is actually one token per second.

log