Last Updated on September 12, 2022
You can use a manager to create hosted versions of concurrency primitives. It will return a proxy object that can be pickled and provide process-safe access to the hosted concurrency primitive objects.
In this tutorial you will discover how to use a multiprocessing manager to share concurrency primitives in Python.
Let’s get started.
Need Manager to Share Concurrency Primitives
A manager in the multiprocessing module provides a way to create Python objects that can be shared easily between processes.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
— multiprocessing — Process-based parallelism
A manager creates a server process that hosts a centralized version of objects that can be shared among multiple processes.
The objects are not shared directly. Instead, the manager creates a proxy object for each object that it manages and the proxy objects are shared among processes.
The proxy objects are used and operate just like the original objects, except that they serialize data, synchronize and coordinate with the centralized version of the object hosted in the manager server process.
A proxy is an object which refers to a shared object which lives (presumably) in a different process. […] A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy).
— multiprocessing — Process-based parallelism
This makes managers a process-safe and preferred way to share Python objects safely and seamlessly among processes.
You can learn more about multiprocessing managers in the tutorial:
We may need to use a Manager to host concurrency primitives, like a mutex lock.
This is because concurrency primitives cannot be pickled, and we may have to pickle objects when sending them to a child process such as via a Pipe, via Queue or as an argument to a task in a process pool.
How can we use a multiprocessing manager to share concurrency primitives among child processes?
Run loops using all CPUs, download your FREE book to learn how.
How to Use a Manager to Share Concurrency Primitives
Concurrency primitives can be created in a Manager directly and shared among child processes.
Managers are provided via the multiprocessing.Manager class, which creates and returns a multiprocessing.managers.SyncManager instance.
The SyncManager allows a suite of Python objects to be created and managed by default, including concurrency primitives such as:
- Lock
- Event
- Condition
- Semaphore
- BoundedSemaphore
- Barrier
Therefore, we can create a Manager instance and use it directly to create a concurrency primitive.
This will create a hosted version of the primitive in the Manager‘s server process and return proxy objects for the primitive that can be pickled and shared with child processes.
For example, we can create a shared mutex lock with a manager as follows:
1 2 3 4 5 |
# create the manager with Manager() as manager: # create the shared mutex lock semaphore = manager.Lock() # ... |
We might create a shared semaphore with a manager as follows:
1 2 3 4 5 |
# create the manager with Manager() as manager: # create the shared semaphore semaphore = manager.Semaphore(10) # ... |
Now that we know how to use a manager to create shared concurrency primitives, let’s look at some worked examples.
Example of Using a Manager for Shared Lock
We can use a manager to share a mutex lock safely with multiple processes.
In this example we will define a task that is constrained by a mutex lock so that only one instance of the task can run at any one time. We will then execute the target from many child processes in parallel. The lock will be shared with each process to ensure that the critical section within the task is protected.
Firstly, we can define a function to execute in child processes.
The function will take an integer number argument and the shared lock. It will then generate a random number between 0 and 1, block for a fraction of a second, then report the number that was generated.
This critical section is contrived and provides a template that you can adapt in your own projects.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a new process def task(number, shared_lock): # acquire the shared lock with shared_lock: # generate a number between 0 and 1 value = random() # block for a fraction of a second sleep(value) # report the generated value print(f'{number} got {value}') |
Note that we are acquiring the lock using the context manager interface that ensures that it is released once we are finished with it.
If you are new to using mutex locks in process-based concurrency, you can get started here:
Next, in the main process, we will create and start a Manager instance using the context manager interface.
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
We can then use the manager to create a shared mutex lock.
The Lock object will be hosted in the manager server process and returns proxy objects that can be shared among child processes, can be pickled if needed, and used to interact with the Lock in a process-safe manner.
1 2 3 |
... # create the shared mutex lock shared_lock = manager.Lock() |
Next, we can create many multiprocessing.Process instances configured to run our task() function and pass an integer and the shared Lock object as arguments.
This can be achieved in a list comprehension that will create a list of configured Process instances.
1 2 3 |
... # create many child processes processes = [Process(target=task, args=(i, shared_lock)) for i in range(10)] |
The main process can then iterate through the processes and start each in turn.
1 2 3 4 |
... # start all processes for process in processes: process.start() |
The main process can then wait until all the processes finish.
This can be achieved by traversing the list of running Process instances and calling the join() function on each. This will block until the Process instance finishes, and once all processes have been joined we know that they have all terminated.
1 2 3 4 |
... # wait for all processes to complete for process in processes: process.join() |
If you are new to joining processes, you can learn more in this tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of using a manager to share a muted lock with child processes from time import sleep from random import random from multiprocessing import Process from multiprocessing import Manager # task executed in a new process def task(number, shared_lock): # acquire the shared lock with shared_lock: # generate a number between 0 and 1 value = random() # block for a fraction of a second sleep(value) # report the generated value print(f'{number} got {value}') # protect the entry point if __name__ == '__main__': # create the manager with Manager() as manager: # create the shared mutex lock shared_lock = manager.Lock() # create many child processes processes = [Process(target=task, args=(i, shared_lock)) for i in range(10)] # start all processes for process in processes: process.start() # wait for all processes to complete for process in processes: process.join() |
Running the example first creates the manager using the context manager interface.
Next, the manager is used to create the shared Lock object.
This creates a centralized version of the Lock on the manager’s server process and returns proxy objects for interacting with the Lock.
Next, 10 child processes are created and configured to call our custom task() function with the shared lock, then the processes are started. The main process then blocks until all child processes complete.
Each process first attempts to acquire the Lock via the Lock‘s proxy object. Only one child process can acquire the lock at a time, whilst all other processes must wait their turn.
Once the Lock is acquired, the process will generate a random number, block for a fraction of a second then report a message.
All tasks complete and the main process unblocks and the program terminates.
Note: Results will vary each time the program is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 |
2 got 0.5864368625706725 0 got 0.8103738265597983 4 got 0.5730217503246685 5 got 0.2079962531879581 3 got 0.20031901361695348 1 got 0.2591675313220979 6 got 0.060164383845161096 8 got 0.3615848478753587 7 got 0.26580907843880375 9 got 0.7784274416480854 |
Next, let’s look at an example where we might share a semaphore with workers in the multiprocessing Pool.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Example of Using a Manager for Shared Semaphore
We can use a manager to share a semaphore among workers in the multiprocessing pool.
Like all concurrency primitives, the semaphore cannot be pickled.
This is a problem when using the multiprocessing.Pool class to execute an ad hoc function because all arguments to tasks in the pool must be pickled.
As such, passing a semaphore to the Pool will result in an RuntimeError, as follows:
1 |
RuntimeError: Semaphore objects should only be shared between processes through inheritance |
Instead, we must use a Manager to create a hosted centralized version of the Semaphore and then share it with workers in the multiprocessing Pool.
You can learn more about sharing semaphores in the multiprocessing pool in the tutorial:
In this example we will define a task that is constrained by a semaphore so that only two instances of the task can run in parallel at any one time. The tasks will be executed by workers in the multiprocessing pool and a manager will be used to create a centralized semaphore that can be used safely in the child worker processes of the pool.
We can update the task() function from the previous section to take a shared semaphore as an argument, instead of a shared lock.
The semaphore can be acquired using the context manager interface, like the Lock class, so the changes are minimal.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a new process def task(number, semaphore): # acquire the shared semaphore with semaphore: # generate a number between 0 and 1 value = random() # block for a fraction of a second sleep(value) # report the generated value print(f'{number} got {value}') |
Next, in the main process, we can use the Manager to create a shared semaphore instance.
This will create the Semaphore in the Manager‘s server process and return a proxy object that can be shared with processes and pickled.
THis is important as we will share it with workers in the multiprocessing pool that require that arguments are pickled.
1 2 3 |
... # create the shared semaphore semaphore = manager.Semaphore(2) |
Next, we can create a multiprocessing Pool instance. We will use the context manager interface to ensure that the Pool is closed once we are finished with it.
1 2 3 4 |
... # create the shared pool with Pool() as pool: # ... |
You can learn more about the context manager for the multiprocessing pool in the tutorial:
Finally, we can issue tasks to the Pool to execute our task() function in parallel with the shared semaphore.
Because we have two arguments to the target function, we can issue tasks to the pool using the starmap() method.
This method requires that we specify the function to execute and provide an iterable of arguments. The iterable must contain one iterable for each call to the function with the arguments for the call, such as in a tuple.
These arguments can be prepared in a list comprehension with 10 items for the 10 calls to the task() function.
1 2 3 |
... # prepare arguments for task args = [(i,semaphore) for i in range(10)] |
We can then issue the tasks with the starmap() method. This will block until all tasks are complete.
1 2 3 |
... # issue many tasks to the process pool pool.starmap(task, args) |
You can learn more about issuing tasks to the multiprocessing pool with the starmap() method in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of shared semaphore using a manager from time import sleep from random import random from multiprocessing import Manager from multiprocessing import Pool # task executed in a new process def task(number, semaphore): # acquire the shared semaphore with semaphore: # generate a number between 0 and 1 value = random() # block for a fraction of a second sleep(value) # report the generated value print(f'{number} got {value}', flush=True) # protect the entry point if __name__ == '__main__': # create the manager with Manager() as manager: # create the shared semaphore semaphore = manager.Semaphore(2) # create the shared pool with Pool() as pool: # prepare arguments for task args = [(i,semaphore) for i in range(10)] # issue many tasks to the process pool pool.starmap(task, args) |
Running the example first creates the manager using the context manager interface.
Next, the manager is used to create the shared semaphore with two positions, returning a proxy object for the hosted object that can be shared among processes.
A multiprocessing pool is then created with the default number of workers. A list of tuples is then created as arguments for the task function, one tuple for each call to the task function.
The calls to the task() function with arguments are then issued to the multiprocessing pool using the starmap() function and the main process blocks until the tasks are completed.
Each task attempts to acquire the semaphore, but only two tasks are able to acquire it at a time. Tasks generate a random number, block for a fraction of a second then report their generated number.
All tasks complete and the multiprocessing pool is shutdown automatically via the context manager interface, followed by the manager.
If the semaphore was not created using the manager, an error would be raised by the multiprocessing pool indicating that a semaphore cannot be passed directly to child worker processes.
Note: Results will vary each time the program is run given the use of random numbers.
1 2 3 4 5 6 7 8 |
4 got 0.6477163269260953 3 got 0.36257010063693895 5 got 0.5121752319871548 2 got 0.795369090090284 6 got 0.4784123199328061 7 got 0.480959275085588 1 got 0.6841684456946335 9 got 0.5539105856832549 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use a multiprocessing manager to share concurrency primitives in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Austin Neill on Unsplash
Do you have any questions?