Asyncio Cancel All Tasks If One Task Fails

November 27, 2023 Python Asyncio

We can cancel all asyncio tasks if one task fails.

It is common to group similar tasks together, then execute the group of tasks concurrently and wait for them to complete. If one task in the group fails with an unhandled exception, we can then cancel all other tasks in the group.

This can be achieved in many ways, although it is straightforward when using the asyncio.TaskGroup structure, and the asyncio.gather() and asyncio.wait() functions.

In this tutorial, you will discover how to cancel all tasks in asyncio if one task fails.

Specifically, you will learn:

Let's get started.

Need to Cancel All Tasks If One Task Fails

We typically group tasks in asyncio.

For example, we have a list of items and issue one task for each to complete concurrently.

When working with a group of tasks, one task may fail and we need to cancel all remaining tasks.

When we say that a task "fails" we mean that it raises an exception that propagates up and causes the task to be "done".

You can learn more about the asyncio task life-cycle in the tutorial:

We may need to cancel all tasks in a group for many reasons, such as:

  1. Preventing Resource Leakage: When one task in a group encounters an error or fails, there might be associated resources allocated or ongoing processes. Canceling all tasks ensures that these resources are released and prevents any potential resource leakage, maintaining system efficiency.
  2. Ensuring Consistent State: In scenarios where the tasks in a group are interdependent or contribute to a larger process, allowing one task to fail while others continue could lead to an inconsistent or invalid state. Canceling all tasks ensures the application remains in a consistent state by preventing further execution that might rely on the failed task.
  3. Error Propagation and Cleanup: By canceling all tasks, especially in cases where the failure might impact subsequent tasks or the overall application flow, it allows for appropriate error handling and cleanup. This ensures that any necessary cleanup actions or error-handling procedures are uniformly applied across all tasks in response to the failure.

Canceling all remaining tasks if one task fails is a common feature in many real-world scenarios.

For example:

Batch Processing: Imagine a scenario where a series of tasks represents batch jobs or data processing tasks. If one task fails due to invalid data or an error, canceling all other tasks prevents further processing, ensuring data consistency and integrity.

Parallel Requests: In applications dealing with concurrent HTTP requests or multiple API calls, if one request fails due to a server error or connectivity issue, canceling other pending requests prevents overloading the system or creating a cascade of subsequent failures.

Data Pipelines: Data processing pipelines or ETL (Extract, Transform, Load) processes often involve a sequence of tasks. Canceling all tasks if one fails prevents incomplete or inconsistent data from being propagated further down the pipeline, maintaining data quality.

Distributed Systems: In distributed systems where tasks may span across different nodes or services, canceling all tasks can prevent unnecessary resource consumption or avoid inconsistent states when a component failure occurs.

Service Orchestration: In microservices architectures where one service's failure might impact others, canceling downstream tasks upon a failure ensures the system's stability and avoids unnecessary processing.

In asyncio, how can we cancel all tasks if one task in a group fails?

What approaches are available?

How to Cancel All Tasks if One Task Fails

There are 3 ways that we can cancel all tasks in a group of tasks if one task in the group fails.

They are:

  1. Use asyncio.gather()
  2. Use asyncio.wait()
  3. Use TaskGroup

We will take a closer look at each in turn.

There are other approaches we could use, for example:

These less direct approaches are left as an exercise.

Cancel All Tasks If One Task Fails with asyncio.gather()

We can cancel all tasks if one task fails using the asyncio.gather() function.

Recall that the asyncio.gather() function takes one or more asyncio tasks, suspends until all tasks are done, and then returns an iterable of return values for all tasks.

For example:

...
# execute a group of tasks concurrently
results = await asyncio.gather(task1, task2, task3)

When used with a list of tasks, the list must be unpacked first using the star (*) operator, for example:

...
# execute a group of tasks concurrently
results = await asyncio.gather(*tasks)

You can learn more about the asyncio.gather() function in the tutorial:

We can use asyncio.gather() to cancel all tasks if one task fails.

This can be achieved by wrapping the call to asyncio.gather() in a try-except structure.

For example:

try:
	# execute a group of tasks concurrently
	results = await asyncio.gather(*tasks)
except Exception:
	...

If one task fails in the group, the exception will propagate up and cancel the task of awaiting the group.

We can catch and handle that exception.

Although the group task has terminated, the tasks in the group have not terminated.

We can iterate over all tasks and cancel each in turn.

...
# cancel all tasks
for task in tasks:
    task.cancel()

We can then wait for all canceled tasks to cancel, using the "cancel and await" idiom.

For example:

