Asyncio Shield Main Coroutine From Cancellation

May 2, 2024 Python Asyncio

We can simulate shielding the main coroutine from cancellation by using a wrapper coroutine that consumes cancellation requests.

In this tutorial, you will discover how to shield the main coroutine from cancellation.

Let's get started.

Need to Shield The Main Coroutine From Cancellation

Asyncio tasks can be canceled at any time.

The main coroutine is the entry point into the asyncio program.

You can learn more about the main coroutine in the tutorial:

We may want to protect or shield the main coroutine from cancellation.

This may be for many reasons, such as:

In asyncio, shielding the main coroutine from cancellation can be useful in scenarios where you want to ensure that critical operations or cleanup tasks are completed before the coroutine is terminated. Here are some reasons why you might want to shield the main coroutine from cancellation:

  1. Cleanup Tasks: We may have critical cleanup tasks that need to be performed before the coroutine exits, such as releasing resources, closing connections, or saving state. By shielding the main coroutine from cancellation, we ensure that these cleanup tasks are executed regardless of whether the coroutine is canceled or not.
  2. Data Integrity: If the main coroutine is performing operations that could leave data in an inconsistent or corrupted state if abruptly terminated, shielding it from cancellation can help maintain data integrity. This is particularly important in applications where data consistency is crucial, such as databases or file systems.
  3. Graceful Shutdown: Shielding the main coroutine allows us to implement a graceful shutdown mechanism for our application. Instead of abruptly terminating the coroutine and potentially leaving resources in an inconsistent state, we can use the shielding mechanism to ensure that the application shuts down cleanly, completing any necessary cleanup tasks before exiting.

How can we shield the main coroutine from cancellation in asyncio?

How to Shield The Main Coroutine From Cancellation

Asyncio tasks can be shielded.

This can be achieved via the asyncio.shield() function that wraps the target task in a task that absorbs any requests for cancellation.

For example:

...
# create a shield
shield = asyncio.shield(task)

You can learn more about shielding asyncio tasks in the tutorial:

The problem is, the asyncio.shield() function cannot be used to shield the main coroutine.

For example, if we call asyncio.run() to start our asyncio program and attempt to pass it a shielded coroutine, it fails with an exception.

...
# run the asyncio event loop
asyncio.run(asyncio.shield(main()))

This fails because it attempts to convert the main() coroutine into a task before shielding it, and we cannot create a task until the event loop has been started.

One solution is to define a new wrapper coroutine that in turn creates a task for the main() coroutine and shields it. It can then specifically consume any cancellation requests, allowing the true main coroutine to continue running.

For example:

# main coroutine
async def shielded_main():
    try:
        # run the main coroutine as a shielded task
        await asyncio.shield(asyncio.create_task(main()))
    except asyncio.CancelledError:
        # get the current task
        current = asyncio.current_task()
        # wait for all other tasks to be done
        for task in asyncio.all_tasks():
            # ensure the task does not await itself
            if task is not current:
                # await the other task
                await task

This can then be used via asyncio.run().

...
# run the asyncio event loop
asyncio.run(shielded_main())

This protects the main coroutine from a single cancellation request.

A more robust solution may protect the main coroutine from repeated cancellation requests by using a loop.

For example:

# main coroutine
async def shielded_main():
    # schedule the main coroutine as a task
    task = asyncio.shield(asyncio.create_task(main()))
    # get the current task
    current = asyncio.current_task()
    # loop forever
    while True:
        try:
            # get all tasks
            all_tasks = asyncio.all_tasks()
            # remove the current task
            all_tasks.remove(current)
            # check for no other tasks
            if not all_tasks:
                break
            # wait for all other tasks to be done
            for task in all_tasks:
                # await the other task
                await task
        except asyncio.CancelledError:
            pass

This is perhaps overly aggressive.

You can learn more about consuming or suppressing the CancelledError exceptions in the tutorial:

Now that we know how to shield the main coroutine from cancellation, let's look at some worked examples.

Example of Canceling the Main Coroutine

