Process-Safe Counter in Python
You can develop a process-safe counter class using a multiprocessing.Value and a mutex lock.
In this tutorial, you will discover how to develop a process-safe counter in Python.
Let's get started.
Need a Process-Safe Counter
A counter is an object that maintains a private variable that changes via methods, e.g. incremented, and accessed.
We often need to share a custom counter object among many processes, such as when using process pools via the Pool or ProcessPoolExecutor class or child processes via the multiprocessing.Process class.
This is challenging because, by default, for two reasons:
- Instance variables of an object cannot be shared between processes.
- If the object is updated to use shareable variables, they may suffer race conditions.
You can learn more about how instance variables are not shared among processes in the tutorial:
You can learn more about race conditions among processes in the tutorial:
As such, creating a counter object and sharing and using it in multiple processes can be difficult.
This problem represents a general class of problems, namely safely sharing custom objects when using process-based concurrency in Python.
How can we share a custom class with private instance variables among processes in Python?
Note, developing a process-safe counter is very different to developing a thread-safe counter because of the lack of shared memory between processes. You can learn more about developing a thread-safe counter in the tutorial:
Next, let's look at how we can develop a process-safe counter in Python.
How to Develop a Process-Safe Counter
We can develop a process safe counter using two capabilities provided by the multiprocessing module, shared ctypes, and a mutex lock.
A shared ctype can be used as a primitive variable that can be shared among processes, where changes will be propagated and visible to all processes.
A mutex lock can be used to protect access and changes to the instance variables of the custom class to ensure changes to the variables are process-safe, e..g to avoid race conditions.
- Use a shared ctype for sharable instance variables.
- Use a mutex lock to protect instance variables against race conditions.
Let's explore the steps to developing a process-safe counter
Step 1: Process Unsafe Counter (not shared)
First;y, let's define an unsafe UnsafeCounter class that we might use in a single-process program.
The class constructor defines and initializes a private instance variable for the count that is to be maintained.
An increment() method adds one to the counter instance variable and a value() method retrieves the current value of the counter.
# define a counter
class UnsafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = 0
# increment the counter
def increment(self):
self._counter += 1
# get the counter value
def value(self):
return self._counter
Sharing an instance of this class among processes will not work as intended.
Changes to the instance variable in each process will not be propagated to other processes.
Step 2: Change Variable to Shared ctype (race)
We can change the private instance variable to a shared ctype.
This is a data type that can be a primitive value, like an integer or floating point value, and changes will be propagated between processes.
This can be achieved by using the multiprocessing.Value class and configuring it to hold an integer value initialized to zero.
For example:
...
# initialize counter
self._counter = Value('i', 0)
You can learn more about shared ctypes in the tutorial:
The integer within the multiprocessing.Value object can be accessed and modified via the "value" attribute.
For example, the updated class with this change to use a multiprocessing.Value is listed below:
# define a counter
class UnsafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = Value('i', 0)
# increment the counter
def increment(self):
self._counter.value += 1
# get the counter value
def value(self):
return self._counter.value
This is an improvement but still has a problem.
Changes to the instance variable are not process-safe, leading to race conditions.
Step 3: Protect Instance Variable With Mutex (process safe)
We can use a mutex lock to protect access and changes to the private instance variable in the counter object.
One approach would be to create an instance of a multiprocessing.Lock class and use it in methods whenever the private instance variable is being accessed or modified.
For example:
# define a counter
class SafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = Value('i', 0)
# initialize lock
self._lock = Lock()
# increment the counter
def increment(self):
# get the lock
with self._lock:
self._counter.value += 1
# get the counter value
def value(self):
with self._lock:
return self._counter.value
You can learn more about mutex locks in the tutorial:
This approach would be effective if we had multiple instance variables to protect in the custom class.
In this case, we do not need to create and manage a new mutex lock.
The reason is the multiprocessing.Value already has a lock within it that we can use.
It is accessible via the get_lock() method.
For example:
# define a counter
class SafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = Value('i', 0)
# increment the counter
def increment(self):
# get the lock
with self._counter.get_lock():
self._counter.value += 1
# get the counter value
def value(self):
# get the lock
with self._counter.get_lock():
return self._counter.value
We probably don't need to protect the access to the value of the counter using the lock as it likely uses the lock internally. Nevertheless, I've left it in for consistency (i.e. the default lock used by a multiprocessing.Value is a reentrant lock).
And that's it.
We can now see how to develop a process-safe counter.
Next, let's look at some worked examples of working with process-unsafe and process-safe counters in Python.
Example of Process-Unsafe Counter (Not Shared Correctly)
We can explore developing a process-unsafe counter where changes to the instance variables are not propagated between processes.
This can be achieved by defining a normal Python class with one instance variable, as we did above.
For example:
# define a counter
class UnsafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = 0
# increment the counter
def increment(self):
self._counter += 1
# get the counter value
def value(self):
return self._counter
We can then define a target task function that takes the shared counter object as an argument and iterates it many times, incrementing the counter each iteration.
# task executed by processes
def task(counter):
# increment the counter
for _ in range(100000):
counter.increment()
We can then start many processes that share the same counter object and execute the same task at the same time in parallel.
# protect the entry point
if __name__ == '__main__':
# create the shared counter
counter = UnsafeCounter()
# create 10 processes to increment the counter
processes = [Process(target=task, args=(counter,)) for _ in range(10)]
# start all processes
for process in processes:
process.start()
# wait for all processes to finish
for process in processes:
process.join()
# report the value of the counter
print(counter.value())
The complete example is listed below.
# SuperFastPython.com
# example of a process unsafe counter that is not shared correctly
from multiprocessing import Process
# define a counter
class UnsafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = 0
# increment the counter
def increment(self):
self._counter += 1
# get the counter value
def value(self):
return self._counter
# task executed by processes
def task(counter):
# increment the counter
for _ in range(100000):
counter.increment()
# protect the entry point
if __name__ == '__main__':
# create the shared counter
counter = UnsafeCounter()
# create 10 processes to increment the counter
processes = [Process(target=task, args=(counter,)) for _ in range(10)]
# start all processes
for process in processes:
process.start()
# wait for all processes to finish
for process in processes:
process.join()
# report the value of the counter
print(counter.value())
Running the example first creates the shared UnsafeCounter object.
Then, 10 child processes are created and configured to execute our custom task() function and passed the shared counter object.
The main process then starts all processes and waits for them to complete.
Each process executes the custom task() function and increments the shared counter 100,000 times.
We would expect the final value of the counter to be 1,000,000, e.g. 10 processes multiplied by 100,000 increments.
The tasks are completed and the final value of the counter is reported.
In this case, the counter is zero.
0
The reason this example failed is that each child process receives a new copy of the UnsafeCounter object.
Each process does increment the counter, but locally. The changes are not shared among processes or with the main process.
The UnsafeCounter object in the main process is never incremented and therefore remains at the value of zero.
Next, let's look at updating the counter to use instance variables whose changes are propagated among processes.
Example of Process-Unsafe Counter (With Race Conditions)
We can update the custom counter object to use instance variables that are shared among multiple processes.
In this example, we will change the instance variable to be an instance of a multiprocessing.Value object.
Changes to the Value object are propagated to all processes that share the object. If the Value object is an instance variable within an UnsafeCounter object, then all processes that share the UnsafeCounter will share the same instance variable.
For example:
# define a counter
class UnsafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = Value('i', 0)
# increment the counter
def increment(self):
self._counter.value += 1
# get the counter value
def value(self):
return self._counter.value
The complete example with this change is listed below.
# SuperFastPython.com
# example of a process unsafe counter with race conditions
from multiprocessing import Process
from multiprocessing import Value
# define a counter
class UnsafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = Value('i', 0)
# increment the counter
def increment(self):
self._counter.value += 1
# get the counter value
def value(self):
return self._counter.value
# task executed by processes
def task(counter):
# increment the counter
for _ in range(100000):
counter.increment()
# protect the entry point
if __name__ == '__main__':
# create the shared counter
counter = UnsafeCounter()
# create 10 processes to increment the counter
processes = [Process(target=task, args=(counter,)) for _ in range(10)]
# start all processes
for process in processes:
process.start()
# wait for all processes to finish
for process in processes:
process.join()
# report the value of the counter
print(counter.value())
Running the example first creates the shared UnsafeCounter object.
Then, 10 child processes are created and configured to execute our custom task() function and passed the shared counter object.
The main process then starts all processes and waits for them to complete.
Each process executes the custom task() function and increments the shared counter 100,000 times.
All child processes executing their tasks share the same shared Value instance variable. Changes are propagated among processes.
We would expect the final value of the counter to be 1,000,000, e.g. 10 processes multiplied by 100,000 increments.
The tasks are completed and the final value of the counter is reported.
In this case, the value is different every time the example is run and is not the expected value of one million.
165690
Running the program again produces a different final value of the count.
162723
The reason the program gives a count value that is different every time is because of a race condition.
The processes step on each other when updating the internal value of the counter.
Consider how the value of the counter is updated:
...
self._counter.value += 1
Unrolled, this is performing at least 3 operations, they are:
- Read the current counter value into a copy.
- Add one to the copy of the current counter value.
- Replace the current counter value with the updated copy.
If these operations are interleaved among multiple processes, even just two processes, then the value of the counter will be corrupted.
For example, an updated copy of the value may overwrite a stale version or an already updated version of the variable.
For example:
- Process 1: Read the current counter value into a copy.
- Process 1: Add one to the copy of the current counter value.
- Process 2: Read the current counter value into a copy.
- Process 2: Add one to the copy of the current counter value.
- Process 2: Replace the current counter value with the updated copy.
- Process 1: Replace the current counter value with the updated copy.
This is called a race condition.
Next, let's explore how we might make the counter process-safe using a mutex lock.
Example of Process-Safe Counter
We can update the UnsafeCounter to be process-safe by protecting the critical section, e.g. changes to the instance variable, to be mutually exclusive.
This means that only one process can read or write the variable at a time, and other processes attempting the same action at the same time, must wait their turn.
This can be achieved by protecting reads and changes to the multiprocessing.Value variable with a mutex lock used internally within the Value object.
The updated counter with this change is listed below:
# define a counter
class SafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = Value('i', 0)
# increment the counter
def increment(self):
# get the lock
with self._counter.get_lock():
self._counter.value += 1
# get the counter value
def value(self):
# get the lock
with self._counter.get_lock():
return self._counter.value
This SafeCounter class is process-safe.
Read and writes to the instance variable are protected with a mutex lock and changes to the variable itself are shared among processes, simulated shared memory.
The complete example is listed below.
# SuperFastPython.com
# example of a process safe counter
from multiprocessing import Process
from multiprocessing import Value
# define a counter
class SafeCounter():
# constructor
def __init__(self):
# initialize counter
self._counter = Value('i', 0)
# increment the counter
def increment(self):
# get the lock
with self._counter.get_lock():
self._counter.value += 1
# get the counter value
def value(self):
# get the lock
with self._counter.get_lock():
return self._counter.value
# task executed by processes
def task(counter):
# increment the counter
for _ in range(100000):
counter.increment()
# protect the entry point
if __name__ == '__main__':
# create the shared counter
counter = SafeCounter()
# create 10 processes to increment the counter
processes = [Process(target=task, args=(counter,)) for _ in range(10)]
# start all processes
for process in processes:
process.start()
# wait for all processes to finish
for process in processes:
process.join()
# report the value of the counter
print(counter.value())
Running the example first creates the shared SafeCounter object.
Then, 10 child processes are created and configured to execute our custom task() function and passed the shared counter object.
The main process then starts all processes and waits for them to complete.
Each process executes the custom task() function and increments the shared counter 100,000 times.
All child processes executing their tasks share the same shared Value instance variable. Changes are propagated among processes.
All reads and writes to the instance variable on the SafeCounter are safe, ensuring only a single child process can access or change the variable at a time.
We would expect the final value of the counter to be 1,000,000, e.g. 10 processes multiplied by 100,000 increments.
The tasks are completed and the final value of the counter is reported.
As we expect, the final value of the counter is one million.
This highlights how to develop and use a process-safe counter in Python.
1000000
Takeaways
You now know how to develop a process-safe counter in Python.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.