A queue is a very common data structure that allows only outgoing (dequeue) operations at the front end of a table (head) and incoming (enqueue) operations at the back end of a table (tail). Like the stack data structure, a queue is a linear table with restricted operations. The end that performs the insert operation is called the tail and the end that performs the delete operation is called the header.

In a concurrent environment using queues, it is necessary to take into account the multi-threaded (multi-threaded) concurrent read and write problems, there may be multiple write (queue) operation threads, while there may also be multiple threads read operation threads, in this case, we want to ensure that the data is not lost, not duplicated, but also to ensure that the function of the queue remains unchanged, that is, the first-in-first-out logic, as long as there is data, you can get out of the column.

Admittedly, concurrent access to the queue can be achieved through an out-of-exclusion lock. Generally, the queue is implemented through pointers and only operates at the head and tail of the queue, so the critical area protected by this out-of-exclusion lock does not have a very complex execution logic and the critical area is processed quickly, so in general the efficiency of the queue is already very high by implementing the out-of-exclusion lock. However, in some cases, by implementing lock-free algorithm, we can further improve the performance of concurrent queues.

This article introduces some background knowledge of the lock-free queue algorithm, and implements three concurrent queues and provides the results of performance tests.

The code base can be found on github: smallnest/queue

lock-free queue algorithm

Speaking of lock-free queue algorithms, we have to mention Maged M. Michael and Michael L. Scott’s 1996 paper Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms, which reviews some implementations of concurrent queues and their limitations, proposes a very simple implementation of lock-free queue, and also provides a two-lock queue algorithm on specific machines such as those without CAS instructions. This article has been cited nearly 1000 times.

It is only worth mentioning that Java’s ConcurrentLinkedQueue is based on this algorithm:

This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.

Most of the lock-free algorithms are implemented through CAS operations.

This article provides a pseudo-code for the lock-free queue algorithm, which is also very small, so it can be easily implemented by various programming languages. I have listed the pseudo-code here:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
structure pointer_t {ptr: pointer to node_t, count: unsigned integer}
 structure node_t {value: data type, next: pointer_t}
 structure queue_t {Head: pointer_t, Tail: pointer_t}
 
 initialize(Q: pointer to queue_t)
    node = new_node()		// Allocate a free node
    node->next.ptr = NULL	// Make it the only node in the linked list
    Q->Head.ptr = Q->Tail.ptr = node	// Both Head and Tail point to it
 
 enqueue(Q: pointer to queue_t, value: data type)
  E1:   node = new_node()	// Allocate a new node from the free list
  E2:   node->value = value	// Copy enqueued value into node
  E3:   node->next.ptr = NULL	// Set next pointer of node to NULL
  E4:   loop			// Keep trying until Enqueue is done
  E5:      tail = Q->Tail	// Read Tail.ptr and Tail.count together
  E6:      next = tail.ptr->next	// Read next ptr and count fields together
  E7:      if tail == Q->Tail	// Are tail and next consistent?
              // Was Tail pointing to the last node?
  E8:         if next.ptr == NULL
                 // Try to link node at the end of the linked list
  E9:            if CAS(&tail.ptr->next, next, <node, next.count+1>)
 E10:               break	// Enqueue is done.  Exit loop
 E11:            endif
 E12:         else		// Tail was not pointing to the last node
                 // Try to swing Tail to the next node
 E13:            CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
 E14:         endif
 E15:      endif
 E16:   endloop
        // Enqueue is done.  Try to swing Tail to the inserted node
 E17:   CAS(&Q->Tail, tail, <node, tail.count+1>)
 
 dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean
  D1:   loop			     // Keep trying until Dequeue is done
  D2:      head = Q->Head	     // Read Head
  D3:      tail = Q->Tail	     // Read Tail
  D4:      next = head.ptr->next    // Read Head.ptr->next
  D5:      if head == Q->Head	     // Are head, tail, and next consistent?
  D6:         if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
  D7:            if next.ptr == NULL  // Is queue empty?
  D8:               return FALSE      // Queue is empty, couldn't dequeue
  D9:            endif
                 // Tail is falling behind.  Try to advance it
 D10:            CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
 D11:         else		     // No need to deal with Tail
                 // Read value before CAS
                 // Otherwise, another dequeue might free the next node
 D12:            *pvalue = next.ptr->value
                 // Try to swing Head to the next node
 D13:            if CAS(&Q->Head, head, <next.ptr, head.count+1>)
 D14:               break             // Dequeue is done.  Exit loop
 D15:            endif
 D16:         endif
 D17:      endif
 D18:   endloop
 D19:   free(head.ptr)		     // It is safe now to free the old node
 D20:   return TRUE                   // Queue was not empty, dequeue succeeded

