What is the ProcessPoolExecutor

February 15, 2022 Python ProcessPoolExecutor

The ProcessPoolExecutor implements the Executor abstract class and provides a process pool in Python.

In this tutorial, you will discover the ProcessPoolExecutor class.

Let's get started.

What Are Python Processes

A process refers to a computer program.

Every Python program is a process and has one thread called the main thread used to execute your program instructions. Each process is, in fact, one instance of the Python interpreter that executes Python instructions (Python byte-code), which is a slightly lower level than the code you type into your Python program.

Sometimes we may need to create new processes to run additional tasks concurrently.

Python provides real system-level processes via the multiprocessing.Process class.

You can learn more about multiprocessing in the tutorial:

The underlying operating system controls how new processes are created. On some systems, that may require spawning a new process, and on others, it may require that the process is forked. The operating system-specific method used for creating new processes in Python is not something we need to worry about as it is managed by your installed Python interpreter.

A task can be run in a new process by creating an instance of the multiprocessing.Process class and specifying the function to run in the new process via the "target" argument.

...
# define a task to run in a new process
p = Process(target=task)

Once the process is created, it must be started by calling the start() function.

...
# start the task in a new process
p.start()

We can then wait around for the task to complete by joining the process; for example:

...
# wait for the task to complete
p.join()

Whenever we create new processes, we must protect the entry point of the program.

# entry point for the program
if __name__ == '__main__':
	# do things...

Tying this together, the complete example of creating a Process to run an ad hoc task function is listed below.

# SuperFastPython.com
# example of running a function in a new process
from multiprocessing import Process

# a task to execute in another process
def task():
    print('This is another process', flush=True)

# entry point for the program
if __name__ == '__main__':
    # define a task to run in a new process
    p = Process(target=task)
    # start the task in a new process
    p.start()
    # wait for the task to complete
    p.join()

This is useful for running one-off ad hoc tasks in a separate process, although it becomes cumbersome when you have many tasks to run.

Each process that is created requires the application of resources (e.g. an instance of the Python interpreter and a memory for the process's main thread's stack space). The computational costs for setting up processes can become expensive if we are creating and destroying many processes over and over for ad hoc tasks.

Instead, we would prefer to keep worker processes around for reuse if we expect to run many ad hoc tasks throughout our program.

This can be achieved using a process pool.

What Is a Process Pool

A process pool is a programming pattern for automatically managing a pool of worker processes.

The pool is responsible for a fixed number of processes.

The pool can provide a generic interface for executing ad hoc tasks with a variable number of arguments, much like the target property on the Process object, but does not require that we choose a process to run the task, start the process, or wait for the task to complete.

Python provides a process pool via the ProcessPoolExecutor class.

What is the ProcessPoolExecutor

The ProcessPoolExecutor extends the Executor class and will return Future objects when it is called.

Let's take a closer look at Executors, Futures, and the life-cycle of using the ProcessPoolExecutor class.

What Are Executors

The ProcessPoolExecutor class extends the abstract Executor class.

The Executor class defines three methods used to control our process pool; they are: submit(), map(), and shutdown().

The Executor is started when the class is created and must be shut down explicitly by calling shutdown(), which will release any resources held by the Executor. It can also be shutdown automatically by using the Executor via the context manager via the "with" keyword.

The submit() and map() functions are used to submit tasks to the Executor for asynchronous execution.

The map() function operates just like the built-in map() function and is used to apply a function to each element in an iterable object, like a list. Unlike the built-in map() function, each application of the function to an element will happen asynchronously instead of sequentially.

The submit() function takes a function as well as any arguments and will execute it asynchronously, although the call returns immediately and provides a Future object.

We will take a closer look at each of these three functions in a moment. Firstly, what is a Future?

What Are Futures

A future is an object that represents a delayed result for an asynchronous task.

It is also sometimes called a promise or a delay. It provides a context for the result of a task that may or may not be executing and a way of getting a result once it is available.

In Python, the Future object is returned from an Executor, such as a ProcessPoolExecutor, when calling the submit() function to dispatch a task to be executed asynchronously.

In general, we do not create Future objects; we only receive them and we may need to call functions on them.