Before we explore how to protect the main coroutine from cancellation, let's explore an example of canceling the main coroutine.

In this example, we will define a task that gets a list of all running tasks and finds and cancels the main coroutine, in this case named "Task-1".

# definition of other task that cancels the main task
async def other_task():
    # get all tasks
    tasks = asyncio.all_tasks()
    # cancel the main task
    for task in tasks:
        # check if it is the main task
        if task.get_name() == 'Task-1':
            # report a message
            print(f'Canceling task: {task.get_coro()}')
            # cancel the task
            result = task.cancel()
            # report success of cancellation
            print(f'Cancellation successful: {result}')
    # suspend a while
    await asyncio.sleep(1)
    # report a final message
    print('Other task done.')

The main coroutine will first create and schedule our other_task() in the event loop, then suspend for a moment to allow the new task to run. It will catch the CancelledError and report a message, to confirm that the main coroutine was canceled.

# main coroutine
async def main():
    try:
        # report a message
        print('Main is running')
        # schedule the other task
        task = asyncio.create_task(other_task())
        # suspend a while
        await asyncio.sleep(2)
        # report a message
        print('Main is done.')
    except asyncio.CancelledError:
        print('Main task cancelled')

Finally, we can start the asyncio event loop and run our main coroutine.

...
# run the asyncio event loop
asyncio.run(main())

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of canceling the main coroutine
import asyncio

# definition of other task that cancels the main task
async def other_task():
    # get all tasks
    tasks = asyncio.all_tasks()
    # cancel the main task
    for task in tasks:
        # check if it is the main task
        if task.get_name() == 'Task-1':
            # report a message
            print(f'Canceling task: {task.get_coro()}')
            # cancel the task
            result = task.cancel()
            # report success of cancellation
            print(f'Cancellation successful: {result}')
    # suspend a while
    await asyncio.sleep(1)
    # report a final message
    print('Other task done.')

# main coroutine
async def main():
    try:
        # report a message
        print('Main is running')
        # schedule the other task
        task = asyncio.create_task(other_task())
        # suspend a while
        await asyncio.sleep(2)
        # report a message
        print('Main is done.')
    except asyncio.CancelledError:
        print('Main task cancelled')

# run the asyncio event loop
asyncio.run(main())

Running the example first starts the asyncio event loop and runs our main coroutine.

The main coroutine runs, reports a message, and then creates and schedules a new task to run our other_task() task before suspending with a sleep.

The other_task() task runs and gets a list of all running tasks. It then searches the list by task name for the main task and cancels it, reporting the details of the main task and that the cancellation request was successful.

The main task resumes and catches the raised CancelledError, confirming that it was canceled.

This highlights how the main task may be canceled.

Main is running
Canceling task: <coroutine object main at 0x10e2fc040>
Cancellation successful: True
Main task cancelled

Next, let's look at a naive approach to shielding the main coroutine.

Example Cannot use asyncio.shield() For The Main Coroutine

We can explore a naive case of protecting the main coroutine from cancellation.

In this case, we can update the above example to wrap the main() coroutine in a call to asyncio.shield().

...
# run the asyncio event loop
asyncio.run(asyncio.shield(main()))

The hope is that the main coroutine would then be shielded from direct cancellation.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of shielding the main coroutine from cancellation
import asyncio

# definition of other task that cancels the main task
async def other_task():
    # get all tasks
    tasks = asyncio.all_tasks()
    # cancel the main task
    for task in tasks:
        # check if it is the main task
        if task.get_name() == 'Task-1':
            # report a message
            print(f'Canceling task: {task.get_coro()}')
            # cancel the task
            result = task.cancel()
            # report success of cancellation
            print(f'Cancellation successful: {result}')
    # suspend a while
    await asyncio.sleep(1)
    # report a final message
    print('Other task done.')

# main coroutine
async def main():
    try:
        # report a message
        print('Main is running')
        # schedule the other task
        task = asyncio.create_task(other_task())
        # suspend a while
        await asyncio.sleep(2)
        # report a message
        print('Main is done.')
    except asyncio.CancelledError:
        print('Main task cancelled')

