How to Benchmark Tasks in the ThreadPoolExecutor

November 24, 2023 Python Benchmarking

You can benchmark tasks executed in the ThreadPoolExecutor.

This can be achieved by benchmarking the overall execution time of the program that executes tasks. We can also benchmark the execution time of individual tasks themselves by manually adding benchmark code to the task functions or using a function decorator and reporting summary statistics for task execution time, such as the minimum, maximum, and mean execution time.

This can help us determine whether different configurations of the thread pool are faster overall and whether changes to the task functions themselves offer a speedup.

In this tutorial, you will discover how to benchmark tasks executed in the ThreadPoolExecutor.

Let's get started.

Need to Benchmark Tasks in the ThreadPoolExecutor

The ThreadPoolExecutor allows us to execute ad hoc tasks concurrently in a thread pool with a simple API.

Given that it makes use of threads, it is appropriate for tasks that perform blocking I/O such as downloading data or reading and writing files.

We can use the ThreadPoolExecutor by first creating an instance and specifying the number of threads to create in the pool, then calling methods like the submit() method to issue tasks to the pool.

For example:

...
# create a thread pool
with ThreadPoolExecutor(100) as exe:
	# issue a task
	future = exe.submit(task, data)

You can learn more about the ThreadPoolExecutor in the tutorial:

We typically intend to use the ThreadPoolExecutor to speed up our program.

This is why we wish to perform many blocking I/O bound tasks concurrently so that the overall execution time of the program is reduced.

How can we know that the execution time is lower after updating our program to use the ThreadPoolExecutor?

We may also want to seek to optimize the execution time of each task in the thread pool. This might involve changing the code to optimize performance so that each task itself executes faster.

In this case, we need to be able to benchmark the execution time of individual tasks to confirm that they are completed faster after a change to the code.

How can we benchmark tasks in the ThreadPoolExecutor?

How to Benchmark Tasks in the ThreadPoolExecutor

There are a few ways that we can benchmark tasks executed by the ThreadPoolExecutor.

Some approaches include:

Let's take a closer look at each in turn.

Benchmark Execution Time of Program

The simplest approach is to benchmark the execution time of the entire program.

This can be achieved by recording the start time before all tasks are issued, then again after all tasks are done, and calculating the difference as the overall execution time.

For example:

...
# record start time
time_start = perf_counter()
# execute the program
...
# record end time
time_end = perf_counter()
# calculate execution time
time_duration = time_end - time_start
# report execution time
print(f'Took: {time_duration:.3f} seconds')

We prefer to use the time.perf_counter() function for benchmarking as it uses a high-precision clock that cannot be adjusted, unlike time.time().

You can learn more about how to benchmark with the time.perf_counter() function in the tutorial:

Benchmark The Task Function Manually

We can benchmark the execution time of tasks manually.

This involves updating the task function itself and adding timing code.

The times can be stored in a global variable, like a list for reporting later.

For example:

# task to execute in the thread pool
def task():
    # record start time
    time_start = perf_counter()
    # run the task
    ...
    # record end time
    time_end = perf_counter()
    # calculate execution time
    time_duration = time_end - time_start
    # store time in global variable
    global task_times
    task_times.append(time_duration)

After all tasks are completed, the global variable of all task run times can be retrieved and used to report details about task execution times.

This might include the minimum, average, and maximum task run time.

For example:

...
# reports task benchmark results
time_min, time_max = min(task_times), max(task_times)
time_mean = sum(task_times) / len(task_times)
print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}')

This is helpful to see if task execution time is faster or slower on average and at the limits after a code change.

Benchmark The Task Function With a Decorator

We can record the execution time of tasks automatically.

This can be achieved using a function decorator.

A decorator can be defined as a function that calls our target function and automatically records the start time, and end time and stores the duration in a global variable.

For example, the benchmark_decorator() function below defines a function decorator.

# define the benchmark decorator
def benchmark_decorator(func):
    # inner function that wraps the target function
    @wraps(func)
    def wrapper(*args, **kwargs):
        # record start time
        time_start = perf_counter()
        try:
            # call the custom function
            return func(*args, **kwargs)
        finally:
            # calculate the execution time
            time_duration = perf_counter() - time_start
            # store time in global variable
            global task_times
            task_times.append(time_duration)
    # return the inner function
    return wrapper

