You can benchmark tasks executed in the ProcessPoolExecutor.
This can be achieved by benchmarking the overall execution time of the program that executes tasks. We can also benchmark the execution time of individual tasks themselves by manually adding benchmark code to the task functions or using a function decorator and reporting summary statistics for task execution time, such as the minimum, maximum, and mean execution time.
This can help us determine whether different configurations of the process pool are faster overall and whether changes to the task functions themselves offer a speedup.
In this tutorial, you will discover how to benchmark tasks executed in the ProcessPoolExecutor.
Let’s get started.
Need to Benchmark Tasks in the ProcessPoolExecutor
The ProcessPoolExecutor allows us to execute ad hoc tasks concurrently in a process pool with a simple API.
Given that it makes use of processes, it is appropriate for tasks that perform blocking CPU-bound tasks such as parsing files, mathematical calculations, and model fitting.
We can use the ProcessPoolExecutor by first creating an instance and specifying the number of worker processes to create in the pool, then calling methods like the submit() method to issue tasks to the pool.
For example:
1 2 3 4 5 |
... # create a process pool with ProcessPoolExecutor(4) as exe: # issue a task future = exe.submit(task, data) |
You can learn more about the ProcessPoolExecutor in the tutorial:
We typically intend to use the ProcessPoolExecutor to speed up our program.
This is why we wish to perform many CPU-bound tasks in parallel so that the overall execution time of the program is reduced.
How can we know that the execution time is lower after updating our program to use the ProcessPoolExecutor?
We may also want to seek to optimize the execution time of each task in the process pool. This might involve changing the code to optimize performance so that each task itself executes faster.
In this case, we need to be able to benchmark the execution time of individual tasks to confirm that they are completed faster after a change to the code.
Run loops using all CPUs, download your FREE book to learn how.
How can we benchmark tasks in the ProcessPoolExecutor?
How to Benchmark Tasks in the ProcessPoolExecutor
There are a few ways that we can benchmark tasks executed by the ProcessPoolExecutor.
Some approaches include:
- Benchmark the overall execution time of the program.
- Wrap target function in benchmark code.
Let’s take a closer look at each in turn.
Benchmark Execution Time of Program
The simplest approach is to benchmark the execution time of the entire program.
This can be achieved by recording the start time before all tasks are issued, then again after all tasks are done, and calculating the difference as the overall execution time.
For example:
1 2 3 4 5 6 7 8 9 10 11 |
... # record start time time_start = perf_counter() # execute the program ... # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # report execution time print(f'Took: {time_duration:.3f} seconds') |
We prefer to use the time.perf_counter() function for benchmarking as it uses a high-precision clock that cannot be adjusted, unlike time.time().
You can learn more about how to benchmark with the time.perf_counter() function in the tutorial:
Benchmark The Task Function Manually
We can benchmark the execution time of tasks manually.
This involves updating the task function itself and adding timing code.
The times can be stored in a global variable, like a list for reporting later. Unfortunately, we cannot have shared global variables when using multiple processes, but we can simulate it using a multiprocessing.Manager and proxy objects.
This will require that we create and start a Manager and use it to create a list hosted in the server process.
1 2 3 4 5 |
... # create the manager with Manager() as manager: # create the shared list task_times = manager.list() |
It will require that we pass the proxy of the shared list as an argument with our task function.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 |
# task to execute in the process pool def task(task_times): # record start time time_start = perf_counter() # execute task ... # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # store time in the shared list task_times.append(time_duration) |
If you are new to the multiprocessing Manager, you can learn more in the tutorial:
After all tasks are completed, the list of all task run times can be retrieved and used to report details about task execution times.
This might include the minimum, average, and maximum task run time.
For example:
1 2 3 4 5 |
... # reports task benchmark results time_min, time_max = min(task_times), max(task_times) time_mean = sum(task_times) / len(task_times) print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}') |
This is helpful to see if task execution time is faster or slower on average and at the limits after a code change.
The downside of this approach is that we have to change our task function.
Now that we know how to benchmark tasks in the ProcessPoolExecutor, let’s look at some worked examples.
Example of Tasks Executed with the ProcessPoolExecutor
Before we explore how to benchmark tasks in the ProcessPoolExecutor, let’s define a simple program that executes tasks in parallel.
In this example, we will define a task that takes about 5 seconds to complete and then report a message. We will create a process pool with 5 worker processes and issue 20 tasks to the pool to complete. Once all tasks are done, a final message is reported.
With 20 tasks, each talking about 5 seconds, executed by 5 workers, we expect the program to complete in about 20 seconds, e.g. (20/5) * 5 seconds.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of executing tasks in the processpoolexecutor from concurrent.futures import ProcessPoolExecutor from time import sleep from random import random # task to execute in the process pool def task(): # determine work time with some statistical noise value = 5 + random() # sleep to simulate blocking work sleep(value) # report a message to show progress print('>task done', flush=True) # drive the program def main(): # number of tasks to run n_tasks = 20 # create the process pool with ProcessPoolExecutor(5) as exe: # issue all tasks _ = [exe.submit(task) for _ in range(n_tasks)] # wait for all tasks to complete print('Done') # protect the entry point if __name__ == '__main__': # execute the program main() |
Running the example first executes the main() function.
The main function runs and creates the ProcessPoolExecutor with 5 worker processes.
All 20 tasks are issued to the process pool and the main process blocks until all tasks are done.
Each task runs, executing the task() function, sleeping for between 5 and 6 seconds then reporting a message.
Once all tasks are done the process pool is shut down automatically via the context manager interface and a final done message is reported.
1 2 3 4 5 6 7 |
... >task done >task done >task done >task done >task done Done |
Next, let’s look at how we can benchmark the overall execution time of all tasks in the ProcessPoolExecutor.
Free Python Benchmarking Course
Get FREE access to my 7-day email course on Python Benchmarking.
Discover benchmarking with the time.perf_counter() function, how to develop a benchmarking helper function and context manager and how to use the timeit API and command line.
Example of Benchmarking Overall Execution Time Of Tasks in ProcessPoolExecutor
We can explore how to benchmark the overall execution time of tasks in the ProcessPoolExecutor.
In this case, we will update the entry point to the program to record the start time before calling the main() function, and again after the main() function has returned, then reporting the calculated execution time.
1 2 3 4 5 6 7 8 9 10 11 12 |
# protect the entry point if __name__ == '__main__': # record start time time_start = perf_counter() # execute the program main() # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # report execution time print(f'Took: {time_duration:.3f} seconds') |
This will ensure that the overall execution time for the program is always reported.
The updated version of the program with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of benchmarking overall execution time of tasks from concurrent.futures import ProcessPoolExecutor from time import sleep from time import perf_counter from random import random # task to execute in the process pool def task(): # determine work time with some statistical noise value = 5 + random() # sleep to simulate blocking work sleep(value) # report a message to show progress print('>task done', flush=True) # drive the program def main(): # number of tasks to run n_tasks = 20 # create the process pool with ProcessPoolExecutor(5) as exe: # issue all tasks _ = [exe.submit(task) for _ in range(n_tasks)] # wait for all tasks to complete print('Done') # protect the entry point if __name__ == '__main__': # record start time time_start = perf_counter() # execute the program main() # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # report execution time print(f'Took: {time_duration:.3f} seconds') |
Running the example first records the start time.
The main function is called and creates the ProcessPoolExecutor with 5 worker processes.
All 20 tasks are issued to the process pool and the main process blocks until all tasks are done.
Each task runs, executing the task() function, sleeping for between 5 and 6 seconds then reporting a message.
Once all tasks are done the process pool is shut down automatically via the context manager interface and a final done message is reported.
Finally, the end time is recorded and the overall execution time is calculated and reported.
In this case, we can see that the example took about 22.749 seconds.
Note, that your results may differ.
This highlights how we can benchmark the overall execution time of tasks in the ProcessPoolExecutor.
1 2 3 4 5 6 7 8 |
... >task done >task done >task done >task done >task done Done Took: 22.749 seconds |
Next, let’s look at how we might manually benchmark tasks in the ProcessPoolExecutor.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Benchmarking Tasks Manually
We can explore how to manually benchmark the execution time of tasks run in the ProcessPoolExecutor.
In this case, we can update the task() function to record the start time at the beginning of the function and the end time at the end of the function.
The execution time can then be calculated and stored in a proxy object for the list stored in the server process.
1 2 3 4 5 6 7 8 9 10 11 12 |
... # record start time time_start = perf_counter() ... # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # calculate execution time time_duration = time_end - time_start # store time in the shared list task_times.append(time_duration) |
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# task to execute in the process pool def task(task_times): # record start time time_start = perf_counter() # determine work time with some statistical noise value = 5 + random() # sleep to simulate blocking work sleep(value) # report a message to show progress print('>task done', flush=True) # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # store time in the shared list task_times.append(time_duration) |
We can then update the main() function to create the manager and create the hosted list and provide proxy objects for accessing it that we can share with tasks.
1 2 3 4 5 6 7 8 9 10 11 |
... # create the manager with Manager() as manager: # create the shared list task_times = manager.list() # number of tasks to run n_tasks = 20 # create the process pool with ProcessPoolExecutor(5) as exe: # issue all tasks _ = [exe.submit(task, task_times) for _ in range(n_tasks)] |
After all tasks are done, the main() function can calculate min, max, and mean statistics from the benchmark scores and report the results.
1 2 3 4 5 |
... # reports task benchmark results time_min, time_max = min(task_times), max(task_times) time_mean = sum(task_times) / len(task_times) print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}') |
The updated main() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# drive the program def main(): # create the manager with Manager() as manager: # create the shared list task_times = manager.list() # number of tasks to run n_tasks = 20 # create the process pool with ProcessPoolExecutor(5) as exe: # issue all tasks _ = [exe.submit(task, task_times) for _ in range(n_tasks)] # wait for all tasks to complete # reports task benchmark results time_min, time_max = min(task_times), max(task_times) time_mean = sum(task_times) / len(task_times) print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}') print('Done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# SuperFastPython.com # example of benchmarking task execution time manually from concurrent.futures import ProcessPoolExecutor from multiprocessing import Manager from time import sleep from time import perf_counter from random import random # task to execute in the process pool def task(task_times): # record start time time_start = perf_counter() # determine work time with some statistical noise value = 5 + random() # sleep to simulate blocking work sleep(value) # report a message to show progress print('>task done', flush=True) # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # store time in the shared list task_times.append(time_duration) # drive the program def main(): # create the manager with Manager() as manager: # create the shared list task_times = manager.list() # number of tasks to run n_tasks = 20 # create the process pool with ProcessPoolExecutor(5) as exe: # issue all tasks _ = [exe.submit(task, task_times) for _ in range(n_tasks)] # wait for all tasks to complete # reports task benchmark results time_min, time_max = min(task_times), max(task_times) time_mean = sum(task_times) / len(task_times) print(f'Tasks Times: min={time_min:.3f}, mean={time_mean:.3f}, max={time_max:.3f}') print('Done') # protect the entry point if __name__ == '__main__': # record start time time_start = perf_counter() # execute the program main() # record end time time_end = perf_counter() # calculate execution time time_duration = time_end - time_start # report execution time print(f'Took: {time_duration:.3f} seconds') |
Running the example first records the start time.
The main function is called and creates the Manager server process. A list is created and hosted in the manager process and a proxy object for accessing the hosted list is provided.
The ProcessPoolExecutor with 5 worker processes. All 20 tasks are issued to the process pool and the main process blocks until all tasks are done.
Each task runs, executing the task() function.
The task() function records the start time, calculates a time to sleep, blocks, and reports a final message.
The task() function then records an end time, calculates the execution time for the task, and stores the benchmark result in the hosted list via the proxy object.
Once all tasks are done the process pool is shut down automatically via the context manager interface.
The minimum, maximum, and average task benchmark times are calculated and reported, and then a final done message is reported.
The manager is then shut down once its context manager is exited.
Finally, the end time is recorded and the overall execution time is calculated and reported.
In this case, we can see that the minimum execution time for tasks was about 5.017 seconds, whereas the maximum was closer to 6 seconds at 5.979 seconds. We also see that the example took about 22.718 seconds.
Note, that your results may differ.
This highlights how we can report summary statistics from benchmarking the execution time of individual tasks executed within the ProcessPoolExecutor.
The downside is that we had to change our task function to accommodate the benchmarking.
1 2 3 4 5 6 7 8 9 |
... >task done >task done >task done >task done >task done Tasks Times: min=5.017, mean=5.484, max=5.979 Done Took: 22.718 seconds |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python Benchmarking, Jason Brownlee (my book!)
Also, the following Python books have chapters on benchmarking that may be helpful:
- Python Cookbook, 2013. (sections 9.1, 9.10, 9.22, 13.13, and 14.13)
- High Performance Python, 2020. (chapter 2)
Guides
- 4 Ways to Benchmark Python Code
- 5 Ways to Measure Execution Time in Python
- Python Benchmark Comparison Metrics
Benchmarking APIs
- time — Time access and conversions
- timeit — Measure execution time of small code snippets
- The Python Profilers
References
Takeaways
You now know how to benchmark tasks in the ProcessPoolExecutor.
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Alexander Schimmeck on Unsplash
Do you have any questions?