initialize Initialize a queue, and use an auxiliary empty node to do the header, to facilitate the processing of incoming and outgoing queues.

In the incoming pair, E1~E3 first create a new node, and save the incoming data on this node, the next step is to insert to the end of the queue.

E4~E16 is a loop that keeps trying to insert the data into the queue, in the case of concurrency CAS may not be successful, so Hu keeps trying, there will always be one of the concurrent threads that is successful, so it is a lock-free algorithm.

E5~E6 is to get the tail pointer and the next node pointed by the tail pointer. If there is no concurrency, the next node pointed by the tail pointer here is empty if there is no concurrency. But if in the case of concurrency, at the time of line E7 another thread may have joined the new node, or the previous tail node is out of the pair, so the implementation in E7 first makes a judgment and re-fetches if it is not satisfied.

In the case that the condition of E8 is satisfied, it means that the currently acquired tail pointer is still the tail pointer, then in line E9 the node is added to the queue by CAS and the loop is jumped out, but the tail pointer has not changed at this time. Otherwise, a new node may have been added to the queue in the process, then in line E12, try to move the tail pointer backward to point to the new node.

At the end of the loop, which is definitely already in the queue, try to point the tail pointer to the newly inserted node. Of course a new node may have joined at this time, causing CAS to be unsuccessful, but it doesn’t matter, because the node has already joined the queue, except that it is no longer the tail node. The logic to update the joined node will move the tail node to the last newly joined node.

At the time of queuing out, D2~D4 get the head pointer and tail pointer, and D5 marks a step in processing with the head pointer unchanged, indicating that there are no other queuing out operations at this time.

D6~D10 is the same node pointed by the tail pointer and the head pointer. There are two cases: 1 is an empty queue, then directly return false, because there is no data out of the column, 2 is a new entry of data, has not had time to adjust the tail pointer, then this time to move the tail pointer. Then try again.

Otherwise, D12 first get the first data, first save the data, then try to move the head pointer to this node. Return this data and null the current node data of the head pointer, because the head pointer is an auxiliary node and does not need to save data.

Realization

1. lock-free queue

According to the pseudo-code in the paper, we can implement a lock-free queue using Go. here the pointer we use unsafe.Pointer to implement it, which is convenient for CAS operations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package queue
import (
	"sync/atomic"
	"unsafe"
)
// LKQueue is a lock-free unbounded queue.
type LKQueue struct {
	head unsafe.Pointer
	tail unsafe.Pointer
}
type node struct {
	value interface{}
	next  unsafe.Pointer
}
// NewLKQueue returns an empty queue.
func NewLKQueue() *LKQueue {
	n := unsafe.Pointer(&node{})
	return &LKQueue{head: n, tail: n}
}
// Enqueue puts the given value v at the tail of the queue.
func (q *LKQueue) Enqueue(v interface{}) {
	n := &node{value: v}
	for {
		tail := load(&q.tail)
		next := load(&tail.next)
		if tail == load(&q.tail) { // are tail and next consistent?
			if next == nil {
				if cas(&tail.next, next, n) {
					cas(&q.tail, tail, n) // Enqueue is done.  try to swing tail to the inserted node
					return
				}
			} else { // tail was not pointing to the last node
				// try to swing Tail to the next node
				cas(&q.tail, tail, next)
			}
		}
	}
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *LKQueue) Dequeue() interface{} {
	for {
		head := load(&q.head)
		tail := load(&q.tail)
		next := load(&head.next)
		if head == load(&q.head) { // are head, tail, and next consistent?
			if head == tail { // is queue empty or tail falling behind?
				if next == nil { // is queue empty?
					return nil
				}
				// tail is falling behind.  try to advance it
				cas(&q.tail, tail, next)
			} else {
				// read value before CAS otherwise another dequeue might free the next node
				v := next.value
				if cas(&q.head, head, next) {
					return v // Dequeue is done.  return
				}
			}
		}
	}
}
func load(p *unsafe.Pointer) (n *node) {
	return (*node)(atomic.LoadPointer(p))
}
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
	return atomic.CompareAndSwapPointer(
		p, unsafe.Pointer(old), unsafe.Pointer(new))
}