...
# wait for all tasks to cancel
for task in tasks:
    try:
        await task
    except Exception:
        pass
    except asyncio.CancelledError:
        pass

The whole asyncio.gather() approach would look as follows:

...
# handle the case that a task fails
try:
    # wait for the group of tasks to complete
    await asyncio.gather(*tasks)
except Exception as e:
    # report failure
    print(f'A task failed with: {e}, canceling all tasks')
    # cancel all tasks
    for task in tasks:
        task.cancel()
    # wait for all tasks to cancel
    for task in tasks:
        try:
            await task
        except Exception:
            pass
        except asyncio.CancelledError:
            pass

You can learn more about how to cancel all tasks if task in a gather fails in the tutorial:

Cancel All Tasks If One Task Fails with asyncio.wait()

We can cancel all tasks if one task fails using the asyncio.wait() function.

Recall that asyncio.wait() takes a collection of asyncio tasks and will suspend until a condition is met, such as all tasks are done.

For example:

...
# wait for all tasks to be done
_ = await asyncio.wait(tasks)

You can learn more about the asyncio.wait() function in the tutorial:

We can configure the asyncio.wait() function to wait for the first task to fail.

This can be achieved by setting the "return_when" argument to asyncio.FIRST_EXCEPTION.

For example:

...
# wait for all tasks
failed, rest = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

If at least one task fails, it will be returned in the "failed" set, whereas any tasks still running will be in the "rest" set.

If no tasks fail, then the "failed" set will have all tasks and the "rest" set will be empty.

Therefore, when asyncio.wait() returns, we can check if we have any tasks still running in the "rest" set and if so, cancel them all, then wait for them to be done.

...
# check for at least one failure
if len(rest) > 0:
    print(f'{len(failed)} task(s) failed, cancelling the rest')
    # cancel all tasks
    for task in rest:
        task.cancel()
    # wait for all tasks to cancel
    for task in tasks:
        try:
            await task
        except Exception:
            pass
        except asyncio.CancelledError:
            pass

Cancel All Tasks If One Task Fails with TaskGroup

We can cancel all tasks if one task fails in asyncio using an asyncio.TaskGroup.

Recall that the asyncio.TaskGroup can be used to create and manage a group of tasks in asyncio.

It is used via the context manager interface and can be used to wait for all tasks to complete.

For example:

...
# create task group
async with asyncio.TaskGroup() as group:
    # create many coroutines
    tasks = [group.create_task(task_coro(i)) for i in range(10)]
    # wait for tasks...

You can learn more about the asyncio.TaskGroup in the tutorial:

Helpfully, the asyncio.TaskGroup will automatically cancel all tasks in the group if one task fails with an unhandled exception.

This means that we can wrap the usage of the asyncio.TaskGroup in a try-except structure and be confident that all tasks in the group will be canceled on failure.

For example:

...
try:
    # create task group
    async with asyncio.TaskGroup() as group:
        # create many coroutines
        tasks = [group.create_task(task_coro(i)) for i in range(10)]
        # wait for tasks...
except Exception as e:
    # report a message
    print('At least one task failed, rest were cancelled')
    # confirm no other running tasks
    print(f'There are only {len(asyncio.all_tasks())} task(s) currently running')

Now that we know how to cancel all tasks if one task fails, let's look at some worked examples.

Example of Canceling All Tasks If One Task Fails With asyncio.gather()

We can explore an example of how to cancel all tasks if one task fails using asyncio.gather().

In this example, we can define a task that conditionally fails. We will then create and schedule 10 versions of the task and wait for them to complete concurrently using asyncio.gather(). If one task fails, we will handle the exception and cancel all tasks in the group.

Firstly, we can define the task that conditionally fails.

The task takes an integer task identifier as an argument. It reports its argument value, sleeps for one second, and if the argument value is equal to 5, then an exception is raised. Otherwise, the task sleeps again for one second and reports a final message.

The task_coro() coroutine below implements this.

# coroutine used for a task
async def task_coro(value):
    # report a message
    print(f'>task {value} executing')
    # sleep for a moment
    await asyncio.sleep(1)
    # check if this task should fail
    if value == 5:
        print(f'>task {value} failing')
        raise Exception('Something bad happened')
    # otherwise, block again
    await asyncio.sleep(1)
    print(f'>task {value} done')

Next, we can define the main coroutine.

Firstly, we will create and schedule 10 tasks with arguments from 0 to 9.

...
# create many coroutines
tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]

We then await all tasks in the group using asyncio.gather() and cancel all tasks if one task fails with an exception.

...
# handle the case that a task fails
try:
    # wait for the group of tasks to complete
    await asyncio.gather(*tasks)
