Today I would like to introduce the C++ threaded high-level APIs: std::promise, std::future, std::packaged_task and std::async. The content of this article can be condensed into the following diagram.

cpp threading

where std::promise and std::future are synchronisation channels between threads. The std::packed_task class template is an adapter for a function or a function object. It wraps the return value of a function in std::future, allowing us to easily execute any function with std::thread. The std::async function is equivalent to the sum of std::packaged_task and std::thread.

I will then introduce each API in turn.

  1. std::promise and std::future
  2. std::packaged_task
  3. std::async

std::promise and std::future

The std::promise<T> and std::future<T> class templates are defined in the <future> header file. Together they form a synchronisation channel. Where std::promise is the sender and std::future is the receiver. The specific usage is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#include <future>
#include <iostream>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  p.set_value(42);
  std::cout << f.get() << std::endl;
}

In the above example, we first create a std::promise<int> object. Where the int pattern argument means that this synchronous channel will pass an int object. Next, we call p.get_future() to get the receiver. We then pass 42 with p.set_value(42) and get the object with f.get().

Next, let’s add a thread.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#include <future>
#include <iostream>
#include <thread>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  std::thread t(
      [](std::promise<int> p) {
        p.set_value(42);
      },
      std::move(p));

  std::cout << f.get() << std::endl;
  t.join();
}

In this example, we hand over std::promise<int> to another thread, which passes the int object, and the main thread reads the int object with f.get(). If the master thread executes to f.get() first, the master thread will wait for the other thread until p.set_value(42) is finished.

Also, this sync channel can only be used once. If we call p.set_value(...) or f.set_value(...)' multiple times or f.get() they will throw std::future_error exceptions.

Strictly speaking, calling std::future<T>::get() or std::promise<T>::set_value() multiple times is an Undefined Behavior. However, the C++ standard encourages C++ authors to throw in std::future_error as an exception.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <future>
#include <iostream>
#include <thread>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  std::thread t(
      [](std::promise<int> p) {
        p.set_value(42);

        try {
          p.set_value(43);  // set the second value
        } catch (std::future_error &e) {
          std::cerr << "caught: " << e.what() << std::endl;
        }
      },
      std::move(p));

  std::cout << f.get() << std::endl;

  try {
    std::cout << f.get() << std::endl;  // get the second value
  } catch (std::future_error &e) {
    std::cerr << "caught: " << e.what() << std::endl;
  }

  t.join();
}

wait member function

On the receiver side we can split the receiver ‘wait’ and ‘read’ into two steps. std::future<T> has three member functions.

  • void wait(): waits until the object is ready to be read.
  • future_status wait_for(const std::chrono::duration<... > &) : Wait until the object is ready to be read or until all the wait time has been used.
  • future_status wait_until(const std::chrono::time_point<... > &) : Wait until the object can be read or the deadline is reached.

The return values for the last two functions can be.

  • future_status::deferred: This std::future object corresponds to the sending end of an inert evaluation (see the std::async paragraph).
  • future_status::ready: The object is ready to be read.
  • future_status::timeout : Waiting for timeout.

For example, if another thread takes a while to produce a return value, and we want the main thread to print something while waiting, we can use the wait_for member function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <future>
#include <iostream>
#include <thread>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  std::thread t(
      [](std::promise<int> p) {
        std::this_thread::sleep_for(std::chrono::seconds(5));
        p.set_value(42);
      },
      std::move(p));

  for (int i = 0; ; ++i) {
    std::cout << "waiting attempt " << i << " ..." << std::endl;
    std::future_status status = f.wait_for(std::chrono::seconds(1));
    if (status != std::future_status::timeout) {
      break;
    }
  }
  std::cout << f.get() << std::endl;

  t.join();
}

Another situation is where we just want to synchronise the point in time when the execution starts with std::promise and std::future, and we don’t really want to send an object. In this case we can use the wait member function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <numeric>
#include <future>
#include <iostream>
#include <thread>
#include <vector>

int main() {
  std::vector<long long int> vec;

  std::promise<void> p;

  std::thread t(
      [&vec](std::future<void> f) {
        std::cout << "thread: started\n" << std::flush;
        f.wait();
        std::cout << "thread: start computation\n" << std::flush;
        long long int sum = std::accumulate(vec.begin(), vec.end(), 0LL);
        std::cout << "thread: end computation\n" << std::flush;
        std::cout << "sum=" << sum << std::endl;
      },
      p.get_future());

  // Initialize the data
  for (long long int i = 0; i < 1000000; ++i) {
    vec.push_back(i);
  }

  std::cout << "main: notify thread\n" << std::flush;
  p.set_value();

  t.join();
}

