How to Show Progress With The ProcessPoolExecutor In Python

February 2, 2022 Python ProcessPoolExecutor

You can show progress of tasks in the ProcessPoolExecutor by using a callback function.

In this tutorial you will discover how to show progress of tasks in a Python process pool.

Let's get started.

Need to Show Progress of Concurrent Tasks

The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.

You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.

Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.

You may need to show progress of all or a subset of tasks submitted to the process pool with either the submit() function.

This may be for many reasons, such as reporting progress to the user or internally keeping track of how much time or work remains to be completed.

How can we show progress of completed tasks in the ProcessPoolExecutor in a standardized way?

How to Show Progress of Concurrent Tasks

You can add a callback function to provide a standard way to track progress for completed tasks.

This can be achieved by calling the add_done_callback() function on the Future object for each task and specifying a custom function to track or report progress.

...
# add a callback function to a task
future.add_done_callback(custom_callback)

Recall that we get a Future object when calling the submit() function on the ProcessPoolExecutor when submitting a task.

The callback function must have a single argument which is the Future object on which it was called.

# task callback function
def custom_callback(future):
	# do something...

Using a callback is not the only way to show progress.

For example, you can enumerate tasks as they are completed in the parent process via the as_completed() function.

...
# report progress as tasks are completed
for future in as_completed(futures):
	print('Another task has completed!')

The problem with this approach is that it requires that you mix both result processing and task progress indication code together, which may be less clean than using a callback function.

The callback function will only be called when the task is completed, e.g. "done".

A task may be completed in one of three ways:

As such, you may want to check the status of the task to see if it was cancelled or if an exception was raised in the callback function before reporting on the progress.

# task callback function
def custom_callback(future):
	# check if task was cancelled
	if future.cancelled():
		# the task was cancelled
	elseif future.exception()
		# the task raised an exception
	else:
		# the task finished successfully

There are many ways to report progress in the callback function.

For example, you can print a character, one for each task that is completed.

...
# show progress for one task
print('.', end='', flush=True)

Now that we know how to show the progress of tasks in a standard way, let's look at a worked example.

Example Of Showing Progress of Concurrent Tasks

Let's look at how we might show progress of tasks completed in the ProcessPoolExecutor.

First, let's define a target task function that blocks for a moment.

# test that works for moment
def task(name):
    sleep(random())

Next, we can define our callback function that will take a Future object and report progress by printing one character for each task that completes.

This is a generic and scalable way of showing progress for tasks, although does not provide an indication of how many tasks remain to be executed.

# simple progress indicator callback function
def progress_indicator(future):
    print('.', end='', flush=True)

Next, we can create a process pool with two processes and submit many tasks.

...
# start the process pool
with ProcessPoolExecutor(2) as executor:
    # send in the tasks
    futures = [executor.submit(task, i) for i in range(20)]

We can then register the callback with each task which will be executed after each task has completed.

...
# register the progress indicator callback
for future in futures:
    future.add_done_callback(progress_indicator)

That's it.

Tying this together, the complete example of showing a progress indicator of completed tasks is listed below.

# SuperFastPython.com
# example of a simple progress indicator for tasks in a process pool
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait

# simple progress indicator callback function
def progress_indicator(future):
    print('.', end='', flush=True)

# mock test that works for moment
def task(name):
    sleep(random())

# entry point
if __name__ == '__main__':
    # start the process pool
    with ProcessPoolExecutor(2) as executor:
        # send in the tasks
        futures = [executor.submit(task, i) for i in range(20)]
        # register the progress indicator callback
        for future in futures:
            future.add_done_callback(progress_indicator)
        # wait for all tasks to complete
    print('\nDone!')

Running the example creates the process pool and submits twenty tasks for execution.

The tasks complete one-by-one reporting the progress of completed tasks with printed dots.

Once all 20 tasks have completed with all 20 dots printed, we can carry on with our program.

....................
Done!

Takeaways

You now know how to show progress of tasks in the ProcessPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ProcessPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.