Last Updated on September 12, 2022
The ThreadPoolExecutor Python API provides a short, clever, and dense example of how to use the class that may be confusing to beginners.
In this tutorial, you will discover how the ThreadPoolExecutor API example works line by line.
Let’s get started.
ThreadPoolExecutor API Example
The API for the ThreadPoolExecutor in Python provides a short example of how to use the class.
Here is the direct link:
Below is a copy of the example in its entirety.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) |
Running the example reports URLs and the number of bytes for each or an error message if the page could not be accessed.
1 2 3 4 5 |
'http://www.foxnews.com/' page is 282256 bytes 'http://www.cnn.com/' page is 1122228 bytes 'http://some-made-up-domain.com/' generated an exception: HTTP Error 400: Bad Request 'http://europe.wsj.com/' generated an exception: HTTP Error 403: Forbidden 'http://www.bbc.co.uk/' page is 364082 bytes |
It is a short and sweet example of how to use the ThreadPoolExecutor class for an IO-bound task.
Nevertheless, if you are new to Python or new to the ThreadPoolExecutor class, it can be confusing.
What is happening in this example?
Run loops using all CPUs, download your FREE book to learn how.
Explanation of the ThreadPoolExecutor API Example
The ThreadPoolExecutor API example is divided into three parts.
The first is the data defined in the URLs constant.
It lists five URLs, three of which can be accessed and two of which cannot.
1 2 3 4 5 6 |
... URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] |
The second is the target task function named load_url().
This function takes a URL and a timeout and attempts to open an HTTP connection to the remote server and download the contents of the file, which is then returned.
The connection is managed via the context manager, which will automatically call the close() function on the HTTP connection once the block is exited via an exception or a return statement.
An exception can be thrown for many reasons, such as if the URL is malformed or if a connection to the server cannot be established.
1 2 3 4 |
# Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() |
So far so good.
The third part of the example is where you may experience confusion.
First, a thread pool is created with worker threads.
This is achieved using the context manager for the thread pool, which will automatically call the shutdown() function on the thread pool to release the worker threads once the block is exited.
1 2 3 4 |
... # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # ... |
The next line is perhaps the most confusing.
It calls the submit() function on the thread pool to add one task for each URL in the URLs global constant defined previously. Each task calls the load_url() function defined previously and passes two arguments: one URL and a timeout.
1 2 |
... executor.submit(load_url, url, 60) |
The submit() function on the ThreadPoolExecutor returns a Future object that provides a handle on the asynchronous task, such as checking its status and getting the result once the task has completed.
Each URL is retrieved from the URLs global constant using a dictionary comprehension.
More specifically, the like creates one dictionary entry for each task using the Future object as the key in the dictionary and the URL as the value in the dictionary.
Recall that we have one unique Future object returned for each call to the submit() function for each load_url() task.
The dictionary provides a way to look up data for each asynchronous task via its Future object, in this case, the URL that was dispatched to the task. We will need to use this mapping in the next part of the example.
1 2 3 |
... # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} |
All of the tasks are submitted, providing a dictionary of Future objects to URL strings called future_to_url.
The final part of the example then enumerates Future objects in the dictionary using a call to the as_completed() module function.
1 2 3 |
... for future in concurrent.futures.as_completed(future_to_url): # ... |
The as_completed() function takes a collection of Future objects created by calling submit() on the thread pool and will return the same Future objects in the order that the tasks associated with each Future object complete.
This is different from the order in which the Future may have been submitted and different to the order in which the Future objects may exist in the collection.
Processing Future objects in the order in which they are completed means that the program can be more responsive to the asynchronously executed tasks, in this case URLs that are being downloaded.
For each Future object returned in the iteration, the URL associated with it is retrieved from the future_to_url dictionary.
1 2 |
... url = future_to_url[future] |
This explains why a dictionary of Future objects to URLs was created, so that data associated with each asynchronous task can be retrieved easily later, in this case, the URL that was downloaded.
Next, the result is retrieved from the Future object, which is the return value of the load_url() function defined previously, which is the content of that URL downloaded.
If the content of the URL is downloaded without incident, the length of the downloaded content is reported.
1 2 |
... print('%r page is %d bytes' % (url, len(data))) |
If the load_url() function raised an exception while downloading the content, e.g. because a connection could not be made to the server, then the exception is caught by the ThreadPoolExecutor and is re-raised automatically when calling the result() on the Future object when retrieving the result.
This explains why we have a try-except block around the call to get the result from the Future.
If an exception is raised, it is caught and a failure message is reported.
1 2 |
... print('%r generated an exception: %s' % (url, exc)) |
Now that we understand each line in the ThreadPoolExecutor API example, let’s consider how we might rewrite it.
Rewrite of the ThreadPoolExecutor API Example
The ThreadPoolExecutor API example is a good illustration of how to use thread pools in Python.
This is because of the following reasons:
- It is short.
- The target task function is IO-bound (e.g. downloading URLs).
- It uses the context manager to automatically shut down the thread pool.
- It explicitly specifies the number of worker threads.
- It shows how to handle exceptions raised by target task functions.
- It shows how to use as_completed() that allows programs to be responsive.
Perhaps the most confusing part of the example is the use of a dictionary comprehension to map Future objects to URLs.
We now understand why this was done. Specifically, so that the program can access data associated with each dispatched task.
If the program did not create this mapping, we would have no way to associate each blob of downloaded content with each URL that was downloaded.
This raises the question of how else we could solve this problem.
One idea is to have the load_url() function return a tuple of both the URL that was downloaded and the data that was downloaded.
For example, the updated function would look as follows:
1 2 3 4 |
# Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return (url, conn.read()) |
Retrieving the tuple from the call to result() function on the Future would look as follows:
1 2 |
... url, data = future.result() |
The submission of tasks to the thread pool could then be simplified from a dictionary comprehension to a list comprehension, containing only the Future objects returned from each call to submit().
1 2 3 |
... # Start the load operations and mark each future with its URL futures = [executor.submit(load_url, url, 60) for url in URLS] |
The downside is that when an exception is raised, we have no way to determine which URL raised the exception.
1 2 |
... print('a url generated an exception: %s' % (exc)) |
In addition to less functionality, this is perhaps a good starting point because now the load_url() function is doing two things: downloading and returning a remote file and the URL that was downloaded, which we might expect the caller to already know as it was given to load_url() as an argument.
What if we didn’t need the program to be responsive to the asynchronous tasks as they were completed?
If so, we could process the Future objects in the order that they were submitted. The index of the current Future object would match the index of URLs in the URLs global constant and we would have everything we need for the successful and exception URL download cases.
For example, the Future objects created by calling submit() can be collected using a list comprehension as we did previously.
1 2 3 |
... # Start the load operations and mark each future with its URL futures = [executor.submit(load_url, url, 60) for url in URLS] |
We can then enumerate the list of Future objects in the order they were submitted and get a list index for each item using the enumerate() function. This allows us to retrieve the URL for the task from the URLs global constant.
1 2 3 4 5 6 7 8 9 |
... for i, future in enumerate(futures): url = URLS[i] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) |
The downside of this approach is that the program is slightly less responsive, forced to report results in the order that tasks were submitted to the thread pool, which in turn was defined by the order or URLs in the URLs global constant.
Do you have more ideas on how the ThreadPoolExecutor API example could be rewritten?
Let me know in the comments below. I’d love to see what you come up with.
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how the ThreadPoolExecutor API example works.
Do you have any questions about the ThreadPoolExecutor API example?
Ask your question in the comments below and I will do my best to answer.
Do you have any questions?