Last Updated on September 12, 2022
You can create ThreadPoolExecutor thread pools within each worker process in the ProcessPoolExecutor.
In this tutorial you will discover how to create and use thread pools within process workers in Python.
Let’s get started.
Need Threads Within Each Worker Process
The ProcessPoolExecutor provides an easy way to execute tasks concurrently and in parallel using child processes.
The process pool can be defined which provides reusable workers that can execute ad hoc tasks. We can submit tasks for execution to the pool using the submit() or map() function and wait for the tasks to complete and even collect results from each task.
The sweet spot for the ProcessPoolExecutor are tasks that are CPU-bound, meaning that they can run as fast as the CPU, and that have low overhead in terms of data sent to and received from the target task function.
In our application, we may have a hierarchy of tasks to execute.
That is, each high-level task executed by a process worker may be required to perform a number of subtasks that could be completed concurrently. If these subtasks are IO-bound, then we may want to make use of threads within each worker process.
There are many examples of why we may need to use threads within worker processes, such as:
- Worker processes need to download data from the network or internet.
- Worker processes need to save or load data to or from file.
- Worker processes need to store or retrieve data from a database.
How can we use threads within each worker process?
Run loops using all CPUs, download your FREE book to learn how.
ThreadPoolExecutor Within Each Worker Process
We can use threads within each worker process by creating a thread pool.
Specifically, each process worker executes a target function. Within the target function, we may create an instance of a ThreadPoolExecutor and submit IO-bound tasks.
For example, a target task function for a process worker may look as follows:
1 2 3 4 5 |
# task for process workers def process_work(): # create the thread pool with ThreadPoolExecutor() as exe: # submit tasks... |
Where each call to the process_work() function is executed by a process worker in a ProcessPoolExecutor, for example:
1 2 3 4 5 6 |
# entry point if __name__ == '__main__': # create a process pool with ProcessPoolExecutor() as exe: # execute a task with a process worker future = exe.submit(process_work) |
This means each worker process in the ProcessPoolExecutor will have a ThreadPoolExecutor.
It allows us to both harness multiple CPU cores by using processes and have perhaps tens or even hundreds of threads within each process or CPU core.
Now that we know how to create and use thread pools within process workers, let’s look at some worked examples.
Example of ThreadPoolExecutor Within ProcessPoolExecutor
We can demonstrate how to create and use a ThreadPoolExecutor within each worker process.
First, let’s define a target task function for the worker threads.
We will assign a unique identifier integer to each process task and each thread task. These unique numbers can then be reported by the process tasks and worker tasks once they finish their computation.
In the case of the thread tasks, we will block for a moment using a call to the sleep() function then report a unique message.
The thread_work() function below implements this.
1 2 3 4 5 6 |
# task for thread workers within each process worker def thread_work(process_id, thread_id): # block for a moment sleep(random()) # report a unique message print(f'Process {process_id}, thread worker {thread_id}', flush=True) |
Next, we can define a target task function for the process tasks.
This function must create a ThreadPoolExecutor and submit a number of calls to the thread_work() function.
First we create the thread pool with the default number of threads, then submit five separate tasks as calls to the thread_work() function, each with the unique process task identifier and a unique integer for each call to the thread task function.
We will use the submit() function to issue the tasks which returns a Future object for each call.
We then explicitly wait on the tasks to complete with a call to the wait() module function passing in all Future objects. This is technically not required as we could just wait for the thread pool to shutdown as part of the context manager which in turn will wait for all tasks to complete.
1 2 3 4 5 6 7 |
... # create the thread pool with ThreadPoolExecutor() as exe: # submit tasks futures = [exe.submit(thread_work, identifier, i) for i in range(5)] # wait for tasks to complete _ = wait(futures) |
The process_work() function implements this, providing a target task function for worker processes.
1 2 3 4 5 6 7 8 9 |
# task for process workers def process_work(identifier): # create the thread pool with ThreadPoolExecutor() as exe: # submit tasks futures = [exe.submit(thread_work, identifier, i) for i in range(5)] # wait for tasks to complete _ = wait(futures) print(f'Process {identifier} Done', flush=True) |
Finally, we can define the entry point of the program.
First, the process pool is created with the default number of worker processes.
1 2 3 4 |
... # create a process pool with ProcessPoolExecutor() as exe: # ... |
We then submit five tasks to the process pool as calls to the process_work() function with a unique integer for each call.
We use submit() to issue calls to the process pool and then explicitly wait on the tasks to complete with a call to the wait() module function, passing in the list of Future objects for our process tasks.
1 2 3 4 5 |
... # submit tasks futures = [exe.submit(process_work, i) for i in range(5)] # wait for tasks to complete... _ = wait(futures) |
We issue five process tasks and each process task will start a thread pool and issue five thread tasks, giving a total of 25 thread tasks in all.
And that’s it, the entry point for the program is defined below.
1 2 3 4 5 6 7 8 9 |
# entry point if __name__ == '__main__': # create a process pool with ProcessPoolExecutor() as exe: # submit tasks futures = [exe.submit(process_work, i) for i in range(5)] # wait for tasks to complete... _ = wait(futures) print('Main Done', flush=True) |
Tying this together, the complete example of creating a thread pool within each process worker is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of using thread pools within worker processes from random import random from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait # task for thread workers within each process worker def thread_work(process_id, thread_id): # block for a moment sleep(random()) # report a unique message print(f'Process {process_id}, thread worker {thread_id}', flush=True) # task for process workers def process_work(identifier): # create the thread pool with ThreadPoolExecutor() as exe: # submit tasks futures = [exe.submit(thread_work, identifier, i) for i in range(5)] # wait for tasks to complete _ = wait(futures) print(f'Process {identifier} Done', flush=True) # entry point if __name__ == '__main__': # create a process pool with ProcessPoolExecutor() as exe: # submit tasks futures = [exe.submit(process_work, i) for i in range(5)] # wait for tasks to complete... _ = wait(futures) print('Main Done', flush=True) |
Running the example first creates the process pool then submits tasks into the pool.
Each task in the process pool then creates a thread pool and submits tasks to the worker threads.
Finally, the thread worker tasks finish and report a message, the process worker tasks complete and report a message and finally the main thread reports that all tasks are complete.
1 2 3 4 5 6 7 8 9 10 11 |
... Process 4, thread worker 1 Process 4, thread worker 3 Process 4 Done Process 0, thread worker 1 Process 0, thread worker 3 Process 0, thread worker 2 Process 0 Done Process 3, thread worker 1 Process 3 Done Main Done |
Next, let’s look at how we might collate data from worker threads into worker processes and finally into the main thread.
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Collate Results From Worker Threads and Worker Processes
We may want to gather results from worker threads within each worker process and even push the subtotals back to the main thread for a final collation.
This can easily be achieved by having both the target function for worker threads and worker processes return values.
We can demonstrate this by adapting the example from the previous section to sum values at each level. This provides a straightforward code template that you can adapt for your own project.
Firstly, we can update the thread_work() function to generate a pseudo random value between 0 and 1, report this value and then return it to the caller.
1 2 3 4 5 6 7 8 9 |
# task for thread workers within each process worker def thread_work(process_id, thread_id): # generate a unique value for this worker thread value = random() # block for a moment sleep(value) # report a unique message print(f'Process {process_id}, thread worker {thread_id}, value={value}', flush=True) return value |
Next, we can update the process_work() function to collate the values returned from the calls to thread_work() issued to the thread pool.
One approach would be to issue each call using the submit() function on the thread pool, then iterate the Future objects returned and call the result() function.
For example:
1 2 3 4 5 6 7 |
... # submit tasks futures = [exe.submit(thread_work, identifier, i) for i in range(4)] # sum the values total = 0 for future in futures: total += future.result() |
A simpler approach is to use the map() function to issue tasks on the thread pool and sum the return values directly, for example:
1 2 3 |
... # sum the value from threads total = sum(map(thread_work, [identifier]*5, range(5))) |
The updated version of the process_work() function that returns the sum of the values calculated by the worker threads is listed below.
1 2 3 4 5 6 7 8 |
# task for process workers def process_work(identifier): # create the thread pool with ThreadPoolExecutor() as exe: # sum the value from threads total = sum(map(thread_work, [identifier]*5, range(5))) print(f'Process {identifier} Done, total={total}', flush=True) return total |
Finally, we can update the main thread to collate the values from each worker process.
Again, we could use the submit() function, get Future objects, then call the result() function on each.
Instead, in this case, it is simpler to use the map() function again and sum the iterable returned, for example:
1 2 3 |
... # sum result from processes total = sum(exe.map(process_work, range(5))) |
The updated entry point that collates values from worker processes is listed below.
1 2 3 4 5 6 7 |
# entry point if __name__ == '__main__': # create a process pool with ProcessPoolExecutor() as exe: # sum result from processes total = sum(exe.map(process_work, range(5))) print(f'Main Done, total={total}', flush=True) |
Tying this together, the complete example of using thread pools and a process pool to collate results is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of using thread pools within worker processes to collate results from random import random from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor # task for thread workers within each process worker def thread_work(process_id, thread_id): # generate a unique value for this worker thread value = random() # block for a moment sleep(value) # report a unique message print(f'Process {process_id}, thread worker {thread_id}, value={value}', flush=True) return value # task for process workers def process_work(identifier): # create the thread pool with ThreadPoolExecutor() as exe: # sum the value from threads total = sum(map(thread_work, [identifier]*5, range(5))) print(f'Process {identifier} Done, total={total}', flush=True) return total # entry point if __name__ == '__main__': # create a process pool with ProcessPoolExecutor() as exe: # sum result from processes total = sum(exe.map(process_work, range(5))) print(f'Main Done, total={total}', flush=True) |
Running the example first creates the process pool and submits five tasks.
Each process task creates a thread pool and submits five tasks. The thread tasks complete and the results returned from each are summed together and the total is reported by each process task.
Finally, the process tasks return their values which are summed by the main thread and the overall total is reported.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
... Process 1, thread worker 4, value=0.5784439574691612 Process 1 Done, total=1.8513536135254558 Process 2, thread worker 3, value=0.9798105837814127 Process 0, thread worker 3, value=0.7251034762402645 Process 3, thread worker 4, value=0.8473968578985317 Process 3 Done, total=2.19615754102612 Process 4, thread worker 3, value=0.48889645146676286 Process 2, thread worker 4, value=0.285544567351118 Process 2 Done, total=2.385693049839012 Process 4, thread worker 4, value=0.17721274068245063 Process 4 Done, total=2.420825508785872 Process 0, thread worker 4, value=0.3492667693766406 Process 0 Done, total=2.4611316259705975 Main Done, total=11.315161339147057 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Takeaways
You now know how to create and use ThreadPoolExecutor thread pools within ProcessPoolExecutor worker processes.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by John McArthur on Unsplash
Mark says
with....
We don’t need to use
wait
for the thread or pool if we use it within the “with” scope isn’t it?Many thanks for your post!! very informative
Jason Brownlee says
Correct, although I like adding explicit waits to make the code easier to follow.