Last Updated on December 22, 2022
You can suffer race conditions when using process-based concurrency via the multiprocessing module in Python.
The types of race conditions we can expect may be different than those expected with threads, given that we are working with processes that do not have shared memory. Nevertheless, we must identify and protect critical sections from race conditions when using processes.
In this tutorial, you will discover how to identify and fix race conditions with processes in Python.
This tutorial was inspired by questions and discussions with Andrii K. Thank you deeply! If you have a question about Python concurrency, message me anytime.
Let’s get started.
What is a Race Condition
A race condition is a bug in concurrency programming.
Race condition: A flaw in a concurrent application in which the result is dependent on the timing or sequence of multiple threads’ execution.
— Page 271, The Art of Concurrency, 2009.
A race condition is a failure case where the behavior of the program is dependent upon the order of execution by two or more threads. This means that the behavior of the program will not be predictable, possibly changing each time it is run.
When threads or processes attempt to simultaneously access a shared resource, and the accesses can result in an error, we often say the program has a race condition, because the threads or processes are in a “race” to carry out an operation.
— Page 53, An Introduction to Parallel Programming, 2020.
There are many types of race conditions and they almost always have to do with an unexpected sequence of operations.
Two common classes of race conditions include:
- Race caused by accessing shared data or state.
- Race conditions caused due to timing.
Both of these race conditions can occur when using threads in Python.
Run loops using all CPUs, download your FREE book to learn how.
Race Conditions with Threads
Race conditions are a real problem in Python when using threads, even in the presence of the global interpreter lock (GIL).
The refrain that there are no race conditions in Python because of the GIL is dangerously wrong.
Let’s look at two examples
Race Condition With Shared Data
For example, one thread may be adding values to a variable, while another thread is subtracting values from the same variable.
Let’s call them an adder thread and a subtractor thread.
At some point, the operating system may context switch from the adding thread to the subtracting thread in the middle of updating the variable. Perhaps right at the point where it was about to write an updated value with an addition, say from the current value of 100 to the new value of 110.
1 2 3 |
... # add to the variable variable = variable + 10 |
You may recall that the operating system controls what threads execute and when, and that a context switch refers to pausing the execution of a thread and storing its state while unpausing another thread and restoring its state.
You may also notice that the adding or subtracting from the variable is composed of at least three steps:
- Read the current value of the variable.
- Calculate a new value for the variable.
- Write a new value for the variable.
A context switch between threads may occur at any point in this task.
Back to our threads. The subtracting thread runs and reads the current value as 100 and reduces the value from 100 to 90.
This subtraction is performed as per normal and the variable value is now 90.
The operating system context switches back to the adding thread and it picks up where it left off writing the value 110.
This means that in this case, one subtraction operation was lost and the shared balance variable has an inconsistent value. A race condition.
This type of race condition can be avoided by using a mutex lock.
Thread scheduling is inherently nondeterministic. Because of this, failure to use locks in threaded programs can result in randomly corrupted data and bizarre behavior known as a “race condition.” To avoid this, locks should always be used whenever shared mutable state is accessed by multiple threads.
— Page 497, Python Cookbook, 2013.
You can see an example of this type of race condition between threads in the tutorial:
Race Condition Due to Timing
Python threads may suffer a race condition due to a bug with timing.
For example, consider the case of the use of a threading.Condition used by two threads to coordinate their behavior.
If you are new to a condition object, you can learn more here:
One thread may wait on the threading.Condition to be notified of some state change within the application via the wait function.
1 2 3 4 |
... # wait for a state change with condition: condition.wait() |
Recall that when using a threading.Condition, you must acquire the condition before you can call wait() or notify(), then release it once you are done. This is easily achieved using the context manager.
Another thread may perform some change within the application and alert the waiting thread via the condition with the notify() function.
1 2 3 4 |
... # alert the waiting thread with condition: condition.notify() |
This is an example of coordinated behavior between two threads where one thread signals another thread.
For the behavior to work as intended, the notification from the second thread must be sent after the first thread has started waiting. If the first thread calls wait() after the second thread calls notify(), then it will not be notified and will wait forever.
This may happen if there is a context switch by the operating system that allows the second thread that calls notify() to run before the first thread that calls wait() to run.
You may recall that the operating system controls what threads execute and when, and that a context switch refers to pausing the execution of a thread and storing its state while unpausing another thread and restoring its state.
You can see an example of a thread race condition due to timing in the tutorial:
Next, let’s consider race conditions with processes.
Race Conditions with Processes
Python provides process-based concurrency via the multiprocessing module.
You can get started with process-based concurrency with the guide:
Race conditions are possible in Python when using process-based concurrency.
Let’s consider the two types of race conditions considered above with threads:
- Race caused by accessing shared data or state.
- Race conditions caused due to timing.
Race Condition With Shared Data
Race conditions with shared data is challenging with processes.
The reason is that processes cannot share data directly.
Threads have shared memory (within one process), therefore a global variable can be accessed by different threads at the same time and left in an inconsistent state.
This cannot happen directly with processes.
A child process created using the “fork” start method will get a copy of all global variables from the parent process, but any changes to the global variable in the child will have no effect on the global variable in the parent.
You can learn more about this in the tutorial:
There are two mechanics for sharing Python objects and data between processes in a Python program, they are:
- Host the object on a server process and allow processes to interact with it indirectly via proxy objects. This can be achieved using a multiprocessing.Manager.
- Share primitive types (e.g. integers and floats) between processes using shared ctypes via the multiprocessing.Value and multiprocessing.Array classes.
Race conditions are possible when using either of these two mechanisms to share data between processes.
Additionally, it is possible for two processes to interact with other types of shared resources and suffer a race condition as a result.
A common example is a file.
If two or more processes attempt to write to the same file at the same time, the results may be unpredictable.
Processes will race to write to the file, overwrite the content of the file and leave the file in an unexpected state.
This is most commonly seen when logging to one file from multiple processes.
You can learn more about this in the tutorial:
Race Condition Due to Timing
Processes can suffer race conditions due to timing.
This is just like the race conditions seen with threads.
A synchronization primitive may be shared among processes and one or more processes may wait to be notified after another process has sent the notification message.
This type of race condition can occur with an assortment of primitives that involve waiting, such as events, condition variables, and barriers.
Process Safety
The solution is to ensure that multiprocessing code is process-safe.
Process-safe is the concept of thread safety applied to concurrency with processes.
This may involve making correct use of synchronization primitives, such as:
- multiprocessing.Lock
- multiprocessing.RLock
- multiprocessing.Semaphore
- multiprocessing.Even
- multiprocessing.Condition Variable
- multiprocessing.Barrier
It may also involve separating out responsibilities for interacting with a resource into a controlling process and having other processes send messages to the controlling process via a multiprocessing queue.
You can learn more about multiprocessing queues in the tutorial:
You can learn more about process safety in the tutorial:
Now that we know that race conditions are possible with processes in Python, let’s explore some examples.
Starting with a negative example.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Multiprocessing Cannot Race with a Global Variable
We can explore the case where a global variable is shared with child processes and does not lead to a race condition.
This situation is important to demonstrate because the same solution when using thread-based concurrency will lead to a race condition.
In this example, we will define two tasks, “adder” and “subtractor“. The adder task will access a shared global variable named “value” and add one to it, with plenty of room for a context switch to occur during the operation. Similarly, the subtractor operation will access a global variable named “value” and subtract one from it with plenty of room for a context switch during the operation.
Specifically, will unroll a “value += 1” and “value -= 1” into their separate operations of: “copy value into tmp“, “change tmp“, and “copy tmp back to value” and add sleep operations between each step. This signals to the operating system that the task may context switch at that point.
Each adder and subtractor task will be performed in a loop that iterates 1,000,000 times, offering many opportunities to race.
Each task will be executed in a separate child process and the child processes will be created using the “fork” start method that ensures that each child is a copy of the parent process.
Because the “value” global variable is defined in the parent process, it will be copied and made available to the child processes.
Note: The “fork” start method is not available on windows. As such this example may not run on the Windows platform. It will work fine on Linux, macOS, and other platforms.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# SuperFastPython.com # example of an attempted race condition with a shared variable from time import sleep from multiprocessing import Process from multiprocessing import set_start_method # make additions into the global variable def adder(amount, repeats): global value for _ in range(repeats): # copy the value tmp = value # suggest a context switch sleep(0) # change the copy tmp = tmp + amount # suggest a context switch sleep(0) # copy the value back value = tmp # make subtractions from the global variable def subtractor(amount, repeats): global value for _ in range(repeats): # copy the value tmp = value # suggest a context switch sleep(0) # change the copy tmp = tmp - amount # suggest a context switch sleep(0) # copy the value back value = tmp if __name__ == '__main__': # set start method set_start_method('fork') # define the global variable global value value = 0 # start a thread making additions adder_thread = Process(target=adder, args=(1, 1000000)) adder_thread.start() # start a thread making subtractions subtractor_thread = Process(target=subtractor, args=(1, 1000000)) subtractor_thread.start() # wait for both processes to finish print('Waiting for processes to finish...') adder_thread.join() # subtractor_thread.join() # report the value print(f'Value: {value}') |
Running the example first declares and defines the “value” global variable.
The adder and subtractor processes are then created and started.
The main process then waits for both processes to complete.
The adder and subtractor processes run, looping one million times each and modifying the global variable.
The child processes finish and the value of the global variable is reported.
And it has the expected value of zero.
1 2 |
Waiting for processes to finish... Value: 0 |
There was no race condition because each process operated on a separate copy of the “value” global variable.
The main process had one copy that was never changed and remained at the value of zero until reported at the end.
Each child process had a copy of the global variable and either added to it or subtracted from it. Their values were changed and never interacted with each other or the copy in the main process.
We cannot get a race condition in this way with processes because we cannot share a global variable directly.
Next, let’s look at an example of sharing data between processes that is subject to a race condition.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Multiprocessing Race Condition with a Hosted Variable
One way to share data between processes is to use a multiprocessing.Manager.
A manager is a server process that hosts Python objects and returns proxy objects. Processes can then use the hosted object via the proxy objects, just like a shared global variable.
All of the inter-process communication required to read and write the shared variable is performed automatically behind the scenes via the proxy objects.
You can learn more about managers in the tutorial:
We can create a custom class that suffers race conditions and hosts it with a manager.
Why not create a custom class with a member variable and share this among child processes directly, without a Manager?
Excellent question! Because each child process would get and work with a copy of the shared object. Any changes to the shared object would be limited to the process that made those changes (e.g. we would be in the same situation as the example in the previous section).
You can see an example of this in the tutorial:
A manager is required to really share an object between child processes and have changes reflected in other processes.
Let’s explore this case and then how to fix it.
Example of Race Condition with a Manager
In this example we can define a custom class that is subject to race conditions, host it on the manager and then interact with it from multiple child processes, triggering a race condition.
Firstly, we can create a custom counter class for incrementing and decrementing a variable.
We can perform the changes to the variable in a way that strongly encourages the operating system to context switch in the middle of the operations, essentially forcing a race condition.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# custom counter class class UnsafeCounter(): def __init__(self, count): self._value = count # retrieve the variable def get(self): return self._value # add one to the variable def increment(self): # copy value tmp = self._value # allow a context switch sleep(0) # increment the value tmp = tmp + 1 # allow a context switch sleep(0) # copy the updated value self._value = tmp # subtract one from the variable def decrement(self): # copy value tmp = self._value # allow a context switch sleep(0) # increment the value tmp = tmp - 1 # allow a context switch sleep(0) # copy the updated value self._value = tmp |
We can host this custom class on a Manager.
This requires first defining a custom Manager class.
1 2 3 4 |
# custom manager to support custom classes class CustomManager(BaseManager): # nothing pass |
Then registering the custom class on the custom manager class, so it knows how to make it.
1 2 3 |
... # register the custom class on the custom manager CustomManager.register('UnsafeCounter', UnsafeCounter) |
You can learn more about hosting custom classes on a manager in the tutorial:
We can define custom task functions to increment and decrement our shared counter.
Each task will perform 1,000 iterations and call increment() or decrement() methods on the shared counter.
1 2 3 4 5 6 7 8 9 |
# task executed in a child process def adder_task(counter): for i in range(1000): counter.increment() # task executed in a child process def subtractor_task(counter): for i in range(1000): counter.decrement() |
The main process can first create the custom Manager class and then create the shared counter using the manager.
This will host the counter on the manager server process and return proxy objects that can be shared with the child processes and used to interact with the hosted object.
1 2 3 4 5 |
... # create manager with CustomManager() as manager: # create the counter counter = manager.UnsafeCounter(0) |
We can then start two child processes, one for adding the shared counter and one for subtracting, and wait for them to complete.
1 2 3 4 5 6 7 8 9 10 |
... # create child processes adder_process = Process(target=adder_task, args=(counter,)) subtractor_process = Process(target=subtractor_task, args=(counter,)) # start child processes adder_process.start() subtractor_process.start() # wait for processes to complete adder_process.join() subtractor_process.join() |
Finally, we can report the value of the shared counter.
1 2 3 |
... # report the value print(f'Value: {counter.get()}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# SuperFastPython.com # example of a race condition between processes with shared memory from time import sleep from multiprocessing import Process from multiprocessing.managers import BaseManager # custom counter class class UnsafeCounter(): def __init__(self, count): self._value = count # retrieve the variable def get(self): return self._value # add one to the variable def increment(self): # copy value tmp = self._value # allow a context switch sleep(0) # increment the value tmp = tmp + 1 # allow a context switch sleep(0) # copy the updated value self._value = tmp # subtract one from the variable def decrement(self): # copy value tmp = self._value # allow a context switch sleep(0) # increment the value tmp = tmp - 1 # allow a context switch sleep(0) # copy the updated value self._value = tmp # custom manager to support custom classes class CustomManager(BaseManager): # nothing pass # task executed in a child process def adder_task(counter): for i in range(1000): counter.increment() # task executed in a child process def subtractor_task(counter): for i in range(1000): counter.decrement() # protect the entry point if __name__ == '__main__': # register the custom class on the custom manager CustomManager.register('UnsafeCounter', UnsafeCounter) # create manager with CustomManager() as manager: # create the counter counter = manager.UnsafeCounter(0) # create child processes adder_process = Process(target=adder_task, args=(counter,)) subtractor_process = Process(target=subtractor_task, args=(counter,)) # start child processes adder_process.start() subtractor_process.start() # wait for processes to complete adder_process.join() subtractor_process.join() # report the value print(f'Value: {counter.get()}') |
Running the example first registers the custom counter class on the custom manager.
An instance of the custom manager is created which creates a server process, then an instance of the custom counter class is created on the manager process, returning proxy objects.
The two child processes are created and provided access to the shared counter via proxy objects.
The main process waits until the tasks are completed.
Each task loops 1,000 times either incrementing or decrementing the shared counter via the proxy objects.
The tasks are completed and the value of the shared counter is reported.
We expect the value to be zero every time because we issued 1,000 increments and 1,000 decrements.
In this case, we see the value is not zero.
1 |
Value: 3 |
In fact, the program gives a different final value every time it is run.
1 |
Value: 5 |
We have a race condition.
Example of Fixing a Race Condition with a Manager
We can fix a race condition between processes using shared memory via a manager.
There are a few ways we could achieve this.
One way might be to share a multiprocessing.Lock object and use it in each task function to protect the usage of the counter.
For example:
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a child process def adder_task(counter, lock): for i in range(1000): with lock: counter.increment() # task executed in a child process def subtractor_task(counter, lock): for i in range(1000): with lock: counter.decrement() |
This will work.
The mutex lock can only be acquired and held by one child process at a time, ensuring only one child process can either increment or decrement the shared counter at a time.
You can learn more about multiprocessing mutex locks in the tutorial:
Another approach is to move the lock into the custom counter class and make it process-safe.
Each time a method in the class needs to operate on the internal variable, it must acquire the lock first. This ensures that all reads and writes to the shared variable are mutually exclusive, e.g. one at a time.
This might be a better approach as it hides the change, transforming our UnsafeCounter into a SafeCounter.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# custom counter class class SafeCounter(): def __init__(self, count): self._lock = Lock() self._value = count # retrieve the variable def get(self): with self._lock: return self._value # add one to the variable def increment(self): with self._lock: # copy value tmp = self._value # allow a context switch sleep(0) # increment the value tmp = tmp + 1 # allow a context switch sleep(0) # copy the updated value self._value = tmp # subtract one from the variable def decrement(self): with self._lock: # copy value tmp = self._value # allow a context switch sleep(0) # increment the value tmp = tmp - 1 # allow a context switch sleep(0) # copy the updated value self._value = tmp |
The updated version with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# SuperFastPython.com # example of fixing a race condition between processes with shared memory from time import sleep from multiprocessing import Process from multiprocessing import Lock from multiprocessing.managers import BaseManager # custom counter class class SafeCounter(): def __init__(self, count): self._lock = Lock() self._value = count # retrieve the variable def get(self): with self._lock: return self._value # add one to the variable def increment(self): with self._lock: # copy value tmp = self._value # allow a context switch sleep(0) # increment the value tmp = tmp + 1 # allow a context switch sleep(0) # copy the updated value self._value = tmp # subtract one from the variable def decrement(self): with self._lock: # copy value tmp = self._value # allow a context switch sleep(0) # increment the value tmp = tmp - 1 # allow a context switch sleep(0) # copy the updated value self._value = tmp # custom manager to support custom classes class CustomManager(BaseManager): # nothing pass # task executed in a child process def adder_task(counter): for i in range(1000): counter.increment() # task executed in a child process def subtractor_task(counter): for i in range(1000): counter.decrement() # protect the entry point if __name__ == '__main__': # register the custom class on the custom manager CustomManager.register('SafeCounter', SafeCounter) # create manager with CustomManager() as manager: # create the counter counter = manager.SafeCounter(0) # create child processes adder_process = Process(target=adder_task, args=(counter,)) subtractor_process = Process(target=subtractor_task, args=(counter,)) # start child processes adder_process.start() subtractor_process.start() # wait for processes to complete adder_process.join() subtractor_process.join() # report the value print(f'Value: {counter.get()}') |
Running the example first registers the custom counter class on the custom manager.
An instance of the custom manager is created which creates a server process, then an instance of the custom counter class is created on the manager process, returning proxy objects.
Importantly, the counter creates its own internal lock when the object is initialized.
The two child processes are created and provided access to the shared counter via proxy objects.
The main process waits until the tasks are completed.
Each task loops 1,000 times either incrementing or decrementing the shared counter via the proxy objects.
The tasks are completed and the value of the shared counter is reported.
We expect the value to be zero every time because we issued 1,000 increments and 1,000 decrements.
In this case, we can see that this expectation is met. In fact, the value of the shared counter is zero every time the program is run.
The race condition has been fixed.
1 |
Value: 0 |
Next, let’s look at a race condition between processes using a shared ctype variable.
Multiprocessing Race Condition with a Shared ctype
Another way we can share a variable between processes is by using shared ctypes.
The ctypes module provides tools for working with C data types.
Python provides the capability to share ctypes between processes on one system.
This is primarily achieved via the following classes:
- multiprocessing.Value: manage a shared value.
- multiprocessing.Array: manage an array of shared values.
The multiprocessing.Value class will create a shared ctype with a specified data type and initial value.
The first argument defines the data type for the value. It may be a string type code or a Python ctype class. The second argument may be an initial value.
For example, we can define a signed integer type with the ‘i’ type code and an initial value of zero as follows:
1 2 3 |
... # create a integer value variable = multiprocessing.Value('i', 0) |
The data within the multiprocessing.Value object can be accessed via the “value” attribute.
1 2 3 |
... # get the data data = variable.value |
You can learn more about how to create and share ctypes between processes in the tutorial:
We can suffer race conditions when sharing ctypes between processes.
Let’s explore an example and then how to fix it.
Example of Race Condition with Shared ctype
In this example, we can define an integer shared ctype variable and share it among processes.
Each process will change the variable at the same time, triggering a race condition and leaving the variable in an unknown and inconsistent state.
We will define two tasks, an adder, and a subtractor task, which we did previously.
The adder task will loop 1,000 times and increment the variable each iteration, whereas the subtractor task will subtract from the task each iteration.
The final value of the shared ctype will be reported, which would expect to be zero, given the balanced number of increments and decrements.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of an a race condition with a shared ctype from time import sleep from multiprocessing import Process from multiprocessing import Value # make additions into the shared variable def adder(variable): for _ in range(1000): # increment the variable variable.value += 1 # make subtractions from the shared variable def subtractor(variable): for _ in range(1000): # decrement the variable variable.value -= 1 if __name__ == '__main__': # create a shared ctype integer variable = Value('i', 0) # start a thread making additions adder_thread = Process(target=adder, args=(variable,)) adder_thread.start() # start a thread making subtractions subtractor_thread = Process(target=subtractor, args=(variable,)) subtractor_thread.start() # wait for both processes to finish print('Waiting for processes to finish...') adder_thread.join() subtractor_thread.join() # report the value print(f'Value: {variable.value}') |
Running the example first creates the shared ctype and initializes it to zero.
The adder process is then created and passed the ctype variable, then started. Similarly. the subtractor child process is created and also passed the same ctype variable and started.
The main process then waits for both processes to complete.
Each process loops 1,000 times and modifies the value within the shared ctype.
Because the variable is shared, both processes operate on the same variable.
Both child processes are completed and the final value of the shared ctype is reported.
In this case, the final value is not zero.
1 2 |
Waiting for processes to finish… Value: 221 |
In fact, the final value is different each time the program is run given a race condition.
1 2 |
Waiting for processes to finish… Value: -659 |
Next, let’s look at how we can fix this race condition.
Example of Fixing a Race Condition with Shared ctype
We can fix the race condition by using a shared mutex lock.
One approach would be to create a mutex lock in the main process and share it with each child process.
Each time the adder and subtractor tasks change the shared ctype, they can first acquire the lock. This would ensure that all changes to the shared ctype are serialized, that is only one child process can change the variable at a time.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# make additions into the shared variable def adder(variable, lock): for _ in range(1000): # acquire the lock on the variable with lock: # increment the variable variable.value += 1 # make subtractions from the shared variable def subtractor(variable,lock ): for _ in range(1000): # acquire the lock on the variable with lock: # decrement the variable variable.value -= 1 |
In fact, this is not necessary.
By default, the shared ctype Value and Array objects will create and manage their own internal mutex lock.
This lock can be retrieved via the get_lock() method and can be used to protect direct modification of the shared ctype value.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# make additions into the shared variable def adder(variable): for _ in range(1000): # acquire the lock on the variable with variable.get_lock(): # increment the variable variable.value += 1 # make subtractions from the shared variable def subtractor(variable): for _ in range(1000): # acquire the lock on the variable with variable.get_lock(): # decrement the variable variable.value -= 1 |
This is the correct and preferred way to modify and access the values of a shared ctype to ensure it is process-safe.
The updated version of the program with these changes that fix the race condition is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of fixing a race condition with a shared ctype from time import sleep from multiprocessing import Process from multiprocessing import Value # make additions into the shared variable def adder(variable): for _ in range(1000): # acquire the lock on the variable with variable.get_lock(): # increment the variable variable.value += 1 # make subtractions from the shared variable def subtractor(variable): for _ in range(1000): # acquire the lock on the variable with variable.get_lock(): # decrement the variable variable.value -= 1 if __name__ == '__main__': # create a shared ctype integer variable = Value('i', 0) # start a thread making additions adder_thread = Process(target=adder, args=(variable,)) adder_thread.start() # start a thread making subtractions subtractor_thread = Process(target=subtractor, args=(variable,)) subtractor_thread.start() # wait for both processes to finish print('Waiting for processes to finish...') adder_thread.join() subtractor_thread.join() # acquire the lock on the variable with variable.get_lock(): # report the value print(f'Value: {variable.value}') |
Running the example first creates the shared ctype and initializes it to zero.
The adder process is then created and passed the ctype variable, then started. Similarly. the subtractor child process is created and also passed the same ctype variable and started.
The main process then waits for both processes to complete.
Each process loops 1,000 times. Each iteration, the tasks first acquire the internal mutex lock for the shared ctype variable. Only once the lock is acquired will the task modify the value within the shared ctype.
Both child processes are completed and the final value of the shared ctype is reported.
In this case, the final value is always zero.
The race condition has been fixed.
1 2 |
Waiting for processes to finish… Value: 0 |
Next, let’s look at sharing a file from multiple processes.
Multiprocessing Cannot Race with a Shared File
We can explore the case of multiple child processes writing to the same file at the same time.
When using threads, we can open a handle to the file, share the handle directly with many threads and have them all write to the file at the same time. This leads to a race condition as the threads complete and overwrites each other’s data, leading to data corruption and loss.
For an example of this type of race condition with threads, see the tutorial:
This type of race condition is challenging to recreate using process-based concurrency.
Firstly, we cannot open a file handle and share it directly with child processes.
Doing so results in an error because the file handle cannot be pickled (serialized) and therefore cannot be sent to a child process as an argument.
An alternate approach is to have each child process open the same file and attempt to write many lines to the file at the same time.
The example below explores this case.
We can define a task that takes the filename and loops 10,000 times. Each iteration, the file is opened and one line of text is written.
We can then perform this task in a large number of child processes, 50 in this case, kept intentionally small as some operating systems like Windows limit the number of child processes that can be created.
With 50 processes each writing 10K lines, we expect the file to have 500,000 lines of text.
The program waits for all child processes to finish before loading the file and reporting the total number of lines.
If there was a race condition leading to corruption and loss, we would expect fewer than 500,000 lines in the final file.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# SuperFastPython.com # example of attempted race condition with shared file from random import random from time import sleep from multiprocessing import Process import os # task for writing data to a file def task(filename, arg): # write data to the file for i in range(10000): # open the file with open(filename, 'a') as file: # write one line print(f'Data line {i} from task {arg}.', file=file) # protect the entry point if __name__ == '__main__': # shared filename filename = 'tmp.txt' # delete the file if it exists try: os.remove(filename) except: pass # start some processes to write to the same file processes = [Process(target=task, args=(filename,i)) for i in range(50)] # start the processes for process in processes: process.start() # wait for the processes to complete for process in processes: process.join() # read the file and report results with open(filename, 'r') as file: # read entire file data = file.read() # split into lines data = data.strip().split('\n') # report total lines print(f'{filename} has {len(data)} lines') |
Running the example first deletes the shared file if it exists (e.g. if the program has been run before).
Next, 50 child processes are created and started and the main process waits for them to complete.
Each process loops 10,000 times, each iteration, opening the file and writing one line.
The repeated opening and writing to the file is an attempt to force a race between child processes.
Once all tasks are complete the main process loads the text file and reports the total number of lines.
In this case, we can see that all 500,000 lines were written without incident.
No race condition is evident.
1 |
tmp.txt has 500000 lines |
It seems writing to the same file from multiple processes is safe on many modern operating systems, such as Linux and macOS. Not sure about windows, sorry.
We cannot easily force a race condition in this situation.
We can see some discussion of this issue on stack overflow, here:
Nevertheless, I would not rely on this finding in a production system.
I would strongly recommend creating a lock and sharing it among the child processes and treat operating on the file as a critical section.
For example:
1 2 3 4 5 6 7 8 9 10 |
# task for writing data to a file def task(filename, arg, lock): # write data to the file for i in range(10000): # acquire the lock with lock: # open the file with open(filename, 'a') as file: # write one line print(f'Data line {i} from task {arg}.', file=file) |
Maybe this adds too much lock overhead in this specific case and the task() function should be restricted so that the lock is acquired more efficiently.
Next, let’s explore a race condition between child processes due to timing.
Multiprocessing Race Condition with Timing
We can explore an example of a race condition between processes due to timing.
We can then explore how to fix the race condition, ensuring it does not happen.
Example of Multiprocessing Race Condition
In this example, we will create a multiprocessing.Condition, then start a new child process that waits on the condition to be notified and the main process that notifies the child process.
If you are new to condition variables, see the tutorial:
To force the race condition, we will add a delay between the new process starting and waiting on the condition. This will cause the new process to always miss the notification from the main process and wait forever, requiring the program to be killed manually rather than terminating normally.
Firstly, we can define a function named task() to be executed by a new process.
The function will first sleep for a fraction of a second to force the timing race condition, then acquire the condition and wait to be notified.
The complete function is listed below.
1 2 3 4 5 6 7 8 9 |
# process waiting to be notified def task(condition): # insert a delay sleep(1) # wait to be notified print('Process: Waiting to be notified...', flush=True) with condition: condition.wait() print('Process: Notified', flush=True) |
Next, in the main process we can create the shared multiprocessing.Condition object.
1 2 3 |
... # create the shared condition variable condition = Condition() |
Next, we can create and start a new multiprocessing.Process configured to execute our task() function and pass in the condition as an argument to the function.
1 2 3 4 5 |
... # create the new process process = Process(target=task, args=(condition,)) # start the new process process.start() |
Finally, the main process can acquire the multiprocessing.Condition and notify the child process.
1 2 3 4 5 6 7 8 9 10 |
... # allow the new process to start up, but not start waiting sleep(0.5) # notify the new process print('Main: Notifying the process') with condition: condition.notify() # wait for the task to complete process.join() print('Main: Done') |
Tying this together, the complete example of a race condition based on timing is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of a race condition with timing between two processes from time import sleep from multiprocessing import Process from multiprocessing import Condition # process waiting to be notified def task(condition): # insert a delay sleep(1) # wait to be notified print('Process: Waiting to be notified...', flush=True) with condition: condition.wait() print('Process: Notified', flush=True) # protect the entry point if __name__ == '__main__': # create the shared condition variable condition = Condition() # create the new process process = Process(target=task, args=(condition,)) # start the new process process.start() # allow the new process to start up, but not start waiting sleep(0.5) # notify the new process print('Main: Notifying the process') with condition: condition.notify() # wait for the task to complete process.join() print('Main: Done') |
Running the example first creates and starts the child process.
The child process starts running and then sleeps for one second.
Meanwhile, the main process acquires the condition and calls notify().
The child process wakes up, acquires the condition, and waits to be notified. Because the notification has already been sent, it is missed and the child process waits forever.
This demonstrates a race condition based on timing.
We might also say that the program is deadlocked and unable to progress.
1 2 |
Main: Notifying the process Process: Waiting to be notified... |
Next, let’s look at how we might fix this race condition using an event.
Example of Fixing a Multiprocessing Race Condition
A race condition based on timing seen in the previous section can be fixed by allowing the notify process to wait for the waiting process to be ready before doing its work and calling notify().
One way to achieve this is by using a multiprocessing.Event, which is a process-safe boolean flag variable.
If you are new to events, see the tutorial:
The shared multiprocessing.Event can be passed to the task() function as an argument and then set by the child process while holding the multiprocessing.Condition, right before waiting on the condition.
It is important in this change that the event is set while the condition is held as it blocks the main process from acquiring the condition and calling notify until the child process releases the condition when calling wait().
The updated version of the task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# process waiting to be notified def task(condition, event): # insert a delay sleep(1) # wait to be notified print('Process: Waiting to be notified...', flush=True) with condition: # report signal that we are ready print('Process: Ready', flush=True) event.set() # wait to be notified condition.wait() print('Process: Notified', flush=True) |
The main process can then create the shared multiprocessing.Event object.
1 2 3 |
... # create the shared event event = Event() |
It can then be passed to a child process.
1 2 3 4 5 |
... # create the new process process = Process(target=task, args=(condition,event)) # start the new process process.start() |
Finally, the main process can wait for the multiprocessing.Event to be set before progressing.
This can be achieved by calling the wait() method on the event.
1 2 3 |
... # wait for the task to signal that it's ready event.wait() |
Tying this together, the complete example of fixing a race condition based on timing is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# SuperFastPython.com # example of fixed race condition due to timing between two processes from time import sleep from multiprocessing import Process from multiprocessing import Condition from multiprocessing import Event # process waiting to be notified def task(condition, event): # insert a delay sleep(1) # wait to be notified print('Process: Waiting to be notified...', flush=True) with condition: # report signal that we are ready print('Process: Ready', flush=True) event.set() # wait to be notified condition.wait() print('Process: Notified', flush=True) # protect the entry point if __name__ == '__main__': # create the shared condition variable condition = Condition() # create the shared event event = Event() # create the new process process = Process(target=task, args=(condition,event)) # start the new process process.start() # wait for the task to signal that it's ready event.wait() # notify the new process print('Main: Notifying the process') with condition: condition.notify() # wait for the task to complete process.join() print('Main: Done') |
Running the example first creates the shared condition and event.
The child process is then created and started, which immediately blocks for a moment.
Meanwhile, the main process waits to be notified that the child process is ready.
The child process wakes up, acquires the condition, sets the event, and then waits on the condition.
The main process notices that the event has been set, then acquires the condition and notifies the child process.
The program works as expected and the race condition no longer exists.
1 2 3 4 5 |
Process: Waiting to be notified... Process: Ready Main: Notifying the process Process: Notified Main: Done |
If you are aware of other race conditions with multiprocessing, please let me know in the comments below.
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to identify and fix race conditions with processes in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Cali Naughton on Unsplash
andylaw says
There is a typo in the example with multiprocessing writing to the file: must be 500,000 lines, not 50,000.
Jason Brownlee says
Thank you!! Fixed.