Last Updated on September 12, 2022
You can limit the number of pending tasks in the ThreadPoolExecutor by using a Semaphore.
In this tutorial, you will discover how to limit the number of pending tasks in the Python thread pool.
Let’s get started.
Need to Limit the Number of Waiting Tasks
The ThreadPoolExecutor provides a flexible way to execute ad hoc tasks using a pool of worker threads.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
We can limit the total number of active threads in the thread pool by setting the max_workers argument when constructing the ThreadPoolExecutor.
Nevertheless, it is possible to continually add tasks to the thread pool, which are queued up or scheduled for execution once worker threads become available.
This can be a problem if each task holds a modest amount of memory and thousands or hundreds of thousands of tasks are queued up. There is a need to bound or limit the number of waiting tasks.
How can we limit the total number of queued or waiting tasks in the ThreadPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How to Limit the Number of Waiting Tasks
The number of waiting tasks in the ThreadPoolExecutor can be limited by using a threading.Semaphore.
You may recall that a Semaphore is a synchronization primitive that allows a fixed number of resources to be acquired before blocking until a resource becomes available.
This is achieved by setting the number of resources when constructing the Semaphore, then calling the acquire() function when a resource is required and release() once the resource is finished with.
For example:
1 2 3 4 5 6 7 8 |
... # create a semaphore semaphore = Semaphore(10) # acquire a resource semaphore.acquire() # do something... # release the resource semaphore.release() |
You can learn more about the Python Semaphore class here:
We can use a semaphore to limit the total number of tasks submitted to the ThreadPoolExecutor.
This can be achieved by defining a proxy submit() function that we call to execute tasks instead of the submit() function on the ThreadPoolExecutor.
This proxy function would be responsible for acquiring a resource from the semaphore, submitting the task on the executor, then adding a done callback function that will release the semaphore once the task is done.
Let’s look at each element in turn.
Firstly, a Semaphore object must be created that specifies the total number of tasks that can be submitted to the ThreadPoolExecutor. This includes the total number of tasks that may be queued and those that may be executing.
1 2 3 |
... # semaphore to limit the queue size to the pool semaphore = Semaphore(100) |
The proxy submit() function would take the name of the target task function and any arguments to that function, just like the submit() function on the Executor class (the parent class of the ThreadPoolExecutor).
1 2 3 |
# proxy for submitting tasks that imposes a limit on the queue size def task_proxy(function, *args, **kwargs): # ... |
Next, we would acquire the resource via the semaphore.
1 2 3 |
... # acquire the semaphore, blocks if occupied semaphore.acquire() |
This will return immediately if a resource is available; otherwise, it will block until a resource becomes available via another task finishing and calling release().
We can then submit the task as per normal on the ThreadPoolExecutor by calling the actual submit() function.
1 2 3 |
... # submit the task normally future = executor.submit(function, *args, **kwargs) |
Finally, we would add a done callback function to the task’s Future object.
This can be achieved by first defining a callback function that takes a Future object and will release the semaphore.
1 2 3 4 5 |
# callback for completed tasks def task_complete_callback(future): global semaphore # release the semaphore semaphore.release() |
This function can then be registered on the Future object by calling the add_done_callback() function and passing the name of the function.
1 2 3 |
... # add the custom done callback future.add_done_callback(task_complete_callback) |
And that’s it.
The function must return the Future object so that the caller can collect Future objects if needed.
The complete proxy task function is listed below.
1 2 3 4 5 6 7 8 9 10 |
# proxy for submitting tasks that imposes a limit on the queue size def submit_proxy(function, *args, **kwargs): global semaphore, executor # acquire the semaphore, blocks if occupied semaphore.acquire() # submit the task normally future = executor.submit(function, *args, **kwargs) # add the custom done callback future.add_done_callback(task_complete_callback) return future |
This function will limit the total number of tasks that can be submitted to the ThreadPoolExecutor.
A downside of this approach is that both the submit_proxy() function and the task_complete_callback() callback function require access to global state, such as the semaphore in both functions and the ThreadPoolExecutor in the proxy function.
This could be avoided if we wrapped this functionality into a class instead of standalone functions.
Now that we know how to limit the number of tasks in the ThreadPoolExecutor, let’s look at a worked example.
Example of Limiting the Number of Waiting Tasks
Let’s explore an example of bounding the total number of tasks in the ThreadPoolExecutor.
First, let’s define a mock task that takes a unique identifier, like an integer, sleeps for a fraction of a second, and reports a message that the task is done.
1 2 3 4 5 |
# mock task that sleeps for a moment def work(identifier): sleep(random()) print(f'Done: {identifier}') return identifier |
We can then define the done callback function for releasing the semaphore when the task is completed, prepared in the previous section.
1 2 3 4 5 |
# callback for completed tasks def task_complete_callback(future): global semaphore # release the semaphore semaphore.release() |
The proxy submit function will acquire the semaphore, then submit the task to the thread pool, and finally will add the done callback function.
1 2 3 4 5 6 7 8 9 10 |
# proxy for submitting tasks that imposes a limit on the queue size def submit_proxy(function, *args, **kwargs): global semaphore, executor # acquire the semaphore, blocks if occupied semaphore.acquire() # submit the task normally future = executor.submit(function, *args, **kwargs) # add the custom done callback future.add_done_callback(task_complete_callback) return future |
All that is left is to submit tasks via the proxy.
First, let’s define the number of worker threads in the thread pool and the total number of tasks in the thread pool.
We will use two workers and a total of ten tasks (e.g. 2 executing and 8 weighting).
1 2 3 4 5 |
... # number of works in the pool n_workers = 2 # max number of queued tasks n_queue = 10 |
We can then define the Semaphore used to bound the total number of tasks in the ThreadPoolExecutor.
1 2 3 |
... # semaphore to limit the queue size to the pool semaphore = Semaphore(n_queue) |
Finally, we can create the thread pool and submit 50 tasks using the proxy submit function submit_proxy() specifying the name of our target task function and a unique integer argument.
1 2 3 4 5 6 7 |
... # create the thread pool with ThreadPoolExecutor(n_workers) as executor: # submit many tasks futures = [submit_proxy(work, i) for i in range(50)] # wait for all tasks to complete print('All tasks are submitted, waiting...') |
Tying this together, the complete example of bounding the total number of tasks in the ThreadPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# SuperFastPython.com # example of bounding the number of tasks submitted to the thread pool from time import sleep from random import random from threading import Semaphore from concurrent.futures import ThreadPoolExecutor # mock task that sleeps for a moment def work(identifier): sleep(random()) print(f'Done: {identifier}') return identifier # callback for completed tasks def task_complete_callback(future): global semaphore # release the semaphore semaphore.release() # proxy for submitting tasks that imposes a limit on the queue size def submit_proxy(function, *args, **kwargs): global semaphore, executor # acquire the semaphore, blocks if occupied semaphore.acquire() # submit the task normally future = executor.submit(function, *args, **kwargs) # add the custom done callback future.add_done_callback(task_complete_callback) return future # number of works in the pool n_workers = 2 # max number of queued tasks n_queue = 10 # semaphore to limit the queue size to the pool semaphore = Semaphore(n_queue) # create the thread pool with ThreadPoolExecutor(n_workers) as executor: # submit many tasks futures = [submit_proxy(work, i) for i in range(50)] # wait for all tasks to complete print('All tasks are submitted, waiting...') |
Running the example first creates the semaphore and then the thread pool and submits all tasks.
The main thread is blocked after ten tasks are submitted and slowly adds new tasks as the initial tasks are completed and releases resources in the semaphore.
Eventually, all 50 tasks are submitted by the main thread and a message “All tasks are submitted, waiting…” is reported with 10 tasks remaining. This highlights that indeed only 10 tasks could be pushed into the thread pool at a time.
Finally, the last ten tasks complete, the semaphore is returned to fully available, and the thread pool is closed, releasing the workers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
Done: 1 Done: 2 Done: 0 Done: 4 Done: 3 Done: 5 Done: 6 Done: 7 Done: 8 Done: 9 Done: 10 Done: 11 Done: 13 Done: 12 Done: 15 Done: 14 Done: 16 Done: 17 Done: 18 Done: 19 Done: 20 Done: 21 Done: 23 Done: 22 Done: 25 Done: 24 Done: 26 Done: 28 Done: 27 Done: 30 Done: 29 Done: 32 Done: 33 Done: 31 Done: 34 Done: 35 Done: 36 Done: 38 Done: 37 Done: 39 All tasks are submitted, waiting... Done: 41 Done: 40 Done: 42 Done: 43 Done: 45 Done: 44 Done: 46 Done: 48 Done: 49 Done: 47 |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to limit the number of waiting tasks in the ThreadPoolExecutor
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Simon Connellan on Unsplash
Evgenii says
Great! Thanks for this article! Maybe, I want to see another print statement before “sleep” invocation)
Jason Brownlee says
Thanks!