You can set a timeout when waiting for the ThreadPoolExecutor.
This involves setting a “timeout” argument when processing task results via the ThreadPoolExecutor.map() method, Future.result() method, and Future.exception() method, as well as when waiting for tasks to complete via the concurrent.futures.wait() function and concurrent.futures.as_completed() function.
In this tutorial, you will discover how to use a timeout with the ThreadPoolExecutor.
Let’s get started.
Need ThreadPoolExecutor Timeouts
The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.
Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.
A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.
For example:
1 2 3 4 5 6 7 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # issue a task future = tpe.submit(task) # the task result once the task is done result = future.result() |
You can learn more about the ThreadPoolExecutor in the tutorial:
When using the thread pool to execute tasks, our program may have to wait.
That is, different methods and functions on the ThreadPoolExecutor may be blocking, such as waiting for a task to complete or waiting for a task result.
It is good practice to set a time limit or timeout when waiting or blocking in a multithreaded program. This ensures that the program does not wait forever in an error case.
How can we configure the ThreadPoolExecutor to wait using a timeout?
Run loops using all CPUs, download your FREE book to learn how.
How to Use Timeouts with the ThreadPoolExecutor
The ThreadPoolExecutor supports timeouts in five places, they are:
- ThreadPoolExecutor.map()
- Future.result()
- Future.exception()
- concurrent.futures.wait()
- concurrent.futures.as_completed()
Let’s take a closer look at each in turn.
ThreadPoolExecutor.map() Timeout
We can set a timeout while processing results from a batch of tasks.
The map() method allows one function to be called many times, each time with different arguments.
A “timeout” argument can be specified which will cause the map() method to raise a TimeoutError exception if a result is not available within a given number of seconds.
The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Executor.map(). timeout can be an int or a float. If timeout is not specified or None, there is no limit to the wait time.
— concurrent.futures — Launching parallel tasks
For example:
1 2 3 4 5 6 7 |
... try: # issue tasks and process results for result in tpe.map(task, range(5), timeout=1): print(result) except TimeoutError: print('Time out waiting for map()') |
You can learn more about the map() method in the tutorial:
Future.result() Timeout
We can set a timeout while waiting for a result for a single task.
This can be achieved by setting the “timeout” argument on the result() method to the Future object.
If the result() method does not return within the given number of seconds, a TimeoutError exception is raised.
If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
— concurrent.futures — Launching parallel tasks
For example:
1 2 3 4 5 6 7 8 |
... try: # wait for result with timeout result = future.result(timeout=1) # report the result print(result) except TimeoutError: print('Time out waiting for result()') |
You can learn more about waiting for a task result in the tutorial:
Future.exception() Timeout
We can set a timeout while waiting for an exception for a single task.
This can be achieved by setting the “timeout” argument on the exception() method to the Future object.
If the exception() method does not return within the given number of seconds, a TimeoutError exception is raised.
If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
— concurrent.futures — Launching parallel tasks
For example:
1 2 3 4 5 6 7 8 |
... try: # wait for exception with timeout exp = future.exception(timeout=1) # report the exception print(exp) except TimeoutError: print('Time out waiting for exception()') |
You can learn more about task exception handling in the tutorial:
concurrent.futures.wait() Timeout
We can set a timeout while waiting for a collection of tasks to complete.
This can be achieved via the wait() module function that takes a “timeout” argument. If the specified conditions are not met within the given number of seconds, the function returns with an empty set of matching Future objects.
timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
— concurrent.futures — Launching parallel tasks
For example:
1 2 3 |
... # wait for all tasks to complete done, notdone = wait(futures, timeout=1) |
concurrent.futures.as_completed() Timeout
We can set a timeout while processing tasks in the order they are completed.
This can be achieved by setting a “timeout” argument to the as_completed() module function.
If one of the many tasks does not complete within the given timeout, then a TimeoutError exception will be raised.
The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to as_completed(). timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
— concurrent.futures — Launching parallel tasks
For example:
1 2 3 4 5 6 7 8 |
... try: # process results as tasks are completed for future in as_completed(futures, timeout=1): # report result print(future.result()) except TimeoutError: print('Time out waiting for as_completed()') |
You can learn more about the as_completed() function in the tutorial:
Now that we know how to use timeouts with the ThreadPoolExecutor, let’s look at some worked examples.
Example of ThreadPoolExecutor.map() Timeout
We can explore an example of setting a timeout on the ThreadPoolExecutor map() method.
In this example, we will define a task that takes an argument, blocks for two seconds, and returns a value. We will then issue many of these tasks to the thread pool using map() and set a timeout of one second.
The expectation is that no results will be available within the timeout and the map() method will fail with a TimeoutError exception.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of a ThreadPoolExecutor.map() timeout from concurrent.futures import ThreadPoolExecutor from concurrent.futures import TimeoutError from time import sleep # function run in the thread pool def task(arg): # block a moment sleep(2) # return a value return arg * 10 # create the thread pool with ThreadPoolExecutor() as tpe: try: # issue tasks and process results for result in tpe.map(task, range(5), timeout=1): print(result) except TimeoutError: print('Time out waiting for map()') |
Running the example first creates the ThreadPoolExecutor using the context manager interface with a default number of workers.
The five tasks are then issued to the pool using the map() method, and the main thread blocks, waiting for the tasks to complete.
The tasks begin executing.
The timeout elapses and the map() method raises a TimeoutError exception, which is caught and a message is reported.
This highlights how we can use the ThreadPoolExecutor map() method with a timeout.
1 |
Time out waiting for map() |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Future.result() Timeout
We can explore an example of setting a timeout on the Future result() method.
In this example, we will define a task that takes an argument, blocks for two seconds, and returns a value. We will then issue one of these tasks to the thread pool and call the result() method on the Future object with a timeout, and wait one second for the result.
The expectation is that the result will not be available within the timeout and a TimeoutError exception will be raised.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of a Future.result() timeout from concurrent.futures import ThreadPoolExecutor from concurrent.futures import TimeoutError from time import sleep # function run in the thread pool def task(arg): # block a moment sleep(2) # return a value return arg * 10 # create the thread pool with ThreadPoolExecutor() as tpe: # issue a task future = tpe.submit(task, 0) try: # wait for result with timeout result = future.result(timeout=1) # report the result print(result) except TimeoutError: print('Time out waiting for result()') |
Running the example first creates the ThreadPoolExecutor using the context manager interface with a default number of workers.
The single task is then issued to the pool using the submit() method and a Future object is returned. The main thread then calls the result() method with a timeout, and blocks waiting for the task return value.
The tasks begin executing.
The timeout elapses and the result() method raises a TimeoutError exception, which is caught and a message is reported.
This highlights how we can use the Future result() method with a timeout.
1 |
Time out waiting for result() |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Future.exception() Timeout
We can explore an example of setting a timeout on the Future exception() method.
In this example, we will define a task that takes an argument, blocks for two seconds, and then raises an exception. We will then issue one of these tasks to the thread pool and call the exception() method on the Future object with a timeout, and wait one second for the exception.
The expectation is that the exception will not be available within the timeout and a TimeoutError exception will be raised.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of a Future.exception() timeout from concurrent.futures import ThreadPoolExecutor from concurrent.futures import TimeoutError from time import sleep # function run in the thread pool def task(arg): # block a moment sleep(2) # raise an exception raise Exception('Something bad happened') # return a value return arg * 10 # create the thread pool with ThreadPoolExecutor() as tpe: # issue a task future = tpe.submit(task, 0) try: # wait for exception with timeout exp = future.exception(timeout=1) # report the exception print(exp) except TimeoutError: print('Time out waiting for exception()') |
Running the example first creates the ThreadPoolExecutor using the context manager interface with a default number of workers.
The single task is then issued to the pool using the submit() method and a Future object is returned. The main thread then calls the exception() method with a timeout, and blocks waiting for the task exception.
The tasks begin executing.
The timeout elapses and the exception() method raises a TimeoutError exception, which is caught and a message is reported.
This highlights how we can use the Future exception() method with a timeout.
1 |
Time out waiting for exception() |
Example of concurrent.futures.wait() Timeout
We can explore setting a timeout when waiting for a collection of Future objects via the wait() function.
In this example, we will define a task that takes an argument, blocks for two seconds, and returns a value. We will then issue a number of these tasks to the thread pool via the submit() method and collect the Future methods. Finally, we will wait for all tasks to complete with a timeout via the wait() function.
The expectation is that no tasks will complete within the timeout and an empty set of completed tasks will be reported.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of a concurrent.futures.wait() timeout from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait from time import sleep # function run in the thread pool def task(arg): # block a moment sleep(2) # return a value return arg * 10 # create the thread pool with ThreadPoolExecutor() as tpe: # issue many tasks futures = [tpe.submit(task, i) for i in range(5)] # wait for all tasks to complete done, notdone = wait(futures, timeout=1) # report the number of tasks done print(f'Tasks done: {len(done)}') |
Running the example first creates the ThreadPoolExecutor using the context manager interface with a default number of workers.
Five tasks are then issued to the thread pool and the associated Future objects are collected.
The main thread then calls the wait() function with a timeout and blocks waiting for all of the tasks to complete.
The tasks run and begin sleeping.
The timeout elapses and the wait() function returns an empty set of completed tasks.
The main thread reports the empty set of completed tasks.
This highlights how we can set a timeout on the wait() function when waiting for tasks to complete.
1 |
Tasks done: 0 |
Example of concurrent.futures.as_completed() Timeout
We can explore setting a timeout when processing Future objects in the order their tasks are complete via the as_completed() function.
In this example, we will define a task that takes an argument, blocks for two seconds, and returns a value. We will then issue a number of these tasks to the thread pool via the submit() method and collect the Future methods. Finally, we will attempt to process Future objects in the order their tasks are completed with a timeout.
The expectation is that the timeout will elapse before any tasks are completed and a TimeoutError exception will be raised.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of a concurrent.futures.as_completed() timeout from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from concurrent.futures import TimeoutError from time import sleep # function run in the thread pool def task(arg): # block a moment sleep(2) # return a value return arg * 10 # create the thread pool with ThreadPoolExecutor() as tpe: # issue many tasks futures = [tpe.submit(task, i) for i in range(5)] # handle timeout try: # process results as tasks are completed for future in as_completed(futures, timeout=1): # report result print(future.result()) except TimeoutError: print('Time out waiting for as_completed()') |
Running the example first creates the ThreadPoolExecutor using the context manager interface with a default number of workers.
Five tasks are then issued to the thread pool and the associated Future objects are collected.
The main thread then calls the as_completed() function with a timeout and blocks waiting for the first task to complete
The tasks run and begin sleeping.
The timeout elapses and the as_completed() function raises a TimeoutError exception that is caught and a message is reported.
This highlights how we can set a timeout on the as_completed() function when processing Future objects in the order their tasks are completed.
1 |
Time out waiting for as_completed() |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to use a timeout with the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Brandon Erlinger-Ford on Unsplash
Marina says
What’s the realistic use case for this timeout. Although the methods can raise the timeout, the workers will continue the execution. This module doesn’t really offer any option to timeout properly and shut down the long running task, which would be a more realistic scenario.