# run the asyncio event loop
asyncio.run(asyncio.shield(main()))

Running the example fails with an exception.

The program fails before the asyncio event loop can even run the main coroutine.

It fails because the call to asyncio.shield() attempts to wrap the main() coroutine in an asyncio.Task, and creating a task requires that an asyncio event loop is running.

This highlights that the naive approach of shielding the main coroutine from cancellation with asyncio.shield() does not work.

ValueError: a coroutine was expected, got <Future pending cb=[shield.<locals>._outer_done_callback() at .../asyncio/tasks.py:908]>
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<main() running at ...> cb=[shield.<locals>._inner_done_callback() at .../asyncio/tasks.py:891]>
sys:1: RuntimeWarning: coroutine 'main' was never awaited

Next, let's look at an alternative approach.

Example of Shielded Main Coroutine

We can explore how to manually shield the main coroutine from cancellation.

This requires developing a main coroutine wrapper that consumes cancellation requests and allows the main coroutine to run.

This approach adds a level of indirection, meaning that the main coroutine is not the actual main coroutine.

# main coroutine
async def shielded_main():
    try:
        # run the main coroutine as a shielded task
        await asyncio.shield(asyncio.create_task(main()))
    except asyncio.CancelledError:
        # get the current task
        current = asyncio.current_task()
        # wait for all other tasks to be done
        for task in asyncio.all_tasks():
            # ensure the task does not await itself
            if task is not current:
                # await the other task
                await task

It then requires that the asyncio.run() call be executed with our shielded_main() coroutine instead of the main() coroutine.

...
# run the asyncio event loop
asyncio.run(shielded_main())

Canceling the shielded_main() coroutine will consume the CancelledError and allow the true main coroutine main() to continue running.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of shielding the main coroutine from cancellation
import asyncio

# definition of other task that cancels the main task
async def other_task():
    # cancel the main task
    for task in asyncio.all_tasks():
        # check if it is the main task
        if task.get_name() == 'Task-1':
            # report a message
            print(f'Canceling task: {task.get_coro()}')
            # cancel the task
            result = task.cancel()
            # report success of cancellation
            print(f'Cancellation successful: {result}')
    # suspend a while
    await asyncio.sleep(1)
    # report a final message
    print('Other task done.')

# main coroutine
async def main():
    # report a message
    print('Main is running')
    # schedule the other task
    task = asyncio.create_task(other_task())
    # suspend a while
    await asyncio.sleep(2)
    # report a message
    print('Main is done.')

# main coroutine
async def shielded_main():
    try:
        # run the main coroutine as a shielded task
        await asyncio.shield(asyncio.create_task(main()))
    except asyncio.CancelledError:
        # get the current task
        current = asyncio.current_task()
        # wait for all other tasks to be done
        for task in asyncio.all_tasks():
            # ensure the task does not await itself
            if task is not current:
                # await the other task
                await task

# run the asyncio event loop
asyncio.run(shielded_main())

Running the example first starts and runs the asyncio event loop with our shielded_main() coroutine.

The shielded_main() coroutine runs and creates, schedules, and awaits the main() coroutine as a shielded task.

The main() task runs and reports a message, then creates and schedules the other_task() coroutine before suspending for two seconds.

The other_task() coroutine runs and attempts to cancel the main coroutine. It finds the shielded_main() and requests it to cancel, reports the details of the coroutine, and reports that the cancellation request was successful.

The shielded_main() resumes and catches a CancelledError. It then awaits all running tasks.

The main() coroutine is not canceled and resumes before reporting a final message and terminating normally.

The shielded_main() coroutine resumes and terminates.

This highlights how we can shield the main coroutine using a wrapper coroutine.

Main is running
Canceling task: <coroutine object shielded_main at 0x10d9035b0>
Cancellation successful: True
Other task done.
Main is done.

Takeaways

You now know how to shield the main coroutine from cancellation.



If you enjoyed this tutorial, you will love my book: Python Asyncio Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.