2. two-lock queue

The lock-free queue above implements an efficient concurrent queue via CAS, while this paper also implements a two-lock algorithm that can be applied to multiprocessors without atomic operations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package queue
import (
	"sync"
)
// CQueue is a concurrent unbounded queue which uses two-Lock concurrent queue qlgorithm.
type CQueue struct {
	head  *cnode
	tail  *cnode
	hlock sync.Mutex
	tlock sync.Mutex
}
type cnode struct {
	value interface{}
	next  *cnode
}
// NewCQueue returns an empty CQueue.
func NewCQueue() *CQueue {
	n := &cnode{}
	return &CQueue{head: n, tail: n}
}
// Enqueue puts the given value v at the tail of the queue.
func (q *CQueue) Enqueue(v interface{}) {
	n := &cnode{value: v}
	q.tlock.Lock()
	q.tail.next = n // Link node at the end of the linked list
	q.tail = n      // Swing Tail to node
	q.tlock.Unlock()
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *CQueue) Dequeue() interface{} {
	q.hlock.Lock()
	n := q.head
	newHead := n.next
	if newHead == nil {
		q.hlock.Unlock()
		return nil
	}
	v := newHead.value
	newHead.value = nil
	q.head = newHead
	q.hlock.Unlock()
	return v
}

3. mutex-based queue

Traditionally, we can implement a queue composed of a mutex + slice, and implement a simple queue without excessive pursuit of performance (time + space).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package queue
import "sync"
// SliceQueue is an unbounded queue which uses a slice as underlying.
type SliceQueue struct {
	data []interface{}
	mu   sync.Mutex
}
// NewSliceQueue returns an empty queue.
// You can give a initial capacity.
func NewSliceQueue(n int) (q *SliceQueue) {
	return &SliceQueue{data: make([]interface{}, 0,n)}
}
// Enqueue puts the given value v at the tail of the queue.
func (q *SliceQueue) Enqueue(v interface{}) {
	q.mu.Lock()
	q.data = append(q.data, v)
	q.mu.Unlock()
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *SliceQueue) Dequeue() interface{} {
	q.mu.Lock()
	if len(q.data) == 0 {
		q.mu.Unlock()
		return nil
	}
	v := q.data[0]
	q.data = q.data[1:]
	q.mu.Unlock()
	return v
}

Performance

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
goos: darwin
goarch: amd64
pkg: github.com/smallnest/queue
BenchmarkQueue/lock-free_queue#4-4           	 8399941	       177 ns/op
BenchmarkQueue/two-lock_queue#4-4            	 7544263	       155 ns/op
BenchmarkQueue/slice-based_queue#4-4         	 6436875	       194 ns/op
BenchmarkQueue/lock-free_queue#32-4          	 8399769	       140 ns/op
BenchmarkQueue/two-lock_queue#32-4           	 7486357	       155 ns/op
BenchmarkQueue/slice-based_queue#32-4        	 4572828	       235 ns/op
BenchmarkQueue/lock-free_queue#1024-4        	 8418556	       140 ns/op
BenchmarkQueue/two-lock_queue#1024-4         	 7888488	       155 ns/op
BenchmarkQueue/slice-based_queue#1024-4      	 8902573	       218 ns/op

Reference https://colobu.com/2020/08/14/lock-free-queue-in-go/