We can then add the decorator to our task function.

For example:

# task to execute in the thread pool
@benchmark_decorator
def task():
	# ...

You can learn more about defining a benchmark decorator in the tutorial:

This has the benefit that we don't have to change our task() function and add benchmarking code to it directly.

Now that we know how to benchmark tasks in the ThreadPoolExecutor, let's look at some worked examples.

Example of Tasks Executed with the ThreadPoolExecutor

Before we explore how to benchmark tasks in the ThreadPoolExecutor, let's define a simple program that executes tasks concurrently.

In this example, we will define a task that takes about 5 seconds to complete and then report a message. We will create a thread pool with 25 worker threads and issue 100 tasks to the pool to complete. Once all tasks are done, a final message is reported.

With 100 tasks, each taking about 5 seconds, executed by 25 workers, we expect the program to complete in about 20 seconds, e.g. (100/25) * 5 seconds.

The complete example is listed below.

# SuperFastPython.com
# example of executing tasks in the threadpoolexecutor
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from random import random

# task to execute in the thread pool
def task():
    # determine work time with some statistical noise
    value = 5 + random()
    # sleep to simulate blocking work
    sleep(value)
    # report a message to show progress
    print('>task done')

# drive the program
def main():
    # number of tasks to run
    n_tasks = 100
    # create the thread pool
    with ThreadPoolExecutor(25) as exe:
        # issue all tasks
        _ = [exe.submit(task) for _ in range(n_tasks)]
    # wait for all tasks to complete
    print('Done')

# protect the entry point
if __name__ == '__main__':
    # execute the program
    main()

Running the example first executes the main() function.

The main function runs and creates the ThreadPoolExecutor with 25 worker threads.

All 100 tasks are issued to the thread pool and the main thread blocks until all tasks are done.

Each task runs, executing the task() function, sleeping for between 5 and 6 seconds then reporting a message.

Once all tasks are done the thread pool is shut down automatically via the context manager interface and a final done message is reported.

...
>task done
>task done
>task done
>task done
>task done
Done

Next, let's look at how we can benchmark the overall execution time of all tasks in the ThreadPoolExecutor.

Example of Benchmarking Overall Execution Time Of Tasks in ThreadPoolExecutor

We can explore how to benchmark the overall execution time of tasks in the ThreadPoolExecutor.

In this case, we will update the entry point to the program to record the start time before calling the main() function, and again after the main() function has returned, then reporting the calculated execution time.

# protect the entry point
if __name__ == '__main__':
    # record start time
    time_start = perf_counter()
    # execute the program
    main()
    # record end time
    time_end = perf_counter()
    # calculate execution time
    time_duration = time_end - time_start
    # report execution time
    print(f'Took: {time_duration:.3f} seconds')

This will ensure that the overall execution time for the program is always reported.

The updated version of the program with this change is listed below.

# SuperFastPython.com
# example of benchmarking overall execution time of tasks
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from time import perf_counter
from random import random

# task to execute in the thread pool
def task():
    # determine work time with some statistical noise
    value = 5 + random()
    # sleep to simulate blocking work
    sleep(value)
    # report a message to show progress
    print('>task done')

# drive the program
def main():
    # number of tasks to run
    n_tasks = 100
    # create the thread pool
    with ThreadPoolExecutor(25) as exe:
        # issue all tasks
        _ = [exe.submit(task) for _ in range(n_tasks)]
    # wait for all tasks to complete
    print('Done')

# protect the entry point
if __name__ == '__main__':
    # record start time
    time_start = perf_counter()
    # execute the program
    main()
    # record end time
    time_end = perf_counter()
    # calculate execution time
    time_duration = time_end - time_start
    # report execution time
    print(f'Took: {time_duration:.3f} seconds')

Running the example first records the start time.

The main function is called and creates the ThreadPoolExecutor with 25 worker threads.

All 100 tasks are issued to the thread pool and the main thread blocks until all tasks are done.

Each task runs, executing the task() function, sleeping for between 5 and 6 seconds then reporting a message.

