Multiprocessing Manager With a Custom Class
You can use manager to with custom Python classes by creating a Manager class that extends the BaseManager class, registering the custom class with the manager, then using the manager to create a shared instance of the custom class.
In this tutorial you will discover how to use a multiprocessing manager with a custom class in Python.
Let's get started.
Need Manager to Share Custom Class
A manager in the multiprocessing module provides a way to create Python objects that can be shared easily between processes.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
-- multiprocessing — Process-based parallelism
A manager creates a server process that hosts a centralized version of objects that can be shared among multiple processes.
The objects are not shared directly. Instead, the manager creates a proxy object for each object that it manages and the proxy objects are shared among processes.
The proxy objects are used and operate just like the original objects, except that they serialize data, synchronize and coordinate with the centralized version of the object hosted in the manager server process.
A proxy is an object which refers to a shared object which lives (presumably) in a different process. [...] A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy).
-- multiprocessing — Process-based parallelism
This makes managers a process-safe and preferred way to share simple data structures like lists and dicts among processes.
They are also a preferred way to share concurrency primitives among processes, specifically among workers in a multiprocessing pool.
You can learn more about multiprocessing managers in the tutorial:
We may need to use a Manager to host our own custom Python objects.
This may be because our custom classes cannot be pickled and therefore cannot be shared directly with child processes or are perhaps challenging to make process-safe.
How can we use a multiprocessing manager to share our own custom Python objects among multiple processes?
How to Use a Manager Create a Custom Class
We can use a manager to create and manage a centralized version of a custom class.
This requires a few steps:
- Defining a custom class.
- Defining a custom Manager class that extends BaseManager.
- Registering the custom class with the custom manager.
- Creating an instance of the custom manager.
- Creating an instance of the custom class from the manager instance.
Let's take a closer look at each step in turn.
1. Define a Custom Class
The first step is to define our custom class.
This is just a normal Python class. No special Manager-specific changes are required.
The class should be serializable, e.g. picklable.
If we expect the class to be accessed and modified from multiple processes concurrently, then it is a good idea to protect any internal state using a mutex lock.
Otherwise, you run the risk of race conditions that may leave the object's internal state in an inconsistent and unknown condition.
You can learn more about race conditions in objects hosted by a manager in the tutorial:
2. Define a Custom Manager
We must define a custom manager in order to use the manager to create and manage custom classes.
This involves defining a new class that extends the multiprocessing.managers.BaseManager class.
The custom class does not need to override the constructor or any methods, nor define any functionality.
For example:
# custom manager to support custom classes
class CustomManager(BaseManager):
# nothing
pass
3. Register the Custom Class
The next step involves registering the custom class with the custom manager.
This is so when the program requests a new instance of the custom class from the manager, it knows how to create an instance of that class.
This involves calling a class function (static function) on the custom manager class called register().
The register() function takes two arguments, the string name of the class used by the program when requesting an instance and the name of the Python class to create.
This should probably be the same name, but do not have to be.
For example:
...
# register the custom class on the custom manager
CustomManager.register('MyCustomClass', MyCustomClass)
4. Create the Manager
The next step is to create an instance of the custom manager, and start it.
This is so that the program can request an instance of the custom class.
This can be achieved manually by instantiating the class and calling start, for example:
...
# create the custom manager
manager = CustomManager()
# start the custom manager
manager.start()
Later we must close the manager to terminate the server process.
...
# close the manager
manager.close()
The preferred approach is to use the context manager interface and limit usage of the manager to the context manager block.
This will ensure the manager is closed automatically regardless of what happens in our program.
For example:
...
# create and start the custom manager
with CustomManager() as manager:
# ...
5. Create a Custom Class Instance
Finally, we can create an instance of our custom class using the manager.
This will create the object in the Manager's server process and return a proxy object.
The proxy object can then be shared among processes and used to interact with the centralized version of the custom class instance, with all data serialization and process-safety handled automatically under the covers.
For example:
...
# create a shared custom class instance
shared_custom = manager.MyCustomClass()
Now that we know how to create a custom class instance using a manager, let's look at a worked example.
Example of Manager With a Custom Class
We can explore an example of using a manager to create a custom class.
In this example we will define a custom class that has some data in an instance variable, a task function that uses the data and returns a value. We will also have the object record all values that are returned in an internal list so we can inspect it later. We will then use a manager to create a centralized version of the object and share it among multiple processes that will all interact with it.
This will be a good case study that can be adapted to host a centralized version of any arbitrary complex object in a manager and share it among multiple child processes.
Firstly, we can define our custom class.
There is nothing Manager-specific about this class, it's just a normal Python class.
We will call the class MyCustomClass and define a class constructor that takes a data argument and stores it, and also initializes an internal list for storing all return values it generates later.
# custom class
class MyCustomClass():
# constructor
def __init__(self, data):
# store the data in the instance
self.data = data
self.storage = list()
In this class, we will define a task() method.
This method will generate a random value between 0 and 1, block for a moment, then calculate a new value that multiples the generated value by the data instance variable. Finally, it stores a tuple of the instance variable, generated value, and calculated value and returns the calculated value.
The task() method below implements this.
# do something with the data
def task(self):
# generate a random number
value = random()
# block for a moment
sleep(value)
# calculate a new value
new_value = self.data * value
# store everything
self.storage.append((self.data, value, new_value))
# return the new value
return new_value
We will define one more method that provides access to the "storage" list instance variable so we can get a log of all returned values later.
# get all stored values
def get_storage(self):
return self.storage
Tying this together, the complete MyCustomClass class is defined below.
# custom class
class MyCustomClass():
# constructor
def __init__(self, data):
# store the data in the instance
self.data = data
self.storage = list()
# do something with the data
def task(self):
# generate a random number
value = random()
# block for a moment
sleep(value)
# calculate a new value
new_value = self.data * value
# store everything
self.storage.append((self.data, value, new_value))
# return the new value
return new_value
# get all stored values
def get_storage(self):
return self.storage
Next, we need a custom Manager class that will support our custom MyCustomClass class.
This can be achieved by defining a new Manager class that extends the BaseManager and does nothing else.
# custom manager to support custom classes
class CustomManager(BaseManager):
# nothing
pass
Next, we can define a custom function that we will execute in new child processes.
This function will take a shared MyCustomClass object, call the task() method and report the value that is returned.
The work() function is listed below.
# custom function to be executed in a child process
def work(shared_custom):
# call the function on the shared custom instance
value = shared_custom.task()
# report the value
print(f'>child got {value}')
Finally, in the main process, we will first register our MyCustomClass class on our custom Manager class.
This can be achieved by calling the register() class method on our CustomManager class and specifying the name of the class we are registering (which is arbitrary, but we will match the actual class name) and the actual Python class it maps to.
...
# register the custom class on the custom manager
CustomManager.register('MyCustomClass', MyCustomClass)
Next, we can create and start an instance of our custom manager.
In this case, we will use the context manager interface.
...
# create a new manager instance
with CustomManager() as manager:
# ...
We will then use the manager instance to create a new instance of our custom class and initialize it with a value, in this case 10, which will be stored in an instance variable.
...
# create a shared custom class instance
shared_custom = manager.MyCustomClass(10)
We will share this object among child processes.
But first, let's confirm that it is working correctly by calling the task() method in the main process and reporting the generated value.
...
# call the function on the shared custom instance
value = shared_custom.task()
# report the value
print(f'>main got {value}')
Next, we will create and start 4 child processes configured to execute our work() function and pass the shared MyCustomClass instance as an argument.
...
# start some child processes
processes = [Process(target=work, args=(shared_custom,)) for i in range(4)]
# start processes
for process in processes:
process.start()
The main process then blocks until all child processes have finished.
...
# wait for processes to finish
for process in processes:
process.join()
The main process then reports a message, then accesses the "storage" instance variable on the shared MyCustomClass object and reports all tuples stored in the centralized object.
We expect there to be 5 items and they will match the values returned by calling the task() method in the main process and in each child process, confirming that we are working with a single centralized instance of the custom class.
...
# all done
print('Done')
# report all values stored in the central object
for t in shared_custom.get_storage():
print(t)
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of using a manager to create a custom class
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing.managers import BaseManager
# custom class
class MyCustomClass():
# constructor
def __init__(self, data):
# store the data in the instance
self.data = data
self.storage = list()
# do something with the data
def task(self):
# generate a random number
value = random()
# block for a moment
sleep(value)
# calculate a new value
new_value = self.data * value
# store everything
self.storage.append((self.data, value, new_value))
# return the new value
return new_value
# get all stored values
def get_storage(self):
return self.storage
# custom manager to support custom classes
class CustomManager(BaseManager):
# nothing
pass
# custom function to be executed in a child process
def work(shared_custom):
# call the function on the shared custom instance
value = shared_custom.task()
# report the value
print(f'>child got {value}')
# protect the entry point
if __name__ == '__main__':
# register the custom class on the custom manager
CustomManager.register('MyCustomClass', MyCustomClass)
# create a new manager instance
with CustomManager() as manager:
# create a shared custom class instance
shared_custom = manager.MyCustomClass(10)
# call the function on the shared custom instance
value = shared_custom.task()
# report the value
print(f'>main got {value}')
# start some child processes
processes = [Process(target=work, args=(shared_custom,)) for i in range(4)]
# start processes
for process in processes:
process.start()
# wait for processes to finish
for process in processes:
process.join()
# all done
print('Done')
# report all values stored in the central object
for t in shared_custom.get_storage():
print(t)
Running the example first registers the custom class with the custom manager class via a class function.
Next, an instance of the custom manager class is created and started, starting a new server process for the manager.
A single centralized instance of our MyCustomClass class is then created using the manager and a proxy object for the instance is then returned.
The proxy object for the custom class is then used in the main process and a method on the object is executed, returning a value that is reported.
Four new child processes are then created, configured and started, executing a custom function that takes the proxy object for our custom class as an argument. Each calls a method on the centralized object via the proxy and reports the generated value.
The proxy objects ensure that all method calls on the centralized object are process-safe.
Each time the task() method is called on the centralized custom class, the generated and calculated values are stored in an internal list.
Finally, the main process accesses the list instance variable on the custom class and reports the log of all values generated and reported.
These match the values reported by the main process and child processes, in the order they were reported. This log provides confirmation that the shared object is indeed centralized on the manager and that all processes interact with the same instance via proxy objects.
>main got 0.5875523348927492
>child got 1.262028562953147
>child got 2.5299182522080277
>child got 2.630220125408651
>child got 8.506195991560324
Done
(10, 0.058755233489274916, 0.5875523348927492)
(10, 0.1262028562953147, 1.262028562953147)
(10, 0.25299182522080277, 2.5299182522080277)
(10, 0.2630220125408651, 2.630220125408651)
(10, 0.8506195991560324, 8.506195991560324)
Takeaways
You now know how to use a manager with a custom class.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.