In this post we explore the key techniques in thread pooling by implementing it step by step.

Preliminary discussion

Why do I need thread pools?

Using threads in C++ has been easy since C++11. At the most basic level, a thread can be managed with std::thread. For asynchronous execution of tasks, it is also convenient to use std::async and std::future. With all this infrastructure in place, why do we need a thread pool? Or rather, when do we need thread pools?

As we all know, threads, as a system resource, take time to create and destroy. Therefore, if the time to create and destroy threads is of the same order of magnitude as the time required to perform a task, the performance loss from frequent thread creation and destruction becomes significant. At this point, we need to consider using thread pools.

What should be the characteristics of a thread pool?

A thread pool is essentially a set of threads to be used. In C++, it can be represented as an array of std::thread or as a vector. In practice, for possible extensions, it is obviously more appropriate to use std::vector<std::thread>.

For each thread in the thread pool, it may receive a task at some point. The exact task is not known when the thread is created. Expressed in C++ language, this means that a thread in a thread pool:

  • should be able to execute arbitrary functions - supporting any list of parameters, and any return value type.
  • it should be possible to send the results of the execution of a task back to the publisher of the task.
  • should be able to be woken up to perform a task when needed, without taking up excessive CPU resources when not needed.
  • should be controllable by the master thread to pause tasks, stop receiving tasks, discard unfinished tasks, etc. when appropriate.

For the first one, the modern C++ approach would be to use the infrastructure provided by the functional header file (std::bind , std::function , etc.) in combination with the template parameter package. For the second one, the old-fashioned approach is to register the callback function at the same time as the task is published; the modern C++ approach would be to use std::packaged_task in combination with std::future to implement it. For the third, consider std::condition_variable for less frequent tasks, or std::this_thread::yield for very frequent tasks. For the fourth one, this can be achieved by setting an internal variable as a token and having each worker thread check that token periodically.

The discussion gets to tasks. Obviously, we will need a thread-safe queue to manage the tasks posted by other threads.

Thread-safe queues

Let’s start directly with the code and analyze it step by step.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
template <typename T>
class blocking_queue : protected std::queue<T> {  // 1.
 public:
  using wlock = std::unique_lock<std::shared_mutex>;  // 2.a
  using rlock = std::shared_lock<std::shared_mutex>;  // 2.b

 public:
  blocking_queue() = default;
  ~blocking_queue() {
    clear();
  }
  blocking_queue(const blocking_queue&) = delete;  // 3.a
  blocking_queue(blocking_queue&&) = delete;  // 3.b
  blocking_queue& operator=(const blocking_queue&) = delete; // 3.c
  blocking_queue& operator=(blocking_queue&&) = delete;  // 3.d

 public:
  bool empty() const {
    rlock lock(mtx_);  // 4.a
    return std::queue<T>::empty();
  }

  size_t size() const {
    rlock lock(mtx_);  // 4.b
    return std::queue<T>::size();
  }

 public:
  void clear() {
    wlock lock(mtx_);
    while (!std::queue<T>::empty()) {
      std::queue<T>::pop();
    }
  }

  void push(const T& obj) {
    wlock lock(mtx_);  // 5.a
    std::queue<T>::push(obj);
  }

  template <typename... Args>
  void emplace(Args&&... args) {
    wlock lock(mtx_);  // 5.b
    std::queue<T>::emplace(std::forward<Args>(args)...);
  }

  bool pop(T& holder) {  // 6.
    wlock lock(mtx_);
    if (std::queue<T>::empty()) {
      return false;
    } else {
      holder = std::move(std::queue<T>::front());
      std::queue<T>::pop();
      return true;
    }
  }

 private:
  mutable std::shared_mutex mtx_;  // 7.
};
  1. blocking_queue inherits from std::queue and leaves the implementation of the most basic queue to the standard library.
  2. use std::shared_mutex in combination with std::unique_lock and std::shared_lock to implement read/write locks.
  3. Here we have disabled the copy and move constructors and the corresponding assignment operators. This is purely because we don’t use them during the implementation of the thread pool. It is possible to implement them on demand if needed.
  4. we use read-only locks in both observers.
  5. push and emplace are similar operations, both appending elements to the end of the queue. The difference and connection is the same as the interface to the standard library container. Note that in emplace, we use the perfect forwarding technique.
  6. pop here is more properly called try_pop. Because the pop action does not necessarily succeed here, the function returns false when the queue is empty and does not make any changes to the queue.
  7. This is a coarse-grained lock on the entire queue. In fact, since the push and pop of the queue are somewhat separate, it is possible to implement a fine-grained version of the lock with some care, with significant performance gains when both push and pop operations are frequent. We can discuss this point in a separate article later.

Thread Pooling

Interface definition

Following the previous discussion, we can put together a rough idea of what a thread pool looks like.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class threadpool {
 public:  // 1.
  void init(int num);
  void terminate();  // stop and process all delegated tasks
  void cancel();     // stop and drop all tasks remained in queue

 public:  // 2.
  bool inited() const;
  bool is_running() const;
  int size() const;

 public:  // 3.
  template <class F, class... Args>
  auto async(F&& f, Args&&... args) const -> std::future<decltype(f(args...))>;

 private:
  std::vector<std::thread> workers_;  // 4.
  mutable blocking_queue<std::function<void()>> tasks_;  // 5.
};
  1. The first group of three interfaces is the control interface for the entire thread pool. The init interface starts the thread pool with the parameter num, which is the number of threads in the pool. The terminate interface terminates the thread pool, not accepting new tasks and ensuring that accepted tasks are processed. cancel is similar to terminate, but it discards accepted but unprocessed tasks.
  2. The three interfaces in the second group are all observers.
  3. The only interface in the third group is the thread pool’s interface for accepting external tasks. It is almost identical to std::async provided by the standard library, accepts arbitrary functions, and returns a std::future.
  4. This is the thread pool ontology.
  5. This is the task queue.

