Recently I had a requirement at work, which simply means that millions of timer tasks will be created in a short period of time, and the corresponding amounts will be added up when they are created to prevent overselling, and the data will need to be checked again after half an hour, and if the data does not match, the added amount will need to be subtracted back.

This is a low performance task if done with Go’s built-in Timer, which uses a minimal heap and has a time complexity of O(log n) for both creation and deletion. The performance is much better if you use the time wheel, which is O(1).

The use of the Timer wheel is actually very widespread, with components such as Netty, Akka, Quartz, ZooKeeper, Kafka and many others.

Introduction

Simple Time Wheel

Storing tasks in the time wheel is a torus queue, the underlying implementation is an array, each element of which can hold a list of timed tasks. The list of timed tasks is a ring-shaped bi-directional chain table, each item in the chain table represents a timed task item, which encapsulates the real timed task.

The time wheel consists of a number of time frames, each representing the basic time span (tickMs) of the current time wheel. The number of time frames in the wheel is fixed and can be expressed in terms of wheelSize, so the overall time span (interval) of the wheel can be calculated using the formula tickMs x wheelSize.

The wheel also has a dial pointer (currentTime), which indicates the current time of the wheel. currentTime is an integer multiple of tickMs. currentTime points to the time frame that is due, indicating all the tasks in the chain table corresponding to the time frame that needs to be processed.

The following diagram shows a time wheel with tickMs of 1s and wheelSize equal to 10, each cell contains a chain of timed tasks, and the chain contains the real task items.

sobyte

Initially the dial pointer currentTime points to time frame 0. If the tickMs of the time wheel is 1ms and the wheelSize is equal to 10, then the interval is equal to 10s. The following diagram shows that a task timed at 2s is inserted and stored in the task chain with time frame 2, marked in red. As time passes, the pointer currentTime advances and if 2s has elapsed, then currentTime will point to time frame 2 and the task chain for this time frame will be retrieved for processing.

sobyte

If the current pointer currentTime points to 2, then if a 9s task is inserted, the new task will take the original time grid chain and will be stored in time grid 1.

sobyte

The time wheels described here are simple time wheels, with one layer and an overall time range between currentTime and currentTime+interval. If you now have a 15s timed task you need to reopen a time wheel and set a time wheel with a time span of at least 15s to be sufficient. But there is no bottom line to this expansion, if a 10,000 second time wheel is needed, then an array of this size is needed to store it, which not only takes up a lot of memory space, but also makes it less efficient to traverse such a large array.

This is why the concept of a hierarchical time wheel was introduced.

Tiered time wheel

The diagram shows a two-layer time wheel, the second layer also consists of 10 time frames, each spanning 10s. the tickMs of the second layer time wheel will be the interval of the first layer time wheel, i.e. 10s. the wheelSize of each layer time wheel is fixed at 10, so the overall time span interval of the second layer time wheel is 100s.

The diagram shows the expiry time range for each time frame, and we can clearly see that the expiry time range for the 0th time frame of the second time frame is [0,9]. That is, one time cell of the second time wheel can represent all (10) time cells of the first time wheel.

If a 15s task is added to this time wheel, then when the first time wheel cannot accommodate it, it enters the second time wheel and is inserted into the time frame with an expiry time of [10, 19].

sobyte

As time passes, when there are 5s left in the original 15s task, there is a time wheel demotion operation here, at which point the overall time span of the first level time wheel is sufficient and this task is added to the time frame with an expiry time of 5 in the first level time wheel, after which another 5s are experienced before this task actually expires and the corresponding expiry operation is finally executed.

Code Implementation

As our Go language version of the TimingWheel code is modelled on Kafka, there are a few minor details in the implementation of the TimingWheel.

  • Each chain in the time grid of the TimingWheel will have a root node for simplifying the boundary conditions. It is an additional chain table node which acts as the first node and which does not store anything in its value field, but is only introduced for the convenience of the operation.
  • The start time (startMs) of all higher-level time wheels except the first one is set to the currentTime of the first wheel before it when this layer was created. currentTime for each layer must be an integer multiple of tickMs, and if it is not satisfied then currentTime is trimmed to an integer multiple of tickMs. The trimming method is: currentTime = startMs - (startMs % tickMs).
  • Timers in Kafka need only hold a reference to the first level of the TimingWheel and do not hold other higher level TimingWheels directly, but each level of the TimingWheel will have a reference (overflowWheel) to a higher level of the application.
  • The timer in Kafka uses a DelayQueue to help advance the time wheel. Each chain in the time grid used is added to the DelayQueue during operation. The DelayQueue is sorted by the expiration time corresponding to the time wheel, with the task with the shortest expiration being placed at the head of the DelayQueue queue, and the tasks that expire in the DelayQueue are fetched via a separate thread.

