You can limit the maximum tasks executed by child worker processes in the ProcessPoolExecutor via the max_tasks_per_child argument.
In this tutorial, you will discover how to limit the maximum tasks per child worker process in the ProcessPoolExecutor.
Let’s get started.
Need to Limit Maximum Tasks Per Child Process
The ProcessPoolExecutor provides a pool of reusable worker processes using the executor design pattern.
Tasks executed in child processes are executed in parallel in Python, making the ProcessPoolExecutor appropriate for CPU-bound tasks.
A ProcessPoolExecutor can be created directly or via the context manager interface and tasks can be issued one by one via the submit() method or in batch via the map() method.
For example:
1 2 3 4 5 6 7 |
... # create a process pool with ProcessPoolExecutor() as ppe: # issue a task future = ppe.submit(task) # the the task result once the task is done result = future.result() |
You can learn more about the ProcessPoolExecutor in the tutorial:
Each worker process in the pool is a separate child process that is either forked or spawned.
It is possible for child processes to become unstable or accumulate resources without releasing them, such as if there are subtle bugs in the tasks that are being executed.
As such, it is a good practice to limit the number of tasks executed by each child worker process and create a new replacement worker process once the limit on the number of tasks has been reached.
A frequent pattern found in other systems (such as Apache, mod_wsgi, etc) to free resources held by workers is to allow a worker within a pool to complete only a set amount of work before being exiting, being cleaned up and a new process spawned to replace the old one.
— MULTIPROCESSING — PROCESS-BASED PARALLELISM
Given that this is a good practice, how can we limit the maximum number of tasks completed by each child worker process in the ProcessPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How To Limit Maximum Tasks Per Child Process
We can limit the number of tasks executed by each child worker process via the max_tasks_per_child argument when creating an instance of the ProcessPoolExecutor class.
max_tasks_per_child is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be replaced with a fresh worker process. By default max_tasks_per_child is None which means worker processes will live as long as the pool.
— concurrent.futures — Launching parallel tasks
The max_tasks_per_child argument was added to the ProcessPoolExecutor class in Python version 3.11 via the feature request “Feature request: maxtasksperchild for ProcessPoolExecutor“.
This argument is set to None by default, which allows each child worker process to execute an unlimited number of tasks.
The max_tasks_per_child argument can be set to a positive integer value that limits the number of tasks executed by a child process before the process is terminated and a replacement child process is created.
For example:
1 2 3 |
... # create a process pool that limits the number of tasks to 5 per process ppe = ProcessPoolExecutor(max_tasks_per_child=5) |
When using the max_tasks_per_child argument, the “spawn” start method must be used. This is the default method for starting processing on Windows and MacOS.
If you are new to process start methods, see the tutorial:
The max_tasks_per_child cannot be used with the “fork” start method, the default on Linux. If the fork start method is being used as the system default, the ProcessPoolExeuctor will use the ‘spawn’ start method internally.
When a max is specified, the “spawn” multiprocessing start method will be used by default in absence of a mp_context parameter. This feature is incompatible with the “fork” start method.
— concurrent.futures — Launching parallel tasks
If the ‘fork‘ start method is forced when creating the ProcessPoolExecutor via the mp_context argument, then a ValueError exception is raised indicating that the fork start method is not compatible with the “max_tasks_per_child” argument.
- max_tasks_per_child is incompatible with the ‘fork’ multiprocessing start method; supply a different mp_context.
You can learn more about setting the mp_context argument in the tutorial:
Now that we know how to limit the maximum tasks per child process, let’s look at some worked examples.
Example of Maximum Tasks Per Child Process
We can explore an example of limiting the maximum number of tasks per child worker process in the ProcessPoolExecutor.
In this example, we will define a simple task that reports the name and pid of the worker process that is executing it, then blocks for a moment to simulate work. We will create a ProcessPoolExecutor with 1 worker process that is limited to 2 tasks before being replaced. Finally, we will issue batches of tasks to the pool and watch as the name and pid used to execute the task changes as child worker processes are replaced after they hit their task limits.
First, we can define a task function that gets the current process and reports the name and pid then sleeps for half a second.
We can get the current process via the multiprocessing.current_process() function.
You can see examples of using this function in the tutorial:
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in the process pool def task(task_id): # get the current process worker = current_process() # report details about the current process print(f'Task {task_id}, worker name={worker.name}, pid={worker.pid}', flush=True) # block for a moment sleep(0.5) |
Next, in the main process we can then explicitly set the spawn start method. This can be achieved using the set_start_method() function. This is not required as the ProcessPoolExecutor executor will use the ‘spawn‘ start method internally when limiting the max tasks per process, but we will set it explicitly to contrast this example to subsequent examples below.
1 2 3 |
... # force the start method to spawn set_start_method('spawn') |
Next, we can create an instance of the ProcessPoolExecutor using the context manager interface and set one worker process and 2 tasks per child process.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(max_workers=1, max_tasks_per_child=2) as ppe: # ... |
If you are new to the context manager interface, you can learn more in the tutorial:
We will then issue two tasks to the pool, wait for them to complete, then issue two more.
In this case, we will issue tasks using the submit() method in a list comprehension and wait on the list of Future objects using the wait() method.
1 2 3 4 5 6 |
... # issue tasks to the pool futures = [ppe.submit(task, i) for i in range(2)] _ = wait(futures) futures = [ppe.submit(task, i) for i in range(2)] _ = wait(futures) |
And that’s it.
We expect that the first two tasks will be executed by one child process. This process will then be terminated and a second different child process will execute the second two tasks.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of limiting the number of tasks per process in the processpoolexecutor from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from multiprocessing import current_process from multiprocessing import set_start_method from time import sleep # task executed in the process pool def task(task_id): # get the current process worker = current_process() # report details about the current process print(f'Task {task_id}, worker name={worker.name}, pid={worker.pid}', flush=True) # block for a moment sleep(0.5) # protect the entry point if __name__ == '__main__': # force the start method to spawn set_start_method('spawn') # create the process pool with ProcessPoolExecutor(max_workers=1, max_tasks_per_child=2) as ppe: # issue tasks to the pool futures = [ppe.submit(task, i) for i in range(2)] _ = wait(futures) futures = [ppe.submit(task, i) for i in range(2)] _ = wait(futures) |
Running the example first sets the start method to spawn.
Next, a process pool is created with one worker process that is limited to a maximum of two tasks before being terminated and replaced.
We then issue two tasks to the pool and wait for them to complete. In this case, we can see that indeed the first two tasks are executed by the same process as we expect, a process with name=SpawnProcess-1, pid=23258.
Two additional tasks are issued to the process pool and as expected are executed by a second different child process, in this case, name=SpawnProcess-2, pid=23260.
This highlights that we can limit the maximum number of tasks executed by each worker process in the ProcessPoolExecutor via the max_tasks_per_child argument.
1 2 3 4 |
Task 0, worker name=SpawnProcess-1, pid=23258 Task 1, worker name=SpawnProcess-1, pid=23258 Task 0, worker name=SpawnProcess-2, pid=23260 Task 1, worker name=SpawnProcess-2, pid=23260 |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Maximum Tasks Per Child Process with Fork Start Method
We can limit the maximum number of tasks per child worker process when using the fork start method.
The ProcessPoolExecutor will use the ‘spawn‘ start method internally and automatically when the max_tasks_per_child is set, meaning that we are able to use this feature when our system is configured to use the ‘fork’ start method elsewhere in our program.
We can explore this with some worked examples.
Note, these examples only work on systems that support the ‘fork’ start method, e.g. they may not run on Windows.
Force Fork Start Method and Limit Max Tasks Per Child
We can update the above example to use the ‘fork‘ start method for the main process (and child processes) and then set the max_tasks_per_child.
Firstly, we can update the task() start method to report the start method used in the worker process.
We expect this to be ‘spawn‘, regardless of what was set in the main process.
1 2 3 4 5 6 7 8 9 10 |
# task executed in the process pool def task(task_id): # get the start method method = get_start_method() # get the current process worker = current_process() # report details about the current process print(f'Task {task_id}, method={method}, name={worker.name}, pid={worker.pid}', flush=True) # block for a moment sleep(0.5) |
Next, we can update the main process to explicitly set the fork start method.
1 2 3 |
... # force the start method to spawn set_start_method('fork') |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of limiting tasks per process with fork start method from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from multiprocessing import current_process from multiprocessing import set_start_method from multiprocessing import get_start_method from time import sleep # task executed in the process pool def task(task_id): # get the start method method = get_start_method() # get the current process worker = current_process() # report details about the current process print(f'Task {task_id}, method={method}, name={worker.name}, pid={worker.pid}', flush=True) # block for a moment sleep(0.5) # protect the entry point if __name__ == '__main__': # force the start method to spawn set_start_method('fork') # create the process pool with ProcessPoolExecutor(max_workers=1, max_tasks_per_child=2) as ppe: # issue tasks to the pool futures = [ppe.submit(task, i) for i in range(2)] _ = wait(futures) futures = [ppe.submit(task, i) for i in range(2)] _ = wait(futures) |
Running the example first sets the fork start method. This is applied to the main process and all child processes.
Next, the ProcessPoolExecutor is created and configured with one worker and 2 tasks per worker.
Two tasks are issued and the main process blocks until they are complete.
We can see that the first two tasks are executed by one process that was created using the ‘spawn’ start method, as signaled by the start method it uses and the name of the process (if it was forked, it would have “Fork” in the default name).
Two more tasks are issued and waited and are executed by a second worker process with a different name and pid, also created using the spawn start method instead of the fork start method.
This highlights that the max_tasks_per_child argument can be used to limit the maximum tasks per child worker process even when the fork start method is the default in the program. This is because the ProcessPoolExecutor will automatically use the ‘spawn’ start method internally instead when the max_tasks_per_child argument is set.
1 2 3 4 |
Task 0, method=spawn, name=SpawnProcess-1, pid=23313 Task 1, method=spawn, name=SpawnProcess-1, pid=23313 Task 0, method=spawn, name=SpawnProcess-2, pid=23315 Task 1, method=spawn, name=SpawnProcess-2, pid=23315 |
Force Fork Start Method via Context and Limit Max Tasks Per Child
We can go one step further and explicitly force the ProcessPoolExecutor to use the ‘fork’ start method via the multiprocessing context.
This can be achieved by creating a fork multiprocessing context via the multiprocessing.get_context().
1 2 3 |
... # get the multiprocessing context context = get_context('fork') |
Then pass this context to the ProcessPoolExecutor via the mp_context argument.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(max_workers=1, mp_context=context, max_tasks_per_child=2) as ppe: # ... |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of limiting tasks per process with fork start method via context from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from multiprocessing import current_process from multiprocessing import set_start_method from multiprocessing import get_start_method from multiprocessing import get_context from time import sleep # task executed in the process pool def task(task_id): # get the start method method = get_start_method() # get the current process worker = current_process() # report details about the current process print(f'Task {task_id}, method={method}, name={worker.name}, pid={worker.pid}', flush=True) # block for a moment sleep(0.5) # protect the entry point if __name__ == '__main__': # force the start method to spawn set_start_method('fork') # get the multiprocessing context context = get_context('fork') # create the process pool with ProcessPoolExecutor(max_workers=1, mp_context=context, max_tasks_per_child=2) as ppe: # issue tasks to the pool futures = [ppe.submit(task, i) for i in range(2)] _ = wait(futures) futures = [ppe.submit(task, i) for i in range(2)] _ = wait(futures) |
Running the example first sets the default start method to fork.
A multiprocessing context with the fork start method is then created.
Next, the ProcessPoolExecutor is created with one worker process, a maximum of two tasks per child process and the fork multiprocessing context.
This fails with a ValueError exception and a message that the fork start method is not compatible with the max_tasks_per_child argument.
- max_tasks_per_child is incompatible with the ‘fork’ multiprocessing start method; supply a different mp_context.
This highlights that we cannot explicitly force a fork start method when using the ProcessPoolExecutor when the max_tasks_per_child is set.
1 2 3 4 |
Traceback (most recent call last): ... raise ValueError("max_tasks_per_child is incompatible with" ValueError: max_tasks_per_child is incompatible with the 'fork' multiprocessing start method; supply a different mp_context. |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Unexpected Behavior With Maximum Tasks Per Child Process (BUG?)
What happens if we issue many tasks to the ProcessPoolExecutor, more tasks than a single process is able to execute given the maximum limit set?
We can explore this situation.
Consider the case where we have a process pool with one worker process that is limited to a maximum of 2 tasks before being replaced. If we issue 4 tasks to the pool at once, we would expect that the first two tasks would be executed by one child worker process and the next two tasks would be executed by a second child worker process.
We do not see this in practice. Instead, it seems that the tasks are never executed by the replacement worker process and the main process hangs, waiting for the tasks to complete.
Is my expectation wrong somehow?
I think this is a bug. I am using Python 3.11.4.
The example below implements this case where each task is issued separately via the submit() method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of issuing more tasks than the maximum limit per child process from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from multiprocessing import current_process from multiprocessing import set_start_method from time import sleep # task executed in the process pool def task(task_id): # get the current process worker = current_process() # report details about the current process print(f'Task {task_id}, worker name={worker.name}, pid={worker.pid}', flush=True) # block for a moment sleep(0.5) # protect the entry point if __name__ == '__main__': # force the start method to spawn set_start_method('spawn') # create the process pool with ProcessPoolExecutor(max_workers=1, max_tasks_per_child=2) as ppe: # issue tasks to the pool futures = [ppe.submit(task, i) for i in range(4)] _ = wait(futures) |
Running the program first sets the spawn start method.
The ProcessPoolExecutor is created with one worker and a max of two tasks per child.
All four tasks are issued to the pool using the submit() method.
The main process then blocks, waiting for all 4 tasks to complete.
The first two tasks execute sequentially using the first worker process, as we would expect.
Then the program hangs, waiting forever.
Do you see the same result?
1 2 |
Task 0, worker name=SpawnProcess-1, pid=23377 Task 1, worker name=SpawnProcess-1, pid=23377 |
We can see the same issue if we issue all tasks at once using the map() method.
For example:
1 2 3 4 5 6 |
... # create the process pool with ProcessPoolExecutor(max_workers=1, max_tasks_per_child=2) as ppe: # issue tasks to the pool for value in ppe.map(task, range(4)): pass |
If we update the first example to add a busy loop instead of waiting explicitly, we can report on the internals of the ProcessPoolExecutor, such as the set of child worker processes.
1 2 3 4 5 6 7 |
... # busy loop checking on internal processes in the pool while True: # report the set of worker processes print(ppe._processes) # block for a moment sleep(1) |
The complete example with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of issuing more tasks than the maximum limit per child process from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from multiprocessing import current_process from multiprocessing import set_start_method from time import sleep # task executed in the process pool def task(task_id): # get the current process worker = current_process() # report details about the current process print(f'Task {task_id}, worker name={worker.name}, pid={worker.pid}', flush=True) # block for a moment sleep(0.5) # protect the entry point if __name__ == '__main__': # force the start method to spawn set_start_method('spawn') # create the process pool with ProcessPoolExecutor(max_workers=1, max_tasks_per_child=2) as ppe: # issue tasks to the pool futures = [ppe.submit(task, i) for i in range(4)] # busy loop checking on internal processes in the pool while True: # report the set of worker processes print(ppe._processes) # block for a moment sleep(1) |
Running the example results in the same problem above.
In this case, we can see that the set of worker processes has one process in it initially as we expect.
The two tasks are executed and the process is terminated.
Then the set of processes is empty, suggesting that the worker process is not replaced.
1 2 3 4 5 6 7 8 9 10 |
{23417: <SpawnProcess name='SpawnProcess-1' pid=23417 parent=23415 started>} Task 0, worker name=SpawnProcess-1, pid=23417 Task 1, worker name=SpawnProcess-1, pid=23417 {23417: <SpawnProcess name='SpawnProcess-1' pid=23417 parent=23415 started>} {} {} {} {} {} ... |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Takeaways
You now know how to limit the maximum tasks per child worker process in the ProcessPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Curioso Photography on Unsplash
Do you have any questions?