Exception handling

We can also use std::promise to send an Exception. If the sender wants to send an “exception” to the receiver, we can call std::promise<T>::set_exception. When the receiver calls std::future<T>::get(), the get() function will again throw the exception.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <future>
#include <iostream>
#include <thread>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  std::thread t(
      [](std::promise<int> p) {
        try {
          throw std::runtime_error("some exception");
        } catch (...) {
          p.set_exception(std::current_exception());
        }
      },
      std::move(p));

  try {
    std::cout << f.get() << std::endl;
  } catch (std::runtime_error &exp) {
    std::cout << "main thread: caught: " << exp.what() << std::endl;
  }

  t.join();
}

It is worth noting that p.set_exception(...) has the argument type std::exception_ptr, so we cannot pass the std::runtime_error object instance directly. We must first throw the exception with a throw statement, then get std::exception_ptr with std::current_exception() in the catch clause and call set_exception to send the exception.

std::shared_future

Although the purpose of std::future and std::promise is to pass objects between threads. However, std::future objects cannot be manipulated by multiple threads at the same time. For example, in the following example, the threads t1 and t2 call f.get() at the same time. However, because the get() member function itself is not entirely Thread-safe, the following code will have undefined behaviour (and generate a Segmentation Fault on my machine).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <chrono>
#include <future>
#include <iostream>
#include <sstream>
#include <thread>

int main() {
  std::promise<std::unique_ptr<int>> p;
  std::future<std::unique_ptr<int>> f = p.get_future();

  std::thread t1(
      [&f]() {
        std::cout << "t1: waiting\n" << std::flush;
        int value = *f.get();  // Race condition

        std::ostringstream ss;
        ss << "t1: " << value << "\n";
        std::cout << ss.str() << std::flush;
      });
  std::thread t2(
      [&f]() {
        std::cout << "t2: waiting\n" << std::flush;
        int value = *f.get();  // Race condition

        std::ostringstream ss;
        ss << "t2: " << value << "\n";
        std::cout << ss.str() << std::flush;
      });

  std::this_thread::sleep_for(std::chrono::seconds(1));
  p.set_value(std::make_unique<int>(42));

  t1.join();
  t2.join();

  return 0;
}

The solution is also very simple. If we want two (or more) threads to be receivers at the same time, we should first call the share() member function of std::future. This will convert the std::future object to a std::shared_future object. We then copy the std::shared_future object so that t1 and t2 threads each have a copy.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <chrono>
#include <future>
#include <iostream>
#include <sstream>
#include <thread>

int main() {
  std::promise<std::unique_ptr<int>> p;
  std::future<std::unique_ptr<int>> f = p.get_future();
  std::shared_future<std::unique_ptr<int>> sf = f.share();  // Added

  std::thread t1(
      [sf]() {  // Copy sf by value
        std::cout << "t1: waiting\n" << std::flush;
        int value = *sf.get();

        std::ostringstream ss;
        ss << "t1: " << value << "\n";
        std::cout << ss.str() << std::flush;
      });
  std::thread t2(
      [sf]() {  // Copy sf by value
        std::cout << "t2: waiting\n" << std::flush;
        int value = *sf.get();

        std::ostringstream ss;
        ss << "t2: " << value << "\n";
        std::cout << ss.str() << std::flush;
      });

  std::this_thread::sleep_for(std::chrono::seconds(1));
  p.set_value(std::make_unique<int>(42));

  t1.join();
  t2.join();

  return 0;
}

std::packaged_task

The std::packed_task class template is also defined in the <future> header file. Its purpose is to act as an adapter between a ‘function’ or ‘function object’ and std::thread. In general, before considering multi-threaded execution, we would define a function as

1
ReturnType Function(ArgType1 arg1, ArgType2 arg2, ..., ArgTypeN argn)

However, if the above function is used as the first argument of the std::thread construct, the return value of the function will be ignored by std::thread. In addition, if an exception is thrown to the above function, std::thread will simply terminate the entire program. To resolve the interface gap, the C++ standard library defines a template for the std::packaged_task class. It defines a get_future member function to return a std::future object that can receive a return value. It also defines an operator() member function to call the original function.

