Use a Lock in the Multiprocessing Pool
You can share a multiprocessing.Lock in child worker processes in the multiprocessing pool by using a multiprocessing.Manager.
In this tutorial you will discover how to use a lock in the process pool in Python.
Let's get started.
Need To Use a Lock In The Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
-- multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may need to use a lock shared among all worker processes.
How can we use a mutex lock in the process pool?
What is a Mutex Lock
A mutual exclusion lock or mutex lock is a synchronization primitive intended to prevent a race condition.
A race condition is a concurrency failure case when two threads run the same code and access or update the same resource (e.g. data variables, stream, etc.) leaving the resource in an unknown and inconsistent state.
Race conditions often result in unexpected behavior of a program and/or corrupt data.
These sensitive parts of code that can be executed by multiple threads concurrently and may result in race conditions are called critical sections. A critical section may refer to a single block of code, but it also refers to multiple accesses to the same data variable or resource from multiple functions.
The most commonly used mechanism for ensuring mutual exclusion is a mutual exclusion lock or mutex, or simply lock. A mutex is a special type of object that has support in the underlying hardware. The basic idea is that each critical section is protected by a lock.
-- PAGE 53, AN INTRODUCTION TO PARALLEL PROGRAMMING, 2020.
A mutex lock can be used to ensure that only one thread at a time executes a critical section of code at a time, while all other threads trying to execute the same code must wait until the currently executing thread is finished with the critical section and releases the lock.
Each thread must attempt to acquire the lock at the beginning of the critical section. If the lock has not been obtained, then a thread will acquire it and other threads must wait until the thread that acquired the lock releases it.
If the lock has not been acquired, we might refer to it as being in the “unlocked” state. Whereas if the lock has been acquired, we might refer to it as being in the “locked” state.
- Unlocked: The lock has not been acquired and can be acquired by the next thread that makes an attempt.
- Locked: The lock has been acquired by one thread and any thread that makes an attempt to acquire it must wait until it is released.
Locks are created in the unlocked state.
Now that we know what a mutex lock is, let’s take a look at how we can use it in Python.
How Can We Use a Multiprocessing Lock
Python provides a mutual exclusion lock via the threading.Lock class.
An instance of the lock can be created and then acquired by threads before accessing a critical section, and released after the critical section.
For example:
...
# create a lock
lock = Lock()
# acquire the lock
lock.acquire()
# ...
# release the lock
lock.release()
Only one thread can have the lock at any time. If a thread does not release an acquired lock, it cannot be acquired again.
The thread attempting to acquire the lock will block until the lock is acquired, such as if another thread currently holds the lock then releases it.
We can also use the lock via the context manager protocol via the with statement, allowing the critical section to be a block within the usage of the lock and for the lock to be released automatically once the block has completed.
For example:
...
# create a lock
lock = Lock()
# acquire the lock
with lock:
# ...
This is the preferred usage as it makes it clear where the protected code begins and ends, and ensures that the lock is always released, even if there is an exception or error within the critical section.
You can learn more about how to use process-based mutex locks in the tutorial:
Next, let's look at how we might use mutex locks in a process pool.
How to Use a Lock With the Process Pool
We can create a multiprocessing.Lock instance and share it with the child worker processes in the process pool.
There are perhaps three ways we can share a multiprocessing.Lock instance with worker processes, they are:
- By passing it as an argument when initializing the worker processes.
- By passing it as an argument to tasks executed by the pool.
- By using the 'fork' start method, storing it in a global variable, then having child processes inherit the variable.
The third method, using the 'fork' start method will work, and provides an easy way to share a lock with child worker processes.
You can learn more about inheriting global variables by child processes in the tutorial:
The problem is, the 'fork' start method is not available on all platforms, e.g. it cannot be used on Windows.
Alternately, if we naively pass a multiprocessing.Lock as an argument when initializing the process pool or in a task executed by the process pool, it will fail with an error, such as:
Lock objects should only be shared between processes through inheritance
Instead, we must use a multiprocessing.Manager.
A multiprocessing.Manager creates a process and is responsible for managing a centralized version of an object. It then provides proxy objects that can be used in other processes that keep up-to-date with the single centralized object.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
-- MULTIPROCESSING — PROCESS-BASED PARALLELISM
As such, using a multiprocessing.Manager is a useful way to centralize a synchronization primitive like a multiprocessing.Lock shared among multiple worker processes.
We can first create a multiprocessing.Manager using the context manager interface.
For example:
...
# create the manager
with Manager() as manager:
# ...
We can then create a shared multiprocessing.Lock object using the manager.
This will return a proxy object for the multiprocessing.Lock object in the manager process that we can share among child worker processes directly or indirectly.
For example:
...
# create a shared object via the manager
lock = manager.Lock()
The proxy for the multiprocessing.Lock can then be passed to a child worker initialization function or to a task function executed by worker processes.
Now that we know how to share a multiprocessing.Lock with child worker processes in the process pool, let's look at some worked examples.
Errors When Sharing a Lock With Child Worker Processes
Before we look at an example of how to successfully share a multiprocessing.Lock with child worker processes, let's look at some common failure cases.
Three common errors when attempting to share a multiprocessing.Lock with worker processes are:
- Asynchronous tasks fail silently.
- Asynchronous tasks fail by error callback.
- Synchronous tasks fail with an error.
Let's take a closer look at each failure case in turn.
Using a Lock in the Process Pool Fails Silently
The first common failure case involves issuing tasks asynchronously to the process pool and having them fail silently.
In this example, we define a task function that takes an integer identifier and a lock as an argument. It acquires the lock, generates a random number, blocks for a moment then reports the generated number. A lock is created that ensures that only one task can perform the chosen action at a time. A process pool is created and many tasks are issued asynchronously.
The asynchronous tasks issued to the process pool fail silently.
They fail because a multiprocessing.Lock is passed to the task, which raises an error. The error is silent because the example does not explicitly get the result from the tasks, as there is no result to get.
Firstly, we can define the target task function executed in the process pool.
The function takes an integer identifier as an argument as well as a shared lock. The task acquires the lock using the context manager interface. It generates a random number between 0 and 1, blocks for a fraction of a second to simulate computational effort, then reports both its identifier and generated value.
The task() function below implements this.
# task executed in a worker process
def task(identifier, lock):
# acquire the mutex lock
with lock:
# generate a value
value = random()
# block for a moment
sleep(value)
# report a message
print(f'Task {identifier} completed with {value}', flush=True)
Next, in the main process, we can define a mutex lock.
...
# create the shared lock
lock = Lock()
We then define a process pool with 4 worker processes. In this case we use the context manager interface to ensure the process pool closes automatically once we are finished with it.
...
# create and configure the process pool
with Pool(4) as pool:
# ...
You can learn more about the context manager interface in the tutorial:
We then define a list of task arguments, each item in the list representing a tuple of arguments for one call to the task() function, passing an integer and the shared lock.
...
# prepare task arguments
items = [(i, lock) for i in range(10)]
The items are then issued with calls to the task() function asynchronously via the starmap_async() function that returns an AsyncResult function immediately.
You can learn more about issuing asynchronous tasks to the process pool with starmap_async() in the tutorial:
...
# issue tasks into the process pool
result = pool.starmap_async(task, items)
Finally, the main process blocks until all tasks are completed.
...
# wait for all tasks to finish
result.wait()
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of using a lock in the process pool that fails silently
from random import random
from time import sleep
from multiprocessing import Lock
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier, lock):
# acquire the mutex lock
with lock:
# generate a value
value = random()
# block for a moment
sleep(value)
# report a message
print(f'Task {identifier} completed with {value}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create the shared lock
lock = Lock()
# create and configure the process pool
with Pool(4) as pool:
# prepare task arguments
items = [(i, lock) for i in range(10)]
# issue tasks into the process pool
result = pool.starmap_async(task, items)
# wait for all tasks to finish
result.wait()
Running the example first creates the shared mutex lock.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 10 tasks are issued to the process pool. The main process then blocks until the tasks are complete.
The tasks fail to execute and no messages are reported.
No error messages are reported and the tasks fail silently.
The main process continues on almost immediately.
The tasks failed because a multiprocessing.Lock was shared directly with the child worker processes. An error was raised by each call to the task() function. The error was stored in the returned AsyncResult object, but was not re-raised or checked for, making it appear that the tasks failed silently.
Next, let's look at how we might expose the error that is raised by issuing asynchronous tasks.
RuntimeError in Error Callback Using Lock in the Process Pool
Sharing a multiprocessing.Lock with tasks in the process pool via an argument to the task function will fail with a RuntimeError.
We can explore this error when issuing asynchronous tasks to the pool by using an error callback.
In this example we will update the previous example that fails silently to report the error caused by sharing a mutex lock with child processes in the process pool.
This can be achieved by defining a function that takes an error argument and reports the error message directly.
# error callback function
def custom_error_callback(error):
print(error, flush=True)
Next, we can configure the starmap_async() function to use the error callback function when an exception is raised executing a task in the process pool.
...
# issue tasks into the process pool
result = pool.starmap_async(task, items, error_callback=custom_error_callback)
And that's it.
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of using a lock in the process pool that fails with an error
from random import random
from time import sleep
from multiprocessing import Lock
from multiprocessing.pool import Pool
# error callback function
def custom_error_callback(error):
print(error, flush=True)
# task executed in a worker process
def task(identifier, lock):
# acquire the mutex lock
with lock:
# generate a value
value = random()
# block for a moment
sleep(value)
# report a message
print(f'Task {identifier} completed with {value}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create the shared lock
lock = Lock()
# create and configure the process pool
with Pool(4) as pool:
# prepare task arguments
items = [(i, lock) for i in range(10)]
# issue tasks into the process pool
result = pool.starmap_async(task, items, error_callback=custom_error_callback)
# wait for all tasks to finish
result.wait()
Running the example first creates the shared mutex lock.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 10 tasks are issued to the process pool. The main process blocks until the tasks are complete.
Each issued task fails with a RuntimeError. The first task to raise the error calls the error callback, which reports the error directly.
This highlights how the process pool is configured to not allow mutex locks to be passed directly as arguments.
Lock objects should only be shared between processes through inheritance
Next, let's look at how we might expose the error that is raised by issuing synchronous tasks instead of asynchronous tasks.
RuntimeError Using Lock in the Process Pool
We can update the previous example so that tasks are issued synchronously instead of asynchronously.
Issuing tasks synchronously means that the main process will block until all tasks are complete and the return value results of the tasks will be returned directly. This is important because if at least one task raises an error while executing, it will be automatically re-raised in the caller main process.
This can be achieved by changing the call to starmap_async() which is asynchronous to starmap() which is blocking and synchronous.
...
# issue tasks into the process pool
pool.starmap(task, items)
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of using a lock in the process pool that fails with an exception
from random import random
from time import sleep
from multiprocessing import Lock
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier, lock):
# acquire the lock
with lock:
# generate a value
value = random()
# block for a moment
sleep(value)
# report a message
print(f'Task {identifier} completed with {value}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create the shared lock
lock = Lock()
# create and configure the process pool
with Pool(4) as pool:
# prepare task arguments
items = [(i, lock) for i in range(10)]
# issue tasks into the process pool
pool.starmap(task, items)
Running the example first creates the shared mutex lock.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 10 tasks are issued to the process pool. The main process blocks until the tasks are complete.
Each issued task fails with a RuntimeError, the first of which is re-raised in the main process.
Traceback (most recent call last):
...
RuntimeError: Lock objects should only be shared between processes through inheritance
Now that we have confirmed that we cannot pass a normal multiprocessing.Lock to a task executed in the process pool, let's look at how we might use a multiprocessing.Manager to fix the problem.
Example of Using a Manager to Share a Lock with the Process Pool
We can explore how to use a multiprocessing.Manager to share a multiprocessing.Lock among child worker processes in the process pool.
This can be achieved by updating the first asynchronous example of issuing tasks that failed silently to create the multiprocessing.Lock using a multiprocessing.Manager.
First, a manager can be created using the context manager interface. This ensures that the manager is closed automatically once we are finished with it.
...
# create the manager
with Manager() as manager:
# ...
Next, a multiprocessing.Lock can be created using the multiprocessing.Manager instance.
This will create and host a lock in a new server process and returns a proxy object that can be shared among child worker processes and used to interface with the centralized lock instance.
...
# create the shared lock
lock = manager.Lock()
And that's it.
The complete example is listed below.
# SuperFastPython.com
# example of using a lock in the process pool that use a manager
from random import random
from time import sleep
from multiprocessing import Manager
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier, lock):
# acquire the lock
with lock:
# generate a value
value = random()
# block for a moment
sleep(value)
# report a message
print(f'Task {identifier} completed with {value}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create the manager
with Manager() as manager:
# create the shared lock
lock = manager.Lock()
# create and configure the process pool
with Pool(4) as pool:
# prepare task arguments
items = [(i, lock) for i in range(10)]
# issue tasks into the process pool
result = pool.starmap_async(task, items)
# wait for all tasks to finish
result.wait()
Running the example first creates the shared mutex lock.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 10 tasks are issued to the process pool. The main process then blocks until the tasks are complete.
Each issued task then executes in the process pool. Each attempts to acquire the shared lock, but only one task is able to acquire the lock and execute at a time.
The tasks generate random values between 0 and 1, block for a moment, then report a message with their integer identifier and their generated number.
Note, the random numbers generated will differ each time the code is run.
All tasks complete normally without error.
This highlights how we can share and use a lock among child worker processes in the process pool.
Task 1 completed with 0.08364371395125247
Task 0 completed with 0.9027476093622914
Task 2 completed with 0.18705202828063638
Task 3 completed with 0.10093696106119898
Task 4 completed with 0.7105243179762597
Task 5 completed with 0.32976092193735884
Task 6 completed with 0.28260540683410096
Task 7 completed with 0.5197292680407757
Task 8 completed with 0.6001511476458928
Task 9 completed with 0.9747283876015417
Example of Using a Global Variable to Share a Lock with the Process Pool
An alternate approach to sharing a mutex lock with workers in the process pool is to share the lock as a global variable.
This requires the use of the 'fork' start method for creating new processes in Python.
A limitation of this approach is that the 'fork' start method is not supported on all platforms. For example, 'fork' is not supported on Windows.
In this example, we will update the previous example to use the 'fork' start method and to share the multiprocessing.Lock with all workers in the process pool via a global variable.
Firstly, we update the task() function so that it does not take the lock as an argument, and instead assumes it is available via an inherited global variable.
The updated version of the task() function with this change is listed below.
# task executed in a worker process
def task(identifier):
# acquire the mutex lock
with lock:
# generate a value
value = random()
# block for a moment
sleep(value)
# report a message
print(f'Task {identifier} completed with {value}', flush=True)
Next, in the main process, we can configure the program to use the 'fork' start method when creating new child processes, such as those in the process pool.
...
# set the fork start method
set_start_method('fork')
You can learn more about setting the process start method in the tutorial:
Next, we can create the shared multiprocessing.Lock instance as per normal.
...
# create the shared lock instance
lock = Lock()
This will implicitly be a global variable.
A more explicit approach would be to declare a "lock" as a global variable, then assign it a new multiprocessing.Lock instance.
For example:
...
# declare the global variable
global lock
# assign the global variable
lock = multiprocessing.Lock()
This might be easier to read to newer Python programmers.
We can then create the process pool as per normal, then issue 10 tasks to the process pool asynchronously via the map_async() function and wait for the tasks to complete.
...
# create and configure the process pool
with Pool(4) as pool:
# issue tasks into the process pool
result = pool.map_async(task, range(10))
# wait for all tasks to finish
result.wait()
We use the map_async() function instead of the starmap_async() function because our task() function only has one argument.
You can learn more about the map_async() function in the tutorial:
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of sharing a lock with worker processes using a global variable
from random import random
from time import sleep
from multiprocessing import set_start_method
from multiprocessing import Lock
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# acquire the mutex lock
with lock:
# generate a value
value = random()
# block for a moment
sleep(value)
# report a message
print(f'Task {identifier} completed with {value}', flush=True)
# protect the entry point
if __name__ == '__main__':
# set the fork start method
set_start_method('fork')
# create the shared lock instance
lock = Lock()
# create and configure the process pool
with Pool(4) as pool:
# issue tasks into the process pool
result = pool.map_async(task, range(10))
# wait for all tasks to finish
result.wait()
Running the example first creates the shared mutex lock.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 10 tasks are issued to the process pool asynchronously. The main process then blocks until the tasks are complete.
Each issued task then executes in the process pool. Each attempts to acquire the shared lock, but only one task is able to acquire the lock and execute at a time.
The tasks generate random values between 0 and 1, block for a moment, then report a message with their integer identifier and their generated number.
The lock instance is accessed via the inherited global variable.
Note, the random numbers generated will differ each time the code is run.
All tasks complete normally without error.
This highlights an alternate way of how we can share and use a lock among child worker processes in the process pool.
Task 0 completed with 0.48489757524446275
Task 1 completed with 0.38729028104193575
Task 2 completed with 0.517681970727736
Task 3 completed with 0.9630723259905106
Task 4 completed with 0.5165313442006523
Task 5 completed with 0.7580011775594481
Task 6 completed with 0.9646278592699737
Task 7 completed with 0.8355688108531624
Task 8 completed with 0.15694978830179873
Task 9 completed with 0.8106436115472159
Takeaways
You now know how to use a lock in the process pool in Python.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.