Last Updated on September 12, 2022
The ThreadPoolExecutor is a flexible and powerful thread pool for executing ad hoc tasks in an asynchronous manner.
In this tutorial, you will discover a ThreadPoolExecutor example that you can use as a template for your own project.
Let’s get started.
ThreadPoolExecutor Example
Perhaps the most common use case for the ThreadPoolExecutor is to download files from the internet concurrently.
It’s a useful problem because there are many ways we can download files. We will use this problem as the basis to explore the different patterns with the ThreadPoolExecutor for downloading files concurrently.
This example is divided into three parts; they are:
- Download Files Serially
- Download Files Concurrently With submit()
- Download Files Concurrently With submit() and as_completed()
First, let’s develop a serial (non-concurrent) version of the program.
Run loops using all CPUs, download your FREE book to learn how.
Download Files Serially
Consider the situation where we might want to have a local copy of some of the Python API documentation on concurrency for later review.
Perhaps we are taking a flight and won’t have internet access and will need to refer to the documentation in HTML format as it appears on the docs.python.org website. It’s a contrived scenario. Python is installed with docs and we also have the pydoc command, but go with me here.
We may want to download local copies of the following ten URLs that cover the extent of the Python concurrency APIs.
- https://docs.python.org/3/library/concurrency.html
- https://docs.python.org/3/library/concurrent.html
- https://docs.python.org/3/library/concurrent.futures.html
- https://docs.python.org/3/library/threading.html
- https://docs.python.org/3/library/multiprocessing.html
- https://docs.python.org/3/library/multiprocessing.shared_memory.html
- https://docs.python.org/3/library/subprocess.html
- https://docs.python.org/3/library/queue.html
- https://docs.python.org/3/library/sched.html
- https://docs.python.org/3/library/contextvars.html
We can define these URLs as a list of strings for processing in our program.
1 2 3 4 5 6 7 8 9 10 11 |
# python concurrency API docs URLS = ['https://docs.python.org/3/library/concurrency.html', 'https://docs.python.org/3/library/concurrent.html', 'https://docs.python.org/3/library/concurrent.futures.html', 'https://docs.python.org/3/library/threading.html', 'https://docs.python.org/3/library/multiprocessing.html', 'https://docs.python.org/3/library/multiprocessing.shared_memory.html', 'https://docs.python.org/3/library/subprocess.html', 'https://docs.python.org/3/library/queue.html', 'https://docs.python.org/3/library/sched.html', 'https://docs.python.org/3/library/contextvars.html'] |
URLs are reasonably easy to download in Python.
First, we can attempt to open a connection to the server using the urlopen() function in urllib.request module and specify the URL and a reasonable timeout in seconds.
This will give a connection, which we can then call the read() function to read the contents of the file. Using the context manager for the connection will ensure it will be closed automatically, even if an exception is thrown.
The download_url() function below implements this, taking a URL as a parameter and returning the contents of the file, or None if the file cannot be downloaded for whatever reason. We will set a lengthy timeout of 3 seconds in case our internet connection is flaky for some reason.
1 2 3 4 5 6 7 8 9 10 |
# download a url and return the raw data, or None on error def download_url(url): try: # open a connection to the server with urlopen(url, timeout=3) as connection: # read the contents of the html doc return connection.read() except: # bad url, socket timeout, http forbidden, etc. return None |
Once we have the data for a URL, we can save it as a local file.
First, we need to retrieve the filename of the file specified in the URL. There are a few ways to do this, but the basename() function from the os.path module is a common approach when working with paths. We can then use the join() function on the same module to construct an output path for saving the file, using a directory we specify and the filename.
We can then use the open() built-in function to open the file in write-binary mode and save the contents of the file, again using the context manager to ensure the file is closed once we are finished.
The save_file() function below implements this, taking the URL that was downloaded, the contents of the file that was downloaded, and the local output path where we wish to save downloaded files. It returns the output path that was used to save the file, in case we want to report progress to the user.
1 2 3 4 5 6 7 8 9 10 |
# save data to a local file def save_file(url, data, path): # get the name of the file from the url filename = basename(url) # construct a local path for saving the file outpath = join(path, filename) # save to file with open(outpath, 'wb') as file: file.write(data) return outpath |
Next, we can call the download_url() function for a given URL in our list, then save_file() to save each downloaded file.
The download_and_save() function below implements this, reporting progress along the way, and handling the case of URLs that cannot be downloaded.
1 2 3 4 5 6 7 8 9 10 11 12 |
# download and save a url as a local file def download_and_save(url, path): # download the url data = download_url(url) # check for no data if data is None: print(f'>Error downloading {url}') return # save the data to a local file outpath = save_file(url, data, path) # report progress print(f'>Saved {url} to {outpath}') |
Finally, we need a function to drive the process.
First, the local output location where we will be saving files needs to be created, if it does not exist. We can achieve this using the makedirs() function in the os module.
We can iterate over a list of URLs and call our download_and_save() function for each.
The download_docs() function below implements this.
1 2 3 4 5 6 7 |
# download a list of URLs to local files def download_docs(urls, path): # create the local directory, if needed makedirs(path, exist_ok=True) # download each url and save as a local file for url in urls: download_and_save(url, path) |
And that’s it.
We can then call our download_docs() with our list of URLs and an output directory. In this case, we will use a 'docs/' subdirectory of our current working directory (where the Python script is located) as the output directory.
Tying this together, the complete example of downloading files serially is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# SuperFastPython.com # download document files and save to local files serially from os import makedirs from os.path import basename from os.path import join from urllib.request import urlopen # download a url and return the raw data, or None on error def download_url(url): try: # open a connection to the server with urlopen(url, timeout=3) as connection: # read the contents of the html doc return connection.read() except: # bad url, socket timeout, http forbidden, etc. return None # save data to a local file def save_file(url, data, path): # get the name of the file from the url filename = basename(url) # construct a local path for saving the file outpath = join(path, filename) # save to file with open(outpath, 'wb') as file: file.write(data) return outpath # download and save a url as a local file def download_and_save(url, path): # download the url data = download_url(url) # check for no data if data is None: print(f'>Error downloading {url}') return # save the data to a local file outpath = save_file(url, data, path) # report progress print(f'>Saved {url} to {outpath}') # download a list of URLs to local files def download_docs(urls, path): # create the local directory, if needed makedirs(path, exist_ok=True) # download each url and save as a local file for url in urls: download_and_save(url, path) # python concurrency API docs URLS = ['https://docs.python.org/3/library/concurrency.html', 'https://docs.python.org/3/library/concurrent.html', 'https://docs.python.org/3/library/concurrent.futures.html', 'https://docs.python.org/3/library/threading.html', 'https://docs.python.org/3/library/multiprocessing.html', 'https://docs.python.org/3/library/multiprocessing.shared_memory.html', 'https://docs.python.org/3/library/subprocess.html', 'https://docs.python.org/3/library/queue.html', 'https://docs.python.org/3/library/sched.html', 'https://docs.python.org/3/library/contextvars.html'] # local path for saving the files PATH = 'docs' # download all docs download_docs(URLS, PATH) |
Running the example iterates over the list of URLs and downloads each in turn.
Each file is then saved to a local file in the specified directory.
The process takes about 700 milliseconds to about one second (1,000 milliseconds) on my system.
Try running it a few times; how long does it take on your system?
Let me know in the comments.
1 2 3 4 5 6 7 8 9 10 |
>Saved https://docs.python.org/3/library/concurrency.html to docs/concurrency.html >Saved https://docs.python.org/3/library/concurrent.html to docs/concurrent.html >Saved https://docs.python.org/3/library/concurrent.futures.html to docs/concurrent.futures.html >Saved https://docs.python.org/3/library/threading.html to docs/threading.html >Saved https://docs.python.org/3/library/multiprocessing.html to docs/multiprocessing.html >Saved https://docs.python.org/3/library/multiprocessing.shared_memory.html to docs/multiprocessing.shared_memory.html >Saved https://docs.python.org/3/library/subprocess.html to docs/subprocess.html >Saved https://docs.python.org/3/library/queue.html to docs/queue.html >Saved https://docs.python.org/3/library/sched.html to docs/sched.html >Saved https://docs.python.org/3/library/contextvars.html to docs/contextvars.html |
Next, we can look at making the program concurrent using a thread pool.
Download Files Concurrently With submit()
Let’s look at updating our program to make use of the ThreadPoolExecutor to download files concurrently.
A first thought might be to use map() as we just want to make a for-loop concurrent.
Unfortunately, the download_and_save() function that we call each iteration in the loop takes two parameters, only one of which is an iterable.
An alternate approach is to use submit() to call download_and_save() in a separate thread for each URL in the provided list.
We can do this by first configuring a thread pool with the number of threads equal to the number of URLs in the list. We’ll use the context manager for the thread pool so that it will be closed automatically for us when we finish.
We can then call the submit() function for each URL using a list comprehension. We don’t even need the Future objects returned from calling submit(), as there is no result we’re waiting for.
1 2 3 4 5 6 |
... # create the thread pool n_threads = len(urls) with ThreadPoolExecutor(n_threads) as executor: # download each url and save as a local file _ = [executor.submit(download_and_save, url, path) for url in urls] |
Once each thread has completed, the context manager will close the thread pool for us and we’re done.
We don’t even need to add an explicit call to wait, although we could if we wanted to make the code more readable; for example:
1 2 3 4 5 6 7 8 |
... # create the thread pool n_threads = len(urls) with ThreadPoolExecutor(n_threads) as executor: # download each url and save as a local file futures = [executor.submit(download_and_save, url, path) for url in urls] # wait for all download tasks to complete _, _ = wait(futures) |
But adding this wait is not needed because the context manager will call the shutdown() function automatically which will block until all tasks complete.
The updated version of our download_docs() function that downloads and saves the files concurrently is listed below.
1 2 3 4 5 6 7 8 9 |
# download a list of URLs to local files def download_docs(urls, path): # create the local directory, if needed makedirs(path, exist_ok=True) # create the thread pool n_threads = len(urls) with ThreadPoolExecutor(n_threads) as executor: # download each url and save as a local file _ = [executor.submit(download_and_save, url, path) for url in urls] |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# SuperFastPython.com # download document files and save to local files concurrently from os import makedirs from os.path import basename from os.path import join from urllib.request import urlopen from concurrent.futures import ThreadPoolExecutor # download a url and return the raw data, or None on error def download_url(url): try: # open a connection to the server with urlopen(url, timeout=3) as connection: # read the contents of the html doc return connection.read() except: # bad url, socket timeout, http forbidden, etc. return None # save data to a local file def save_file(url, data, path): # get the name of the file from the url filename = basename(url) # construct a local path for saving the file outpath = join(path, filename) # save to file with open(outpath, 'wb') as file: file.write(data) return outpath # download and save a url as a local file def download_and_save(url, path): # download the url data = download_url(url) # check for no data if data is None: print(f'>Error downloading {url}') return # save the data to a local file outpath = save_file(url, data, path) # report progress print(f'>Saved {url} to {outpath}') # download a list of URLs to local files def download_docs(urls, path): # create the local directory, if needed makedirs(path, exist_ok=True) # create the thread pool n_threads = len(urls) with ThreadPoolExecutor(n_threads) as executor: # download each url and save as a local file _ = [executor.submit(download_and_save, url, path) for url in urls] # python concurrency API docs URLS = ['https://docs.python.org/3/library/concurrency.html', 'https://docs.python.org/3/library/concurrent.html', 'https://docs.python.org/3/library/concurrent.futures.html', 'https://docs.python.org/3/library/threading.html', 'https://docs.python.org/3/library/multiprocessing.html', 'https://docs.python.org/3/library/multiprocessing.shared_memory.html', 'https://docs.python.org/3/library/subprocess.html', 'https://docs.python.org/3/library/queue.html', 'https://docs.python.org/3/library/sched.html', 'https://docs.python.org/3/library/contextvars.html'] # local path for saving the files PATH = 'docs' # download all docs download_docs(URLS, PATH) |
Running the example downloads and saves the files as before.
This time, the operation is complete in a fraction of a second. About 300 milliseconds in my case, which is less than half the time it took to download all files serially in the previous example, e.g. about a 2.33x speedup.
How long did it take to download all files on your system?
Let me know in the comments below.
1 2 3 4 5 6 7 8 9 10 |
>Saved https://docs.python.org/3/library/concurrent.html to docs/concurrent.html >Saved https://docs.python.org/3/library/multiprocessing.shared_memory.html to docs/multiprocessing.shared_memory.html >Saved https://docs.python.org/3/library/concurrency.html to docs/concurrency.html >Saved https://docs.python.org/3/library/sched.html to docs/sched.html >Saved https://docs.python.org/3/library/contextvars.html to docs/contextvars.html >Saved https://docs.python.org/3/library/queue.html to docs/queue.html >Saved https://docs.python.org/3/library/concurrent.futures.html to docs/concurrent.futures.html >Saved https://docs.python.org/3/library/threading.html to docs/threading.html >Saved https://docs.python.org/3/library/subprocess.html to docs/subprocess.html >Saved https://docs.python.org/3/library/multiprocessing.html to docs/multiprocessing.html |
This is one approach to making the program concurrent, but let’s look at some alternatives.
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Download Files Concurrently With submit() and as_completed()
Perhaps we want to report the progress of downloads as they are completed.
The thread pool allows us to do this by storing the Future objects returned from calls to submit() and then calling the as_completed() function on the collection of Future objects.
Also, consider that we are doing two things in the task. The first is a download from a remote server, which is an IO-bound operation that we can make concurrently. The second is saving the contents of the file to the local hard drive, which is another IO-bound operation that we cannot make concurrently as most hard drives can only save one file at a time.
Therefore, perhaps a better design is to only make the file downloading part of the program a concurrent task and the file saving part of the program serial.
This will require more changes to the program.
We can call the download_url() function for each URL and this can be our concurrent task submitted to the thread pool.
When we call result() on each Future object, it will give us the data that was downloaded, but we won’t know what URL the data was downloaded from. The Future object won’t know.
Therefore, we can update the download_url() to return both the data that was downloaded and the URL that was provided as an argument.
The updated version of the download_url() function that returns a tuple of data and the input URL is listed below.
1 2 3 4 5 6 7 8 9 10 |
# download a url and return the raw data, or None on error def download_url(url): try: # open a connection to the server with urlopen(url, timeout=3) as connection: # read the contents of the html doc return (connection.read(), url) except: # bad url, socket timeout, http forbidden, etc. return (None, url) |
We can then submit a call to this function to each URL to the thread pool to give us a Future object.
1 2 3 |
... # download each url and save as a local file futures = [executor.submit(download_url, url) for url in urls] |
So far, so good.
Now, we want to save local files and report progress as the files are downloaded.
This requires that we cannibalize the download_and_save() function and move it back into the download_docs() function used to drive the program.
We can iterate over the futures via the as_completed() function that will return Future objects in the order that the downloads are completed, not the order that we dispatched them into the thread pool.
We can then retrieve the data and URL from the Future object.
1 2 3 4 5 |
... # process each result as it is available for future in as_completed(futures): # get the downloaded url data data, url = future.result() |
We can check if the download was unsuccessful and report an error, otherwise save the file and report progress as per normal. A direct copy-paste from the download_and_save() function.
1 2 3 4 5 6 7 8 9 |
... # check for no data if data is None: print(f'>Error downloading {url}') continue # save the data to a local file outpath = save_file(url, data, path) # report progress print(f'>Saved {url} to {outpath}') |
The updated version of our download_docs() function that will only download files concurrently, then save the files serially as the files are downloaded is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# download a list of URLs to local files def download_docs(urls, path): # create the local directory, if needed makedirs(path, exist_ok=True) # create the thread pool n_threads = len(urls) with ThreadPoolExecutor(n_threads) as executor: # download each url and save as a local file futures = [executor.submit(download_url, url) for url in urls] # process each result as it is available for future in as_completed(futures): # get the downloaded url data data, url = future.result() # check for no data if data is None: print(f'>Error downloading {url}') continue # save the data to a local file outpath = save_file(url, data, path) # report progress print(f'>Saved {url} to {outpath}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# SuperFastPython.com # download document files concurrently and save the files locally concurrently from os import makedirs from os.path import basename from os.path import join from urllib.request import urlopen from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # download a url and return the raw data, or None on error def download_url(url): try: # open a connection to the server with urlopen(url, timeout=3) as connection: # read the contents of the html doc return (connection.read(), url) except: # bad url, socket timeout, http forbidden, etc. return (None, url) # save data to a local file def save_file(url, data, path): # get the name of the file from the url filename = basename(url) # construct a local path for saving the file outpath = join(path, filename) # save to file with open(outpath, 'wb') as file: file.write(data) return outpath # download a list of URLs to local files def download_docs(urls, path): # create the local directory, if needed makedirs(path, exist_ok=True) # create the thread pool n_threads = len(urls) with ThreadPoolExecutor(n_threads) as executor: # download each url and save as a local file futures = [executor.submit(download_url, url) for url in urls] # process each result as it is available for future in as_completed(futures): # get the downloaded url data data, url = future.result() # check for no data if data is None: print(f'>Error downloading {url}') continue # save the data to a local file outpath = save_file(url, data, path) # report progress print(f'>Saved {url} to {outpath}') # python concurrency API docs URLS = ['https://docs.python.org/3/library/concurrency.html', 'https://docs.python.org/3/library/concurrent.html', 'https://docs.python.org/3/library/concurrent.futures.html', 'https://docs.python.org/3/library/threading.html', 'https://docs.python.org/3/library/multiprocessing.html', 'https://docs.python.org/3/library/multiprocessing.shared_memory.html', 'https://docs.python.org/3/library/subprocess.html', 'https://docs.python.org/3/library/queue.html', 'https://docs.python.org/3/library/sched.html', 'https://docs.python.org/3/library/contextvars.html'] # local path for saving the files PATH = 'docs' # download all docs download_docs(URLS, PATH) |
Running the program, the files are downloaded and saved as before, perhaps a few milliseconds faster.
Looking at the output of the program, we can see that the order of saved files is different.
Smaller files like “sched.html” that were dispatched nearly last were downloaded sooner (e.g. less bytes to download) and, in turn, were saved to local files sooner.
This confirms that we are indeed processing downloads in their order of task completion and not the order in which the tasks were submitted.
1 2 3 4 5 6 7 8 9 10 |
>Saved https://docs.python.org/3/library/concurrent.html to docs/concurrent.html >Saved https://docs.python.org/3/library/sched.html to docs/sched.html >Saved https://docs.python.org/3/library/concurrency.html to docs/concurrency.html >Saved https://docs.python.org/3/library/contextvars.html to docs/contextvars.html >Saved https://docs.python.org/3/library/queue.html to docs/queue.html >Saved https://docs.python.org/3/library/multiprocessing.shared_memory.html to docs/multiprocessing.shared_memory.html >Saved https://docs.python.org/3/library/threading.html to docs/threading.html >Saved https://docs.python.org/3/library/concurrent.futures.html to docs/concurrent.futures.html >Saved https://docs.python.org/3/library/subprocess.html to docs/subprocess.html >Saved https://docs.python.org/3/library/multiprocessing.html to docs/multiprocessing.html |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to download files concurrently with this ThreadPoolExecutor example.
Do you have any questions about this example?
Ask your question in the comments below and I will do my best to answer.
Photo by Timotheus Fröbel on Unsplash
Do you have any questions?