You can use a done callback function to automatically perform an operation or process the results from a group of tasks created by a call to the gather() function.
This can be achieved by first retrieving the Future object returned from gather(), then calling the add_done_callback() function on it to register a callback function to call once all tasks in the group are done.
In this tutorial, you will discover how to add a done callback function to the Future returned from asyncio.gather() in Python.
Let’s get started.
What is Asyncio gather()
The asyncio.gather() module function allows the caller to group multiple awaitables together.
Once grouped, the awaitables can be executed concurrently, awaited, and canceled.
Run awaitable objects in the aws sequence concurrently.
— Coroutines and Tasks
It is a helpful utility function for both grouping and executing multiple coroutines or multiple tasks.
For example:
1 2 3 |
... # run a collection of awaitables results = await asyncio.gather(coro1(), asyncio.create_task(coro2())) |
You can learn more about how to use the gather() function in the tutorial:
The gather() function does not block.
Instead, it returns an asyncio.Future object that represents the group of awaitables.
For example:
1 2 3 |
... # get a future that represents multiple awaitables group = asyncio.gather(coro1(), coro2()) |
Now that we know about the asyncio.gather() function for managing a group of awaitables, let’s look at how we might perform an operation when all tasks finish.
Run loops using all CPUs, download your FREE book to learn how.
How to Run a Function After All Tasks Finish
The asyncio.gather() function allows us to schedule and manage many asyncio tasks as one task.
We may wish to perform an action or execute a function only after all tasks in the group have been completed.
This may be for many reasons, such as
- Processing the results of the tasks in the group.
- Logging the event.
- Starting the next part of the program depends on the completion of the tasks.
One approach to achieving this would be to explicitly await the Future returned from the gather() function, then call the custom function to perform the desired action.
For example:
1 2 3 4 5 6 7 |
... # gather the tasks into a group future = asyncio.gather(*tasks) # wait for the tasks to complete results = await future # perform a custom action handler(results) |
A downside of this approach is that it occupies the coroutine that created the group of tasks, requiring it to suspend until the group of tasks is complete.
This might be a problem if the caller needs to perform other actions while the group of tasks is executing.
Another approach is to use a done callback function.
Because the gather() function returns a Future object, we can directly add a done callback function to this future object that will only be called when the Future is done.
Notably, the Future will only be done when all of the tasks it subsumes are done.
For example:
1 2 3 4 5 6 7 8 9 10 |
... # define a done callback function for a group of tasks def handler(future): # get the results result = future.result() # ... ... # add the done callback future.add_done_callback(handler) |
This allows the caller to continue on with other operations and have a custom function called automatically only after all tasks in the group have been completed.
Now that we know how to add a done callback function to the Future returned from gather(), let’s look at a worked example.
Example of a Done Callback Function for gather()
We can explore how to automatically execute a custom function after all tasks provided in a call to gather() are done.
This allows the results of the tasks to be processed, or similar, while the calling coroutine can continue on with other parts of the program.
The example below defines a task coroutine that takes an integer argument, blocks for a moment, then returns a multiple of the integer argument. This is an arbitrary task that takes an argument and returns a value.
A done callback function is defined that takes the Future object returned from the gather() function, it gets the list of results from all tasks in the call to gather() and processes them, in this case, simply reporting their values.
The main coroutine first creates a list of coroutines, then calls the gather() function with the list. This returns a Future object that represents the group of tasks. The done callback function is added, and the main coroutine continues on with other tasks, in this case simply sleeping for a moment.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of a done callback function for a future from gather() import asyncio # define a custom callback function def handler(future): # get all results results = future.result() # process all results for result in results: print(f'>got {result}') # coroutine used for a task async def task_coro(value): # sleep for a moment await asyncio.sleep(1) # return a custom value return value * 10 # coroutine used for the entry point async def main(): # create many coroutines coros = [task_coro(i) for i in range(10)] # execute all coroutines as a group future = asyncio.gather(*coros) # add the done callback function future.add_done_callback(handler) # continue on with other things... await asyncio.sleep(2) # start the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine and uses it to start the asyncio program.
The main() coroutine then creates a list of 10 coroutines, each passing a separate integer value from 0 to 9.
The list of coroutines is then passed to the gather() module function, unpacked into separate expressions using the star-operator, a requirement of the gather() function.
This immediately schedules all of the coroutines as tasks in the asyncio event loop and returns a Future object.
The main() coroutine then adds the done callback function to the Future object that represents the group of tasks.
It then suspends, sleeping for a moment to simulate performing other operations while the tasks execute concurrently.
The tasks execute, each blocking for a moment and returning a multiple of 10 of their integer argument.
All tasks are completed and the done callback function is called.
The callback function receives the Future as an argument, then retrieves the list of results. This list contains one return value for each task in the group.
It processes the results, reporting the value of each.
The main() coroutine resumes and then exits, terminating the program.
This highlights how we can add a done callback function to automatically handle the results from a group of tasks created by a call to asyncio.gather().
1 2 3 4 5 6 7 8 9 10 |
>got 0 >got 10 >got 20 >got 30 >got 40 >got 50 >got 60 >got 70 >got 80 >got 90 |
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to add a done callback function to the Future returned from asyncio.gather() in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Ashley Byrd on Unsplash
Do you have any questions?