Last Updated on October 18, 2022
You can create a process pool using the multiprocessing.pool.Pool class.
In this tutorial you will discover the multiprocessing process pool in Python.
Let’s get started.
What is a Process
A process is a computer program.
Every Python program is a process and has one thread called the main thread used to execute your program instructions. Each process is, in fact, one instance of the Python interpreter that executes Python instructions (Python byte-code), which is a slightly lower level than the code you type into your Python program.
Sometimes we may need to create new processes to run additional tasks concurrently.
Python provides real system-level processes via the multiprocessing.Process class.
The multiprocessing module and the multiprocessing.Process class provide process-based concurrency, as opposed to thread-based concurrency.
The underlying operating system controls how new processes are created. On some systems, that may require spawning a new process, and on others, it may require that the process is forked. The operating system-specific method used for creating new processes in Python is not something we need to worry about as it is managed by your installed Python interpreter.
A task can be run in a new process by creating an instance of the multiprocessing.Process class and specifying the function to run in the new process via the “target” argument.
1 2 3 |
... # define a task to run in a new process p = Process(target=task) |
Once the process is created, it must be started by calling the start() function.
1 2 3 |
... # start the task in a new process p.start() |
You can learn more about running a function in a child process in the tutorial:
We can then wait around for the task to complete by joining the process; for example:
1 2 3 |
... # wait for the task to complete p.join() |
Whenever we create new processes, we must protect the entry point of the program.
1 2 3 |
# entry point for the program if __name__ == '__main__': # do things... |
Tying this together, the complete example of creating a Process to run an ad hoc task function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# SuperFastPython.com # example of running a function in a new process from multiprocessing import Process # a task to execute in another process def task(): print('This is another process', flush=True) # entry point for the program if __name__ == '__main__': # define a task to run in a new process p = Process(target=task) # start the task in a new process p.start() # wait for the task to complete p.join() |
This is useful for running one-off ad hoc tasks in a separate process, although it becomes cumbersome when you have many tasks to run.
Each process that is created requires the application of resources (e.g. an instance of the Python interpreter and a memory for the process’s main thread’s stack space). The computational costs for setting up processes can become expensive if we are creating and destroying many processes over and over for ad hoc tasks.
Instead, we would prefer to keep worker processes around for reuse if we expect to run many ad hoc tasks throughout our program.
This can be achieved using a process pool.
Run loops using all CPUs, download your FREE book to learn how.
What is a Process Pool
A process pool is a programming pattern for automatically managing a pool of worker processes.
The pool is responsible for a fixed number of processes.
- It controls when they are created, such as when they are needed.
- It also controls what they should do when they are not being used, such as making them wait without consuming computational resources.
The pool can provide a generic interface for executing ad hoc tasks with a variable number of arguments, much like the target property on the Process object, but does not require that we choose a process to run the task, start the process, or wait for the task to complete.
Python provides a process pool via the ProcessPoolExecutor class.
You can learn more about the ProcessPoolExecutor class in the tutorial:
Python also provides a process pool via the multiprocessing.pool.Pool class.
What is the Process Pool Class
The multiprocessing.pool.Pool class provides a process pool in Python.
It allows tasks to be submitted as functions to the process pool to be executed concurrently.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
To use the process pool, we must first create and configure an instance of the class.
For example:
1 2 3 |
... # create a process pool pool = multiprocessing.pool.Pool(...) |
Once configured, tasks can be submitted to the pool for execution using blocking and asynchronous versions of apply() and map().
For example:
1 2 3 |
... # issues tasks for execution results = pool.map(task, items) |
Once we have finished with the process pool, it can be closed and resources used by the pool can be released.
For example:
1 2 3 |
... # close the process pool pool.close() |
Next, let’s take a closer look at each of these aspects in turn.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
How to Configure the Process Pool
The process pool is configured via the class constructor.
All arguments to the constructor are optional, therefore it is possible to create a process pool with all default configuration by providing no arguments.
For example:
1 2 3 |
... # create a process pool with a default configuration pool = multiprocessing.pool.Pool() |
The first argument is “processes” that specifies the number of workers to create and manage within the pool.
By default this equals the number of logical CPUs in your system.
processes is the number of worker processes to use. If processes is None then the number returned by os.cpu_count() is used.
— multiprocessing — Process-based parallelism
For example, if you had 4 physical CPU cores with hyperthreading, this would mean you would have 8 logical CPU cores and this would be the default number of workers in the process pool.
It is a good idea to set the number of worker processes to the number of logical or the number of physical CPU cores in your system. Experiment and discover what works best.
For example:
1 2 3 |
... # create a process pool with a given number of workers pool = multiprocessing.pool.Pool(processes=4) |
Each worker process may prepare resources used within that process by executing tasks.
A good example might be access to a file, socket, or logging.
This can be achieved by configuring an initializer function to execute by each worker process. The “initializer” argument specifies the name of the function and the “initargs” specifies any arguments to the function.
For example:
1 2 3 4 5 6 7 |
# initialization function for worker processes def init_worker(log_name): # ... ... # create a process pool with initialized workers pool = multiprocessing.pool.Pool(initializer=init_worker, initargs=['app']) |
Each worker process in the pool is a separate child process.
It is possible for child processes to become unstable or accumulate resources without releasing them, such as if there are subtle bugs in the tasks that are being executed.
As such, it is a good practice to limit the number of tasks executed by each worker process and create a new replacement worker process once the limit on the number of tasks has been reached.
A frequent pattern found in other systems (such as Apache, mod_wsgi, etc) to free resources held by workers is to allow a worker within a pool to complete only a set amount of work before being exiting, being cleaned up and a new process spawned to replace the old one. The maxtasksperchild argument to the Pool exposes this ability to the end user.
— multiprocessing — Process-based parallelism
The number of tasks that may be executed by each worker process can be set via the “maxtasksperchild” argument, which defaults to no limit.
maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool.
— multiprocessing — Process-based parallelism
For example:
1 2 3 |
... # create a process pool limiting each worker to 10 tasks pool = multiprocessing.pool.Pool(maxtasksperchild=10) |
Finally, we can configure the process context used to create child processes.
Recall that a process context defines the start method used to create child processes, such as ‘fork‘ or ‘spawn‘, and provides an implementation of the multiprocessing API using the configured context.
We can configure and retrieve a multiprocessing context using the multiprocessing.get_context() function.
You can learn more about start methods in the tutorial:
You can learn more about process contexts in the tutorial:
We can create a new process context with a given start method, then pass this to the process pool to create all child processes via the “context” argument. If not set, the current context is used by default.
For example:
1 2 3 4 5 |
... # create a multiprocessing context with the spawn start method spawn_context = multiprocessing.get_context('spawn') # create a process pool using the new context pool = multiprocessing.pool.Pool(context=spawn_context) |
Now that we know how to configure the process pool, let’s look at how we might submit tasks.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
How to Submit Tasks to the Process Pool
There are four ways to issue tasks to the process pool.
They are:
- apply, via pool.apply()
- map, via pool.map()
- imap, via pool.imap()
- starmap, via pool.starmap()
The apply() function will submit a task to the process pool that executes a function with the given arguments.
It is used for one-off function calls to be executed by a worker process and specifies the function to execute and any arguments to the function.
For example:
1 2 3 |
... # execute a function call by the process pool result = pool.apply(task) |
The map() function is a parallel version of the built-in map() function.
Recall that the built-in map function will call a specified function for each item on a provided iterable.
The map() function on the process pool performs the same action, except that each function call on an item in an iterable is executed by a worker process.
A common idiom for using map() is to iterate the results returned from the function call.
For example:
1 2 3 4 |
... # execute tasks in the process pool and process results for result in pool.map(task, items): # ... |
Unlike the built-in map() function, the pool version of map() only supports one iterable instead of multiple iterable, and all items are iterated and submitted as tasks instead of being lazily evaluated.
The starmap() function can be used for a version of map() that supports an arbitrary number of iterables.
Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments. Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 |
... # execute tasks in the process pool and process results for result in pool.starmap(task, args): # ... |
The imap() function provides a version of the map() function on the process pool that performs a lazy evaluation. That is, it does not dispatch tasks to the process pool until just-in-time, such as when requesting the next result on the returned iterator.
For example:
1 2 3 4 |
... # execute tasks in the process pool and process results for result in pool.imap(task, items): # ... |
Each approach for issuing tasks to the process pool has both a blocking (synchronous) and a non-blocking (asynchronous) version.
Recall that a blocking function call will not return until the function has finished, whereas a non-blocking function call will return immediately and provide a callback mechanism for getting the result of the function call.
The functions for issuing tasks to the process pool listed above, apply(), map(), imap(), and starmap() are all blocking function calls.
The non-blocking or asynchronous versions of these functions have a _async suffix, for example:
- apply, via pool.apply_async()
- map, via pool.map_async()
- imap, via pool.imap_unordered()
- starmap, via pool.starmap_async()
The imap_unordered() function will return results out of under, e.g. in the order the tasks complete, unlike the imap() function.
The apply_async() function allows callback and error handling functions to be specified and called automatically once the issued task is complete.
The map_async() and starmap_async() versions of the functions for issuing tasks return an instance of the AsyncResult class for each task that provides a mechanism for getting the results of the submitted tasks once they are completed.
Next, let’s take a closer look at the AsyncResult class.
What is the AsyncResult Class
The multiprocessing.pool.AsyncResult represents a result from a task issued asynchronously to the process pool.
It provides a mechanism to check the status, wait for, and get the result for a task executed asynchronously in the process pool.
An instance of the multiprocessing.pool.AsyncResult class is returned for each task submitted by both the map_async() and starmap_async() functions.
For example, a call to map_async() for a function task() with an iterable of ten items, will return a list of ten instances of the multiprocessing.pool.AsyncResult class.
For example:
1 2 3 |
... # submit tasks to the pool in a non-blocking manner async_result = pool.map_async(task, items) |
For a single task represented via a multiprocessing.pool.AsyncResult, we can check if the task is completed via the ready() function which returns True if the task is completed (successfully or with an error) or False otherwise.
For example:
1 2 3 4 |
... # check if a task is done if async_result.ready(): # ... |
A task may be completed successfully or may raise an Error or Exception. We can check if a task completed successfully via the successful() function. If the task is still running, it raises a ValueError.
For example:
1 2 3 4 |
... # check if a task was completed successfully if async_result.successful(): # ... |
We can wait for a task to complete via the wait() function.
If called with no argument, the function call will block until the task finishes. A “timeout” can be provided so that the function will after a fixed number of seconds if the task has not completed.
For example:
1 2 3 |
... # wait 10 seconds for the task to complete async_result.wait(timeout=10) |
Finally, we can get the result from the task via the get() function.
If the task is finished, then get() will return immediately. If the task is still running, a call to get() will not return until the task finishes and returns the result.
For example:
1 2 3 |
... # get the result of a task result = async_result.get() |
If an exception was raised while the task was being executed, it is re-raised by the get() function in the parent process.
Finally, a “timeout” argument can be specified when getting the result. It will return when the task is finished or after the fixed number of seconds have elapsed after which a time multiprocessing.TimeoutError is raised.
Next, let’s look at how we might close down the process pool.
How to Close the Process Pool
The process pool can be closed once we have no further tasks to issue.
We can call the close() function to close down the process pool once all the currently issued tasks have completed.
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
— multiprocessing — Process-based parallelism
The close() function will return immediately and the pool will not take any further tasks.
For example:
1 2 3 |
... # close the process pool pool.close() |
Alternately, we may want to forcefully terminate all child worker processes, regardless of whether they are executing tasks or not.
This can be achieved via the terminate() function.
For example:
1 2 3 |
... # forcefully close all worker processes pool.terminate() |
We may want to then wait for all tasks in the pool to finish.
This can be achieved by calling the join() function on the pool.
For example:
1 2 3 |
... # wait for all issued tasks to complete pool.join() |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to create and use a process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Brett Jordan on Unsplash
Do you have any questions?