The control interface of the thread pool

Next we discuss the specific implementation of the control interface.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
inline void threadpool::init(int num) {
  std::call_once(once_, [this, num]() {  // 1.
    wlock lock(mtx_);  // 2.
    stop_ = false;
    cancel_ = false;
    workers_.reserve(num);
    for (int i = 0; i < num; ++i) {
      workers_.emplace_back(std::bind(&threadpool::spawn, this));  // 3.a
    }
    inited_ = true;
  });
}

inline void threadpool::spawn() {  // 3.b
  for (;;) {
    bool pop = false;
    std::function<void()> task;
    {
      wlock lock(mtx_);
      cond_.wait(lock, [this, &pop, &task] {
        pop = tasks_.pop(task);
        return cancel_ || stop_ || pop;  // 4.
      });
    }
    if (cancel_ || (stop_ && !pop)) {  // 5.a
      return;
    }
    task();  // 5.b
  }
}

inline void threadpool::terminate() {  // 6.a
  {
    wlock lock(mtx_);
    if (_is_running()) {
      stop_ = true;  // 7.a
    } else {
      return;
    }
  }
  cond_.notify_all();
  for (auto& worker : workers_) {
    worker.join();
  }
}

inline void threadpool::cancel() {  // 6.b
  {
    wlock lock(mtx_);
    if (_is_running()) {
      cancel_ = true;  // 7.b
    } else {
      return;
    }
  }
  tasks_.clear();  // 8.
  cond_.notify_all();
  for (auto& worker : workers_) {
    worker.join();
  }
}

inline bool threadpool::_is_running() const {
  return inited_ && !stop_ && !cancel_;
}
  1. The work done by init can logically only be done once. However, we cannot guarantee that the user code will actually execute as we think it should. Therefore, we use std::call_once to ensure that the work in question is executed only once.
  2. Because it involves modifying the state of the threadpool, we use a write lock here.
  3. The spawn interface is a thread function, that is, a function that runs all the time after the thread is started.
  4. When a thread is woken up (either accidentally or by the notify_* function), if the threadpool is not canceled or terminate and no task is taken out of the task queue, the thread should remain dormant, otherwise it should wake up and continue processing.
  5. If the thread pool is canceled, the current task is not executed; if the thread pool is stopped and no task is removed from the task queue, the current task is not executed either; otherwise, the current task is executed.
  6. the implementation of terminate and cancel is almost identical.
  7. except that terminate modifies the stop_ variable and cancel modifies the cancel_ variable.
  8. In addition, the cancel interface explicitly empties the task queue.

Observer for thread pools

The observer is relatively simple, the only thing worth mentioning here is the use of a read lock.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
inline bool threadpool::inited() const {
  rlock lock(mtx_);
  return inited_;
}

inline bool threadpool::is_running() const {
  rlock lock(mtx_);
  return _is_running();
}

inline int threadpool::size() const {
  rlock lock(mtx_);
  return workers_.size();
}

Task interface

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
template <class F, class... Args>
auto threadpool::async(F&& f, Args&&... args) const
    -> std::future<decltype(f(args...))> {
  using return_t = decltype(f(args...));  // 1.a
  using future_t = std::future<return_t>;  // 1.b
  using task_t = std::packaged_task<return_t()>;  // 1.c

  {
    rlock lock(mtx_);  // 2.
    if (stop_ || cancel_)
      throw std::runtime_error(
          "Delegating task to a threadpool "
          "that has been terminated or canceled.");
  }

  auto bind_func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);  // 3.
  std::shared_ptr<task_t> task = std::make_shared<task_t>(std::move(bind_func));  // 4.a
  future_t fut = task->get_future();  // 4.b
  tasks_.emplace([task]() -> void { (*task)(); });  // 5.
  cond_.notify_one();  // 6.
  return fut;  // 4.c
}
  1. the three types defined using using.
  2. There is no modification of the thread pool state involved here, so it is sufficient to read the lock. Obviously, we are forbidden to continue posting tasks to a thread pool that is already terminate or cancel.
  3. Since the task queue only receives callable objects from std::function<void()>, here we use std::bind to match the argument list first.
  4. Here we use std::packed_task to associate the task to be executed with a std::future and return std::future to the outside world so that the task publisher can get the result of the task execution in the future.
  5. Here we use a lambda that both executes the task and wipes out the return value (but is managed by future) to match std::function<void()>.
  6. Here we wake up the worker thread via a condition variable.

Full implementation

The full implementation can be found at Liam0205/toy-threadpool, which includes unit tests and performance comparisons compared to std::async

Finally

Here we’ve implemented a thread pool that you can look at and use. But as the GitHub repo name implies, it’s still just a toy. There are a number of optimizations you can make to use it in your project. For example.

  • optimizing the thread-safe queue, using finer-grained locks (which are already in the full implementation), or switching to a lock-free implementation.
  • A complete thread pool that supports the states mentioned in this article, but also has the ability to pause, expand (automatically expand when there are too many tasks), and shrink (automatically shrink when there are too many idle threads).

All these elements can continue to be dug deeper and optimized.