Once all tasks are done the thread pool is shut down automatically via the context manager interface and a final done message is reported.

Finally, the end time is recorded and the overall execution time is calculated and reported.

In this case, we can see that the example took about 23.446 seconds.

Note, that your results may differ.

This highlights how we can benchmark the overall execution time of tasks in the ThreadPoolExecutor.

...
>task done
>task done
>task done
>task done
>task done
Done
Took: 23.446 seconds

Next, let's look at how we might manually benchmark tasks in the ThreadPoolExecutor.

Example of Benchmarking Tasks Manually

We can explore how to manually benchmark the execution time of tasks run in the ThreadPoolExecutor.

In this case, we can update the task() function to record the start time at the beginning of the function and the end time at the end of the function. The execution time can then be calculated and stored in a list global variable.

...
# record start time
time_start = perf_counter()
...
# record end time
time_end = perf_counter()
# calculate execution time
time_duration = time_end - time_start
# store time in global variable
global task_times
task_times.append(time_duration)

The updated task() function with this change is listed below.

# task to execute in the thread pool
def task():
    # record start time
    time_start = perf_counter()
    # determine work time with some statistical noise
    value = 5 + random()
    # sleep to simulate blocking work
    sleep(value)
    # report a message to show progress
    print('>task done')
    # record end time
    time_end = perf_counter()
    # calculate execution time
    time_duration = time_end - time_start
    # store time in global variable
    global task_times
    task_times.append(time_duration)

We can then update the main() function to declare and define a global variable to store the benchmark times.

...
# declare and define global variable for benchmarking
global task_times
task_times = list()

After all tasks are done, the main() function can calculate min, max, and mean statistics from the benchmark scores and report the results.

...
# reports task benchmark results
time_min, time_max = min(task_times), max(task_times)
time_mean = sum(task_times) / len(task_times)
print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}')

The updated main() function with these changes is listed below.

# drive the program
def main():
    # declare and define global variable for benchmarking
    global task_times
    task_times = list()
    # number of tasks to run
    n_tasks = 100
    # create the thread pool
    with ThreadPoolExecutor(25) as exe:
        # issue all tasks
        _ = [exe.submit(task) for _ in range(n_tasks)]
        # wait for all tasks to complete
    # reports task benchmark results
    time_min, time_max = min(task_times), max(task_times)
    time_mean = sum(task_times) / len(task_times)
    print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}')
    print('Done')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of benchmarking task execution time manually
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from time import perf_counter
from random import random

# task to execute in the thread pool
def task():
    # record start time
    time_start = perf_counter()
    # determine work time with some statistical noise
    value = 5 + random()
    # sleep to simulate blocking work
    sleep(value)
    # report a message to show progress
    print('>task done')
    # record end time
    time_end = perf_counter()
    # calculate execution time
    time_duration = time_end - time_start
    # store time in global variable
    global task_times
    task_times.append(time_duration)

# drive the program
def main():
    # declare and define global variable for benchmarking
    global task_times
    task_times = list()
    # number of tasks to run
    n_tasks = 100
    # create the thread pool
    with ThreadPoolExecutor(25) as exe:
        # issue all tasks
        _ = [exe.submit(task) for _ in range(n_tasks)]
        # wait for all tasks to complete
    # reports task benchmark results
    time_min, time_max = min(task_times), max(task_times)
    time_mean = sum(task_times) / len(task_times)
    print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}')
    print('Done')

# protect the entry point
if __name__ == '__main__':
    # record start time
    time_start = perf_counter()
    # execute the program
    main()
    # record end time
    time_end = perf_counter()
    # calculate execution time
    time_duration = time_end - time_start
    # report execution time
    print(f'Took: {time_duration:.3f} seconds')

Running the example first records the start time.

The main function is called and creates the ThreadPoolExecutor with 25 worker threads.

All 100 tasks are issued to the thread pool and the main thread blocks until all tasks are done.

Each task runs, executing the task() function.

The task() function records the start time, calculates a time to sleep, blocks, and reports a final message.

The task() function then records an end time, calculates the execution time for the task, and stores the benchmark result in the global variable.

Once all tasks are done the thread pool is shut down automatically via the context manager interface.

