Last Updated on September 12, 2022
You can share a multiprocessing.Event in child worker processes in the multiprocessing pool by using a multiprocessing.Manager.
In this tutorial you will discover how to use an event in the process pool in Python.
Let’s get started.
Need an Event In The Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may need to use an Event shared among all worker processes.
How can we use an event in the process pool?
Run loops using all CPUs, download your FREE book to learn how.
What is an Event
An event is a process-safe boolean variable.
Process-safe means that the event can be checked and changed from multiple processes concurrently without concern for a race condition, corruption or inconsistency.
An event may be shared among multiple processes and kept automatically up-to-date, meaning one process may set the event to True, and another process may check the status of the event and see that it has been set True, then take action.
It provides an easy way to encapsulate a boolean flag or state variable that can be shared and used with process-based concurrency.
How to Use an Event
Python provides an event object for processes via the multiprocessing.Event class.
An event is a simple concurrency primitive that allows communication between processes.
A multiprocessing.Event object wraps a boolean variable that can either be “set” (True) or “not set” (False). Processes sharing the event instance can check if the event is set, set the event, clear the event (make it not set), or wait for the event to be set.
The multiprocessing.Event provides an easy way to share a boolean variable between threads that can act as a trigger for an action.
First, an event object must be created and the event will be in the “not set” state.
1 2 3 |
... # create an instance of an event event = multiprocessing.Event() |
Once created we can check if the event has been set via the is_set() function which will return True if the event is set, or False otherwise.
For example:
1 2 3 4 |
... # check if the event is set if event.is_set(): # do something... |
The event can be set via the set() function. Any threads waiting on the event to be set will be notified.
For example:
1 2 3 |
... # set the event event.set() |
Finally, threads can wait for the event to set via the wait() function. Calling this function will block until the event is marked as set (e.g. another process calling the set() function). If the event is already set, the wait() function will return immediately.
1 2 3 |
... # wait for the event to be set event.wait() |
You can learn more about how to use an event in the tutorial:
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
How to Use an Event in the Process Pool
We can create a multiprocessing.Event instance and share it with the child worker processes in the process pool.
There are perhaps three ways we can share a multiprocessing.Event instance with worker processes, they are:
- By passing it as an argument when initializing the worker processes.
- By passing it as an argument to tasks executed by the pool.
- By using the ‘fork’ start method, storing it in a global variable, then having child processes inherit the variable.
The third method, using the ‘fork’ start method will work, and provides an easy way to share an event with child worker processes.
You can learn more about inheriting global variables by child processes in the tutorial:
The problem is, the ‘fork’ start method is not available on all platforms, e.g. it cannot be used on Windows.
Alternately, if we naively pass a multiprocessing.Event as an argument when initializing the process pool or in a task executed by the process pool, it will fail with an error, such as:
1 |
Condition objects should only be shared between processes through inheritance |
The error likely mentions a “event object” because internally, the multiprocessing.Event makes use of a multiprocessing.Condition object.
Instead, we must use a multiprocessing.Manager.
A multiprocessing.Manager creates a process and is responsible for managing a centralized version of an object. It then provides proxy objects that can be used in other processes that keep up-to-date with the single centralized object.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
— MULTIPROCESSING — PROCESS-BASED PARALLELISM
As such, using a multiprocessing.Manager is a useful way to centralize a synchronization primitive like a multiprocessing.Event shared among multiple worker processes.
We can first create a multiprocessing.Manager using the context manager interface.
For example:
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
We can then create a shared multiprocessing.Event object using the manager.
This will return a proxy object for the multiprocessing.Event object in the manager process that we can share among child worker processes directly or indirectly.
For example:
1 2 3 |
... # create a shared object via the manager event = manager.Event() |
The proxy for the multiprocessing.Event can then be passed to a child worker initialization function or to a task function executed by worker processes.
Now that we know how to share a multiprocessing.Event with child worker processes in the process pool, let’s look at some worked examples.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Errors Using an Event in the Process Pool
Before we look at an example of how to successfully share a multiprocessing.Event with child worker processes, let’s look at some common failure cases.
Three common errors when attempting to share a multiprocessing.Event with worker processes are:
- Asynchronous tasks fail silently.
- Asynchronous tasks fail by error callback.
- Synchronous tasks fail with an error.
Let’s take a closer look at each failure case in turn.
Using an Event in the Process Pool Fails Silently
The first common failure case involves issuing tasks asynchronously to the process pool and having them fail silently.
In this example, we will define a task that must wait for the event to be set before it can be started. It allows us to issue multiple concurrent tasks into the process pool all at once and have them wait until we are ready, then have all issued tasks start at the same time.
In this example, we define a task function that takes an integer identifier and an event as an argument. It then waits on the event to be set before starting. Once started, it generates a random number, blocks for a moment then reports the generated number. A process pool is created and many tasks are issued asynchronously. The tasks start, but wait for the event to be set continuing. The main process sets the event, then waits for them to complete.
The asynchronous tasks issued to the process pool fail silently.
They fail because a multiprocessing.Event is passed to the task, which raises an error. The error is silent because the example does not explicitly get the result from the tasks, as there is no result to get.
Firstly, we can define the target task function executed in the process pool.
The function takes an integer identifier as an argument as well as a shared event object. The task then reports a message then waits for the event to be set. Once the event is set, the task generates a random number between 0 and 1, blocks for a fraction of a second to simulate computational effort, then reports both its identifier and generated value.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a worker process def task(identifier, event): # wait for the event to be set print(f'Task {identifier} waiting...', flush=True) event.wait() # generate a value value = random() # block for a moment sleep(value) # report a message print(f'Task {identifier} completed with {value}', flush=True) |
Next, in the main process, we can define an event.
1 2 3 |
... # create the shared event event = Event() |
We then define a process pool with 4 worker processes. In this case we use the context manager interface to ensure the process pool closes automatically once we are finished with it.
1 2 3 4 |
... # create and configure the process pool with Pool(4) as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We then define a list of task arguments, each item in the list representing a tuple of arguments for one call to the task() function, passing an integer and the shared event.
1 2 3 |
... # prepare task arguments items = [(i, event) for i in range(4)] |
The items are then issued with calls to the task() function asynchronously via the starmap_async() function that returns an AsyncResult function immediately.
You can learn more about issuing asynchronous tasks to the process pool with starmap_async() in the tutorial:
1 2 3 |
... # issue tasks into the process pool result = pool.starmap_async(task, items) |
The main process then blocks for a moment to allow all issued tasks to start in the process pool and wait for the event to be set.
1 2 3 |
... # wait a moment sleep(1) |
The main process then reports a message then sets the event to true, allowing all waiting tasks to start.
1 2 3 4 |
... # start all issued tasks print('Setting event.', flush=True) event.set() |
Finally, the main process blocks until all tasks are completed.
1 2 3 4 |
... # wait for all tasks to finish result.wait() print('All done.', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of using an event in the process pool that fails silently from random import random from time import sleep from multiprocessing import Event from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, event): # wait for the event to be set print(f'Task {identifier} waiting...', flush=True) event.wait() # generate a value value = random() # block for a moment sleep(value) # report a message print(f'Task {identifier} completed with {value}', flush=True) # protect the entry point if __name__ == '__main__': # create the shared event event = Event() # create and configure the process pool with Pool(4) as pool: # prepare task arguments items = [(i, event) for i in range(4)] # issue tasks into the process pool result = pool.starmap_async(task, items) # wait a moment sleep(1) # start all issued tasks print('Setting event.', flush=True) event.set() # wait for all tasks to finish result.wait() print('All done.', flush=True) |
Running the example first creates the shared event.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 4 tasks are issued to the process pool. The main process then sleeps for a moment.
The tasks fail to execute and no messages are reported.
No error messages are reported and the tasks fail silently.
The main process continues on. It sets the event, then waits for the issued tasks to complete. The tasks have already failed, therefore the main process finishes nearly immediately.
The tasks failed because a multiprocessing.Event was shared directly with the child worker processes. An error was raised by each call to the task() function. The error was stored in the returned AsyncResult object, but was not re-raised or checked for, making it appear that the tasks failed silently.
1 2 |
Setting event. All done. |
Next, let’s look at how we might expose the error that is raised by issuing asynchronous tasks.
RuntimeError in Error Callback Using Event in the Process Pool
Sharing a multiprocessing.Event with tasks in the process pool via an argument to the task function will fail with a RuntimeError.
We can explore this error when issuing asynchronous tasks to the pool by using an error callback.
In this example we will update the previous example that fails silently to report the error caused by sharing an event with child processes in the process pool.
This can be achieved by defining a function that takes an error argument and reports the error message directly.
1 2 3 |
# error callback function def custom_error_callback(error): print(error, flush=True) |
Next, we can configure the starmap_async() function to use the error callback function when an exception is raised executing a task in the process pool.
1 2 3 |
... # issue tasks into the process pool result = pool.starmap_async(task, items, error_callback=custom_error_callback) |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# SuperFastPython.com # example of using an event in the process pool that fails with an error callback from random import random from time import sleep from multiprocessing import Event from multiprocessing.pool import Pool # error callback function def custom_error_callback(error): print(error, flush=True) # task executed in a worker process def task(identifier, event): # wait for the event to be set print(f'Task {identifier} waiting...', flush=True) event.wait() # generate a value value = random() # block for a moment sleep(value) # report a message print(f'Task {identifier} completed with {value}', flush=True) # protect the entry point if __name__ == '__main__': # create the shared event event = Event() # create and configure the process pool with Pool(4) as pool: # prepare task arguments items = [(i, event) for i in range(4)] # issue tasks into the process pool result = pool.starmap_async(task, items, error_callback=custom_error_callback) # wait a moment sleep(1) # start all issued tasks print('Setting event.', flush=True) event.set() # wait for all tasks to finish result.wait() print('All done.', flush=True) |
Running the example first creates the shared event.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 4 tasks are issued to the process pool. The main process then blocks for a moment.
Each issued task fails with a RuntimeError. The first task to raise the error calls the error callback, which reports the error directly.
The main process unblocks, sets the event, waits for the tasks to finish and terminates the program.
This highlights how the process pool is configured to not allow events to be passed directly as arguments.
Note, the error message mentions a “Condition object” because the multiprocessing.Event makes use of a multiprocessing.Condition internally.
1 2 3 |
Condition objects should only be shared between processes through inheritance Setting event. All done. |
Next, let’s look at how we might expose the error that is raised by issuing asynchronous tasks, then get the results of the issued tasks directly.
RuntimeError Using Event in the Process Pool
We can update the above example to get the results of the issued tasks.
Our task() function does not have a return value, therefore there are no results to get. Nevertheless, we can get the get() function on the returned AsyncResult object for each issued task. This will allow any failure that occurred while running the task, such as an Error or Exception to be re-raised.
Recall that the get() function on the AsyncResult will block until the task function returns.
For example:
1 2 3 4 |
... # get each result for item in result.get(): pass |
You can learn more about getting results from the AsyncResult object in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of using an event in the process pool that fails with an error from random import random from time import sleep from multiprocessing import Event from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, event): # wait for the event to be set print(f'Task {identifier} waiting...', flush=True) event.wait() # generate a value value = random() # block for a moment sleep(value) # report a message print(f'Task {identifier} completed with {value}', flush=True) # protect the entry point if __name__ == '__main__': # create the shared event event = Event() # create and configure the process pool with Pool(4) as pool: # prepare task arguments items = [(i, event) for i in range(4)] # issue tasks into the process pool result = pool.starmap_async(task, items) # wait a moment sleep(1) # start all issued tasks print('Setting event.', flush=True) event.set() # get results for each task for value in result.get(): pass print('All done.', flush=True) |
Running the example first creates the shared event.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 4 tasks are issued to the process pool. The main process then sleeps for a moment.
The tasks fail to execute and no messages are reported.
The main process unblocks, sets the event, then begins iterating through the return values for the issued task.
The first call to get() re-raises the RuntimeError error experienced in the task() function, terminating the program.
1 2 3 4 |
Setting event. Traceback (most recent call last): ... RuntimeError: Condition objects should only be shared between processes through inheritance |
Now that we have confirmed that we cannot pass a normal multiprocessing.Event to a task executed in the process pool, let’s look at how we might use a multiprocessing.Manager to fix the problem.
Use a Manager to Share an Event in the Process Pool
We can explore how to use a multiprocessing.Manager to share a multiprocessing.Event among child worker processes in the process pool.
This can be achieved by updating the first asynchronous example of issuing tasks that failed silently to create the multiprocessing.Event using a multiprocessing.Manager.
First, a manager can be created using the context manager interface. This ensures that the manager is closed automatically once we are finished with it.
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
Next, a multiprocessing.Event can be created using the multiprocessing.Manager instance.
This will create and host an event in a new server process and returns a proxy object that can be shared among child worker processes and used to interface with the centralized event instance.
1 2 3 |
... # create the shared event event = manager.Event() |
And that’s it.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of using an event in the process pool that use a manager from random import random from time import sleep from multiprocessing import Manager from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, event): # wait for the event to be set print(f'Task {identifier} waiting...', flush=True) event.wait() # generate a value value = random() # block for a moment sleep(value) # report a message print(f'Task {identifier} completed with {value}', flush=True) # protect the entry point if __name__ == '__main__': # create the manager with Manager() as manager: # create the shared event event = manager.Event() # create and configure the process pool with Pool(4) as pool: # prepare task arguments items = [(i, event) for i in range(4)] # issue tasks into the process pool result = pool.starmap_async(task, items) # wait a moment sleep(1) # start all issued tasks print('Setting event.', flush=True) event.set() # wait for all tasks to finish result.wait() print('All done.', flush=True) |
Running the example first creates the shared event.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 4 tasks are issued to the process pool. The main process then sleeps for a moment.
Each task begins executing, reporting a message and waiting for the event to be set.
The main process continues on. It sets the event. It then waits for all issued tasks to complete.
The waiting tasks notice that the event is set and continue on. Each task generates a random value between 0 and 1, then blocks for a fraction of a second, then reports a message.
All tasks complete and the main process continues on, reports a final message and closes the application.
This highlights how we can share and use an event variable among child worker processes in the process pool.
1 2 3 4 5 6 7 8 9 10 |
Task 0 waiting... Task 1 waiting... Task 2 waiting... Task 3 waiting... Setting event. Task 0 completed with 0.18413644309583843 Task 1 completed with 0.5040406238898768 Task 2 completed with 0.8470031509751197 Task 3 completed with 0.9937680687798223 All done. |
Use a Global Variable to Share an Event in the Process Pool
An alternate approach to sharing an event with workers in the process pool is to share the event as a global variable.
This requires the use of the ‘fork‘ start method for creating new processes in Python.
A limitation of this approach is that the ‘fork‘ start method is not supported on all platforms. For example, ‘fork‘ is not supported on Windows.
In this example, we will update the previous example to use the ‘fork‘ start method and to share the multiprocessing.Event with all workers in the process pool via a global variable.
Firstly, we update the task() function so that it does not take the event as an argument, and instead assumes it is available via an inherited global variable.
The updated version of the task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a worker process def task(identifier): # wait for the event to be set print(f'Task {identifier} waiting...', flush=True) event.wait() # generate a value value = random() # block for a moment sleep(value) # report a message print(f'Task {identifier} completed with {value}', flush=True) |
Next, in the main process, we can configure the program to use the ‘fork‘ start method when creating new child processes, such as those in the process pool.
1 2 3 |
... # set the fork start method set_start_method('fork') |
You can learn more about setting the process start method in the tutorial:
Next, we can create the shared multiprocessing.Event instance as per normal.
1 2 3 |
... # create the shared event event = Event() |
This will implicitly be a global variable.
A more explicit approach would be to declare a “event” as a global variable, then assign it a new multiprocessing.Event instance.
For example:
1 2 3 4 5 |
... # declare the global variable global event # assign the global variable event = multiprocessing.Event() |
This might be easier to read to newer Python programmers.
We can then create the process pool as per normal, then issue 4 tasks to the process pool asynchronously via the map_async() function.
1 2 3 4 5 |
... # create and configure the process pool with Pool(4) as pool: # issue tasks into the process pool result = pool.map_async(task, range(4)) |
We use the map_async() function instead of the starmap_async() function because our task() function only has one argument.
You can learn more about the map_async() function in the tutorial:
Finally, as before, the main process will block for a moment to allow all tasks to start and wait for the event to be set, then set the event and wait for the issued tasks to complete.
1 2 3 4 5 6 7 8 9 |
... # wait a moment sleep(1) # start all issued tasks print('Setting event.', flush=True) event.set() # wait for all tasks to finish result.wait() print('All done.', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of using an event in the process pool via an inherited global variable from random import random from time import sleep from multiprocessing import set_start_method from multiprocessing import Event from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # wait for the event to be set print(f'Task {identifier} waiting...', flush=True) event.wait() # generate a value value = random() # block for a moment sleep(value) # report a message print(f'Task {identifier} completed with {value}', flush=True) # protect the entry point if __name__ == '__main__': # set the fork start method set_start_method('fork') # create the shared event event = Event() # create and configure the process pool with Pool(4) as pool: # issue tasks into the process pool result = pool.map_async(task, range(4)) # wait a moment sleep(1) # start all issued tasks print('Setting event.', flush=True) event.set() # wait for all tasks to finish result.wait() print('All done.', flush=True) |
Running the example first creates the shared event.
The process pool is then created with 4 worker processes.
The task arguments are prepared and 4 tasks are issued to the process pool. The main process then sleeps for a moment.
Each task begins executing, reporting a message and waiting for the event to be set.
The main process continues on. It then sets the event and then waits for all issued tasks to complete.
The waiting tasks notice that the event is set and continue on. Each task generates a random value between 0 and 1, then blocks for a fraction of a second, then reports a message.
All tasks complete and the main process continues on, reports a final message and closes the application.
This highlights an alternate way that we can share and use an event among child worker processes in the process pool.
1 2 3 4 5 6 7 8 9 10 |
Task 0 waiting... Task 1 waiting... Task 2 waiting... Task 3 waiting... Setting event. Task 2 completed with 0.34916782261872026 Task 0 completed with 0.5747714837403812 Task 3 completed with 0.7922442775112111 Task 1 completed with 0.9403023029276796 All done. |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use an event in the process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Stephen Coetsee on Unsplash
Do you have any questions?