Asyncio gather() Add Done Callback Function

November 20, 2022 Python Asyncio

You can use a done callback function to automatically perform an operation or process the results from a group of tasks created by a call to the gather() function.

This can be achieved by first retrieving the Future object returned from gather(), then calling the add_done_callback() function on it to register a callback function to call once all tasks in the group are done.

In this tutorial, you will discover how to add a done callback function to the Future returned from asyncio.gather() in Python.

Let's get started.

What is Asyncio gather()

The asyncio.gather() module function allows the caller to group multiple awaitables together.

Once grouped, the awaitables can be executed concurrently, awaited, and canceled.

Run awaitable objects in the aws sequence concurrently.

-- Coroutines and Tasks

It is a helpful utility function for both grouping and executing multiple coroutines or multiple tasks.

For example:

...
# run a collection of awaitables
results = await asyncio.gather(coro1(), asyncio.create_task(coro2()))

You can learn more about how to use the gather() function in the tutorial:

The gather() function does not block.

Instead, it returns an asyncio.Future object that represents the group of awaitables.

For example:

...
# get a future that represents multiple awaitables
group = asyncio.gather(coro1(), coro2())

Now that we know about the asyncio.gather() function for managing a group of awaitables, let's look at how we might perform an operation when all tasks finish.

How to Run a Function After All Tasks Finish

The asyncio.gather() function allows us to schedule and manage many asyncio tasks as one task.

We may wish to perform an action or execute a function only after all tasks in the group have been completed.

This may be for many reasons, such as

One approach to achieving this would be to explicitly await the Future returned from the gather() function, then call the custom function to perform the desired action.

For example:

...
# gather the tasks into a group
future = asyncio.gather(*tasks)
# wait for the tasks to complete
results = await future
# perform a custom action
handler(results)

A downside of this approach is that it occupies the coroutine that created the group of tasks, requiring it to suspend until the group of tasks is complete.

This might be a problem if the caller needs to perform other actions while the group of tasks is executing.

Another approach is to use a done callback function.

Because the gather() function returns a Future object, we can directly add a done callback function to this future object that will only be called when the Future is done.

Notably, the Future will only be done when all of the tasks it subsumes are done.

For example:

...
# define a done callback function for a group of tasks
def handler(future):
	# get the results
	result = future.result()
	# ...

...
# add the done callback
future.add_done_callback(handler)

This allows the caller to continue on with other operations and have a custom function called automatically only after all tasks in the group have been completed.

Now that we know how to add a done callback function to the Future returned from gather(), let's look at a worked example.

Example of a Done Callback Function for gather()

We can explore how to automatically execute a custom function after all tasks provided in a call to gather() are done.

This allows the results of the tasks to be processed, or similar, while the calling coroutine can continue on with other parts of the program.

The example below defines a task coroutine that takes an integer argument, blocks for a moment, then returns a multiple of the integer argument. This is an arbitrary task that takes an argument and returns a value.

A done callback function is defined that takes the Future object returned from the gather() function, it gets the list of results from all tasks in the call to gather() and processes them, in this case, simply reporting their values.

The main coroutine first creates a list of coroutines, then calls the gather() function with the list. This returns a Future object that represents the group of tasks. The done callback function is added, and the main coroutine continues on with other tasks, in this case simply sleeping for a moment.

The complete example is listed below.

# SuperFastPython.com
# example of a done callback function for a future from gather()
import asyncio

# define a custom callback function
def handler(future):
    # get all results
    results = future.result()
    # process all results
    for result in results:
        print(f'>got {result}')

# coroutine used for a task
async def task_coro(value):
    # sleep for a moment
    await asyncio.sleep(1)
    # return a custom value
    return value * 10

# coroutine used for the entry point
async def main():
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # execute all coroutines as a group
    future = asyncio.gather(*coros)
    # add the done callback function
    future.add_done_callback(handler)
    # continue on with other things...
    await asyncio.sleep(2)

# start the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine and uses it to start the asyncio program.

The main() coroutine then creates a list of 10 coroutines, each passing a separate integer value from 0 to 9.

The list of coroutines is then passed to the gather() module function, unpacked into separate expressions using the star-operator, a requirement of the gather() function.

This immediately schedules all of the coroutines as tasks in the asyncio event loop and returns a Future object.

The main() coroutine then adds the done callback function to the Future object that represents the group of tasks.

It then suspends, sleeping for a moment to simulate performing other operations while the tasks execute concurrently.

The tasks execute, each blocking for a moment and returning a multiple of 10 of their integer argument.

All tasks are completed and the done callback function is called.

The callback function receives the Future as an argument, then retrieves the list of results. This list contains one return value for each task in the group.

It processes the results, reporting the value of each.

The main() coroutine resumes and then exits, terminating the program.

This highlights how we can add a done callback function to automatically handle the results from a group of tasks created by a call to asyncio.gather().

>got 0
>got 10
>got 20
>got 30
>got 40
>got 50
>got 60
>got 70
>got 80
>got 90

Takeaways

You now know how to add a done callback function to the Future returned from asyncio.gather() in Python.



If you enjoyed this tutorial, you will love my book: Python Asyncio Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.