How to implement a thread pool

Thread pooling: a thread usage pattern. Too many threads introduce scheduling overhead, which in turn affects cache locality and overall performance. Instead, a thread pool maintains multiple threads waiting for a supervising manager to assign tasks that can be executed concurrently. This avoids the cost of creating and destroying threads while processing short time tasks. Thread pools not only ensure full utilization of the kernel, but also prevent over-scheduling. The number of available threads should depend on the number of available concurrent processors, processor cores, memory, network sockets, etc. For example, for computationally intensive tasks, the number of threads is generally taken as the number of cpu’s +2. Too many threads can lead to additional thread switching overhead.

How to define the thread pool Pool? First of all, the maximum number of threads should definitely be used as a property of the thread pool, and the specified threads should be created when the Pool is new.

Thread pool Pool

1
2
3
4
5
6
7
8
pub struct Pool {
  max_workers: usize, // 定义最大线程数
}

impl Pool {
  fn new(max_workers: usize) -> Pool {}
  fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
}

Using execute to execute the task, F: FnOnce() + 'static + Send is the trait that needs to be satisfied to execute the thread using thread::spawn, which means that F is a closure function that can be executed in the thread.

Another natural thing to think about is adding an array of threads to the Pool, which is used to execute tasks. For example, Vec<Thread> balabala, where a thread is a living entity that is constantly receiving tasks and executing them. You can think of it as a Worker in a thread that constantly performs tasks and executes them.

1
2
3
4
struct Worker where
{
    _id: usize, // worker 编号
}

How to send tasks to workers? mpsc(multi producer single consumer) can meet our needs, let (tx, rx) = mpsc::channel() can get a pair of senders and receivers. The Pool sends tasks to multiple workers for consumption through the channel.

The receiver side of the channel needs to be safely shared between multiple threads, so it needs to be wrapped in Arc<Mutex::<T>>, i.e., a lock to resolve concurrency conflicts.

The full definition of Pool

1
2
3
4
5
pub struct Pool {
    workers: Vec<Worker>,
    max_workers: usize,
    sender: mpsc::Sender<Message>
}

It’s time to define the Message we want to send to the Worker.

Define the following enumeration values

1
2
3
4
5
type Job = Box<dyn FnOnce() + 'static + Send>;
enum Message {
    ByeBye,
    NewJob(Job),
}

Job is a closure function to be sent to the Worker for execution. Here ByeBye is used to notify the Worker that it can terminate the current execution and exit the thread.

Only the specific logic for implementing Worker and Pool remains.

Worker’s implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
impl Worker
{
    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let t = thread::spawn( move || {
            loop {
                let receiver = receiver.lock().unwrap();
                let message=  receiver.recv().unwrap();
                match message {
                    Message::NewJob(job) => {
                        println!("do job from worker[{}]", id);
                        job();
                    },
                    Message::ByeBye => {
                        println!("ByeBye from worker[{}]", id);
                        break
                    },
                }  
            }
        });

        Worker {
            _id: id,
            t: Some(t),
        }
    }
}

let message = receiver.lock().unwrap().recv().unwrap(); Here get the lock and then get the message body from receiver, then let message end the life cycle of rust will automatically release the lock.

But if written as

1
2
while let message = receiver.lock().unwrap().recv().unwrap() {
};

The entire bracket after the while let is a scope, and the lock will be released only after this scope ends, which is longer than the let message above. The rust mutex lock does not have a corresponding unlock method, and is managed by the life cycle of the mutex.

We implement the Drop trait for Pool to automatically suspend the execution of the worker thread when the Pool is destroyed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
impl Drop for Pool {
    fn drop(&mut self) {
        for _ in 0..self.max_workers {
            self.sender.send(Message::ByeBye).unwrap();
        }
        for w in self.workers.iter_mut() {
            if let Some(t) = w.t.take() {
                t.join().unwrap();
            }
        }
    }
}

The drop method uses two loops inside , instead of doing both things in one loop?

1
2
3
4
5
6
for w in self.workers.iter_mut() {
    if let Some(t) = w.t.take() {
        self.sender.send(Message::ByeBye).unwrap();
        t.join().unwrap();
    }
}

This hides a trap that can cause deadlocks, such as two workers, iterating through all workers in a single loop, and then calling join directly after sending the termination message to the channel. We expect the first worker to receive the message and wait for him to finish execution. When the second worker gets the message and the first worker does not, the next join will block and cause a deadlock.

Notice that the worker is wrapped in an Option and there are two points to note here

  1. t.join needs to take ownership of t.
  2. in our case, self.workers can only be iterated over by the for loop as a reference.

Here consider letting Worker hold Option<JoinHandle<()>> and subsequently move the value of the Some variant out by calling the take method on the Option and leaving the None variant in its original position. In other words, let the running worker hold the Some variant, and when you clean up the worker, you can use None to replace Some, thus leaving the worker without a thread to run on.

1
2
3
4
5
struct Worker where
{
    _id: usize,
    t: Option<JoinHandle<()>>,
}

Summary of key points

  • Mutex relies on lifecycle management for lock release, so you need to pay attention to whether you are overdue for a lock when using it.
  • Vec<Option<T>> can solve the scenario where T ownership is needed in some cases.

Full Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std:🧵:{self, JoinHandle};
use std::sync::{Arc, mpsc, Mutex};


type Job = Box<dyn FnOnce() + 'static + Send>;
enum Message {
    ByeBye,
    NewJob(Job),
}

struct Worker where
{
    _id: usize,
    t: Option<JoinHandle<()>>,
}

impl Worker
{
    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let t = thread::spawn( move || {
            loop {
                let message = receiver.lock().unwrap().recv().unwrap();
                match message {
                    Message::NewJob(job) => {
                        println!("do job from worker[{}]", id);
                        job();
                    },
                    Message::ByeBye => {
                        println!("ByeBye from worker[{}]", id);
                        break
                    },
                }  
            }
        });

        Worker {
            _id: id,
            t: Some(t),
        }
    }
}

pub struct Pool {
    workers: Vec<Worker>,
    max_workers: usize,
    sender: mpsc::Sender<Message>
}

impl Pool where {
    pub fn new(max_workers: usize) -> Pool {
        if max_workers == 0 {
            panic!("max_workers must be greater than zero!")
        }
        let (tx, rx) = mpsc::channel();

        let mut workers = Vec::with_capacity(max_workers);
        let receiver = Arc::new(Mutex::new(rx));
        for i in 0..max_workers {
            workers.push(Worker::new(i, Arc::clone(&receiver)));
        }

        Pool { workers: workers, max_workers: max_workers, sender: tx }
    }
    
    pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
    {

        let job = Message::NewJob(Box::new(f));
        self.sender.send(job).unwrap();
    }
}

impl Drop for Pool {
    fn drop(&mut self) {
        for _ in 0..self.max_workers {
            self.sender.send(Message::ByeBye).unwrap();
        }
        for w in self.workers {
            if let Some(t) = w.t.take() {
                t.join().unwrap();
            }
        }
    }
}


#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn it_works() {
        let p = Pool::new(4);
        p.execute(|| println!("do new job1"));
        p.execute(|| println!("do new job2"));
        p.execute(|| println!("do new job3"));
        p.execute(|| println!("do new job4"));
    }
}