There is always one Future object for each task sent into the ProcessPoolExecutor via a call to submit().

The Future object provides a number of helpful functions for inspecting the status of the task such as: cancelled(), running(), and done() to determine if the task was cancelled, is currently running, or has finished execution.

A running task cannot be cancelled and a done task could have been canceled.

A Future object also provides access to the result of the task via the result() function. If an exception was raised while executing the task, it will be re-raised when calling the result() function, or can be accessed via the exception() function.

Both the result() and exception() functions allow a timeout to be specified as an argument, which is the number of seconds to wait for a return value if the task is not yet complete. If the timeout expires, then a TimeoutError will be raised.

Finally, we may want to have the process pool automatically call a function once the task is completed.

This can be achieved by attaching a callback to the Future object for the task via the add_done_callback() function.

We can add more than one callback to each task, and they will be executed in the order they were added. If the task has already completed before we add the callback, then the callback is executed immediately.

Any exceptions raised in the callback function will not impact the task or process pool.

We will take a closer look at the Future object in a later section.

Now that we are familiar with the Executor and Future classes, let's take a closer look at the ProcessPoolExecutor.

What is the ProcessPoolExecutor

The ProcessPoolExecutor provides a pool of generic worker processes.

It was designed to be easy and straightforward to use.

If multiprocessing was like the transmission for changing gears in a car, then using multiprocessing.Process is a manual transmission (e.g. hard to learn and use) whereas concurrency.futures.ProcessPoolExecutor is an automatic transmission (e.g. easy to learn and use).

Executing functions using worker processes in the ProcessPoolExecutor involves first creating the process pool, then submitting the task into the pool. If the task returns a value, we can then retrieve it when the task is completed.

We can demonstrate this with a small example.

First, let's define a task that blocks for a moment, then reports a message and returns a value.

# a simple task that blocks for a moment and prints a message
def task():
    # block for a moment
    sleep(1)
    # display a message
    print('This is coming from another process', flush=True)

Next, we can create the process pool by calling the constructor of the ProcessPoolExecutor class that will create a number of worker processes for us to use.

We will use a context manager so that the pool will be shutdown automatically once we are finished with it.

...
# create the pool of worker processes
with ProcessPoolExecutor() as executor:
    # ...

We can submit a task into the process pool by calling the submit() function and specifying the name of the function to run in a worker process. The task will begin executing as soon as a worker is available, in this case, nearly immediately.

This returns a Future object that provides a handle on the task that is executing concurrently.

...
# execute a task in another process
future = executor.submit(task)

We can then report a message that we are waiting for the task to complete.

...
# display a message
print('Waiting for the new process to finish...'. flush=True)

We can then retrieve the return value from this function by calling the result() function on the Future object.

...
# wait for the task to finish and get the result
result = future.result()
# report the result
print(result)

Tying this together, a complete example of using the Python process pool class ProcessPoolExecutor is listed below.

# SuperFastPython.com
# demonstration with the process pool life-cycle
from time import sleep
from concurrent.futures import ProcessPoolExecutor

# a simple task that blocks for a moment and prints a message
def task():
    # block for a moment
    sleep(1)
    # display a message
    print(f'Task running in a worker process')
    # return a message
    return 'All done'

# entry point
if __name__ == '__main__':
    # create the pool of worker processes
    with ProcessPoolExecutor() as executor:
        # execute a task in another process
        future = executor.submit(task)
        # display a message
        print('Waiting for the new process to finish...')
        # wait for the task to finish and get the result
        result = future.result()
        # report the result
        print(result)
    # shutdown the process pool automatically

Running the example creates the process pool, submits the task, and waits for the result in the main thread.

The process pool creates the default number of worker processes and waits for the result of a task. The task is submitted and is consumed by a worker process in the pool and begins executing.

The task blocks for a moment, reports a message, and returns a value.

Finally, the main thread retrieves the result of the task and closes the process pool.

Waiting for the new process to finish...
Task running in a worker process
All done

We can see that the process pool is indeed very easy to use once created. We could keep calling submit() and issuing tasks to be executed concurrently until we have no further tasks to execute.

Takeaways

You now know about the ProcessPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ProcessPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.