A coroutine, also known as a micro-thread, is a lightweight thread. coroutines can be understood as processes that collaborate with the caller to produce values provided by the caller. The advantage over threads is that context switching is cheaper and under the control of the user.

History of development

Coroutines in Python have gone through three main phases. coroutines were first implemented in Python 2.5, morphed from generators and implemented with keywords like yield/send; yield from was introduced, allowing complex generators to be refactored into small nested generators; Python 3.5 introduced the async/await syntactic sugar was introduced in Python 3.5.

Since yield from has been removed from python’s syntax, this article focuses on how the yield/send and async/await keywords implement coroutine.

yield / send

coroutine in action

Using the yield keyword in the generator, the caller of the generator sends data using the .send(value) method, and that data value becomes the value of the yield expression in the generator function. In other words, yield is a pause in the generator, pausing at yield on the first call to return the value to the right of yield; the next time the data is sent in, it becomes the value of the yield expression. As an example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def count_num():
    r = 0
    print("Started.")
    while True:
        x = yield r
        print("Received x: {}".format(x))
        r = r + 1


if __name__ == "__main__":
    coroutine = count_num()
    next(coroutine)

    for i in "hello":
        t = coroutine.send(i)
        print("Coroutine times: {}".format(t))
    coroutine.close()

The results of the run are as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Started.
Received x: h
Coroutine times: 1
Received x: e
Coroutine times: 2
Received x: l
Coroutine times: 3
Received x: l
Coroutine times: 4
Received x: o
Coroutine times: 5

This shows that the value of the local variable r does not change as the coroutine is paused, and it can be seen that the local variables in the coroutine remain in a context. This is one of the advantages of using coroutine, no need to use properties of class objects or closures to stay in context during multiple calls.

Also note that next(coroutine) means that the coroutine is called first to make it run to yield for the first pause, which puts the coroutine in a suspended state. After that, the coroutine will take effect when it sends again, which is called “pre-excitation”.

There are 4 states of the coroutine, namely

  • GEN_CREATED: Waiting to start execution state
  • GEN_RUNNING : interpreter is executing
  • GEN_SUSPENDED: paused at the yield expression
  • GEN_CLOSED : execution finished

In addition to the next() method, the coroutine pre-excitation can also use the .send(None) method, which has the same effect. If you comment out the pre-excited code in the above example, you will get an error when you run it.

1
2
3
4
Traceback (most recent call last):
  File "test.py", line 15, in <module>
    t = coroutine.send(i)
TypeError: can't send non-None value to a just-started generator

The error stack makes it clear: You may not send a value that is not None while the generator is still in the start state.

Coroutine Exception Handling

If an unhandled exception occurs in the coroutine, it is passed up to the caller of next or send, and the coroutine stops. Most of the time we need the coroutine to not exit internally when an exception occurs, so the usual way to handle this is the throw method.

throw allows the coroutine to throw a specified exception without affecting its flow, and the coroutine still pauses at yield. To add exception handling to the above example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Error(Exception):
    pass


def count_num():
    r = 0
    print("Started.")
    while True:
        try:
            x = yield r
            print("Received x: {}".format(x))
        except Error:
            print("Coroutine error.")
        r = r + 1


if __name__ == "__main__":
    coroutine = count_num()
    next(coroutine)

    n = 0
    for i in "hello":
        n = n + 1
        if n % 2 == 0:
            coroutine.throw(Error)
        else:
            t = coroutine.send(i)
            print("Coroutine times: {}".format(t))
    coroutine.close()

Running results.

1
2
3
4
5
6
7
8
9
Started.
Received x: h
Coroutine times: 1
Coroutine error.
Received x: l
Coroutine times: 3
Coroutine error.
Received x: o
Coroutine times: 5

In addition to using the throw method, coroutine can also use the send method to pass in an illegal value, such as the commonly used None, which is also called a whistle value. Replacing coroutine.throw(Error) in the above code with coroutine.send(None) will have the same effect.

The above code finally calls the close method, which switches the coroutine’s state to GEN_CLOSED. This method works by throwing a GeneratorExit exception at the yield pause, but if the coroutine caller does not handle the exception or throws a StopIteration exception, it does not handle it and switches its state to GEN_CLOSED.

async / await

Starting with python 3.5, Python has added a new coroutine definition method async def. In short, async defines a coroutine and await is used to hang the blocking asynchronous call interface; the coroutine call method was changed slightly in Python 3.7, so this section is split into two parts for the Python version.

python 3.5 - 3.6

If you read the official documentation for coroutine, you’ll see that coroutine itself is not runable, and that its code can only be run by placing it in an event_loop. So what is an event loop? In the source code it is defined as:

1
2
3
4
5
6
# A TLS for the running event loop, used by _get_running_loop.
class _RunningLoop(threading.local):
    loop_pid = (None, None)


_running_loop = _RunningLoop()

event_loop inherits from threading.local and creates a global ThreadLocal object. The coroutine is then pushed into this loop, and the coroutine is executed only if the loop is running.

