A colleague asked how Rust aysnc/await is implemented, stopping at the await place and then resuming the execution (of the current thread/coroutine) when it continues, also using something like yield/generator?

I’ve tried it briefly, and I guess it’s probably like this.

The following code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11

async fn say_world() {
    println!("hello world");
}

#[tokio::main]
async fn main() {
    let op = say_world();

    op.await;
}

Using nightly’s rustc “compile”.

1
Using nightly's `rustc` "compile".

The following is the output, which shows only the code after processing related to the main() function (with modified formatting).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(#[lang = "from_generator"](|mut _task_context|
{
    let op = say_world();
    match op
        {
        mut pinned => loop {
            match unsafe
                  {
                      #[lang = "poll"](#[lang = "new_unchecked"](&mut pinned),
                                       #[lang = "get_context"](_task_context))
                  }
                {
                    #[lang = "Ready"] {
                    0: result
                    } =>
                    break result,
                    #[lang = "Pending"] { } =>
                    {
                    }
                }

            _task_context = (yield());
        },
    };
}))

If it’s ready, then it processes the result and exits; otherwise (the Pending state) it continues to wait and lets the runtime dispatch other tasks.

Future is “a” task in tokio (future.await, to be exact?), and the tokio runtime is responsible for scheduling the task, which is somewhat like a goroutine, but Rust itself does not come with a runtime implementation.

According to the description of the await! macro here.

1
2
3
4
5
6
7
8
let mut future = IntoFuture::into_future($expression);
let mut pin = unsafe { Pin::new_unchecked(&mut future) };
loop {
    match Future::poll(Pin::borrow(&mut pin), &mut ctx) {
          Poll::Ready(item) => break item,
          Poll::Pending     => yield,
    }
}

and here

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#[async]
fn print_lines() -> io::Result<()> {
    let addr = "127.0.0.1:8080".parse().unwrap();
    let tcp = await!(TcpStream::connect(&addr))?;
    let io = BufReader::new(tcp);

    #[async]
    for line in io.lines() {
        println!("{}", line);
    }

    Ok(())
}

The above code, after “translation”, would look like this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fn print_lines() -> impl Future<Item = (), Error = io::Error> {
    CoroutineToFuture(|| {
        let addr = "127.0.0.1:8080".parse().unwrap();
        let tcp = {
            let mut future = TcpStream::connect(&addr);
            loop {
                match future.poll() {
                    Ok(Async::Ready(e)) => break Ok(e),
                    Ok(Async::NotReady) => yield,
                    Err(e) => break Err(e),
                }
            }
        }?;

        let io = BufReader::new(tcp);

        let mut stream = io.lines();
        loop {
            let line = {
                match stream.poll()? {
                    Async::Ready(Some(e)) => e,
                    Async::Ready(None) => break,
                    Async::NotReady => {
                        yield;
                        continue
                    }
                }
            };
            println!("{}", line);
        }

        Ok(())
    })
}

Note: The above code poll result and NotReady, should be RFC update is not timely, the latest version of the Future should be Pendding.

From the above two descriptions, we can also roughly understand this generator mechanism: Ready when the results are returned, Pending when the dispatch is given.

Today, I just searched the information in general and threw out such a question. The next step is to confirm the implementation of tokio and see how it works.