How to Wait For a Task to Finish in the ProcessPoolExecutor

January 11, 2022 Python ProcessPoolExecutor

You can wait for a task to finish in a ProcessPoolExecutor by calling the wait() function.

In this tutorial you will discover how to wait for tasks to finish in a Python process pool.

Let's get started.

Need To Wait for Submitted Tasks to Finish

The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.

You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process. Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.

You can also submit tasks to the process pool by calling the map() function and pass in the name of the function and the iterable of items that your function will be applied to asynchronously.

After submitting tasks to the process pool, you may want to wait for the tasks to complete before continuing.

There are many reasons for this to be the case, for example:

How can you wait for tasks submitted to the ProcessPoolExecutor to complete?

How to Wait For Tasks To Complete

There are a number of ways that you can wait for tasks to complete in the ProcessPoolExecutor.

The most common approach is to call the wait() module function and pass in a collection of Future objects created when calling submit().

The wait function gives you fine grained control over the specific tasks that you wish to wait to complete and also allows you to specify a timeout for how long you are willing to wait in seconds.

...
# wait for a collection of tasks to complete
wait(futures)

Alternatively, you can enumerate the list of Future objects and attempt to get the result from each. This iteration will complete when all results are available meaning that all tasks were completed.

...
# wait for all tasks to complete by getting all results
for future in futures:
	result = future.result()
	# do something with the result...
# all tasks are complete

Perhaps you don't have Future objects for your tasks because you submitted them to the process pool by calling map(), or perhaps you wish to wait for all tasks in the process pool to complete rather than a subset.

In this case, you can wait for tasks to complete while shutting down the process pool.

You can shutdown the process pool by calling the shutdown() function. We can set the "wait" argument to True so that the call will not return until all running tasks complete and set "cancel_futures" to True which will cancel all scheduled tasks.

...
# shutdown the pool, cancels scheduled tasks, returns when running tasks complete
executor.shutdown(wait=True, cancel_futures=True)

You can also shutdown the pool and not cancel the scheduled tasks, yet still wait for all tasks to complete.

This will ensure all running and scheduled tasks are completed before the function returns. This is the default behavior of the shutdown function, but is a good idea to specify explicitly.

...
# shutdown the pool, returns after all scheduled and running tasks complete
executor.shutdown(wait=True, cancel_futures=False)

Now that we have seen a few ways to wait for tasks to complete in the ProcessPoolExecutor, let's look at a worked example.

Example of Waiting For Tasks to Complete

Let's demonstrate how to wait for tasks to complete in the ProcessPoolExecutor.

First, let's define a simple task that will block for a fraction of a second and report when it is completed.

# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    print(f'Done: {name}')

Next, we can create a process pool with 2 worker processes and issue ten tasks to the pool for execution by calling the submit() function for each task.

Each call to submit will return a Future object which we will collect into a list.

...
# start the process pool
with ProcessPoolExecutor(2) as executor:
    # submit tasks and collect futures
    futures = [executor.submit(task, i) for i in range(10)]

Next, we can call the wait() module function and pass in the list of the ten Future objects we collected when calling submit().

...
# wait for all tasks to complete
print('Waiting for tasks to complete...')
wait(futures)

This call will return once all tasks associated with the Future objects in the collection have completed.

We can then report that all tasks are completed.

...
print('All tasks are done!')

Tying this together, the complete example of starting tasks and waiting for them all to complete before continuing on is listed below.

# SuperFastPython.com
# example of waiting for tasks to complete in the process pool
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait

# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    print(f'Done: {name}')

# entry point
if __name__ == '__main__':
    # start the process pool
    with ProcessPoolExecutor(2) as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # wait for all tasks to complete
        print('Waiting for tasks to complete...')
        wait(futures)
        print('All tasks are done!')

Running the example first submits all tasks into the process pool.

We then start waiting for the tasks to complete, reporting a message.

The tasks complete, reporting a message as they do.

Finally, all tasks are completed and we are free to carry on.

Waiting for tasks to complete...
Done: 0
Done: 1
Done: 2
Done: 3
Done: 4
Done: 6
Done: 7
Done: 5
Done: 8
Done: 9
All tasks are done!

Example of Waiting For All Tasks to Complete via shutdown()

Let's look at an alternative approach for waiting for all tasks to complete.

Perhaps we have many tasks in the process pool and we don't have Future objects for them.

This might happen if we add tasks to the pool using the map() function and choose not to enumerate the results for the tasks.

...
# submit tasks to the process pool
map(task, range(10))

We can wait for all of the scheduled and running tasks to complete by calling the shutdown() function explicitly and setting wait=True and cancel_futures=False, the default arguments.

...
# shutdown the process pool and wait for all tasks to complete
executor.shutdown()

Because we are using the context manager, the process pool will be closed automatically using the default parameters.

Therefore, we don't need to add any extra code in order to wait for all tasks in the process pool to complete before continuing on.

We can demonstrate this with a worked example.

# SuperFastPython.com
# example of waiting for tasks to complete via a pool shutdown
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor

# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    print(f'Done: {name}')

# entry point
if __name__ == '__main__':
    # start the process pool
    with ProcessPoolExecutor(2) as executor:
        # submit tasks
        executor.map(task, range(10))
        # wait for all tasks to complete
        print('Waiting for tasks to complete...')
    print('All tasks are done!')

Running the example, we see that all tasks are submitted to the pool using the map() function and we do not have Future objects associated with each task.

We then wait for the tasks to complete at the end of the context manager. Tasks report their progress as they complete.

Finally, all tasks are completed, the context manager block ends closing the process pool and we continue on.

Waiting for tasks to complete...
Done: 0
Done: 1
Done: 2
Done: 4
Done: 3
Done: 6
Done: 7
Done: 5
Done: 8
Done: 9
All tasks are done!

Takeaways

You now know how to wait for tasks to complete in the ProcessPoolExecutor in Python.



If you enjoyed this tutorial, you will love my book: Python ProcessPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.