The minimum, maximum, and average task benchmark times are calculated and reported, and then a final done message is reported.

Finally, the end time is recorded and the overall execution time is calculated and reported.

In this case, we can see that the minimum execution time for tasks was about 5.013 seconds, whereas the maximum was closer to 6 seconds at 5.973 seconds. We also see that the example took about 23.300 seconds.

Note, that your results may differ.

This highlights how we can report summary statistics from benchmarking the execution time of individual tasks executed within the ThreadPoolExecutor.

...
>task done
>task done
>task done
>task done
>task done
Tasks Times: min=5.013, mean=5.488, max=5.973
Done
Took: 23.300 seconds

Next, let's look at how we can benchmark task functions run in the thread pool without changing the content of the tasks themselves.

Example of Benchmarking Tasks With Function Decorator

We can explore how to benchmark tasks executed in the ThreadPoolExecutor without changing the body of the tasks themselves

This can be achieved using a function decorator that records the time before and after the task function is executed and stores the benchmark time in the global variable, as we did before.

The updated task() function with the added function decorator is listed below.

# task to execute in the thread pool
@benchmark_decorator
def task():
    # determine work time with some statistical noise
    value = 5 + random()
    # sleep to simulate blocking work
    sleep(value)
    # report a message to show progress
    print('>task done')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of benchmarking task execution time with decorator
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from time import perf_counter
from random import random
from functools import wraps

# define the benchmark decorator
def benchmark_decorator(func):
    # inner function that wraps the target function
    @wraps(func)
    def wrapper(*args, **kwargs):
        # record start time
        time_start = perf_counter()
        try:
            # call the custom function
            return func(*args, **kwargs)
        finally:
            # calculate the execution time
            time_duration = perf_counter() - time_start
            # store time in global variable
            global task_times
            task_times.append(time_duration)
    # return the inner function
    return wrapper

# task to execute in the thread pool
@benchmark_decorator
def task():
    # determine work time with some statistical noise
    value = 5 + random()
    # sleep to simulate blocking work
    sleep(value)
    # report a message to show progress
    print('>task done')

# drive the program
def main():
    # declare and define global variable for benchmarking
    global task_times
    task_times = list()
    # number of tasks to run
    n_tasks = 100
    # create the thread pool
    with ThreadPoolExecutor(25) as exe:
        # issue all tasks
        _ = [exe.submit(task) for _ in range(n_tasks)]
        # wait for all tasks to complete
    # reports task benchmark results
    time_min, time_max = min(task_times), max(task_times)
    time_mean = sum(task_times) / len(task_times)
    print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}')
    print('Done')

# protect the entry point
if __name__ == '__main__':
    # record start time
    time_start = perf_counter()
    # execute the program
    main()
    # record end time
    time_end = perf_counter()
    # calculate execution time
    time_duration = time_end - time_start
    # report execution time
    print(f'Took: {time_duration:.3f} seconds')

Running the example first records the start time.

The main function is called and creates the ThreadPoolExecutor with 25 worker threads.

All 100 tasks are issued to the thread pool and the main thread blocks until all tasks are done.

Each task runs, executing the task() function.

The function decorator runs and records the start time, then calls the task function proper.

The task() function runs, sleeping for between 5 and 6 seconds then reporting a message.

The function decorator resumes and then records an end time, calculates the execution time for the task, and stores the benchmark result in the global variable.

Once all tasks are done the thread pool is shut down automatically via the context manager interface.

The minimum, maximum, and average task benchmark times are calculated and reported, and then a final done message is reported.

Finally, the end time is recorded and the overall execution time is calculated and reported.

In this case, we can see that the average execution time of tasks was about 5.549 seconds with a minimum and maximum close to 5 and 6 seconds respectively.

We also see that the example took about 22.988 seconds.

Note, that your results may differ.

This highlights how we can benchmark the execution time of tasks executed in the ThreadPoolExecutor without manually changing the body of the task function.

...
>task done
>task done
>task done
>task done
>task done
Tasks Times: min=5.023, mean=5.549, max=6.000
Done
Took: 22.988 seconds

Takeaways

You now know how to benchmark tasks in the ThreadPoolExecutor.