You can develop a process-safe counter class using a multiprocessing.Value and a mutex lock.
In this tutorial, you will discover how to develop a process-safe counter in Python.
Let’s get started.
Need a Process-Safe Counter
A counter is an object that maintains a private variable that changes via methods, e.g. incremented, and accessed.
We often need to share a custom counter object among many processes, such as when using process pools via the Pool or ProcessPoolExecutor class or child processes via the multiprocessing.Process class.
This is challenging because, by default, for two reasons:
- Instance variables of an object cannot be shared between processes.
- If the object is updated to use shareable variables, they may suffer race conditions.
You can learn more about how instance variables are not shared among processes in the tutorial:
You can learn more about race conditions among processes in the tutorial:
As such, creating a counter object and sharing and using it in multiple processes can be difficult.
This problem represents a general class of problems, namely safely sharing custom objects when using process-based concurrency in Python.
How can we share a custom class with private instance variables among processes in Python?
Note, developing a process-safe counter is very different to developing a thread-safe counter because of the lack of shared memory between processes. You can learn more about developing a thread-safe counter in the tutorial:
Next, let’s look at how we can develop a process-safe counter in Python.
Run loops using all CPUs, download your FREE book to learn how.
How to Develop a Process-Safe Counter
We can develop a process safe counter using two capabilities provided by the multiprocessing module, shared ctypes, and a mutex lock.
A shared ctype can be used as a primitive variable that can be shared among processes, where changes will be propagated and visible to all processes.
A mutex lock can be used to protect access and changes to the instance variables of the custom class to ensure changes to the variables are process-safe, e..g to avoid race conditions.
- Use a shared ctype for sharable instance variables.
- Use a mutex lock to protect instance variables against race conditions.
Let’s explore the steps to developing a process-safe counter
Step 1: Process Unsafe Counter (not shared)
First;y, let’s define an unsafe UnsafeCounter class that we might use in a single-process program.
The class constructor defines and initializes a private instance variable for the count that is to be maintained.
An increment() method adds one to the counter instance variable and a value() method retrieves the current value of the counter.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# define a counter class UnsafeCounter(): # constructor def __init__(self): # initialize counter self._counter = 0 # increment the counter def increment(self): self._counter += 1 # get the counter value def value(self): return self._counter |
Sharing an instance of this class among processes will not work as intended.
Changes to the instance variable in each process will not be propagated to other processes.
Step 2: Change Variable to Shared ctype (race)
We can change the private instance variable to a shared ctype.
This is a data type that can be a primitive value, like an integer or floating point value, and changes will be propagated between processes.
This can be achieved by using the multiprocessing.Value class and configuring it to hold an integer value initialized to zero.
For example:
1 2 3 |
... # initialize counter self._counter = Value('i', 0) |
You can learn more about shared ctypes in the tutorial:
The integer within the multiprocessing.Value object can be accessed and modified via the “value” attribute.
For example, the updated class with this change to use a multiprocessing.Value is listed below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# define a counter class UnsafeCounter(): # constructor def __init__(self): # initialize counter self._counter = Value('i', 0) # increment the counter def increment(self): self._counter.value += 1 # get the counter value def value(self): return self._counter.value |
This is an improvement but still has a problem.
Changes to the instance variable are not process-safe, leading to race conditions.
Step 3: Protect Instance Variable With Mutex (process safe)
We can use a mutex lock to protect access and changes to the private instance variable in the counter object.
One approach would be to create an instance of a multiprocessing.Lock class and use it in methods whenever the private instance variable is being accessed or modified.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# define a counter class SafeCounter(): # constructor def __init__(self): # initialize counter self._counter = Value('i', 0) # initialize lock self._lock = Lock() # increment the counter def increment(self): # get the lock with self._lock: self._counter.value += 1 # get the counter value def value(self): with self._lock: return self._counter.value |
You can learn more about mutex locks in the tutorial:
This approach would be effective if we had multiple instance variables to protect in the custom class.
In this case, we do not need to create and manage a new mutex lock.
The reason is the multiprocessing.Value already has a lock within it that we can use.
It is accessible via the get_lock() method.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# define a counter class SafeCounter(): # constructor def __init__(self): # initialize counter self._counter = Value('i', 0) # increment the counter def increment(self): # get the lock with self._counter.get_lock(): self._counter.value += 1 # get the counter value def value(self): # get the lock with self._counter.get_lock(): return self._counter.value |
We probably don’t need to protect the access to the value of the counter using the lock as it likely uses the lock internally. Nevertheless, I’ve left it in for consistency (i.e. the default lock used by a multiprocessing.Value is a reentrant lock).
And that’s it.
We can now see how to develop a process-safe counter.
Next, let’s look at some worked examples of working with process-unsafe and process-safe counters in Python.
Example of Process-Unsafe Counter (Not Shared Correctly)
We can explore developing a process-unsafe counter where changes to the instance variables are not propagated between processes.
This can be achieved by defining a normal Python class with one instance variable, as we did above.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# define a counter class UnsafeCounter(): # constructor def __init__(self): # initialize counter self._counter = 0 # increment the counter def increment(self): self._counter += 1 # get the counter value def value(self): return self._counter |
We can then define a target task function that takes the shared counter object as an argument and iterates it many times, incrementing the counter each iteration.
1 2 3 4 5 |
# task executed by processes def task(counter): # increment the counter for _ in range(100000): counter.increment() |
We can then start many processes that share the same counter object and execute the same task at the same time in parallel.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# protect the entry point if __name__ == '__main__': # create the shared counter counter = UnsafeCounter() # create 10 processes to increment the counter processes = [Process(target=task, args=(counter,)) for _ in range(10)] # start all processes for process in processes: process.start() # wait for all processes to finish for process in processes: process.join() # report the value of the counter print(counter.value()) |
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of a process unsafe counter that is not shared correctly from multiprocessing import Process # define a counter class UnsafeCounter(): # constructor def __init__(self): # initialize counter self._counter = 0 # increment the counter def increment(self): self._counter += 1 # get the counter value def value(self): return self._counter # task executed by processes def task(counter): # increment the counter for _ in range(100000): counter.increment() # protect the entry point if __name__ == '__main__': # create the shared counter counter = UnsafeCounter() # create 10 processes to increment the counter processes = [Process(target=task, args=(counter,)) for _ in range(10)] # start all processes for process in processes: process.start() # wait for all processes to finish for process in processes: process.join() # report the value of the counter print(counter.value()) |
Running the example first creates the shared UnsafeCounter object.
Then, 10 child processes are created and configured to execute our custom task() function and passed the shared counter object.
The main process then starts all processes and waits for them to complete.
Each process executes the custom task() function and increments the shared counter 100,000 times.
We would expect the final value of the counter to be 1,000,000, e.g. 10 processes multiplied by 100,000 increments.
The tasks are completed and the final value of the counter is reported.
In this case, the counter is zero.
1 |
The reason this example failed is that each child process receives a new copy of the UnsafeCounter object.
Each process does increment the counter, but locally. The changes are not shared among processes or with the main process.
The UnsafeCounter object in the main process is never incremented and therefore remains at the value of zero.
Next, let’s look at updating the counter to use instance variables whose changes are propagated among processes.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Example of Process-Unsafe Counter (With Race Conditions)
We can update the custom counter object to use instance variables that are shared among multiple processes.
In this example, we will change the instance variable to be an instance of a multiprocessing.Value object.
Changes to the Value object are propagated to all processes that share the object. If the Value object is an instance variable within an UnsafeCounter object, then all processes that share the UnsafeCounter will share the same instance variable.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# define a counter class UnsafeCounter(): # constructor def __init__(self): # initialize counter self._counter = Value('i', 0) # increment the counter def increment(self): self._counter.value += 1 # get the counter value def value(self): return self._counter.value |
The complete example with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# SuperFastPython.com # example of a process unsafe counter with race conditions from multiprocessing import Process from multiprocessing import Value # define a counter class UnsafeCounter(): # constructor def __init__(self): # initialize counter self._counter = Value('i', 0) # increment the counter def increment(self): self._counter.value += 1 # get the counter value def value(self): return self._counter.value # task executed by processes def task(counter): # increment the counter for _ in range(100000): counter.increment() # protect the entry point if __name__ == '__main__': # create the shared counter counter = UnsafeCounter() # create 10 processes to increment the counter processes = [Process(target=task, args=(counter,)) for _ in range(10)] # start all processes for process in processes: process.start() # wait for all processes to finish for process in processes: process.join() # report the value of the counter print(counter.value()) |
Running the example first creates the shared UnsafeCounter object.
Then, 10 child processes are created and configured to execute our custom task() function and passed the shared counter object.
The main process then starts all processes and waits for them to complete.
Each process executes the custom task() function and increments the shared counter 100,000 times.
All child processes executing their tasks share the same shared Value instance variable. Changes are propagated among processes.
We would expect the final value of the counter to be 1,000,000, e.g. 10 processes multiplied by 100,000 increments.
The tasks are completed and the final value of the counter is reported.
In this case, the value is different every time the example is run and is not the expected value of one million.
1 |
165690 |
Running the program again produces a different final value of the count.
1 |
162723 |
The reason the program gives a count value that is different every time is because of a race condition.
The processes step on each other when updating the internal value of the counter.
Consider how the value of the counter is updated:
1 2 |
... self._counter.value += 1 |
Unrolled, this is performing at least 3 operations, they are:
- Read the current counter value into a copy.
- Add one to the copy of the current counter value.
- Replace the current counter value with the updated copy.
If these operations are interleaved among multiple processes, even just two processes, then the value of the counter will be corrupted.
For example, an updated copy of the value may overwrite a stale version or an already updated version of the variable.
For example:
- Process 1: Read the current counter value into a copy.
- Process 1: Add one to the copy of the current counter value.
- Process 2: Read the current counter value into a copy.
- Process 2: Add one to the copy of the current counter value.
- Process 2: Replace the current counter value with the updated copy.
- Process 1: Replace the current counter value with the updated copy.
This is called a race condition.
Next, let’s explore how we might make the counter process-safe using a mutex lock.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Process-Safe Counter
We can update the UnsafeCounter to be process-safe by protecting the critical section, e.g. changes to the instance variable, to be mutually exclusive.
This means that only one process can read or write the variable at a time, and other processes attempting the same action at the same time, must wait their turn.
This can be achieved by protecting reads and changes to the multiprocessing.Value variable with a mutex lock used internally within the Value object.
The updated counter with this change is listed below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# define a counter class SafeCounter(): # constructor def __init__(self): # initialize counter self._counter = Value('i', 0) # increment the counter def increment(self): # get the lock with self._counter.get_lock(): self._counter.value += 1 # get the counter value def value(self): # get the lock with self._counter.get_lock(): return self._counter.value |
This SafeCounter class is process-safe.
Read and writes to the instance variable are protected with a mutex lock and changes to the variable itself are shared among processes, simulated shared memory.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# SuperFastPython.com # example of a process safe counter from multiprocessing import Process from multiprocessing import Value # define a counter class SafeCounter(): # constructor def __init__(self): # initialize counter self._counter = Value('i', 0) # increment the counter def increment(self): # get the lock with self._counter.get_lock(): self._counter.value += 1 # get the counter value def value(self): # get the lock with self._counter.get_lock(): return self._counter.value # task executed by processes def task(counter): # increment the counter for _ in range(100000): counter.increment() # protect the entry point if __name__ == '__main__': # create the shared counter counter = SafeCounter() # create 10 processes to increment the counter processes = [Process(target=task, args=(counter,)) for _ in range(10)] # start all processes for process in processes: process.start() # wait for all processes to finish for process in processes: process.join() # report the value of the counter print(counter.value()) |
Running the example first creates the shared SafeCounter object.
Then, 10 child processes are created and configured to execute our custom task() function and passed the shared counter object.
The main process then starts all processes and waits for them to complete.
Each process executes the custom task() function and increments the shared counter 100,000 times.
All child processes executing their tasks share the same shared Value instance variable. Changes are propagated among processes.
All reads and writes to the instance variable on the SafeCounter are safe, ensuring only a single child process can access or change the variable at a time.
We would expect the final value of the counter to be 1,000,000, e.g. 10 processes multiplied by 100,000 increments.
The tasks are completed and the final value of the counter is reported.
As we expect, the final value of the counter is one million.
This highlights how to develop and use a process-safe counter in Python.
1 |
1000000 |
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to develop a process-safe counter in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Eduardo Arcos on Unsplash
Do you have any questions?