You can convert nested for-loops to execute concurrently or in parallel in Python using thread pools or process pools, depending on the types of tasks that are being executed.
In this tutorial, you will discover how to change a nested for-loop to be concurrent or parallel in Python with a suite of worked examples.
This tutorial was triggered by questions and discussions with Robert L. Thanks again. If you have questions or want to chat through a technical issue in Python concurrency, message me any time.
Let’s get started.
Nested For-Loop in Python
A nested for-loop is a loop within a loop.
For example, we may need to loop over a number of tasks, and each task has subtasks.
Each task requires effort, e.g. I/O (read or write data) or CPU compute (calculate something), and each subtask also requires some effort.
Importantly, the number and nature of subtasks for each task are a function of the task and may not be known beforehand. The tasks must be computed in order to determine and then issue the subtasks.
For example:
1 2 3 4 5 6 7 8 9 |
... # loop over tasks for task in tasks: # exec task and generate subtasks subtasks = do_task(task) # execute subtasks for subtask in subtasks: # execute subtask result = do_subtask(subtask) |
Often the tasks are independent of one another, and each subtask is also independent of one another.
Importantly, subtasks are dependent upon tasks. As such, we cannot pre-define a set of function calls prior and issue them all in batch. Instead, we need to navigate the tree or hierarchy of tasks and subtasks.
This raises the question, can we perform the tasks and subtasks concurrently or in parallel?
If so, the concurrent execution of tasks and subtasks can offer a dramatic speed-up.
How can we execute a nested for-loop in parallel in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Execute a Parallel Nested For-Loop
A nested for-loop can be converted to run in parallel.
More specifically, we can make it concurrent if the tasks are independent and if the subtasks are independent.
I/O bound tasks like reading and writing from files and sockets can be executed at the same time concurrently using threads. CPU-bound tasks like parsing a document in memory or calculating something can be performed in parallel using process-based concurrency.
You can learn more about the difference between when to use threads vs processes in the tutorial:
Therefore, if we have I/O bound tasks or subtasks, we can use a thread pool to make the loops concurrent via the concurrent.futures.ThreadPoolExecutor class or the multiprocessing.pool.ThreadPool class.
Concurrent for-loops (not nested) are straightforward, for example:
More work is required for concurrent nested for-loops.
If we have CPU-bound tasks or subtasks, we can use a process pool to make loops parallel via the concurrent.futures.ProcessPoolExecutor class or the multiprocessing.Pool class.
Parallel for-loops (not bested) are straightforward, for example:
More work is required for parallel nested for-loops.
There are two main approaches we can use to make a nested for-loop concurrent.
They are:
- Create a pool of workers at each level in the hierarchy.
- Share a pool of workers across the hierarchy.
Let’s take a closer look at each approach.
Approach 1: One Pool of Workers Per Level
Each level in a nested for-loop can have its own pool of workers.
That is, each task runs, does its work, creates a pool of workers, and issues the subtasks to the pool. If there is another level of subsubtasks, each of these would create its own pool of workers and issue its own tasks.
This is suited to nested for-loops that have a large number of tasks to execute at a given level.
The downside is the redundancy of having many pools of workers competing with each other. This is not a problem with thread pools, as we may have many thousands of concurrent threads, but process pools are typically limited to one worker per CPU core.
As such, some tuning of the number of workers per pool may be required.
Another downside of this approach is when using process pools, child processes are typically daemonic and are unable to create their own child processes. This means if tasks executing in a child process tries to create their own pool of workers it will fail with an error.
As such, this approach may only be viable when working with thread pools, and even then, perhaps only in a nested loop with tasks and subtasks with many subtasks per task.
Approach 2: Shared Pool of Workers Across Levels
Another approach is to create one pool of workers and issue all tasks, subtasks, and subsubtasks to this pool.
When using thread pools in one process, the pool can be shared with tasks and subtasks as a shared global variable, allowing tasks to be issued directly.
When using process pools, things are more tricky. A centralized pool of workers can be created in a server process using a multiprocessing.Manager and the proxy objects for using the centralized server can be shared among all tasks and subtasks.
An alternate design might be to use a shared queue. All tasks and subtasks may be placed onto the queue and a single consumer of tasks can retrieve items from the queue and issue them to the pool of workers.
This is functionally the same, although it separates the concern of issuing tasks from how they are executed, potentially allowing the consumer to decide to use a thread pool or process pool based on the types of tasks issued to the queue.
Now that we have considered some designs on how to convert a nested for-loop to run concurrently, let’s look at some worked examples.
Example of a Nested For-Loop in Python (slow version)
Firstly let’s develop a nested for-loop that does not run concurrently.
In this example, we will design a loop with 3 levels.
That is, tasks that generate subtasks, that themselves generate subsubtasks.
Each task will simulate effort with a sleep of one second and report a message.
The complete example of a nested for-loop is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# SuperFastPython.com # example of a nested for-loop import time # third level task def task3(arg): # report message print(f'\t\t>>>task3 {arg}', flush=True) # do work time.sleep(1) # second level task def task2(arg): # report message print(f'\t>>task2 {arg}', flush=True) # do work time.sleep(1) # issue third level tasks for i in range(2): task3(i) # top level task def task1(arg): # report message print(f'>task1 {arg}', flush=True) # do work time.sleep(1) # issue second level tasks for i in range(3): task2(i) # protect the entry point if __name__ == '__main__': # issue top level tasks to pool for i in range(5): task1(i) |
Running the example first executes 5 tasks.
Each task simulates work by reporting a message and taking a one-second sleep and executing 3 subtasks.
Each subtask simulates work by reporting a message and taking a one-second sleep and executing 2 subsubtasks.
Finally, each subsubtask simulates work by reporting a message and taking a one-second sleep.
There are 50 tasks in total. Given that each task involves a one-second sleep, the nested for-loop takes 50 seconds to complete.
The output of the program is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>task1 0 >>task2 0 >>>task3 0 >>>task3 1 >>task2 1 >>>task3 0 >>>task3 1 >>task2 2 >>>task3 0 >>>task3 1 >task1 1 >>task2 0 >>>task3 0 >>>task3 1 >>task2 1 >>>task3 0 >>>task3 1 >>task2 2 >>>task3 0 >>>task3 1 >task1 2 >>task2 0 >>>task3 0 >>>task3 1 >>task2 1 >>>task3 0 >>>task3 1 >>task2 2 >>>task3 0 >>>task3 1 >task1 3 >>task2 0 >>>task3 0 >>>task3 1 >>task2 1 >>>task3 0 >>>task3 1 >>task2 2 >>>task3 0 >>>task3 1 >task1 4 >>task2 0 >>>task3 0 >>>task3 1 >>task2 1 >>>task3 0 >>>task3 1 >>task2 2 >>>task3 0 >>>task3 1 |
Next, let’s explore how we might make this nested for-loop concurrent using thread pools.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Examples of Concurrent Nested For-Loops
The nested for-loop from the previous example can be updated to be concurrent using thread pools.
Recall, thread pools are appropriate for tasks that are I/O bound like reading and writing from files, sockets, and devices.
In this case, we will use the multiprocessing.pool.ThreadPool class, but the examples could just as easily use the concurrent.futures.ThreadPoolExecutor.
We can use a concurrent version of the built-in map() function on the ThreadPool to call a function for each item in an iterable and have each function call issued as a concurrent task. This can be achieved using the map() method.
You can learn more about how to use the ThreadPool class in the guide:
Next, let’s explore different designs for making the nested for-loop concurrent.
Single Shared ThreadPool
We can update the nested for-loop to use a single shared thread pool.
This can be achieved by creating a thread pool as a global variable, then having each task and subtask function access the global variable and issue tasks to the same shared thread pool directly.
It requires that the pool have enough capacity to execute all tasks, in this case at least 50 worker threads.
We need a way for the program to wait for all tasks to be completed.
One approach is to have each task wait for its subtasks. This is a straightforward solution, although it has the downside of occupying a worker thread while it waits for all subtasks to complete. This could turn disastrous, resulting in a deadlock if more tasks/subtasks/subsubtassk are issued to the thread pool than there are worker threads.
The example below demonstrates this approach of using a shared thread pool and having each task wait for its subtasks.
This approach can only be used if the total number of overall tasks/subtasks/etc is known and as many or more workers than tasks can be specified.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# SuperFastPython.com # example of a nested for-loop to use a single shared thread pool import time import multiprocessing.pool # third level task def task3(arg): # report message print(f'\t\t>>>task3 {arg}') # do work time.sleep(1) # second level task def task2(arg): # declare the global pool global pool # report message print(f'\t>>task2 {arg}') # do work time.sleep(1) # issue third level tasks and wait pool.map(task3, range(2)) # top level task def task1(arg): # declare the global pool global pool # report message print(f'>task1 {arg}') # do work time.sleep(1) # issue second level tasks and wait pool.map(task2, range(3)) # protect the entry point if __name__ == '__main__': # declare global pool global pool # assign the global pool pool = multiprocessing.pool.ThreadPool(100) # issue top level tasks to pool and wait pool.map(task1, range(5)) # close the pool pool.close() |
Running the example first creates the shared thread pool as a global variable.
It then issues 5 tasks to the pool and waits for them to complete.
Each task accesses the shared thread pool, reports a message, and sleeps, then issues 3 subtasks and waits for them to complete.
Each subtask accesses the same shared thread pool, reports a message, and sleeps, then issues 2 subsubtasks and waits for them to complete.
The subsubtasks report a message and sleep.
Critically, the thread pool is created with 100 workers, double the number needed to complete all 50 tasks.
In this case, all tasks are completed in about 3 seconds.
This makes sense as at most it takes 3 seconds to complete all three levels in the hierarchy, and all of the hierarchy is executed concurrently.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>task1 0 >task1 1 >task1 2 >task1 3 >task1 4 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 |
Next, let’s explore how we might update this example to use a queue and release workers immediately after they issue their subtasks.
Single ThreadPool With a Shared Queue
We can make a nested for-loop concurrent using a single thread pool and a shared queue.
You may recall that Python provides a thread-safe queue via the queue.Queue class.
Each task can push its subtasks into the queue and finish, allowing workers in the thread pool to be released, rather than wait for the subtasks to be completed.
Another thread, or the main thread, can consume task requests from the queue and issue them asynchronously to the thread pool, such as via the apply_async() method.
This is functionally the same as sharing a ThreadPool object via a global variable and having tasks issue their subtasks directly.
The important difference is that because works are released immediately after subtasks are issued, we do not need to have one worker per task in the ThreadPool. We have far fewer workers per task, without the risk of a deadlock.
The downside, still, is that we must know how many tasks overall will be issued.
The reason is that the thread responsible for issuing tasks to the thread pool needs to know when to stop consuming items from the queue and wait for all issued tasks to complete before terminating the program.
We can update the nested for-loop program to use a single ThreadPool and a queue.
The queue can be defined as a global variable and shared with all tasks at all levels. Tasks issue their subtasks to the queue as a tuple containing the function name and any arguments.
The main thread creates the thread pool and then consumes a fixed number of tasks from the queue before stopping and waiting for all issued tasks to the pool to finish.
This is a better design than the previous example.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# SuperFastPython.com # example of a nested for-loop to use a single shared thread pool with a queue import time import threading import multiprocessing.pool import queue # third level task def task3(arg): # report message print(f'\t\t>>>task3 {arg}') # do work time.sleep(1) # second level task def task2(arg): # declare the global queue global queue # report message print(f'\t>>task2 {arg}') # do work time.sleep(1) # issue third level tasks for i in range(2): queue.put((task3, (i,))) # top level task def task1(arg): # declare the global queue global queue # report message print(f'>task1 {arg}') # do work time.sleep(1) # issue second level tasks for i in range(3): queue.put((task2, (i,))) # protect the entry point if __name__ == '__main__': # declare the global queue global queue # create the shared queue queue = queue.Queue() # issue all top-level tasks for i in range(5): queue.put((task1, (i,))) # assume we know the total number of tasks total_tasks = 50 # create a thread pool with multiprocessing.pool.ThreadPool(total_tasks) as pool: # loop over all known tasks for _ in range(total_tasks): # consume a task task, args = queue.get() # issue task to the thread pool pool.apply_async(task, args) # close the pool pool.close() # wait for all tasks to be processed pool.join() |
Running the example first creates the shared queue as a global variable.
It then issues 5 tasks to the queue.
The overall total number of tasks is known and stated. It is used as the number of workers in the pool and in a loop over the number of tasks to retrieve from the queue.
Importantly, we could have far fewer workers in the thread pool than there are total tasks without fear of a deadlock because workers are released as soon as their subtasks are issued to the queue.
The main thread consumes items from the queue and issues them to the thread pool one by one.
Tasks run, report a message, sleep, then issue their subtasks to the queue before exiting.
Subtasks run, report their message, sleep and issue their subsubtasks to the queue before exiting.
Subsubsub tasks run, report their message, sleep, and exit.
In this case, all tasks are completed in about 3 seconds as before.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>task1 0 >task1 1 >task1 2 >task1 3 >task1 4 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 |
Next, let’s look at how we might update this example further so we don’t need to know the total number of tasks in the hierarchy beforehand.
Single ThreadPool With a Shared Queue (Unbounded)
We can update the single thread pool with the shared queue so that the total number of tasks does not need to be known.
This can be achieved by maintaining a shared thread-safe counter that is incremented for each task/subtask/subsubtask that is issued and is only decremented once a given task is completed and known not to issue a subtask.
Once the counter is zero, we know there are no further tasks running and no further tasks to be issued, meaning we can stop consuming items from the queue and close the thread pool.
The downside is that both the queue and the counter must be shared with each task. This could be abstracted away with a more clever design (something like: sharing them as arguments to worker initialization functions, store them in thread local or global vars, then using a separate single function from all tasks used to issue subtasks).
We can develop our own thread-safe counter, but for brevity, we can use a threading.Semaphore class.
The initial value of the Semaphore will be 5, for the known 5 tasks issued from the main thread.
For example:
1 2 3 4 |
... # shared counter for total tasks global counter counter = threading.Semaphore(5) |
The Semaphore can be incremented by calling the release() method and specifying the “n” argument as the number of subtasks by which to increment the counter.
The Semaphore can be decremented by calling the acquire() method.
We can check if the counter is zero by acquiring the internal condition variable and checking the internal counter value.
Therefore, our loop in the main thread can loop forever until we explicitly break the loop when the counter is zero.
For example:
1 2 3 4 5 6 7 |
... # loop over all known tasks while True: # check for no further tasks with counter._cond: if not counter._value: break |
This is fragile because we are accessing private members in the stdlib. A custom counter class is preferred if you are implementing this for a production system.
You can learn more about advanced and off-label use of threading semaphores in the tutorial:
You can learn more about developing your own custom thread-safe counter class in the tutorial:
It is possible that we may try to get a task from the queue when there are no further tasks to consume, such as when the final task is running but not yet done. The queue is empty in this case, but the counter is non-zero.
To overcome this case we can get items from the queue with a small timeout, allowing us to give up and check the counter again.
For example:
1 2 3 4 5 6 |
... # consume a task try: task, args = task_queue.get(timeout=0.5) except queue.Empty: continue |
You can learn more about how to get the most out of the thread-safe queue.Queue class in the tutorial:
We can decrement the counter automatically after every task is completed using a callback function on the asynchronous task in the ThreadPool.
We can define a callback function and have it decrement the counter.
For example:
1 2 3 4 5 |
# callback on all tasks in the thread pool def callback(arg): global counter # decrement the counter counter.acquire() |
The callback can be attached to each task issued to the thread pool via the apply_async() method.
For example:
1 2 3 |
... # issue task to the thread pool async_result = pool.apply_async(task, args, callback=callback) |
You can learn more about callback functions in the ThreadPool class in the tutorial:
Each task can be updated to declare the global counter (semaphore) variable as well as the queue.
For example:
1 2 3 |
... # declare the global queue and counter global task_queue, counter |
Then after subtasks and subsubtasks are issued to the queue, we can increment the counter.
For example:
1 2 3 |
... # increment the counter by the number of subtasks counter.release(n=3) |
That’s about it.
We have a way of using a single thread pool, issuing tasks using a shared queue, allowing the thread pool to have an arbitrary number of workers, and the program to have an unknown number of total tasks.
This is an off-the-cuff program design (design by the seat of my pants!), if you have a tighter design, please let me know in the comments below.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# SuperFastPython.com # example of a nested for-loop to use a single shared thread pool with a queue unbounded import time import threading import multiprocessing.pool import queue # third level task def task3(arg): # report message print(f'\t\t>>>task3 {arg}') # do work time.sleep(1) # second level task def task2(arg): # declare the global queue and counter global task_queue, counter # report message print(f'\t>>task2 {arg}') # do work time.sleep(1) # issue third level tasks for i in range(2): task_queue.put((task3, (i,))) # increment the counter by the number of subtasks counter.release(n=2) # top level task def task1(arg): # declare the global queue and counter global task_queue, counter # report message print(f'>task1 {arg}') # do work time.sleep(1) # issue second level tasks for i in range(3): task_queue.put((task2, (i,))) # increment the counter by the number of subtasks counter.release(n=3) # callback on all tasks in the thread pool def callback(arg): global counter # decrement the counter counter.acquire() # protect the entry point if __name__ == '__main__': # declare the global queue global task_queue # create the shared queue task_queue = queue.Queue() # issue all top-level tasks for i in range(5): task_queue.put((task1, (i,))) # shared counter for total tasks global counter counter = threading.Semaphore(5) # create a thread pool with multiprocessing.pool.ThreadPool(30) as pool: # loop over all known tasks while True: # check for no further tasks with counter._cond: if not counter._value: break # consume a task try: task, args = task_queue.get(timeout=0.5) except queue.Empty: continue # issue task to the thread pool async_result = pool.apply_async(task, args, callback=callback) # close the pool pool.close() # wait for all tasks to be processed pool.join() |
Running the example first creates the shared queue as a global variable.
It then issues 5 tasks to the queue and initializes the shared counter to 5.
The thread pool is created with 30 workers, an arbitrary number and fewer than the 50 concurrent tasks we know we will be executing.
The main thread loops forever. It first checks if the counter is zero and if not it then tries to get an item from the queue. If there is an item in the queue within half a second (chosen arbitrarily), it is retrieved and issued as a task with a callback function. If not, the loop starts again.
Tasks run, report a message, sleep, then issue their subtasks to the queue before incrementing the shared counter and exiting. After each task exits, the counter is decremented by one by the callback function.
Subtasks run, report their message, sleep and issue their subsubtasks to the queue before incrementing the shared counter and exiting. After each subtask exits, the counter is decremented by one by the callback function.
Subsubsub tasks run, report their message, sleep, and exit. After each subsubtask exits, the counter is decremented by one by the callback function.
The counter reaches zero and the main thread exits the loop and closes the pool.
In this case, all tasks are completed in about 3 seconds as before.
I don’t think there’s a race condition but be warned, I have not unit-tested or stress-tested this design.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>task1 0 >task1 1 >task1 2 >task1 3 >task1 4 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 |
Next, let’s explore the case of creating a separate pool of workers at each level.
Separate Thread Pools
An alternate solution to making a nested for-loop concurrent is to create a separate pool of workers at each level of the hierarchy of tasks.
This is easily done with thread pools.
The example below shows the nested for-loop updated to create a thread pool per level, where each thread pool is configured with one worker per subtask required.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# SuperFastPython.com # example of a nested for-loop with separate thread pools import time import multiprocessing.pool # third level task def task3(arg): # report message print(f'\t\t>>>task3 {arg}') # do work time.sleep(1) # second level task def task2(arg): # report message print(f'\t>>task2 {arg}') # do work time.sleep(1) # create pool with multiprocessing.pool.ThreadPool(3) as pool: # issue third level tasks pool.map(task3, range(2)) # top level task def task1(arg): # report message print(f'>task1 {arg}') # do work time.sleep(1) # create pool with multiprocessing.pool.ThreadPool(3) as pool: # issue second level tasks and wait pool.map(task2, range(3)) # protect the entry point if __name__ == '__main__': # create pool with multiprocessing.pool.ThreadPool(5) as pool: # issue top level tasks to pool and wait pool.map(task1, range(5)) |
Running the example first creates the private thread pool.
The main thread then issues 5 tasks to the pool and waits for them to complete.
Each task reports a message and sleeps, then creates its own thread pool and issues 3 subtasks, and waits for them to complete.
We now have 6 thread pools running.
Each subtask reports a message and sleeps, then creates its own thread pool and issues 2 subsubtasks, and waits for them to complete.
We now have 16 thread pools running. Ouch. We have 5 workers at level 1, 3 * 5 workers at level 2, and 3 * 5 * 2 workers at level 3. That is 50 workers, one for each task in the hierarchy.
The subsubtasks report a message and sleep.
In this case, all tasks are completed in about 3 seconds again.
This makes sense as at most it takes 3 seconds to complete all three levels in the hierarchy, and all of the hierarchy is executed concurrently.
This approach overcomes the problem of needing to know the total number of tasks in the hierarchy as we did in the first approach but means we have many more concurrent thread pools. Importantly, we have exactly enough concurrent workers to complete all tasks.
This approach is fine, as long as the total number of concurrent tasks does not exceed the capability of the system. This may not be the case if a level in the hierarchy balloons.
A modern system may only be able to support a maximum of a few thousand concurrent threads before running out of main memory and/or taxing the underlying operating system too much with context switching required between threads.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>task1 0 >task1 1 >task1 2 >task1 3 >task1 4 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 0 >>task2 0 >>task2 2 >>task2 1 >>task2 0 >>task2 1 >>task2 1 >>task2 2 >>task2 2 >>task2 2 >>>task3 0 >>>task3 0 >>>task3 0 >>>task3 0 >>>task3 1 >>>task3 1 >>>task3 1 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 0 >>>task3 1 >>>task3 1 >>>task3 0 >>>task3 0 >>>task3 1 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 |
Let’s leave threads pools for now and take a look at process pools for executing tasks in parallel.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Examples of Parallel Nested For-Loops
Running loops in parallel is more challenging than using process pools.
The reason is twofold.
Firstly, workers in a process are daemon processes and are unable to create their own child processes.
This means that tasks executed by workers in a process pool cannot create their own process pool to execute subtasks. Doing so will result in an error, for example:
1 |
AssertionError: daemonic processes are not allowed to have children |
I know, I tried.
Secondly, processes do not have shared memory.
Instead, at best they have a copy of the global variables from the parent process, depending on the start method.
Therefore, in order to share a process pool across all tasks/subtasks/etc in the hierarchy we must simulate a shared memory, such as by using a centralized process pool in a server process via a multiprocessing.Manager object.
We can explore this with a multiprocessing.Pool, although a concurrent.futures.ProcessPoolExecutor would be very similar.
You can learn more about how to use the Pool class in the guide:
Let’s dive in.
Shared Process Pool
We can update the nested for-loop to use a shared process pool accessed from each task.
Firstly, we can create a manager, which will create a server process for hosting a centralized process pool.
We can then create a hosted process pool and get a proxy object for the pool that we can share around with tasks in other processes.
1 2 3 4 5 |
... # create manager with multiprocessing.Manager() as manager: # create the shared pool pool = manager.Pool(100) |
If we just tried to create a process pool and share it with workers, we would get an error indicating that the Pool object cannot be pickled.
You can learn more about using managers to host a centralized Pool and sharing it with child workers in the tutorial:
We can then pass the “pool” proxy object as an argument to our tasks.
This can be achieved by changing our task/subtask/etc functions to take the pool as an argument and using the starmap() method instead of map() method that supports multiple arguments.
The updated version of the nested-for loop that executes all tasks in parallel using a shared process pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# SuperFastPython.com # example of a nested for-loop with a shared process pool import time import multiprocessing # third level task def task3(arg): # report message print(f'\t\t>>>task3 {arg}', flush=True) # do work time.sleep(1) # second level task def task2(arg, pool): # report message print(f'\t>>task2 {arg}', flush=True) # do work time.sleep(1) # issue third level tasks and wait pool.map(task3, range(2)) # top level task def task1(arg, pool): # report message print(f'>task1 {arg}', flush=True) # do work time.sleep(1) # prepare task arguments args = [(i, pool) for i in range(3)] # issue second level tasks and wait pool.starmap(task2, args) # protect the entry point if __name__ == '__main__': # create manager with multiprocessing.Manager() as manager: # create the shared pool with manager.Pool(100) as pool: # prepare arguments to tasks args = [(i, pool) for i in range(5)] # execute top level tasks and wait pool.starmap(task1, args) |
Running the example first creates the manager, which starts a server process.
The manager is then used to create a process pool. This returns a proxy object for the pool that is safe to share with child processes, like those in the pool itself via the tasks and subtasks we wish to execute.
The main thread then issues 5 tasks to the shared pool and waits for them to complete.
Each task receives the proxy object for the shared pool as an argument. Each task reports a message and sleeps, then issues 3 subtasks to the shared pool and waits for them to complete.
Each subtask receives access to the same shared pool via the provided arguments, reports a message, sleeps, then issues 2 subsubtasks and waits for them to complete.
The subsubtasks report a message and sleep.
As with the shared thread pool example above, the process pool is created with 100 worker processes, double the number needed to complete all 50 tasks.
This approach requires prior knowledge of the total number of tasks that will be issued to the pool in the hierarchy, and the configuration of a pool large enough to support all of those tasks concurrently.
This is bad news because we very likely have more CPU-bound tasks than CPU cores available and would prefer to queue up tasks and have them executed one per CPU core.
As such, this design is fragile.
In this case, all tasks are completed in about 4 seconds, slightly longer than the thread pool example because of the overhead in starting child processes and transmitting data between them.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>task1 0 >task1 3 >task1 2 >task1 4 >task1 1 >>task2 0 >>task2 1 >>task2 1 >>task2 2 >>task2 0 >>task2 2 >>task2 2 >>task2 0 >>task2 0 >>task2 1 >>task2 1 >>task2 2 >>task2 1 >>task2 0 >>task2 2 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 |
Next, let’s look at how we might update our queue-based example for an unknown number of tasks developed for the ThreadPool (above) and adapt it for use with the Pool class.
Single Process Pool and Shared Queue (unbounded)
In a prior section, we developed an example using the ThreadPool with a shared queue and an unknown number of tasks using a shared counter.
We can update that example to use a Pool class.
It requires some changes.
Firstly, we must develop a custom counter class to be shared among child processes that has an internal integer and a lock to protect it.
We need a custom counter class because when we pass around a shared semaphore via a proxy object, we cannot easily get access to the internals, like the condition variable and integer count value.
For example, a custom process-safe counter may look as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# custom counter class class SafeCounter(): def __init__(self, count): self._lock = multiprocessing.Lock() self._value = count def is_zero(self): with self._lock: return self._value == 0 def increment(self, value=1): with self._lock: self._value += value def decrement(self, value=1): with self._lock: self._value -= value |
If you are new to developing thread-safe (or process-safe) counter classes, see the tutorial:
We then must use a custom Manager that allows us to host a centralized version of our SafeCounter.
1 2 3 4 |
# custom manager to support custom classes class CustomManager(multiprocessing.managers.BaseManager): # nothing pass |
We can create the custom manager and use it to host our centralized queue and counter.
For example:
1 2 3 4 5 6 7 8 9 10 |
... # register the custom class on the custom manager CustomManager.register('SafeCounter', SafeCounter) CustomManager.register('Queue', multiprocessing.Queue) # create manager with CustomManager() as manager: # create the shared queue task_queue = manager.Queue() # create the shared counter counter = manager.SafeCounter(5) |
If you are new to sharing custom classes on the Manager, see the tutorial:
The tasks cannot access the shared queue and counter via a global variable.
In this case, we will work around that by passing them directly as arguments.
Finally, the callback function cannot access the shared counter as a global variable.
In this case, we can solve this by having each task decrement the counter directly.
I don’t like the tasks having to know so much about how to issue and manage subtasks. This can all be abstracted away with some custom functions/classes so the tasks can remain pure.
And that’s it.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# SuperFastPython.com # example of a nested for-loop to use a single shared process pool with a queue unbounded import time import queue import multiprocessing import multiprocessing.managers # custom counter class class SafeCounter(): def __init__(self, count): self._lock = multiprocessing.Lock() self._value = count def is_zero(self): with self._lock: return self._value == 0 def increment(self, value=1): with self._lock: self._value += value def decrement(self, value=1): with self._lock: self._value -= value # custom manager to support custom classes class CustomManager(multiprocessing.managers.BaseManager): # nothing pass # third level task def task3(arg, task_queue, counter): # report message print(f'\t\t>>>task3 {arg}', flush=True) # do work time.sleep(1) # decrement the counter counter.decrement() # second level task def task2(arg, task_queue, counter): # report message print(f'\t>>task2 {arg}', flush=True) # do work time.sleep(1) # issue third level tasks for i in range(2): task_queue.put((task3, (i,task_queue,counter))) # increment the counter by the number of subtasks counter.increment() # decrement the counter counter.decrement() # top level task def task1(arg, task_queue, counter): # report message print(f'>task1 {arg}', flush=True) # do work time.sleep(1) # issue second level tasks for i in range(3): task_queue.put((task2, (i,task_queue,counter))) # increment the counter by the number of subtasks counter.increment() # decrement the counter counter.decrement() # protect the entry point if __name__ == '__main__': # register the custom class on the custom manager CustomManager.register('SafeCounter', SafeCounter) CustomManager.register('Queue', multiprocessing.Queue) # create manager with CustomManager() as manager: # create the shared queue task_queue = manager.Queue() # create the shared counter counter = manager.SafeCounter(5) # issue all top-level tasks for i in range(5): task_queue.put((task1, (i,task_queue,counter))) # create a process pool, local to the main thread/process with multiprocessing.Pool(30) as pool: # loop over all known tasks while True: # check for no further tasks if counter.is_zero(): break # consume a task try: task, args = task_queue.get(timeout=0.5) except queue.Empty: continue # issue task to the process pool async_result = pool.apply_async(task, args) # close the pool pool.close() # wait for all tasks to be processed pool.join() |
Running the example first registers the classes on the custom manager.
The custom manager is then created and used to create the shared queue and counter.
It then issues 5 tasks to the queue.
The process pool is created with 30 workers, an arbitrary number and fewer than the 50 parallel tasks we know we will be executing.
The main thread/process loops forever. It first checks if the counter is zero and if not it then tries to get an item from the queue. If there is an item in the queue within half a second (chosen arbitrarily), it is retrieved and issued as a task. If not, the loop starts again.
Tasks run, report a message, sleep, then issue their subtasks to the queue before incrementing the shared counter and decrementing the counter for themselves.
Subtasks run, report their message, sleep and issue their subsubtasks to the queue before incrementing the shared counter and decrementing the counter for themselves.
Subsubsub tasks run, report their message, sleep, and decrement the counter for themselves.
The counter reaches zero and the main thread exits the loop and closes the pool and the manager.
In this case, all tasks are completed in about 4 seconds as in the previous example.
The benefit of this approach is that the number of workers in the shared process pool can be limited to the number of CPU cores in the system and we do not need to know the total number of tasks beforehand.
I don’t think there’s a race condition in this version but be warned, I have not unit-tested or stress-tested this design.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>task1 0 >task1 1 >task1 2 >task1 3 >task1 4 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>task2 0 >>task2 1 >>task2 2 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 1 >>>task3 0 >>>task3 0 >>>task3 1 >>>task3 1 |
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how changes a nested for-loop to be concurrent or parallel using thread and process pools.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Cali Naughton on Unsplash
Jay says
Hi Jason ,
I am trying to add multiprocessing to my nested loop calculation, but I am struggling, can you please take a look at my question
https://stackoverflow.com/questions/75080453/how-to-add-multiprocessing-to-loops
Teresa says
Dear Jason,
thanks a lot for this very clear and comprehensive tutorial! It is very helpful!
One question that remains a little unclear to me though is how to store/ retrieve the results. I.e. I would like to save the output of task3 together with the arguments which were passed to task1, task2 and task3.
Usually I’m defining an “output array” which is step by step filled in each loop iteration. In the concurrent world, I guess I should avoid different processes trying to access the same array simultaneously… but how?
Thanks a lot
Teresa
Jason Brownlee says
Good question.
There are many ways to get the result back to the main process.
For example, you could push the result into a queue or a pipe. See this tutorial:
https://superfastpython.com/multiprocessing-return-value-from-process/
Another approach is to use a shared object, such as a list, dict or array hosted in a Manager. For example:
https://superfastpython.com/multiprocessing-share-object-with-processes/
Does that help?
Nikita says
can we use two managers if we have 2 nested for loops in a single python program?
Jason Brownlee says
You can use two managers within one program.
It may or may not help with a nested loop. Try it and see. Let me know how you go.