A simplified implementation of std::packaged_task is as follows (for reference only, the actual implementation is more complex).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <exception>
#include <functional>
#include <future>

template <typename Func>
class my_packaged_task;

template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
private:
  std::promise<Ret> promise_;
  std::function<Ret(Args...)> func_;

public:
  my_packaged_task(std::function<Ret(Args...)> func)
      : func_(std::move(func)) {}

  void operator()(Args&&... args) {
    try {
      promise_.set_value(func_(std::forward<Args&&>(args)...));
    } catch (...) {
      promise_.set_exception(std::current_exception());
    }
  }

  std::future<Ret> get_future() {
    return promise_.get_future();
  }
};

The following is how std::packaged_task is used.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#include <future>
#include <iostream>

int compute(int a, int b) {
  return 42 + a + b;
}

int main() {
  std::packaged_task<int(int, int)> task(compute);
  std::future<int> f = task.get_future();
  task(3, 4);
  std::cout << f.get() << std::endl;

  return 0;
}

Without changing the compute function, we wrap the compute function in std::packaged_task. After calling task(3, 4), the return value can be obtained by calling f.get().

We can then add the thread.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#include <future>
#include <iostream>
#include <thread>

int compute(int a, int b) {
  return 42 + a + b;
}

int main() {
  std::packaged_task<int(int, int)> task(compute);
  std::future<int> f = task.get_future();

  std::thread t(std::move(task), 3, 4);  // Added
  t.detach();                            // Added

  std::cout << f.get() << std::endl;
  return 0;
}

The above code replaces the original task(3, 4) with the code that creates the thread. Since std::packaged_task is a non-copyable class, we must transfer the std::packaged_task object to the std::thread construct with std::move(task). Next, we call t.detach() to avoid std::thread’s destructor calling std::terminate. On the other hand, the construct of std::thread will call std::packaged_task::operator() and execute the compute function for us at another thread. When compute finishes, the main thread can receive the returned value via f.get().

std::async

Finally, I would like to introduce the std::async function sample. When using std::async, we must pass in a function or function object with the arguments needed to call the function. std::async will call the function we pass in at a certain point in time. The caller of std::async can read the return value of the incoming function via the std::async returned std::future<T> object.

There are two different execution strategies for std::async.

  1. std::launch::async: creates an execution thread, executes the specified work and returns a std::future<T> object.
  2. std::launch::deferred: Returns a std::future<T> object directly and defers the specified work to the call point of std::future<T>::get().

For example, the previous example of std::packed_task could also be rewritten as

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#include <future>
#include <iostream>

int compute(int a, int b) {
  return 42 + a + b;
}

int main() {
  std::future<int> f = std::async(std::launch::async, compute, 3, 4);
  // ...
  std::cout << f.get() << std::endl;
  return 0;
}

The above code’s std::async creates a thread to execute compute(3, 4). The main thread gets the return value of compute(3, 4) from f.get().

A simple std::async(std::launch::async, ...) works as follows (for reference only, the actual implementation is more complex).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#include <cassert>
#include <future>
#include <thread>
#include <type_traits>

template <typename Func, typename... Args>
std::future<typename std::result_of<Func(Args...)>::type>
my_async(std::launch policy, Func&& func, Args&&... args) {
  assert(policy == std::launch::async && "only async is supported");

  using Result = typename std::result_of<Func(Args...)>::type;
  std::packaged_task<Result(Args...)> task(func);
  std::future<Result> future = task.get_future();
  std::thread t(std::move(task), args...);
  t.detach();
  return future;
}

On the other hand, the std::launch::deferred execution policy does not create a new execution thread. It works by maintaining an additional state inside the std::future<T> object. When the user calls std::future<T>::get, the get member function will execute the incoming function. If no one calls std::future<T>::get, std::async(std::launch::deferred, ...) will not execute the incoming function.

For example, in std::launch::deferred mode, the following code must print the first line before the second line. If you delete the f.get() line, the whole program must not print the second line.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#include <chrono>
#include <future>
#include <iostream>
#include <thread>

int compute(int a, int b) {
  std::cout << "this must be the second line\n" << std::flush;
  return 42 + a + b;
}

int main() {
  std::future<int> f = std::async(std::launch::deferred, compute, 3, 4);
  std::this_thread::sleep_for(std::chrono::seconds(1));
  std::cout << "this must be the first line\n" << std::flush;
  std::cout << f.get() << std::endl;
  return 0;
}