Use ThreadPoolExecutor Within ProcessPoolExecutor in Python

January 8, 2022 Python ProcessPoolExecutor

You can create ThreadPoolExecutor thread pools within each worker process in the ProcessPoolExecutor.

In this tutorial you will discover how to create and use thread pools within process workers in Python.

Let's get started.

Need Threads Within Each Worker Process

The ProcessPoolExecutor provides an easy way to execute tasks concurrently and in parallel using child processes.

The process pool can be defined which provides reusable workers that can execute ad hoc tasks. We can submit tasks for execution to the pool using the submit() or map() function and wait for the tasks to complete and even collect results from each task.

The sweet spot for the ProcessPoolExecutor are tasks that are CPU-bound, meaning that they can run as fast as the CPU, and that have low overhead in terms of data sent to and received from the target task function.

In our application, we may have a hierarchy of tasks to execute.

That is, each high-level task executed by a process worker may be required to perform a number of subtasks that could be completed concurrently. If these subtasks are IO-bound, then we may want to make use of threads within each worker process.

There are many examples of why we may need to use threads within worker processes, such as:

How can we use threads within each worker process?

ThreadPoolExecutor Within Each Worker Process

We can use threads within each worker process by creating a thread pool.

Specifically, each process worker executes a target function. Within the target function, we may create an instance of a ThreadPoolExecutor and submit IO-bound tasks.

For example, a target task function for a process worker may look as follows:

# task for process workers
def process_work():
    # create the thread pool
    with ThreadPoolExecutor() as exe:
        # submit tasks...

Where each call to the process_work() function is executed by a process worker in a ProcessPoolExecutor, for example:

# entry point
if __name__ == '__main__':
    # create a process pool
    with ProcessPoolExecutor() as exe:
    	# execute a task with a process worker
    	future = exe.submit(process_work)

This means each worker process in the ProcessPoolExecutor will have a ThreadPoolExecutor.

It allows us to both harness multiple CPU cores by using processes and have perhaps tens or even hundreds of threads within each process or CPU core.

Now that we know how to create and use thread pools within process workers, let's look at some worked examples.

Example of ThreadPoolExecutor Within ProcessPoolExecutor

We can demonstrate how to create and use a ThreadPoolExecutor within each worker process.

First, let's define a target task function for the worker threads.

We will assign a unique identifier integer to each process task and each thread task. These unique numbers can then be reported by the process tasks and worker tasks once they finish their computation.

In the case of the thread tasks, we will block for a moment using a call to the sleep() function then report a unique message.

The thread_work() function below implements this.

# task for thread workers within each process worker
def thread_work(process_id, thread_id):
    # block for a moment
    sleep(random())
    # report a unique message
    print(f'Process {process_id}, thread worker {thread_id}', flush=True)

Next, we can define a target task function for the process tasks.

This function must create a ThreadPoolExecutor and submit a number of calls to the thread_work() function.

First we create the thread pool with the default number of threads, then submit five separate tasks as calls to the thread_work() function, each with the unique process task identifier and a unique integer for each call to the thread task function.

We will use the submit() function to issue the tasks which returns a Future object for each call.

We then explicitly wait on the tasks to complete with a call to the wait() module function passing in all Future objects. This is technically not required as we could just wait for the thread pool to shutdown as part of the context manager which in turn will wait for all tasks to complete.

...
# create the thread pool
with ThreadPoolExecutor() as exe:
    # submit tasks
    futures = [exe.submit(thread_work, identifier, i) for i in range(5)]
    # wait for tasks to complete
    _ = wait(futures)

The process_work() function implements this, providing a target task function for worker processes.

# task for process workers
def process_work(identifier):
    # create the thread pool
    with ThreadPoolExecutor() as exe:
        # submit tasks
        futures = [exe.submit(thread_work, identifier, i) for i in range(5)]
        # wait for tasks to complete
        _ = wait(futures)
    print(f'Process {identifier} Done', flush=True)

Finally, we can define the entry point of the program.

First, the process pool is created with the default number of worker processes.

...
# create a process pool
with ProcessPoolExecutor() as exe:
	# ...

We then submit five tasks to the process pool as calls to the process_work() function with a unique integer for each call.

We use submit() to issue calls to the process pool and then explicitly wait on the tasks to complete with a call to the wait() module function, passing in the list of Future objects for our process tasks.

...
# submit tasks
futures = [exe.submit(process_work, i) for i in range(5)]
# wait for tasks to complete...
_ = wait(futures)

We issue five process tasks and each process task will start a thread pool and issue five thread tasks, giving a total of 25 thread tasks in all.

And that's it, the entry point for the program is defined below.

# entry point
if __name__ == '__main__':
    # create a process pool
    with ProcessPoolExecutor() as exe:
        # submit tasks
        futures = [exe.submit(process_work, i) for i in range(5)]
        # wait for tasks to complete...
        _ = wait(futures)
    print('Main Done', flush=True)

Tying this together, the complete example of creating a thread pool within each process worker is listed below.

# SuperFastPython.com
# example of using thread pools within worker processes
from random import random
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait

# task for thread workers within each process worker
def thread_work(process_id, thread_id):
    # block for a moment
    sleep(random())
    # report a unique message
    print(f'Process {process_id}, thread worker {thread_id}', flush=True)

