Last Updated on November 14, 2023
We often need to coordinate the behavior between multiple concurrent tasks.
For example, it is common for one or more tasks to need to wait for an activity to be completed by another task.
Rather than requiring the task to be completed and have other task wait upon it, we can use a concurrency primitive designed for this purpose called a monitor or a condition variable, provided in asyncio via the asyncio.Condition class class.
The condition variable implements the common wait/notify and wait/notify-all patterns for concurrent programming. Dependent tasks can wait on the condition variable and the target task can complete its work and notify all interested parties that a condition has been met and they can resume.
In this tutorial, you will discover how to use the condition variable in asyncio.
After completing this tutorial, you will know:
- What is a condition variable and the types of problems where it can be used.
- How to create and use a condition variable to signal between asyncio tasks and coroutines.
- How to develop examples that make use of the condition variable.
Let’s get started.
What is an Asyncio Condition Variable
In concurrency, a condition (also called a monitor) allows multiple threads to be notified about some result.
It combines both a mutual exclusion lock (mutex) and an event allowing exclusive access and notification.
In essence, a Condition object combines the functionality of an Event and a Lock.
— Asyncio Synchronization Primitives
A mutex can be used to protect a critical section, but it cannot be used to alert other threads that a condition has changed or been met
An event can be used to notify other threads, but it cannot be used to protect a critical section and enforce mutual exclusion.
A condition can be acquired by a thread (like a mutex) after which it can wait to be notified by another thread that something has changed like an event. While waiting, the thread is blocked and releases the lock for other threads to acquire.
Another thread can then acquire the condition, make a change, and notify one, all, or a subset of threads waiting on the condition that something has changed. The waiting thread can then wake up (be scheduled by the operating system), re-acquire the condition (mutex), perform checks on any changed state, and perform required actions.
The condition variable is typically described in terms of threads. Python provides a condition variable for threads via the threading.Condition class:
Nevertheless, a condition may also be used with other units of concurrency, such as processes via the multiprocessing.Condition class:
It may also be used with coroutines in asyncio via the asyncio.Condition class.
Now that we know what a condition is, let’s look at how we might use it in an asyncio program.
Run loops using all CPUs, download your FREE book to learn how.
How to Use an Asyncio Condition Variable
In this section, we will explore how to use the asyncio condition variable.
Create a Condition Variable
Python provides a condition for use with coroutines via the asyncio.Condition class.
To use a condition variable, we must create an instance of the class.
1 2 3 |
... # create a new condition condition = asyncio.Condition() |
The condition object may then be shared and used among multiple asyncio coroutines.
Acquire and Release Condition Variable
In order for a coroutine to make use of the condition, it must acquire it and release it, like a mutex lock.
You can learn more about acquiring and releasing mutex locks in asyncio programs in the tutorial:
This can be achieved manually with the acquire() and release() methods.
Acquiring the condition via acquire() returns a coroutine object that requires that the caller use an await expression.
For example, we can acquire the condition, do something, then release the condition as follows:
1 2 3 4 5 6 7 |
... # acquire the condition await condition.acquire() # do something # ... # release the condition condition.release() |
An alternative to calling the acquire() and release() methods directly is to use the async context manager, which will perform the acquire/release automatically for us, for example:
1 2 3 4 5 |
... # acquire the condition async with condition: # do something # ... |
You can learn more about the “async with” expression in the tutorial:
Wait to be Notified
Once the condition is acquired, we can wait on it.
This will suspend the calling coroutine until another coroutine notices it via the condition with the notify() function (seen later).
This can be achieved via the wait() method that will return a coroutine and must be awaited.
For example:
1 2 3 4 5 |
... # acquire the condition async with condition: # wait to be notified await condition.wait() |
Critically, the condition is relinquished while waiting. This allows other coroutines to acquire it in order to also wait or to notify waiting coroutines.
Wait For Expression
The asyncio.Condition class also provides a wait_for() method that can be used to only unlock the waiting coroutine if a condition is met, such as calling a function that returns a boolean value.
The name of the function that returns a boolean value can be provided to the wait_for() method directly.
Like the wait() method, it returns a coroutine object that must be awaited.
For example:
1 2 3 4 5 |
... # acquire the condition async with condition: # wait to be notified and a function to return true await condition.wait_for(all_data_collected) |
The condition is only checked when the coroutine is notified. If the condition is not met, the coroutine may be notified many times and will not resume until the condition is met.
Notify Waiting Coroutines
We also must acquire the condition in a coroutine if we wish to notify waiting coroutines. This too can be achieved directly with the acquire/release method calls or via the context manager.
We can notify a single waiting coroutine via the notify() method.
For example:
1 2 3 4 5 |
... # acquire the condition async with condition: # notify a waiting coroutine condition.notify() |
The notified coroutine will resume as soon as it can re-acquire the mutex within the condition. This will be attempted automatically as part of its call to wait(), you do not need to do anything extra.
If there are more than one coroutine waiting on the condition, we will not know which coroutine will be notified.
We can also notify a subset of waiting coroutines by setting the “n” argument to an integer number of coroutines to notify, for example:
1 2 3 4 5 |
... # acquire the condition async with condition: # notify 3 waiting coroutines condition.notify(n=3) |
Notify All Waiting Coroutines
Finally, we can notify all coroutines waiting on the condition via the notify_all() method.
1 2 3 4 5 |
... # acquire the condition async with condition: # notify all coroutines waiting on the condition condition.notify_all() |
A final reminder, a coroutine must acquire the condition before waiting on it or notifying waiting coroutines.
A failure to acquire the condition (the lock within the condition) before performing these actions will result in a RuntimeError.
Now that we know how to use the asyncio.Condition class, let’s look at some worked examples.
Example of Wait and Notify With an Asyncio Condition
In this section, we will explore using an asyncio.Condition to notify a waiting coroutine that something has happened.
We will use a task to prepare some data and notify a waiting coroutine, and in the main coroutine, we will create and schedule the new task and use the condition to wait for the work to be completed.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of wait/notify with an asyncio condition import asyncio # task coroutine async def task(condition, work_list): # block for a moment await asyncio.sleep(1) # add data to the work list work_list.append(33) # notify a waiting coroutine that the work is done print('Task sending notification...') async with condition: condition.notify() # main coroutine async def main(): # create a condition condition = asyncio.Condition() # prepare the work list work_list = list() # wait to be notified that the data is ready print('Main waiting for data...') async with condition: # create and start the a task _ = asyncio.create_task(task(condition, work_list)) # wait to be notified await condition.wait() # we know the data is ready print(f'Got data: {work_list}') # run the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine which is used as the entry point into the asyncio program.
The main() coroutine runs and creates the shared condition and the work list.
The main() coroutine then acquires the condition. A new task is created and scheduled, provided the shared condition and work list.
The main() coroutine then waits to be notified, suspending and calling the new scheduled task to run.
The task() coroutine runs. It first blocks for a moment to simulate effort, then adds work to the shared list. The condition is acquired and the waiting coroutine is notified, then releases the condition automatically. The task terminates.
The main() coroutine resumes and reports a final message, showing the updated content of the shared list.
This highlights how we can use a wait-notify pattern between coroutines using a condition variable.
1 2 3 |
Main waiting for data... Task sending notification... Got data: [33] |
Next, let’s look at how we might notify all waiting coroutines.
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of wait() and notify_all() With an Asyncio Condition
We can explore how to notify all coroutines waiting on a condition.
In this example, we will start a suite of tasks that will wait on the condition to be notified before performing their processing and reporting a result.
The main coroutine will block for a moment and then notify all waiting coroutines that they can begin processing.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of wait/notify all with an asyncio condition from random import random import asyncio # task coroutine async def task(condition, number): # report a message print(f'Task {number} waiting...') # acquire the condition async with condition: # wait to be notified await condition.wait() # generate a random number between 0 and 1 value = random() # block for a moment await asyncio.sleep(value) # report a result print(f'Task {number} got {value}') # main coroutine async def main(): # create a condition condition = asyncio.Condition() # create and start many tasks tasks = [asyncio.create_task(task(condition, i)) for i in range(5)] # allow the tasks to run await asyncio.sleep(1) # acquire the condition async with condition: # notify all waiting tasks condition.notify_all() # wait for all tasks to complete _ = await asyncio.wait(tasks) # run the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine which is used as the entry point into the asyncio program.
The main() coroutine runs and creates the shared condition.
It then creates and schedules five tasks, each providing the shared condition and a unique integer from 0 to 4 as arguments.
The main() coroutine then suspends, allowing the tasks to run.
The tasks run one by one. The tasks first report their message, then acquire the condition. Once acquired they wait to be notified.
The main() coroutine resumes. It acquires the condition and then notifies all coroutines waiting on the condition. It then releases the condition and waits for the issued tasks to terminate.
The tasks resume, one at a time. Each task first generates a random number, and blocks for a fraction of a second. Once resumed it reports a message and terminates.
This highlights how we may have many coroutines waiting to be notified and have them all notified.
1 2 3 4 5 6 7 8 9 10 |
Task 0 waiting... Task 1 waiting... Task 2 waiting... Task 3 waiting... Task 4 waiting... Task 4 got 0.11036200324308998 Task 3 got 0.25497519885869324 Task 1 got 0.36215401216779797 Task 0 got 0.4277021597975379 Task 2 got 0.7073867691766996 |
Next, let’s look at how we might wait for a specific result on the condition.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of wait_for() With an Asyncio Condition
We can explore how to use the wait_for() function on the condition.
The wait_for() method takes a callable, such as a function with no arguments or a lambda expression. The coroutine calling the wait_for() method will block until notified and the callable passed in as an argument returns a True value.
This might mean that the coroutine is notified many times by different coroutines, but will only unblock and continue execution once the condition in the callable is met.
In this example, will create a suite of tasks, each of which will calculate a value and add it to a shared list and notify the waiting coroutine.
The main coroutine will wait on the condition and will use a lambda expression in the wait_for() function to not continue on until a work list populated by the worker coroutines is fully populated.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# SuperFastPython.com # example of wait for with a condition from random import random import asyncio # task coroutine async def task(condition, work_list): # acquire the condition async with condition: # generate a random value between 0 and 1 value = random() # block for a moment await asyncio.sleep(value) # add work to the list work_list.append(value) print(f'Task added {value}') # notify the waiting coroutine condition.notify() # main coroutine async def main(): # create a condition condition = asyncio.Condition() # define work list work_list = list() # create and start many tasks _ = [asyncio.create_task(task(condition, work_list)) for _ in range(5)] # acquire the condition async with condition: # wait to be notified await condition.wait_for(lambda : len(work_list)==5) # report final message print(f'Done, got: {work_list}') # run the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine which is used as the entry point into the asyncio program.
The main() coroutine runs and first creates the shared condition variable and work list.
The main() coroutine then creates and schedules five tasks, passing the condition and work list.
The main() coroutine then acquires the condition and waits to be notified and for the lambda expression to be true, which in this case requires that the work list has a length of 5.
The tasks then execute one by one. Each task first acquires the condition, generates a random value, and blocks. Once it resumes it adds a value to the shared work list, reports a message, and notifies the waiting main() coroutine.
The main() coroutine is notified many times but does not resume until the condition is met, after which the final message is reported.
Importantly, only one coroutine can hold or acquire the condition at a time, although recall that the condition is released when waiting to be notified. In this case, the tasks do not wait, although they are suspended. The lock is held during this block, making the block of the condition context manager coroutine-safe, e.g. mutually exclusive.
This highlights how we may use the wait-for and notify pattern with coroutines.
1 2 3 4 5 6 |
Task added 0.45089927950158515 Task added 0.8931671797432676 Task added 0.06484378382773981 Task added 0.7649095074042099 Task added 0.9954569956651376 Done, got: [0.45089927950158515, 0.8931671797432676, 0.06484378382773981, 0.7649095074042099, 0.9954569956651376] |
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to use an asyncio condition variable in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Josh Arras on Unsplash
Jakub Schimer says
I LOVE Python as well (joy to code in, really), and this is Great website and tutorials, Super Fast indeed. This is most probably the best python concurrency resource on the internet. Thanks a lot for all your work, Mister! <3
Jason Brownlee says
Thank you for your kind words Jakub, I’m grateful!