You can benchmark tasks executed in the ThreadPoolExecutor.
This can be achieved by benchmarking the overall execution time of the program that executes tasks. We can also benchmark the execution time of individual tasks themselves by manually adding benchmark code to the task functions or using a function decorator and reporting summary statistics for task execution time, such as the minimum, maximum, and mean execution time.
This can help us determine whether different configurations of the thread pool are faster overall and whether changes to the task functions themselves offer a speedup.
In this tutorial, you will discover how to benchmark tasks executed in the ThreadPoolExecutor.
Let’s get started.
Need to Benchmark Tasks in the ThreadPoolExecutor
The ThreadPoolExecutor allows us to execute ad hoc tasks concurrently in a thread pool with a simple API.
Given that it makes use of threads, it is appropriate for tasks that perform blocking I/O such as downloading data or reading and writing files.
We can use the ThreadPoolExecutor by first creating an instance and specifying the number of threads to create in the pool, then calling methods like the submit() method to issue tasks to the pool.
For example:
1 2 3 4 5 |
... # create a thread pool with ThreadPoolExecutor(100) as exe: # issue a task future = exe.submit(task, data) |
You can learn more about the ThreadPoolExecutor in the tutorial:
We typically intend to use the ThreadPoolExecutor to speed up our program.
This is why we wish to perform many blocking I/O bound tasks concurrently so that the overall execution time of the program is reduced.
How can we know that the execution time is lower after updating our program to use the ThreadPoolExecutor?
We may also want to seek to optimize the execution time of each task in the thread pool. This might involve changing the code to optimize performance so that each task itself executes faster.
In this case, we need to be able to benchmark the execution time of individual tasks to confirm that they are completed faster after a change to the code.
How can we benchmark tasks in the ThreadPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How to Benchmark Tasks in the ThreadPoolExecutor
There are a few ways that we can benchmark tasks executed by the ThreadPoolExecutor.
Some approaches include:
- Benchmark the overall execution time of the program.
- Wrap target function in benchmark code.
- Use a benchmark function decorator.
Let’s take a closer look at each in turn.
Benchmark Execution Time of Program
The simplest approach is to benchmark the execution time of the entire program.
This can be achieved by recording the start time before all tasks are issued, then again after all tasks are done, and calculating the difference as the overall execution time.
For example:
1 2 3 4 5 6 7 8 9 10 11 |
... # record start time time_start = perf_counter() # execute the program ... # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # report execution time print(f'Took: {time_duration:.3f} seconds') |
We prefer to use the time.perf_counter() function for benchmarking as it uses a high-precision clock that cannot be adjusted, unlike time.time().
You can learn more about how to benchmark with the time.perf_counter() function in the tutorial:
Benchmark The Task Function Manually
We can benchmark the execution time of tasks manually.
This involves updating the task function itself and adding timing code.
The times can be stored in a global variable, like a list for reporting later.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# task to execute in the thread pool def task():     # record start time     time_start = perf_counter()     # run the task     ...     # record end time     time_end = perf_counter()     # calculate execution time     time_duration = time_end - time_start     # store time in global variable     global task_times     task_times.append(time_duration) |
After all tasks are completed, the global variable of all task run times can be retrieved and used to report details about task execution times.
This might include the minimum, average, and maximum task run time.
For example:
1 2 3 4 5 |
... # reports task benchmark results time_min, time_max = min(task_times), max(task_times) time_mean = sum(task_times) / len(task_times) print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}') |
This is helpful to see if task execution time is faster or slower on average and at the limits after a code change.
Benchmark The Task Function With a Decorator
We can record the execution time of tasks automatically.
This can be achieved using a function decorator.
A decorator can be defined as a function that calls our target function and automatically records the start time, and end time and stores the duration in a global variable.
For example, the benchmark_decorator() function below defines a function decorator.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# define the benchmark decorator def benchmark_decorator(func):     # inner function that wraps the target function     @wraps(func)     def wrapper(*args, **kwargs):         # record start time         time_start = perf_counter()         try:             # call the custom function             return func(*args, **kwargs)         finally:             # calculate the execution time             time_duration = perf_counter() - time_start             # store time in global variable             global task_times             task_times.append(time_duration)     # return the inner function     return wrapper |
We can then add the decorator to our task function.
For example:
1 2 3 4 |
# task to execute in the thread pool @benchmark_decorator def task(): # ... |
You can learn more about defining a benchmark decorator in the tutorial:
This has the benefit that we don’t have to change our task() function and add benchmarking code to it directly.
Now that we know how to benchmark tasks in the ThreadPoolExecutor, let’s look at some worked examples.
Example of Tasks Executed with the ThreadPoolExecutor
Before we explore how to benchmark tasks in the ThreadPoolExecutor, let’s define a simple program that executes tasks concurrently.
In this example, we will define a task that takes about 5 seconds to complete and then report a message. We will create a thread pool with 25 worker threads and issue 100 tasks to the pool to complete. Once all tasks are done, a final message is reported.
With 100 tasks, each taking about 5 seconds, executed by 25 workers, we expect the program to complete in about 20 seconds, e.g. (100/25) * 5 seconds.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of executing tasks in the threadpoolexecutor from concurrent.futures import ThreadPoolExecutor from time import sleep from random import random  # task to execute in the thread pool def task():     # determine work time with some statistical noise     value = 5 + random()     # sleep to simulate blocking work     sleep(value)     # report a message to show progress     print('>task done')  # drive the program def main():     # number of tasks to run     n_tasks = 100     # create the thread pool     with ThreadPoolExecutor(25) as exe:         # issue all tasks         _ = [exe.submit(task) for _ in range(n_tasks)]     # wait for all tasks to complete     print('Done')  # protect the entry point if __name__ == '__main__':     # execute the program     main() |
Running the example first executes the main() function.
The main function runs and creates the ThreadPoolExecutor with 25 worker threads.
All 100 tasks are issued to the thread pool and the main thread blocks until all tasks are done.
Each task runs, executing the task() function, sleeping for between 5 and 6 seconds then reporting a message.
Once all tasks are done the thread pool is shut down automatically via the context manager interface and a final done message is reported.
1 2 3 4 5 6 7 |
... >task done >task done >task done >task done >task done Done |
Next, let’s look at how we can benchmark the overall execution time of all tasks in the ThreadPoolExecutor.
Free Python Benchmarking Course
Get FREE access to my 7-day email course on Python Benchmarking.
Discover benchmarking with the time.perf_counter() function, how to develop a benchmarking helper function and context manager and how to use the timeit API and command line.
Example of Benchmarking Overall Execution Time Of Tasks in ThreadPoolExecutor
We can explore how to benchmark the overall execution time of tasks in the ThreadPoolExecutor.
In this case, we will update the entry point to the program to record the start time before calling the main() function, and again after the main() function has returned, then reporting the calculated execution time.
1 2 3 4 5 6 7 8 9 10 11 12 |
# protect the entry point if __name__ == '__main__':     # record start time     time_start = perf_counter()     # execute the program     main()     # record end time     time_end = perf_counter()     # calculate execution time     time_duration = time_end - time_start     # report execution time     print(f'Took: {time_duration:.3f} seconds') |
This will ensure that the overall execution time for the program is always reported.
The updated version of the program with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of benchmarking overall execution time of tasks from concurrent.futures import ThreadPoolExecutor from time import sleep from time import perf_counter from random import random  # task to execute in the thread pool def task():     # determine work time with some statistical noise     value = 5 + random()     # sleep to simulate blocking work     sleep(value)     # report a message to show progress     print('>task done')  # drive the program def main():     # number of tasks to run     n_tasks = 100     # create the thread pool     with ThreadPoolExecutor(25) as exe:         # issue all tasks         _ = [exe.submit(task) for _ in range(n_tasks)]     # wait for all tasks to complete     print('Done')  # protect the entry point if __name__ == '__main__':     # record start time     time_start = perf_counter()     # execute the program     main()     # record end time     time_end = perf_counter()     # calculate execution time     time_duration = time_end - time_start     # report execution time     print(f'Took: {time_duration:.3f} seconds') |
Running the example first records the start time.
The main function is called and creates the ThreadPoolExecutor with 25 worker threads.
All 100 tasks are issued to the thread pool and the main thread blocks until all tasks are done.
Each task runs, executing the task() function, sleeping for between 5 and 6 seconds then reporting a message.
Once all tasks are done the thread pool is shut down automatically via the context manager interface and a final done message is reported.
Finally, the end time is recorded and the overall execution time is calculated and reported.
In this case, we can see that the example took about 23.446 seconds.
Note, that your results may differ.
This highlights how we can benchmark the overall execution time of tasks in the ThreadPoolExecutor.
1 2 3 4 5 6 7 8 |
... >task done >task done >task done >task done >task done Done Took: 23.446 seconds |
Next, let’s look at how we might manually benchmark tasks in the ThreadPoolExecutor.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Benchmarking Tasks Manually
We can explore how to manually benchmark the execution time of tasks run in the ThreadPoolExecutor.
In this case, we can update the task() function to record the start time at the beginning of the function and the end time at the end of the function. The execution time can then be calculated and stored in a list global variable.
1 2 3 4 5 6 7 8 9 10 11 |
... # record start time time_start = perf_counter() ... # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # store time in global variable global task_times task_times.append(time_duration) |
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# task to execute in the thread pool def task():     # record start time     time_start = perf_counter()     # determine work time with some statistical noise     value = 5 + random()     # sleep to simulate blocking work     sleep(value)     # report a message to show progress     print('>task done')     # record end time     time_end = perf_counter()     # calculate execution time     time_duration = time_end - time_start     # store time in global variable     global task_times     task_times.append(time_duration) |
We can then update the main() function to declare and define a global variable to store the benchmark times.
1 2 3 4 |
... # declare and define global variable for benchmarking global task_times task_times = list() |
After all tasks are done, the main() function can calculate min, max, and mean statistics from the benchmark scores and report the results.
1 2 3 4 5 |
... # reports task benchmark results time_min, time_max = min(task_times), max(task_times) time_mean = sum(task_times) / len(task_times) print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}') |
The updated main() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# drive the program def main():     # declare and define global variable for benchmarking     global task_times     task_times = list()     # number of tasks to run     n_tasks = 100     # create the thread pool     with ThreadPoolExecutor(25) as exe:         # issue all tasks         _ = [exe.submit(task) for _ in range(n_tasks)]         # wait for all tasks to complete     # reports task benchmark results     time_min, time_max = min(task_times), max(task_times)     time_mean = sum(task_times) / len(task_times)     print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}')     print('Done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# SuperFastPython.com # example of benchmarking task execution time manually from concurrent.futures import ThreadPoolExecutor from time import sleep from time import perf_counter from random import random  # task to execute in the thread pool def task():     # record start time     time_start = perf_counter()     # determine work time with some statistical noise     value = 5 + random()     # sleep to simulate blocking work     sleep(value)     # report a message to show progress     print('>task done')     # record end time     time_end = perf_counter()     # calculate execution time     time_duration = time_end - time_start     # store time in global variable     global task_times     task_times.append(time_duration)  # drive the program def main():     # declare and define global variable for benchmarking     global task_times     task_times = list()     # number of tasks to run     n_tasks = 100     # create the thread pool     with ThreadPoolExecutor(25) as exe:         # issue all tasks         _ = [exe.submit(task) for _ in range(n_tasks)]         # wait for all tasks to complete     # reports task benchmark results     time_min, time_max = min(task_times), max(task_times)     time_mean = sum(task_times) / len(task_times)     print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}')     print('Done')  # protect the entry point if __name__ == '__main__':     # record start time     time_start = perf_counter()     # execute the program     main()     # record end time     time_end = perf_counter()     # calculate execution time     time_duration = time_end - time_start     # report execution time     print(f'Took: {time_duration:.3f} seconds') |
Running the example first records the start time.
The main function is called and creates the ThreadPoolExecutor with 25 worker threads.
All 100 tasks are issued to the thread pool and the main thread blocks until all tasks are done.
Each task runs, executing the task() function.
The task() function records the start time, calculates a time to sleep, blocks, and reports a final message.
The task() function then records an end time, calculates the execution time for the task, and stores the benchmark result in the global variable.
Once all tasks are done the thread pool is shut down automatically via the context manager interface.
The minimum, maximum, and average task benchmark times are calculated and reported, and then a final done message is reported.
Finally, the end time is recorded and the overall execution time is calculated and reported.
In this case, we can see that the minimum execution time for tasks was about 5.013 seconds, whereas the maximum was closer to 6 seconds at 5.973 seconds. We also see that the example took about 23.300 seconds.
Note, that your results may differ.
This highlights how we can report summary statistics from benchmarking the execution time of individual tasks executed within the ThreadPoolExecutor.
1 2 3 4 5 6 7 8 9 |
... >task done >task done >task done >task done >task done Tasks Times: min=5.013, mean=5.488, max=5.973 Done Took: 23.300 seconds |
Next, let’s look at how we can benchmark task functions run in the thread pool without changing the content of the tasks themselves.
Example of Benchmarking Tasks With Function Decorator
We can explore how to benchmark tasks executed in the ThreadPoolExecutor without changing the body of the tasks themselves
This can be achieved using a function decorator that records the time before and after the task function is executed and stores the benchmark time in the global variable, as we did before.
The updated task() function with the added function decorator is listed below.
1 2 3 4 5 6 7 8 9 |
# task to execute in the thread pool @benchmark_decorator def task():     # determine work time with some statistical noise     value = 5 + random()     # sleep to simulate blocking work     sleep(value)     # report a message to show progress     print('>task done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# SuperFastPython.com # example of benchmarking task execution time with decorator from concurrent.futures import ThreadPoolExecutor from time import sleep from time import perf_counter from random import random from functools import wraps  # define the benchmark decorator def benchmark_decorator(func):     # inner function that wraps the target function     @wraps(func)     def wrapper(*args, **kwargs):         # record start time         time_start = perf_counter()         try:             # call the custom function             return func(*args, **kwargs)         finally:             # calculate the execution time             time_duration = perf_counter() - time_start             # store time in global variable             global task_times             task_times.append(time_duration)     # return the inner function     return wrapper  # task to execute in the thread pool @benchmark_decorator def task():     # determine work time with some statistical noise     value = 5 + random()     # sleep to simulate blocking work     sleep(value)     # report a message to show progress     print('>task done')  # drive the program def main():     # declare and define global variable for benchmarking     global task_times     task_times = list()     # number of tasks to run     n_tasks = 100     # create the thread pool     with ThreadPoolExecutor(25) as exe:         # issue all tasks         _ = [exe.submit(task) for _ in range(n_tasks)]         # wait for all tasks to complete     # reports task benchmark results     time_min, time_max = min(task_times), max(task_times)     time_mean = sum(task_times) / len(task_times)     print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}')     print('Done')  # protect the entry point if __name__ == '__main__':     # record start time     time_start = perf_counter()     # execute the program     main()     # record end time     time_end = perf_counter()     # calculate execution time     time_duration = time_end - time_start     # report execution time     print(f'Took: {time_duration:.3f} seconds') |
Running the example first records the start time.
The main function is called and creates the ThreadPoolExecutor with 25 worker threads.
All 100 tasks are issued to the thread pool and the main thread blocks until all tasks are done.
Each task runs, executing the task() function.
The function decorator runs and records the start time, then calls the task function proper.
The task() function runs, sleeping for between 5 and 6 seconds then reporting a message.
The function decorator resumes and then records an end time, calculates the execution time for the task, and stores the benchmark result in the global variable.
Once all tasks are done the thread pool is shut down automatically via the context manager interface.
The minimum, maximum, and average task benchmark times are calculated and reported, and then a final done message is reported.
Finally, the end time is recorded and the overall execution time is calculated and reported.
In this case, we can see that the average execution time of tasks was about 5.549 seconds with a minimum and maximum close to 5 and 6 seconds respectively.
We also see that the example took about 22.988 seconds.
Note, that your results may differ.
This highlights how we can benchmark the execution time of tasks executed in the ThreadPoolExecutor without manually changing the body of the task function.
1 2 3 4 5 6 7 8 9 |
... >task done >task done >task done >task done >task done Tasks Times: min=5.023, mean=5.549, max=6.000 Done Took: 22.988 seconds |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python Benchmarking, Jason Brownlee (my book!)
Also, the following Python books have chapters on benchmarking that may be helpful:
- Python Cookbook, 2013. (sections 9.1, 9.10, 9.22, 13.13, and 14.13)
- High Performance Python, 2020. (chapter 2)
Guides
- 4 Ways to Benchmark Python Code
- 5 Ways to Measure Execution Time in Python
- Python Benchmark Comparison Metrics
Benchmarking APIs
- time — Time access and conversions
- timeit — Measure execution time of small code snippets
- The Python Profilers
References
Takeaways
You now know how to benchmark tasks in the ThreadPoolExecutor.
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Vincent Ghilione on Unsplash
Do you have any questions?