Recently, I encountered some bugs in my project: the receiver of tokio channel was dropped for some reason, resulting in a send error.

After debugging, I found that it was actually caused by hyper’s cancel propagation. The following changes to examples/web_api.rs in the hyper source code can be reproduced

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
diff --git a/Cargo.toml b/Cargo.toml
index 862c20f9..694b8855 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -73,7 +73,7 @@ pnet_datalink = "0.27.2"
 
 [features]
 # Nothing by default
-default = []
+default = ["full"]
 
 # Easily turn it all on
 full = [
diff --git a/examples/web_api.rs b/examples/web_api.rs
index 5226249b..6de7f682 100644
--- a/examples/web_api.rs
+++ b/examples/web_api.rs
@@ -56,6 +56,12 @@ async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> {
 
 async fn api_get_response() -> Result<Response<Body>> {
     let data = vec!["foo", "bar"];
+    let (tx, rx) = tokio::sync::oneshot::channel::<()>();
+    tokio::spawn(async move {
+        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+        tx.send(()).unwrap();
+    });
+    rx.await.unwrap();
     let res = match serde_json::to_string(&data) {
         Ok(json) => Response::builder()
             .header(header::CONTENT_TYPE, "application/json")

async fn api_get_response is an asynchronous function that hyper processes http requests, spawning a Task inside to do some time consuming operations. We use sleep to simulate that it takes 5 seconds to finish the task, and finally send the processed data/results to async fn api_get_response via chanel. If the client actively closes the connection before the server response, hyper will cancel async fn api_get_response. So rx will be dropped and the subsequent delivery will fail

cancel No propagation poses a problem

A large number of malicious requests that actively disconnect

This leads to the problem that the client is already disconnected without waiting for the response, and the server is still checking the spawn concurrently to check the database. **For example, if someone maliciously sends 100,000 requests that take a long time to process, and the client cancels immediately after the request is sent, the server is still processing the “already disconnected” requests. At this point, if the server is still processing “requests that have been canceled”, it will cause a waste of resources

If the client is disconnected, it is reasonable to let all the associated asynchronous Task/Future processing the request from the root node backward all propagated and canceled.

Otherwise, the client would have been disconnected long ago and the server would still be foolishly continuing to check a bunch of databases, consuming a lot of resources to check out the data ready to return to the client only to find Send Error

systemctl stop timeout

For example, the web server process generally uses the libc::signal callback function to allow the process to receive a signal to shut down and gracefully shutdown the web server

generally receive the signal after cancel to propagate the signal to each process, but some stubborn process live a long time (such as loop sleep and other polling tasks)

However, sending a kill signal to stop the process from completing the deployment update is not good, because the callback function of libc::signal cannot and will not handle the SIGKILL signal and cannot perform some custom resource recovery operations

(this problem golang also has go routetine leak, go can only spawn out plus a down channel/ctx notification mechanism, passed down through the parent task)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
Dec 18 10:39:21 ww systemd[715]: Stopping graph...
Dec 18 10:39:21 ww atlasd[1518986]: 2021-12-18 10:39:21.588323  INFO atlasd: Signal SIGTERM received, stopping this daemon server
Dec 18 10:39:21 ww atlasd[1518986]: 2021-12-18 10:39:21.588408  INFO server::graph: Prepare to stop graph server
Dec 18 10:39:21 ww atlasd[1518986]: 2021-12-18 10:39:21.588744  INFO start_prometheus_exporter{ip=0.0.0.0 port=19100 instance_kind=Graph}:prometheus_exporter(accept): common::metrics::prome>
Dec 18 10:39:21 ww atlasd[1518986]: 2021-12-18 10:39:21.588830  INFO web::server: graceful shutdown web server
Dec 18 10:40:51 ww systemd[715]: graph.service: State 'stop-sigterm' timed out. Killing.
Dec 18 10:40:51 ww systemd[715]: graph.service: Killing process 1518986 (atlasd) with signal SIGKILL.
Dec 18 10:40:51 ww systemd[715]: graph.service: Killing process 1518988 (tokio-runtime-w) with signal SIGKILL.
Dec 18 10:40:51 ww systemd[715]: graph.service: Killing process 1518989 (tokio-runtime-w) with signal SIGKILL.
Dec 18 10:40:51 ww systemd[715]: graph.service: Killing process 1518993 (tokio-runtime-w) with signal SIGKILL.
Dec 18 10:40:51 ww systemd[715]: graph.service: Killing process 1519000 (tokio-runtime-w) with signal SIGKILL.
Dec 18 10:40:51 ww systemd[715]: graph.service: Killing process 1519002 (tokio-runtime-w) with signal SIGKILL.
Dec 18 10:40:51 ww systemd[715]: graph.service: Killing process 1519007 (tokio-runtime-w) with signal SIGKILL.
Dec 18 10:40:51 ww systemd[715]: graph.service: Main process exited, code=killed, status=9/KILL
Dec 18 10:40:51 ww systemd[715]: graph.service: Failed with result 'timeout'.
Dec 18 10:40:51 ww systemd[715]: Stopped graph.

Future pruning?

async fn api_get_response can be abstracted as the root node of Future, and the Future spawned inside can be abstracted as a son node

I thought that the root Future of an rpc request handler would be choked out by hyper due to client disconnection, along with all the leaf Future nodes of that Future, but actually tokio doesn’t have such an API or design

What I would like is a constraint like scoped_thread (child threads live shorter) so that the Future spawned by async fn api_get_response must live shorter, which would reduce the mental burden of worrying about the child Future holding the parent’s resources after the parent is dropped.

Sounds a bit like a scoped future

How to get hyper’s cancel to propagate

1
2
3
4
5
6
7
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let task_handle = tokio::spawn(async move {
    dbg!("task start");
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
    dbg!("task finished, prepare send back to parent future Node");
    tx.send(()).unwrap();
});

The line tx.send(()).unwrap(); will panic when the client disconnects early.

In order to make tokio::spawn’s task abort early when the request is cancelled, more boilerplate code needs to be introduced

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct ReqHandlerContext {
    task_handle: tokio::task::JoinHandle<()>,
    // rx: tokio::sync::oneshot::Receiver<()>
}
impl Drop for ReqHandlerContext {
    fn drop(&mut self) {
        dbg!("fn api_get_response() ReqHandlerContext in drop");
        self.task_handle.abort();
    }
}
let ctx = ReqHandlerContext {
    task_handle,
};

When the client disconnects early, hyper’s log is as follows

1
2
3
Listening on http://127.0.0.1:1337
[examples/web_api.rs:60] "task start" = "task start"
[examples/web_api.rs:72] "fn api_get_response() ReqHandlerContext in drop" = "fn api_get_response() ReqHandlerContext in drop"

The line dbg!("task finished was not executed before the spawn was cancelled, as expected

You must ask your superiors before writing a spawn

In order to avoid bugs caused by improper handling of tokio::spawn, the use of spawn must be strictly limited by code review.

Our company requires that before introducing a new tokio::spawn you must request it from your supervisor, and include the reasons why you want to use spawn. and include in the comments a detailed explanation of why the spawn is being used and how long the spawn will live, and what external resources are being introduced.

and ask the JoinHandle that explicitly stores the spawn to call handle.abort(), which is usually active in the Drop

spawn is not a silver bullet, if only to not block the execution of the code in fact there are other ways to write instead, you must think carefully before using spawn

tokio Some thoughts on issues

tokio::task::JoinHandle

The documentation says that A JoinHandle detaches the associated task when it is dropped

Personally, I don’t think it’s as convenient as glommio’s detach or libpthead.so’s pthread_detach to let the caller decide whether to detach or not.

pin Future to CPU core

tokio lacks a concept like glommio pin to core, similar to libpthread.so’s pthread_setaffinity_np? It seems tokio doesn’t have it either.

This is also a limitation of tokio to be cross-platform. Some APIs are available on Linux, but maybe tokio doesn’t provide them because windows/mac doesn’t have them.

Can tokio fix a certain Future on a certain CPU core to avoid the context switching overhead of executing across multiple cores? But this seems to conflict with tokio’s desire to load-balance all cores with work stealing, as tokio does not want other cores to be idle.

Single-threaded runtime is faster in some cases

In our project benchmark found that some modules perform better with tokio’s single-threaded runtime, so you can’t be convinced that multi-threaded is necessarily better than single-threaded, because the CPU has a lot of overhead to synchronize data before multiple cores, you need to analyze on a case-by-case basis whether to use single-threaded or multi-threaded runtime

(But the Future signature of tokio::spawn is the same for both single-threaded and multi-threaded… Single-threaded does not have one less Send constraint…)

tokio’s discussion of cancel on drop

https://github.com/tokio-rs/tokio/issues/1830

Finally, I dare to imagine if we can design the runtime from the perspective of Linux only, without considering compatibility with mac/windows, so that we can use many Linux only APIs to fully drain Linux performance, and may eventually find that Rust’s own Future is not quite enough or not good enough, or even create a runtime dedicated Future (just like C++ many libraries each make their own Future not universal)