except Exception as e:
    # report failure
    print(f'A task failed with: {e}, canceling all tasks')
    # cancel all tasks
    for task in tasks:
        task.cancel()
    # wait for all tasks to cancel
    for task in tasks:
        try:
            await task
        except Exception:
            pass
        except asyncio.CancelledError:
            pass

Tying this together, the complete main() coroutine is listed below.

# coroutine used for the entry point
async def main():
    # create many coroutines
    tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]
    # handle the case that a task fails
    try:
        # wait for the group of tasks to complete
        await asyncio.gather(*tasks)
    except Exception as e:
        # report failure
        print(f'A task failed with: {e}, canceling all tasks')
        # cancel all tasks
        for task in tasks:
            task.cancel()
        # wait for all tasks to cancel
        for task in tasks:
            try:
                await task
            except Exception:
                pass
            except asyncio.CancelledError:
                pass

Finally, we can start the asyncio event loop and run our main() coroutine.

...
# start the asyncio program
asyncio.run(main())

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of canceling all tasks if one task fails with gather()
import asyncio

# coroutine used for a task
async def task_coro(value):
    # report a message
    print(f'>task {value} executing')
    # sleep for a moment
    await asyncio.sleep(1)
    # check if this task should fail
    if value == 5:
        print(f'>task {value} failing')
        raise Exception('Something bad happened')
    # otherwise, block again
    await asyncio.sleep(1)
    print(f'>task {value} done')

# coroutine used for the entry point
async def main():
    # create many coroutines
    tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]
    # handle the case that a task fails
    try:
        # wait for the group of tasks to complete
        await asyncio.gather(*tasks)
    except Exception as e:
        # report failure
        print(f'A task failed with: {e}, canceling all tasks')
        # cancel all tasks
        for task in tasks:
            task.cancel()
        # wait for all tasks to cancel
        for task in tasks:
            try:
                await task
            except Exception:
                pass
            except asyncio.CancelledError:
                pass

# start the asyncio program
asyncio.run(main())

Running the example starts the asyncio event loop and runs our main() coroutine.

The main() coroutine runs and creates and schedules 10 tasks to run concurrently, with arguments from 0 to 9.

The main() coroutine then suspends and awaits all tasks via the asyncio.gather() function.

Each task runs, reporting a message and suspending with a sleep. The tasks then resume. The task with an argument of 5 then fails, other tasks suspend with an additional sleep.

The exception propagates up and terminates the task. It then propagates back to the main() coroutine where it is caught.

A message is reported and all tasks in the group are canceled.

The main() coroutine then awaits all tasks in the group, careful to handle any exceptions and CancelledError that may be re-raised.

This highlights how we can cancel all tasks in the group if one task fails using asyncio.gather().

>task 0 executing
>task 1 executing
>task 2 executing
>task 3 executing
>task 4 executing
>task 5 executing
>task 6 executing
>task 7 executing
>task 8 executing
>task 9 executing
>task 5 failing
A task failed with: Something bad happened, canceling all tasks

Example of Canceling All Tasks If One Task Fails With asyncio.wait()

We can explore an example of how to cancel all tasks if one task fails using asyncio.wait().

In this case, we can update the above example to use asyncio.wait() to suspend the main() coroutine and wait for all tasks to complete, or for at least one task to fail with an exception.

...
# wait for all tasks
failed, rest = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

We can then check if asyncio.wait() returns early before all tasks are completed, and if so cancel all tasks and wait for the tasks to be done.

...
# check for at least one failure
if len(rest) > 0:
    print(f'{len(failed)} task(s) failed, cancelling the rest')
    # cancel all tasks
    for task in rest:
        task.cancel()
    # wait for all tasks to cancel
    for task in rest:
        try:
            await task
        except asyncio.CancelledError:
            pass
else:
    print('No tasks failed')

Tying this together, the updated main() coroutine with this change is listed below.

# coroutine used for the entry point
async def main():
    # create many coroutines
    tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]
    # wait for all tasks
    failed, rest = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    # check for at least one failure
    if len(rest) > 0:
        print(f'{len(failed)} task(s) failed, cancelling the rest')
        # cancel all tasks
        for task in rest:
            task.cancel()
        # wait for all tasks to cancel
        for task in rest:
            try:
                await task
            except asyncio.CancelledError:
                pass
    else:
        print('No tasks failed')

Tying all of this together, the complete example is listed below.

# SuperFastPython.com
# example of canceling all tasks if one task fails with wait()
import asyncio

# coroutine used for a task
async def task_coro(value):
    # report a message
    print(f'>task {value} executing')
    # sleep for a moment
    await asyncio.sleep(1)
    # check if this task should fail
    if value == 5:
        print(f'>task {value} failing')
        raise Exception('Something bad happened')
    # otherwise, block again
    await asyncio.sleep(1)
    print(f'>task {value} done')

