Multiprocessing Pool Max Tasks Per Child in Python

June 30, 2022 Python Multiprocessing Pool

You can limit the maximum tasks executed by child worker processes in the process pool by setting the "maxtasksperchild" argument in the multiprocessing.pool.Pool class constructor.

In this tutorial you will discover how to limit the maximum tasks per child process in Python process pools.

Let's get started.

Need to Limit Maximum Tasks Per Child

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

-- multiprocessing — Process-based parallelism

We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map(). Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

Each worker process in the pool is a separate child process.

It is possible for child processes to become unstable or accumulate resources without releasing them, such as if there are subtle bugs in the tasks that are being executed.

As such, it is a good practice to limit the number of tasks executed by each worker process and create a new replacement worker process once the limit on the number of tasks has been reached.

A frequent pattern found in other systems (such as Apache, mod_wsgi, etc) to free resources held by workers is to allow a worker within a pool to complete only a set amount of work before being exiting, being cleaned up and a new process spawned to replace the old one.

-- multiprocessing — Process-based parallelism

How can we limit the maximum number of tasks completed by each worker child process in the process pool?

How To Limit Maximum Tasks Per Child

We can limit the maximum number of tasks completed by each child process in the process pool by setting the "maxtasksperchild" argument in the multiprocessing.pool.Pool class constructor when configuring a new process pool.

For example:

...
# create a process loop and limit the number of tasks in each worker
pool = multiprocessing.pool.Pool(maxtasksperchild=5)

The maxtasksperchild takes a positive integer number of tasks that may be completed by a child worker process, after which the process will be terminated and a new child worker process will be created to replace it.

maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed.

-- multiprocessing — Process-based parallelism

By default the maxtasksperchild argument is set to None, which means each child worker process will run for the lifetime of the process pool.

The default maxtasksperchild is None, which means worker processes will live as long as the pool.

-- multiprocessing — Process-based parallelism

Now that we know how to configure the maximum number of tasks per worker child process in the process pool, let's look at a worked example.

Example of Maximum Tasks Per Child

We can explore how to limit the maximum number of tasks per child worker process with a worked example.

In this example we will define a simple task to execute in the process pool that will report the process name and block for a moment. We will then configure a process pool with two processes and to limit the number of tasks per child process to three. We will then issue ten tasks to the pool and see as each child process executes three tasks each before being replaced.

Firstly, we can define a function to execute the task by workers in the process pool.

The function will first get the current multiprocessing.Process instance that is executing the task via the multiprocessing.current_process() function. It then reports the name of the process executing the task along with an integer identifying the task, and blocks for one second.

The task() function below implements this.

# task executed in a worker process
def task(value):
    # get the current process
    process = current_process()
    # report a message
    print(f'Worker is {process.name} with {value}', flush=True)
    # block for a moment
    sleep(1)

Next, in the main process we can first create and configure a process pool.

We will use the context manager interface to create the process pool and configure the pool with two child worker processes and limit each worker to three tasks.

...
# create and configure the process pool
with Pool(2, maxtasksperchild=3) as pool:
	# ...

Next, we will issue ten tasks to the process pool. The tasks will be issued asynchronously via the apply_async() to execute our task() function and provide a unique integer as an argument to the task function for each task, e.g. from 0 to 9.

...
# issue tasks to the process pool
for i in range(10):
    pool.apply_async(task, args=(i,))

Finally, we will explicitly close the process pool and wait for all tasks to complete.

...
# close the process pool
pool.close()
# wait for all tasks to complete
pool.join()

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of limiting the number of tasks per child in the process pool
from time import sleep
from multiprocessing.pool import Pool
from multiprocessing import current_process

# task executed in a worker process
def task(value):
    # get the current process
    process = current_process()
    # report a message
    print(f'Worker is {process.name} with {value}', flush=True)
    # block for a moment
    sleep(1)

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool(2, maxtasksperchild=3) as pool:
        # issue tasks to the process pool
        for i in range(10):
            pool.apply_async(task, args=(i,))
        # close the process pool
        pool.close()
        # wait for all tasks to complete
        pool.join()

Running the example first creates the process pool.

Ten tasks are then issued to the pool, then the main process closes the pool and waits for all issued tasks to complete.

Two worker child processes are created in the pool to execute tasks, SpawnPoolWorker-1 and SpawnPoolWorker-2.

Each worker then executes three tasks each, then terminates.

Two new child worker processes are then created and started, SpawnPoolWorker-3 and SpawnPoolWorker-4. They proceed to execute two tasks each before the process pool is closed.

This highlights how we can automatically replace worker processes after a fixed number of tasks executed in the process pool.

Worker is SpawnPoolWorker-1 with 0
Worker is SpawnPoolWorker-2 with 1
Worker is SpawnPoolWorker-1 with 2
Worker is SpawnPoolWorker-2 with 3
Worker is SpawnPoolWorker-2 with 4
Worker is SpawnPoolWorker-1 with 5
Worker is SpawnPoolWorker-3 with 6
Worker is SpawnPoolWorker-4 with 7
Worker is SpawnPoolWorker-4 with 8
Worker is SpawnPoolWorker-3 with 9

Takeaways

You now know how to limit the maximum number of tasks per child worker process in the process pool.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.