Structs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type TimingWheel struct {
    // 时间跨度,单位是毫秒
    tick      int64 // in milliseconds
    // 时间轮个数
    wheelSize int64
    // 总跨度
    interval    int64 // in milliseconds
    // 当前指针指向时间
    currentTime int64 // in milliseconds
    // 时间格列表
    buckets     []*bucket
    // 延迟队列
    queue       *delayqueue.DelayQueue 
    // 上级的时间轮引用
    overflowWheel unsafe.Pointer // type: *TimingWheel

    exitC     chan struct{}
    waitGroup waitGroupWrapper
}

tick, wheelSize, interval, currentTime are all relatively well understood, the buckets field represents a list of time frames, queue is a delayed queue through which all tasks are triggered, and overflowWheel is a reference to the upper level time wheel.

1
2
3
4
5
6
7
8
type bucket struct {
    // 任务的过期时间
    expiration int64

    mu     sync.Mutex
    // 相同过期时间的任务队列
    timers *list.List
}

The bucket actually encapsulates the task queue inside the time frame, which puts in tasks with the same expiry time, and the queue timers will be taken out for processing after expiry. An interesting point here is that since there will be multiple threads accessing the bucket concurrently, the atomic class is needed to get the int64 bit value, and 64 bit alignment is needed to ensure consistency in reading 64 bit data on 32 bit systems.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Timer struct {
  // 到期时间
    expiration int64 // in milliseconds
  // 要被执行的具体任务
    task       func()
    // Timer所在bucket的指针
    b unsafe.Pointer // type: *bucket
    // bucket列表中对应的元素
    element *list.Element
}

Timer is the smallest execution unit of the time wheel and is a wrapper around a timed task that will call task to execute the task when it expires.

sobyte

Initializing the time wheel

For example, now initialise a time wheel with a tick of 1s and a wheelSize of 10.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func main() {
    tw := timingwheel.NewTimingWheel(time.Second, 10)
    tw.Start() 
}

func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {
  // 将传入的tick转化成毫秒
    tickMs := int64(tick / time.Millisecond)
  // 如果小于零,那么panic
    if tickMs <= 0 {
        panic(errors.New("tick must be greater than or equal to 1ms"))
    }
    // 设置开始时间
    startMs := timeToMs(time.Now().UTC())
    // 初始化TimingWheel
    return newTimingWheel(
        tickMs,
        wheelSize,
        startMs,
        delayqueue.New(int(wheelSize)),
    )
}

func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel {
  // 初始化buckets的大小
    buckets := make([]*bucket, wheelSize)
    for i := range buckets {
        buckets[i] = newBucket()
    }
  // 实例化TimingWheel
    return &TimingWheel{
        tick:        tickMs,
        wheelSize:   wheelSize,
    // currentTime必须是tickMs的倍数,所以这里使用truncate进行修剪
        currentTime: truncate(startMs, tickMs),
        interval:    tickMs * wheelSize,
        buckets:     buckets,
        queue:       queue,
        exitC:       make(chan struct{}),
    }
}

The initialisation is very simple, you can just look at the code comments above.

Starting the time wheel

Here we look at the start method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (tw *TimingWheel) Start() {
    // Poll会执行一个无限循环,将到期的元素放入到queue的C管道中
    tw.waitGroup.Wrap(func() {
        tw.queue.Poll(tw.exitC, func() int64 {
            return timeToMs(time.Now().UTC())
        })
    })
    // 开启无限循环获取queue中C的数据
    tw.waitGroup.Wrap(func() {
        for {
            select {
            // 从队列里面出来的数据都是到期的bucket
            case elem := <-tw.queue.C:
                b := elem.(*bucket)
                // 时间轮会将当前时间 currentTime 往前移动到 bucket的到期时间
                tw.advanceClock(b.Expiration())
                // 取出bucket队列的数据,并调用addOrRun方法执行
                b.Flush(tw.addOrRun)
            case <-tw.exitC:
                return
            }
        }
    })
}

This method will start a goroutines to execute the incoming function asynchronously, you can see the source code at the link above.

The first goroutines is used to call the Poll method of the queue of the delayed queue, which will keep cycling through the data in the queue and put the expired data into the C pipe of the queue; the second goroutines will infinitely cycle through the data of C in the queue, and if there is data in C, it means it has expired. If there is data in C that indicates it has expired, then the advanceClock method will be called first to move the current time currentTime forward to the expiry time of the bucket, then the Flush method will be called to take out the queue in the bucket and the addOrRun method will be called to execute it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (tw *TimingWheel) advanceClock(expiration int64) {
    currentTime := atomic.LoadInt64(&tw.currentTime)
    // 过期时间大于等于(当前时间+tick)
    if expiration >= currentTime+tw.tick {
        // 将currentTime设置为expiration,从而推进currentTime
        currentTime = truncate(expiration, tw.tick)
        atomic.StoreInt64(&tw.currentTime, currentTime)

        // Try to advance the clock of the overflow wheel if present
        // 如果有上层时间轮,那么递归调用上层时间轮的引用
        overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
        if overflowWheel != nil {
            (*TimingWheel)(overflowWheel).advanceClock(currentTime)
        }
    }
}