# task for process workers
def process_work(identifier):
    # create the thread pool
    with ThreadPoolExecutor() as exe:
        # submit tasks
        futures = [exe.submit(thread_work, identifier, i) for i in range(5)]
        # wait for tasks to complete
        _ = wait(futures)
    print(f'Process {identifier} Done', flush=True)

# entry point
if __name__ == '__main__':
    # create a process pool
    with ProcessPoolExecutor() as exe:
        # submit tasks
        futures = [exe.submit(process_work, i) for i in range(5)]
        # wait for tasks to complete...
        _ = wait(futures)
    print('Main Done', flush=True)

Running the example first creates the process pool then submits tasks into the pool.

Each task in the process pool then creates a thread pool and submits tasks to the worker threads.

Finally, the thread worker tasks finish and report a message, the process worker tasks complete and report a message and finally the main thread reports that all tasks are complete.

...
Process 4, thread worker 1
Process 4, thread worker 3
Process 4 Done
Process 0, thread worker 1
Process 0, thread worker 3
Process 0, thread worker 2
Process 0 Done
Process 3, thread worker 1
Process 3 Done
Main Done

Next, let's look at how we might collate data from worker threads into worker processes and finally into the main thread.

Collate Results From Worker Threads and Worker Processes

We may want to gather results from worker threads within each worker process and even push the subtotals back to the main thread for a final collation.

This can easily be achieved by having both the target function for worker threads and worker processes return values.

We can demonstrate this by adapting the example from the previous section to sum values at each level. This provides a straightforward code template that you can adapt for your own project.

Firstly, we can update the thread_work() function to generate a pseudo random value between 0 and 1, report this value and then return it to the caller.

# task for thread workers within each process worker
def thread_work(process_id, thread_id):
    # generate a unique value for this worker thread
    value = random()
    # block for a moment
    sleep(value)
    # report a unique message
    print(f'Process {process_id}, thread worker {thread_id}, value={value}', flush=True)
    return value

Next, we can update the process_work() function to collate the values returned from the calls to thread_work() issued to the thread pool.

One approach would be to issue each call using the submit() function on the thread pool, then iterate the Future objects returned and call the result() function.

For example:

...
# submit tasks
futures = [exe.submit(thread_work, identifier, i) for i in range(4)]
# sum the values
total = 0
for future in futures:
	total += future.result()

A simpler approach is to use the map() function to issue tasks on the thread pool and sum the return values directly, for example:

...
# sum the value from threads
total = sum(exe.map(thread_work, *5, range(5)))

The updated version of the process_work() function that returns the sum of the values calculated by the worker threads is listed below.

# task for process workers
def process_work(identifier):
    # create the thread pool
    with ThreadPoolExecutor() as exe:
        # sum the value from threads
        total = sum(exe.map(thread_work, *5, range(5)))
    print(f'Process {identifier} Done, total={total}', flush=True)
    return total

Finally, we can update the main thread to collate the values from each worker process.

Again, we could use the submit() function, get Future objects, then call the result() function on each.

Instead, in this case, it is simpler to use the map() function again and sum the iterable returned, for example:

...
# sum result from processes
total = sum(exe.map(process_work, range(5)))

The updated entry point that collates values from worker processes is listed below.

# entry point
if __name__ == '__main__':
    # create a process pool
    with ProcessPoolExecutor() as exe:
        # sum result from processes
        total = sum(exe.map(process_work, range(5)))
    print(f'Main Done, total={total}', flush=True)

Tying this together, the complete example of using thread pools and a process pool to collate results is listed below.

# SuperFastPython.com
# example of using thread pools within worker processes to collate results
from random import random
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor

# task for thread workers within each process worker
def thread_work(process_id, thread_id):
    # generate a unique value for this worker thread
    value = random()
    # block for a moment
    sleep(value)
    # report a unique message
    print(f'Process {process_id}, thread worker {thread_id}, value={value}', flush=True)
    return value

# task for process workers
def process_work(identifier):
    # create the thread pool
    with ThreadPoolExecutor() as exe:
        # sum the value from threads
        total = sum(exe.map(thread_work, *5, range(5)))
    print(f'Process {identifier} Done, total={total}', flush=True)
    return total

# entry point
if __name__ == '__main__':
    # create a process pool
    with ProcessPoolExecutor() as exe:
        # sum result from processes
        total = sum(exe.map(process_work, range(5)))
    print(f'Main Done, total={total}', flush=True)

Running the example first creates the process pool and submits five tasks.

Each process task creates a thread pool and submits five tasks. The thread tasks complete and the results returned from each are summed together and the total is reported by each process task.

Finally, the process tasks return their values which are summed by the main thread and the overall total is reported.

...
Process 1, thread worker 4, value=0.5784439574691612
Process 1 Done, total=1.8513536135254558
Process 2, thread worker 3, value=0.9798105837814127
Process 0, thread worker 3, value=0.7251034762402645
Process 3, thread worker 4, value=0.8473968578985317
Process 3 Done, total=2.19615754102612
Process 4, thread worker 3, value=0.48889645146676286
Process 2, thread worker 4, value=0.285544567351118
Process 2 Done, total=2.385693049839012
Process 4, thread worker 4, value=0.17721274068245063
Process 4 Done, total=2.420825508785872
Process 0, thread worker 4, value=0.3492667693766406
Process 0 Done, total=2.4611316259705975
Main Done, total=11.315161339147057

Takeaways

You now know how to create and use ThreadPoolExecutor thread pools within ProcessPoolExecutor worker processes.



If you enjoyed this tutorial, you will love my book: Python ProcessPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.