Last Updated on September 12, 2022
You can use a condition variable with processes via the multiprocessing.Condition class.
In this tutorial you will discover how to use a process condition variable in Python.
Let’s get started.
Need for A Condition Variable
A process is a running instance of a computer program.
Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create new child processes in our program in order to execute code concurrently.
Python provides the ability to create and manage new processes via the multiprocessing.Process class.
You can learn more about multiprocessing in the tutorial:
When writing concurrent programs we may need processes to wait for some condition within a critical section before continuing.
This could be achieved using a mutual exclusion lock to protect the critical section, but the processes waiting for the condition would have to spin (execute in a loop) repeatedly releasing/re-acquiring the mutex lock until the condition was met.
An alternative is to use a condition (also called a monitor) that builds upon a mutex and allows processes to wait and be notified.
What is a condition variable in multiprocessing and how can we use it in Python?
Run loops using all CPUs, download your FREE book to learn how.
What is a Condition Variable
In concurrency, a condition (also called a monitor) allows multiple processes (or threads) to be notified about some result.
It combines both a mutual exclusion lock (mutex) and a conditional variable.
A mutex can be used to protect a critical section, but it cannot be used to alert other processes that a condition has changed or been met.
A condition can be acquired by a process (like a mutex) after which it can wait to be notified by another process that something has changed. While waiting, the process is blocked and releases the lock for other processes to acquire.
Another process can then acquire the condition, make a change, and notify one, all, or a subset of processes waiting on the condition that something has changed. The waiting process can then wake-up (be scheduled by the operating system), re-acquire the condition (mutex), perform checks on any changed state and perform required actions.
This highlights that a condition makes use of a mutex internally (to acquire/release the condition), but it also offers additional features such as allowing processes to wait on the condition and to allow processes to notify other processes waiting on the condition.
Now that we know what a condition is, let’s look at how we might use it in Python.
How to Use a Condition Variable
Python provides a condition variable via the multiprocessing.Condition class.
We can create a condition variable and by default it will create a new reentrant mutex lock (multiprocessing.RLock class) by default which will be used internally.
1 2 3 |
... # create a new condition variable condition = multiprocessing.Condition() |
We may have a reentrant mutex or a non-reentrant mutex that we wish to reuse in the condition for some good reason, in which case we can provide it to the constructor.
I don’t recommend this unless you know your use case has this requirement. The chance of getting into trouble is high.
1 2 3 |
... # create a new condition with a custom lock condition = multiprocessing.Condition(lock=my_lock) |
In order for a process to make use of the condition, it must acquire it and release it, like a mutex lock.
This can be achieved manually with the acquire() and release() functions.
For example, we can acquire the condition and then wait on the condition to be notified and finally release the condition as follows:
1 2 3 4 5 6 7 |
... # acquire the condition condition.acquire() # wait to be notified condition.wait() # release the condition condition.release() |
An alternative to calling the acquire() and release() functions directly is to use the context manager, which will perform the acquire/release automatically for us, for example:
1 2 3 4 5 |
... # acquire the condition with condition: # wait to be notified condition.wait() |
The wait() function will wait forever until notified by default. We can also pass a “timeout” argument which will allow the process to stop blocking after a time limit in seconds.
For example:
1 2 3 4 5 |
... # acquire the condition with condition: # wait to be notified condition.wait(timeout=10) |
The multiprocessing.Condition class also provides a wait_for() function that can be used to only unlock the waiting process if a condition is met, such as calling a function that returns a boolean value.
The name of the function that returns a boolean value can be provided to the wait_for() function directly, and the function also takes a “timeout” argument in seconds.
1 2 3 4 5 |
... # acquire the condition with condition: # wait to be notified and a function to return true condition.wait_for(all_data_collected) |
We also must acquire the condition in a process if we wish to notify waiting processes. This too can be achieved directly with the acquire/release function calls or via the context manager.
We can notify a single waiting process via the notify() function.
For example:
1 2 3 4 5 |
... # acquire the condition with condition: # notify a waiting process condition.notify() |
The notified process will stop-blocking as soon as it can re-acquire the mutex within the condition. This will be attempted automatically as part of its call to wait() or wait_for(), you do not need to do anything extra.
If there are more than one process waiting on the condition, we will not know which process will be notified.
We can also notify a subset of waiting processes by setting the “n” argument to an integer number of processes to notify, for example:
1 2 3 4 5 |
... # acquire the condition with condition: # notify 3 waiting processes condition.notify(n=3) |
Finally, we can notify all processes waiting on the condition via the notify_all() function.
1 2 3 4 5 |
... # acquire the condition with condition: # notify all processes waiting on the condition condition.notify_all() |
A final reminder, a process must acquire the condition before waiting on it or notifying waiting processes. A failure to acquire the condition (the lock within the condition) before performing these actions will result in a RuntimeError.
Condition variables can also be used with threads, you can learn more in the tutorial:
Now that we know how to use the multiprocessing.Condition class, let’s look at some worked examples.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Example of wait() and notify() With a Condition
In this section we will explore using a multiprocessing.Condition to notify a waiting process that something has happened.
We will create a new child process to simulate performing some work that the main process is dependent upon. Once prepared, the child process will notify the waiting main process, then the main process will continue on.
First, we will define a target task function to execute in a new child process.
The function will take the condition variable. The function will block for a moment to simulate performing a task, then notify the waiting main process.
The complete target task() function is listed below.
1 2 3 4 5 6 7 8 9 10 |
# target function to prepare some work def task(condition): # block for a moment sleep(1) # notify a waiting process that the work is done print('Child process sending notification...', flush=True) with condition: condition.notify() # do something else... sleep(1) |
In the main process, first we can create the shared condition variable.
1 2 3 |
... # create a condition condition = Condition() |
Next, we can acquire the condition variable, so that we can wait on it later.
1 2 3 4 5 |
... # wait to be notified that the data is ready print('Main process waiting for data...') with condition: # ... |
The main process will then create a new child process to perform some work, then notify the main process once the work is prepared.
Next, we can start a new child process calling our target task function and wait on the condition variable to be notified of the result.
1 2 3 4 5 6 |
... # start a new process to perform some work worker = Process(target=task, args=(condition,)) worker.start() # wait to be notified condition.wait() |
Note, we must start the new process after we have acquired the mutex lock in the condition variable in this example.
If we did not acquire the lock first, it is possible that there would be a race condition. Specifically, if we started the new child process before acquiring the condition and waiting in the main process, then it is possible for the new process to execute and notify before the main process has had a chance to start waiting. In which case the main process would wait forever to be notified.
Finally, we can report that the main process is all done.
1 2 3 |
... # we know the data is ready print('Main process all done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of wait/notify with a condition for processes from time import sleep from multiprocessing import Process from multiprocessing import Condition # target function to prepare some work def task(condition): # block for a moment sleep(1) # notify a waiting process that the work is done print('Child process sending notification...', flush=True) with condition: condition.notify() # do something else... sleep(1) # entry point if __name__ == '__main__': # create a condition condition = Condition() # wait to be notified that the data is ready print('Main process waiting for data...') with condition: # start a new process to perform some work worker = Process(target=task, args=(condition,)) worker.start() # wait to be notified condition.wait() # we know the data is ready print('Main process all done') |
Running the example first creates the condition variable.
The condition variable is acquired, then a new child process is created and started.
The child process blocks for a moment to simulate work, then notifies the waiting main process.
Meanwhile the main process waits to be notified by the child process, then once notified it continues on.
1 2 3 |
Main process waiting for data... Child process sending notification... All done |
Next, let’s look at how we might notify all waiting processes.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of wait() and notify_all() With a Condition
We can explore how to notify all processes waiting on a condition.
In this example we will start a suite of processes that will wait on the condition variable to be notified before performing their processing and reporting a result. The main process will block for a moment then notify all waiting processes that they can begin their work.
First, we can define a target task function that takes the condition variable as an argument along with a unique integer for identification.
The task function will acquire the condition variable and wait to be notified. Once notified it will generate a random value between 0 and 1, block for that fraction of a second then report the value.
The task function is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# target function def task(condition, number): # wait to be notified print(f'Process {number} waiting...', flush=True) with condition: condition.wait() # block for a moment value = random() sleep(value) # report a result print(f'Process {number} got {value}', flush=True) |
The main process will first create the shared condition variable.
1 2 3 |
... # create a condition condition = Condition() |
Next, we can create five child processes that will execute the task() function, passing in the condition instance and a unique integer from 0 to 4 for each process instance.
This can be achieved in a list comprehension.
1 2 3 |
... # create all child processes processes = [Process(target=task, args=(condition, i)) for i in range(5)] |
We can then start all child processes.
1 2 3 4 |
... # start all child processes for process in processes: process.start() |
Next, the main process can block for a second, then will notify all waiting processes to begin their simulated work.
1 2 3 4 5 6 7 |
... # block for a moment sleep(1) # notify all waiting processes that they can run with condition: # wait to be notified condition.notify_all() |
Finally, the main process can block until all child processes have finished.
1 2 3 4 |
... # wait for all child processes to terminate for process in processes: process.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of wait/notify all with a condition from time import sleep from random import random from multiprocessing import Process from multiprocessing import Condition # target function def task(condition, number): # wait to be notified print(f'Process {number} waiting...', flush=True) with condition: condition.wait() # block for a moment value = random() sleep(value) # report a result print(f'Process {number} got {value}', flush=True) # entry point if __name__ == '__main__': # create a condition condition = Condition() # create all child processes processes = [Process(target=task, args=(condition, i)) for i in range(5)] # start all child processes for process in processes: process.start() # block for a moment sleep(1) # notify all waiting processes that they can run with condition: # wait to be notified condition.notify_all() # wait for all child processes to terminate for process in processes: process.join() |
Running the example first creates the condition variable, then five child process are created and started.
The child processes run immediately and each acquire the condition and block waiting to be notified.
The main process blocks for a moment then notifies all five waiting processes. The waiting processes wake up, acquire the lock in the condition one-at-a-time, perform their simulated work and report their result.
The program exits once all child processes finish their work.
Note, your specific results will differ given the use of random numbers in the example.
1 2 3 4 5 6 7 8 9 10 |
Process 2 waiting... Process 0 waiting... Process 1 waiting... Process 3 waiting... Process 4 waiting... Process 0 got 0.3008159112259974 Process 1 got 0.6001846867782568 Process 3 got 0.8140761969493645 Process 4 got 0.8965445712089593 Process 2 got 0.9000804784430533 |
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use a condition variable with processes in Python
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Claudio Poggio on Unsplash
Tushar pharale says
I had a question
question: when we want to use multiple conditional variables let’s say 5 conditions, is it possible to have all conditional variables under common object such as dictionaries or lists
I have tried to create multiple conditions inside Manager().dict() object but didn’t succeed, do you have any advice?
Jason Brownlee says
Great question.
You must nest the proxy objects for each managed condition variable.
This tutorial will show you how:
https://superfastpython.com/multiprocessing-manager-nested-proxy-objects/
So, you would create 5 managed condition variables, then add them to a managed dict.
Let me know how you ho.