The execution of the coroutine.

To execute a coroutine, first wrap the coroutine into a future or task and push it into the event_loop; then execute loop.run_until_complete to run all the coroutines in the loop.

Here future refers to an object that represents an operation executed asynchronously; task refers to a further wrapping of the coroutine, which contains various states of the task, where task is a subclass of future.

There are two ways to do this: asyncio.ensure_future and loop.create_task. But both are essentially the same: wrapping the coroutine in future.

The following two implementations have the same effect.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
async def count_num(num):
    print("count num: {}".format(num))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    future = asyncio.ensure_future(count_num(100))  
    loop.run_until_complete(future)
    loop.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
async def count_num(num):
    print("count num: {}".format(num))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    task = loop.create_task(count_num(100))
    loop.run_until_complete(task)
    loop.close()

Note that it is also possible to execute loop.run_until_complete(coroutine) directly, but here it is actually coroutine that is wrapped into ensure_future first.

Concurrency and blocking in coroutine.

Since coroutines are made to be asynchronous, their asynchronous execution must be the focus. The asyncio call to asyncio.gather() can push multiple coroutines into the same event loop. Look at an example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time


async def count_num(num):
    print("Started coroutine #{} at".format(num), time.strftime('%X'))
    for i in range(num):
        time.sleep(1)
        print("[coroutine #{}] count: {} at".format(num, i), time.strftime('%X'), "...")
    print("Finish coroutine #{} at".format(num), time.strftime('%X'))


if __name__ == "__main__":
    print("Start.")
    loop = asyncio.get_event_loop()

    loop.run_until_complete(asyncio.gather(
        count_num(3),
        count_num(4),
    ))
    loop.close()

    print("Finish at", time.strftime('%X'))

The example calls two coroutines, which count from 0 to the end of the number passed in, and sleep 1s for each count. The result is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Start.
Started coroutine #3 at 11:51:10
[coroutine #3] count: 0 at 11:51:11...
[coroutine #3] count: 1 at 11:51:12...
[coroutine #3] count: 2 at 11:51:13...
Finish coroutine #3 at 11:51:13
Started coroutine #4 at 11:51:13
[coroutine #4] count: 0 at 11:51:14...
[coroutine #4] count: 1 at 11:51:15...
[coroutine #4] count: 2 at 11:51:16...
[coroutine #4] count: 3 at 11:51:17...
Finish coroutine #4 at 11:51:17
Finish at 11:51:17

As you can see from the results, #3 and #4 are executed separately and do not have the concurrent effect we want. This is where the await keyword comes into play. await can hang the coroutine that is blocking and let the event loop through the other coroutines until the other coroutines hang or finish executing. Let’s modify sleep in the above example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time


async def count_num(num):
    print("Started coroutine #{} at".format(num), time.strftime('%X'))
    for i in range(num):
        await asyncio.sleep(1)
        print("[coroutine #{}] count: {} at".format(num, i), time.strftime('%X'), "...")
    print("Finish coroutine #{} at".format(num), time.strftime('%X'))


if __name__ == "__main__":
    print("Start.")
    loop = asyncio.get_event_loop()

    loop.run_until_complete(asyncio.gather(
        count_num(3),
        count_num(4),
    ))
    loop.close()

    print("Finish at", time.strftime('%X'))

Execution results.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Start.
Started coroutine #3 at 11:59:16
Started coroutine #4 at 11:59:16
[coroutine #3] count: 0 at 11:59:17...
[coroutine #4] count: 0 at 11:59:17...
[coroutine #3] count: 1 at 11:59:18...
[coroutine #4] count: 1 at 11:59:18...
[coroutine #3] count: 2 at 11:59:19...
Finish coroutine #3 at 11:59:19
[coroutine #4] count: 2 at 11:59:19...
[coroutine #4] count: 3 at 11:59:20...
Finish coroutine #4 at 11:59:20
Finish at 11:59:20

python3.7

Python 3.7 adds a layer of encapsulation to the execution of coroutines, making this functionality even more accessible. We just need to define the coroutine we need and call .run(); in the case of multiple coroutines, we can make a single entry point, see an example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time


async def count_num(num):
    print("Started coroutine #{} at ".format(num), time.strftime('%X'))
    for i in range(num):
        await asyncio.sleep(1)
        print("[coroutine #{}] count: {} at ".format(num, i),
              time.strftime('%X'), "...")
    print("Finish coroutine #{} at ".format(num), time.strftime('%X'))


async def main():
    await asyncio.gather(
        count_num(3),
        count_num(4),
    )


if __name__ == "__main__":
    print("Start.")
    asyncio.run(main())
    print("Finish at ", time.strftime('%X'))

The result of this code is exactly the same as the example above, and you can see that it is much easier to call because most of the logic (including the event loop) is wrapped up for you in the .run() method. Take a look at the source code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def run(main, *, debug=False):
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()

One thing to note here is that the .run() function cannot be called when there is already an event loop in the same thread, it always creates a new event loop and closes it after all coroutines have been executed.