Last Updated on September 12, 2022
You can limit the number of waiting tasks in the ProcessPoolExecutor by using a Semaphore.
In this tutorial you will discover how to limit the number of waiting tasks in the Python process pool.
Let’s get started.
Need to Limit The Number of Waiting Tasks
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
We can limit the total number of active processes in the process pool by setting the max_worker argument when constructing the ProcessPoolExecutor.
Nevertheless, it is possible to continually add tasks to the process pool, which are queued up or scheduled for execution once worker processes become available.
This can be a problem if each task holds a modest amount of memory and thousands or hundreds of thousands of tasks are queued up. THere is a need to bound or limit the number of waiting tasks.
How can we limit the total number of queued or waiting tasks in the ProcessPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How to Limit The Number of Waiting Tasks
The number of waiting tasks in the ProcessPoolExecutor can be limited by using multiprocessing.Semaphore.
You may recall that a Semaphore is a synchronization primitive that allows a fixed number of resources to be acquired before blocking until a resource becomes available. This is achieved by setting the number of resources when constructing the Semaphore then calling the acquire() function when a resource is required and release() once the resource is finished with.
For example:
1 2 3 4 5 6 7 8 |
... # create a semaphore semaphore = Semaphore(10) # acquire a resource semaphore.acquire() # do something... # release the resource semaphore.release() |
We cannot create Semaphore objects directly and pass them around when working with processes, as we can when working with threads. Instead, we must use a Manager to create the Semaphore object for us.
Managers are used to create and manage shared state between processes, like data and variables.
You can learn more about Semaphores and Managers here:
Managers have to be created and closed which can be achieved seamlessly using the context manager, for example:
1 2 3 4 |
... # create the manager used for sharing state between processes with Manager() as manager: # ... |
Next, we can use the Manager to create the Semaphore:
1 2 3 |
... # create a semaphore to limit the number of pending tasks semaphore = manager.Semaphore() |
We can use a semaphore to limit the total number of tasks submitted to the ProcessPoolExecutor.
This can be achieved by defining a proxy submit() function that we call to execute tasks instead of the submit() function on the ProcessPoolExecutor.
This proxy function would be responsible for acquiring a resource from the semaphore, submitting the task on the executor, then adding a done callback function that will release the Semaphore once the task is done.
Let’s look at each element in turn.
Firstly, a Semaphore object must be created that specifies the total number of tasks that can be submitted to the ProcessPoolExecutor. This includes the total number of tasks that may be queued and those that may be executing.
1 2 3 |
... # semaphore to limit the queue size to the pool semaphore = manager.Semaphore(100) |
The proxy submit() function would take the name of the target task function and any arguments to that function, just like the submit() function on the Executor class (the parent class of the ProcessPoolExecutor).
1 2 3 |
# proxy for submitting tasks that imposes a limit on the queue size def task_proxy(function, *args, **kwargs): # ... |
Next, we would acquire the resource via the Semaphore.
1 2 3 |
... # acquire the semaphore, blocks if occupied semaphore.acquire() |
This will return immediately if a resource is available otherwise it will block until a resource becomes available via another task finishing and calling release().
We can then submit the task as per normal on the ProcessPoolExecutor by calling the actual submit() function.
1 2 3 |
... # submit the task normally future = executor.submit(function, *args, **kwargs) |
Finally, we would add a done callback function to the task’s Future object.
This can be achieved by first defining a callback function that takes a Future object and will release the semaphore.
1 2 3 4 5 |
# callback for completed tasks def task_complete_callback(future): global semaphore # release the semaphore semaphore.release() |
This function can then be registered on the Future object by calling the add_done_callback() function and passing the name of the function.
1 2 3 |
... # add the custom done callback future.add_done_callback(task_complete_callback) |
And that’s it.
The function must return the Future object so that the caller can collect Futures if needed.
The complete proxy task function is listed below.
1 2 3 4 5 6 7 8 9 10 |
# proxy for submitting tasks that imposes a limit on the queue size def submit_proxy(function, *args, **kwargs): global semaphore, executor # acquire the semaphore, blocks if occupied semaphore.acquire() # submit the task normally future = executor.submit(function, *args, **kwargs) # add the custom done callback future.add_done_callback(task_complete_callback) return future |
This function will limit the total number of tasks that can be submitted to the ProcessPoolExecutor.
A downside of this approach is that both the submit_proxy() function and the task_complete_callback() callback function both require access to global state, such as the Semaphore in both functions and the ProcessPoolExecutor in the proxy function.
This could be avoided if we wrapped this functionality into a class instead of standalone functions.
Now that we know how to bound the number of tasks in the ProcessPoolExecutor, let’s look at a worked example.
Example of Limiting The Number of Waiting Tasks
Let’s explore an example of bounding the total number of tasks in the ProcessPoolExecutor.
First, let’s define a task that takes a unique identifier, like an integer, blocks for a fraction of a second and reports a message that the task is done.
1 2 3 4 5 |
# task that sleeps for a moment def work(identifier): sleep(random()) print(f'Done: {identifier}', flush=True) return identifier |
We can then define the done callback function for releasing the Semaphore when the task is completed, prepared in the previous section.
1 2 3 4 |
# callback for completed tasks def task_complete_callback(future): # release the semaphore semaphore.release() |
Then the proxy submit function will acquire the Semaphore before submitting the task to the process pool, and will add the done callback function.
1 2 3 4 5 6 7 8 9 |
# proxy for submitting tasks that imposes a limit on the queue size def submit_proxy(function, *args, **kwargs): # acquire the semaphore, blocks if occupied semaphore.acquire() # submit the task normally future = executor.submit(function, *args, **kwargs) # add the custom done callback future.add_done_callback(task_complete_callback) return future |
All that is left is to submit tasks via the proxy.
First, let’s define the number of worker processes in the process pool and the total number of tasks in the process pool.
We will use two workers and a total of ten tasks (e.g. 2 executing and 8 waiting).
1 2 3 4 5 |
... # number of works in the pool n_workers = 2 # max number of queued tasks n_queue = 10 |
We can then define the Semaphore used to bound the total number of tasks in the ProcessPoolExecutor.
First, we create the Manager object used for creating synchronization primitive objects to be explicitly shared between processes.
1 2 3 4 |
... # create a manager for sharing the semaphore with Manager() as manager: # ... |
We can then use the manager to create the Semaphore object.
1 2 3 |
... # semaphore to limit the queue size to the pool semaphore = manager.Semaphore(n_queue) |
Finally, we can create the process pool and submit 50 tasks using the proxy submit function submit_proxy() specifying the name of our target task function and a unique integer argument.
1 2 3 4 5 6 7 |
... # create the process pool with ProcessPoolExecutor(n_workers) as executor: # submit many tasks futures = [submit_proxy(work, i) for i in range(50)] # wait for all tasks to complete print('All tasks are submitted, waiting...') |
Tying this together, the complete example of bounding the total number of tasks in the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# SuperFastPython.com # example of bounding the number of tasks submitted to the process pool from time import sleep from random import random from multiprocessing import Semaphore from multiprocessing import Manager from concurrent.futures import ProcessPoolExecutor # mock task that sleeps for a moment def work(identifier): sleep(random()) print(f'Done: {identifier}', flush=True) return identifier # callback for completed tasks def task_complete_callback(future): # release the semaphore semaphore.release() # proxy for submitting tasks that imposes a limit on the queue size def submit_proxy(function, *args, **kwargs): # acquire the semaphore, blocks if occupied semaphore.acquire() # submit the task normally future = executor.submit(function, *args, **kwargs) # add the custom done callback future.add_done_callback(task_complete_callback) return future # entry point if __name__ == '__main__': # number of works in the pool n_workers = 2 # max number of queued tasks n_queue = 10 # create a manager for sharing the semaphore with Manager() as manager: # semaphore to limit the queue size to the pool semaphore = manager.Semaphore(n_queue) # create the process pool with ProcessPoolExecutor(n_workers) as executor: # submit many tasks futures = [submit_proxy(work, i) for i in range(50)] # wait for all tasks to complete print('All tasks are submitted, waiting...') |
Running the example first creates the semaphore and then the process pool and submits all tasks.
The main process in the primary process is blocked after ten tasks are submitted and slowly adds new tasks as the initial tasks are completed and releases resources in the semaphore.
Eventually, all 50 tasks are submitted by the main thread and a message “All tasks are submitted, waiting…” is reported with 10 tasks remaining. This highlights that indeed only 10 tasks could be pushed into the process pool at a time.
Finally, the last ten tasks complete, the semaphore is returned to fully available and the process pool is closed releasing the workers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
Done: 1 Done: 2 Done: 0 Done: 4 Done: 3 Done: 5 Done: 6 Done: 7 Done: 8 Done: 9 Done: 10 Done: 11 Done: 13 Done: 12 Done: 15 Done: 14 Done: 16 Done: 17 Done: 18 Done: 19 Done: 20 Done: 21 Done: 23 Done: 22 Done: 25 Done: 24 Done: 26 Done: 28 Done: 27 Done: 30 Done: 29 Done: 32 Done: 33 Done: 31 Done: 34 Done: 35 Done: 36 Done: 38 Done: 37 Done: 39 All tasks are submitted, waiting... Done: 41 Done: 40 Done: 42 Done: 43 Done: 45 Done: 44 Done: 46 Done: 48 Done: 49 Done: 47 |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to limit the number of waiting tasks in the ProcessPoolExecutor
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Elizabeth Camp on Unsplash
Do you have any questions?