The advanceClock method advances the time wheel by setting the currentTime from the due time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (b *bucket) Flush(reinsert func(*Timer)) {
    var ts []*Timer

    b.mu.Lock()
    // 循环获取bucket队列节点
    for e := b.timers.Front(); e != nil; {
        next := e.Next()

        t := e.Value.(*Timer)
        // 将头节点移除bucket队列
        b.remove(t)
        ts = append(ts, t)

        e = next
    }
    b.mu.Unlock()

    b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()

    for _, t := range ts {
        reinsert(t)
    }
}

The Flush method iterates through the list of timers in the bucket and inserts it into the ts array, then calls the reinsert method, in this case the addOrRun method.

1
2
3
4
5
6
7
func (tw *TimingWheel) addOrRun(t *Timer) {
    // 如果已经过期,那么直接执行
    if !tw.add(t) { 
        // 异步执行定时任务
        go t.task()
    }
}

addOrRun will call the add method to check if the incoming timer has expired, and if so, call the task method asynchronously to execute it directly. add method will be analysed below.

The entire start execution flow is shown in the diagram.

sobyte

  1. the start method starts a goroutines call back to poll to process the data due in the DelayQueue and put the data into pipe C.
  2. the start method starts a second goroutines method that loops through the data in DelayQueue’s pipe C, which actually holds a bucket, and then iterates through the bucket’s list of timers, executing them asynchronously if the task is due, or putting them back into DelayQueue if it’s not.

add task

1
2
3
4
5
6
7
8
9
func main() {
    tw := timingwheel.NewTimingWheel(time.Second, 10)
    tw.Start() 
    // 添加任务
    tw.AfterFunc(time.Second*15, func() {
        fmt.Println("The timer fires")
        exitC <- time.Now().UTC()
    })
}

We add a 15s timed task via the AfterFunc method and if it expires then execute the function passed in.

1
2
3
4
5
6
7
8
func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {
    t := &Timer{
        expiration: timeToMs(time.Now().UTC().Add(d)),
        task:       f,
    }
    tw.addOrRun(t)
    return t
}

The AfterFunc method calls the addOrRun method based on the task due time passed in and the function that needs to be executed when it expires. addOrRun method, which we have seen above, will determine if the timed task needs to be executed based on the due time.

Let’s look at the add method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (tw *TimingWheel) add(t *Timer) bool {
    currentTime := atomic.LoadInt64(&tw.currentTime)
    // 已经过期
    if t.expiration < currentTime+tw.tick {
        // Already expired
        return false
    //  到期时间在第一层环内
    } else if t.expiration < currentTime+tw.interval {
        // Put it into its own bucket
        // 获取时间轮的位置
        virtualID := t.expiration / tw.tick
        b := tw.buckets[virtualID%tw.wheelSize]
        // 将任务放入到bucket队列中
        b.Add(t) 
        // 如果是相同的时间,那么返回false,防止被多次插入到队列中
        if b.SetExpiration(virtualID * tw.tick) { 
            // 将该bucket加入到延迟队列中
            tw.queue.Offer(b, b.Expiration())
        }

        return true
    } else {
        // Out of the interval. Put it into the overflow wheel
        // 如果放入的到期时间超过第一层时间轮,那么放到上一层中去
        overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
        if overflowWheel == nil {
            atomic.CompareAndSwapPointer(
                &tw.overflowWheel,
                nil,
                // 需要注意的是,这里tick变成了interval
                unsafe.Pointer(newTimingWheel(
                    tw.interval,
                    tw.wheelSize,
                    currentTime,
                    tw.queue,
                )),
            )
            overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
        }
        // 往上递归
        return (*TimingWheel)(overflowWheel).add(t)
    }
}

add method is divided into three parts according to the expiration time, the first part is less than the current time + tick, indicating that it has expired, then return false to execute the task can be.

The second part of the judgment will be based on whether the expiration is less than the span of the time wheel, if it is less than that, it means that the timed task can be put into the current time wheel, find the time frame corresponding to the buckets by taking the mode and put it into the bucket queue, the SetExpiration method will determine whether the delay queue has already been executed according to the parameters passed in, to prevent repeated insertion.

The third part indicates that the time span of the timed task has exceeded the current time wheel and needs to be escalated to the upper layer of the time wheel. Note that the tick of the time wheel of the upper level is the interval of the current time wheel, the delay queue is still the same, and then set to the pointer overflowWheel and call the add method to recurse to the upper level.

By now the time wheel is finished, but there are still things to note, we used the DelayQueue plus the ring queue in the implementation of the time wheel with the above time wheel. The TimingWheel time complexity is O(1) for the insertion and deletion operations of timed task items, and the queue in the DelayQueue uses a priority queue with a time complexity of O(log n), but since the buckets list is actually very small, this does not affect performance.