# coroutine used for the entry point
async def main():
    # create many coroutines
    tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]
    # wait for all tasks
    failed, rest = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    # check for at least one failure
    if len(rest) > 0:
        print(f'{len(failed)} task(s) failed, cancelling the rest')
        # cancel all tasks
        for task in rest:
            task.cancel()
        # wait for all tasks to cancel
        for task in tasks:
            try:
                await task
            except Exception:
                pass
            except asyncio.CancelledError:
                pass
    else:
        print('No tasks failed')

# start the asyncio program
asyncio.run(main())

Running the example starts the asyncio event loop and runs our main() coroutine.

The main() coroutine runs and creates and schedules 10 tasks to run concurrently, with arguments from 0 to 9.

The main() coroutine then suspends and awaits all tasks via the asyncio.wait() function.

Each task runs, reporting a message and suspending with a sleep. The tasks then resume. The task with an argument of 5 then fails, other tasks suspend with an additional sleep.

The exception propagates up and terminates the task. It then propagates back to the asyncio.wait() coroutine that returns one set with the failed task and another set with all tasks that are still running.

The main() coroutine resumes and checks if there are any tasks in the "rest" set. There are, so the message is reported and all tasks in the group are canceled.

The main() coroutine then awaits all tasks in the group, careful to handle any exceptions and CancelledError that may be re-raised.

This highlights how we can cancel all tasks in the group if one task fails using asyncio.wait().

>task 0 executing
>task 1 executing
>task 2 executing
>task 3 executing
>task 4 executing
>task 5 executing
>task 6 executing
>task 7 executing
>task 8 executing
>task 9 executing
>task 5 failing
1 task(s) failed, cancelling the rest

Example of Canceling All Tasks If One Task Fails With TaskGroup

We can explore an example of how to cancel all tasks if one task fails using asyncio.TaskGroup).

In this case, we can update the above example to create and use the asyncio.TaskGroup to create all tasks.

This can then be wrapped in a try-except structure to handle exceptions raised by tasks in the group.

The updated main() coroutine with this change is listed below.

# coroutine used for the entry point
async def main():
    try:
        # create task group
        async with asyncio.TaskGroup() as group:
            # create many coroutines
            tasks = [group.create_task(task_coro(i)) for i in range(10)]
            # wait for tasks...
    except Exception as e:
        # report a message
        print('At least one task failed, rest were cancelled')
        # confirm no other running tasks
        print(f'There are only {len(asyncio.all_tasks())} task(s) currently running')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of canceling all tasks if one task fails with TaskGroup
import asyncio

# coroutine used for a task
async def task_coro(value):
    # report a message
    print(f'>task {value} executing')
    # sleep for a moment
    await asyncio.sleep(1)
    # check if this task should fail
    if value == 5:
        print(f'>task {value} failing')
        raise Exception('Something bad happened')
    # otherwise, block again
    await asyncio.sleep(1)
    print(f'>task {value} done')

# coroutine used for the entry point
async def main():
    try:
        # create task group
        async with asyncio.TaskGroup() as group:
            # create many coroutines
            tasks = [group.create_task(task_coro(i)) for i in range(10)]
            # wait for tasks...
    except Exception as e:
        # report a message
        print('At least one task failed, rest were cancelled')
        # confirm no other running tasks
        print(f'There are only {len(asyncio.all_tasks())} task(s) currently running')

# start the asyncio program
asyncio.run(main())

Running the example starts the asyncio event loop and runs our main() coroutine.

The main() coroutine runs and creates a TaskGroup, then uses it to create and schedule 10 tasks to run concurrently, with arguments from 0 to 9.

The exit method of the TaskGroup runs and suspends, awaiting all tasks in the group.

Each task runs, reporting a message and suspending with a sleep. The tasks then resume. The task with an argument of 5 then fails, other tasks suspend with an additional sleep.

The exception propagates up and terminates the task. It then propagates back to the TaskGroup which cancels all tasks in the group.

The exception is handled and messages are reported, showing that only one task remains running, the current task.

This highlights how we can cancel all tasks in the group if one task fails using TaskGroup.

>task 0 executing
>task 1 executing
>task 2 executing
>task 3 executing
>task 4 executing
>task 5 executing
>task 6 executing
>task 7 executing
>task 8 executing
>task 9 executing
>task 5 failing
At least one task failed, rest were cancelled
There are only 1 task(s) currently running

Takeaways

You now know how to cancel all tasks in asyncio if one task fails.