You can order parallel by having the tasks coordinate themselves or by splitting tasks into parallel and sequential subtasks.
In this tutorial, you will discover how to order tasks that are executed in parallel in Python.
This tutorial was inspired by questions and discussions with Casey M. Thank you deeply! If you have a question about Python concurrency, message me anytime.
Let’s get started.
Need to Order Parallel Tasks
Consider this situation.
You have many tasks to execute and they are semi-independent, so they can be executed concurrently, perhaps in parallel.
However, the tasks are not completely independent.
There is a need to order the tasks at some point.
For example:
- Perhaps the initial portion of each task must be executed in order.
- Perhaps the middle portion of the task must be ordered.
- Perhaps the results of the tasks must be processed or reported in order.
By order, we specifically mean that the tasks are executed sequentially in a mutually exclusive manner.
The order may be assigned to each task when created. Alternatively, the ordering of the tasks may be determined dynamically through initial processing within each task.
This raises the question, how can we execute tasks independently, yet order them in some way as needed?
Run loops using all CPUs, download your FREE book to learn how.
How to Order Parallel Tasks
There are many ways to order concurrent and parallel tasks.
We must be careful to avoid ordering the complete tasks.
If concurrent or parallel tasks are ordered in their entirety, it means the tasks should just be executed sequentially.
Attempting to order concurrent tasks is an anti-pattern in concurrent programming. This means it is attempting to counter the concurrent nature of the tasks provided by threading or multiprocessing by imposing order, defeating the concurrency.
Nevertheless, a partial ordering of the task, such as the beginning, middle, or end, can be achieved.
There are perhaps two main approaches that could be explored to solving this problem:
- Use synchronization primitives to coordinate task order.
- Split tasks into concurrent and sequential subtasks.
Let’s explore both approaches.
Use Synchronization Primitives to Coordinate Tasks
Perhaps a naive approach is to modify the tasks so that they coordinate at the point that ordering is required.
This might be preferred for existing code or complex tasks that cannot be easily split into subtasks.
Each task may be assigned or determined its ordering as an integer between 1 and n (the total number of tasks).
A shared counter can be used to keep track of which task may execute and a condition variable can be used by tasks to wait for their turn to execute and by executing tasks to notify waiting tasks to check if it is their time to execute.
Split Tasks into Concurrent and Sequential Subtasks
Ideally, we would split each task into the concurrent portion and the sequential portion.
The concurrent portion can be completed with coroutines, threads, and processes, and the sequential portion can be completed in one thread.
Queues can be used to create producer-consider tasks or a pipeline of tasks.
A priority queue may even be used to automatically order the sequential tasks, where each task item is assigned a priority order and the consumer of those tasks consumes them in the assigned order by default and executes them one by one.
This approach is appropriate for those tasks that are simple enough to split and may be suited for those tasks where only the initial processing or collation and reporting of results need to be ordered.
Now that we have explored some approaches to solving this problem, let’s look at some worked examples.
Example of Using Synchronization Primitives to Coordinate Tasks
We can explore the case of complex tasks that cannot easily be split into subtasks.
In this case, we will use synchronization primitives to have the tasks coordinate themselves automatically.
Each task has an order, in this case, assigned beforehand.
We will execute tasks in parallel using process-based concurrency, suited for CPU-bound tasks. Processes do not have shared memory, so we will use a multiprocessing.Manager to host a shared counter and a shared condition variable.
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
If you are new to working with managers, see the tutorial:
The shared counter will be an instance of a multiprocessing.Value, called a shared ctype configured to be an integer.
For example:
1 2 3 |
... # create the shared value value = manager.Value('i', -1) |
You can learn more about shared ctype objects in the tutorial:
We will also use the manager to create the shared condition variable object.
1 2 3 |
... # create a shared condition condition = manager.Condition() |
You can learn more about condition variables in the tutorial:
Each task will be executed in a new multiprocessing.Process instance and the main process will create, start and wait for all processes to complete.
For example:
1 2 3 4 5 6 7 8 9 |
... # create the processes processes = [Process(target=task, args=(i, condition, value)) for i in range(5)] # start the processes for process in processes: Â Â Â Â process.start() # wait on all processes for process in processes: Â Â Â Â process.join() |
We can define a task() function to perform our operation.
It will take its assigned task order as an argument, as well as the shared condition variable and counter.
Tasks coordinate themselves.
This is achieved using a spin lock structure.
Each task loops forever. Each iteration, it acquires the shared condition and checks the shared counter to see if it is its turn to execute. If it is, the spin lock is exited, otherwise, it waits on the condition to be notified by another task.
1 2 3 4 5 6 7 8 9 10 11 |
... # loop until the task can execute while True:     # attempt to acquire the condition     with shared_condition:         # check stat to see if it is my turn         if shared_value.value == number - 1:             break         else:             # wait to be notified to re-check the state             shared_condition.wait() |
This is an efficient way of waiting as tasks only execute and check the status of the counter when another task finishes.
You can learn more about spin locks in the tutorial:
Once a task is permitted to run, it performs its activity.
In this case, it will sleep for a random interval.
1 2 3 4 5 |
... # it is my turn print(f'Task {number} is running!', flush=True) # block to simulate work sleep(random()) |
Once it has finished its sequential activity, it can increment the shared counter and notify the next task.
This involves first acquiring the shared condition variable, changing the counter value, then notifying all waiting tasks.
1 2 3 4 5 6 7 8 |
... # acquire the condition so we can update state with shared_condition:     # update the shared value     shared_value.value = number     print(f'Task {number} done, shared number is {shared_value.value}!', flush=True)     # notify everyone so the next task can run     shared_condition.notify_all() |
Tying this together, the complete task() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# task executed in a process simultaneously def task(number, shared_condition, shared_value):     # loop until the task can execute     while True:         # attempt to acquire the condition         with shared_condition:             # check stat to see if it is my turn             if shared_value.value == number - 1:                 break             else:                 # wait to be notified to re-check the state                 shared_condition.wait()     # it is my turn     print(f'Task {number} is running!', flush=True)     # block to simulate work     sleep(random())     # acquire the condition so we can update state     with shared_condition:         # update the shared value         shared_value.value = number         print(f'Task {number} done, shared number is {shared_value.value}!', flush=True)         # notify everyone so the next task can run         shared_condition.notify_all() |
The task() function only includes the sequential aspects of the complex task.
The parallel activities performed by each task may be performed before the spin lock and after releasing the condition variable when completing their sequential portion.
The spinlock and notification mechanisms are quite standard structures and could be abstracted onto private methods of an object that is overridden by each task.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# SuperFastPython.com # example of executing tasks in order sequentially from time import sleep from random import random from multiprocessing import Manager from multiprocessing import Process  # task executed in a process simultaneously def task(number, shared_condition, shared_value):     # loop until the task can execute     while True:         # attempt to acquire the condition         with shared_condition:             # check stat to see if it is my turn             if shared_value.value == number - 1:                 break             else:                 # wait to be notified to re-check the state                 shared_condition.wait()     # it is my turn     print(f'Task {number} is running!', flush=True)     # block to simulate work     sleep(random())     # acquire the condition so we can update state     with shared_condition:         # update the shared value         shared_value.value = number         print(f'Task {number} done, shared number is {shared_value.value}!', flush=True)         # notify everyone so the next task can run         shared_condition.notify_all()  # protect the entry point if __name__ == '__main__':     # create the manager     with Manager() as manager:         # create a shared condition         condition = manager.Condition()         # create the shared value         value = manager.Value('i', -1)         print(f'Shared value is {value.value}')         # create the processes         processes = [Process(target=task, args=(i, condition, value)) for i in range(5)]         # start the processes         for process in processes:             process.start()         # wait on all processes         for process in processes:             process.join()         print('Done') |
Running the example first creates the manager using the context manager interface.
Next, the manager is used to create the shared condition variable and counter.
Five child processes are then configured and passed a unique task order and the shared counter and condition variables as arguments.
The child processes are then started and the main process waits for the tasks to be completed.
Each task runs and executes its own spinlock.
The initial counter has a value of -1 and the first task is assigned the value of 0. Each task checks if the current counter value equals one minus its assigned order.
This means the counter will run from -1 to 3, whereas task orders will run from 0 to 4. This could be cleaner.
The task assigned the order value of zero breaks the spinlock, sleeps, reports a message, then changes the counter value to zero and notify all waiting tasks. It is done.
All tasks resume, check the counter value. Task 1 breaks the lock and all other tasks wait.
This process continues until all tasks execute in their assigned order.
This example highlights how many concurrent or parallel tasks started at an arbitrary time can be coordinated to execute in a specific order at some part during their execution.
1 2 3 4 5 6 7 8 9 10 11 12 |
Shared value is -1 Task 0 is running! Task 0 done, shared number is 0! Task 1 is running! Task 1 done, shared number is 1! Task 2 is running! Task 2 done, shared number is 2! Task 3 is running! Task 3 done, shared number is 3! Task 4 is running! Task 4 is done, shared number is 4! Done |
Next, let’s look at an alternate approach to ordering concurrent tasks.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Example of Processing Results in Order
We can explore an example of using a queue to handle, process, or report task results in order.
This is a preferred approach to handling tasks that have some ordering requirements. It involves splitting each task into subtasks that can be made concurrent or parallel, and subtasks, like processing results, that can be performed sequentially.
It avoids the complication of having to have tasks coordinate their order amongst themselves.
In this example, each task will generate a random number between 0 and 5 and block for that many seconds. This will be the result of the task which is then placed on a shared queue.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a process simultaneously def task(number, queue):     # it is my turn     print(f'Task {number} is running!', flush=True)     # generate a random number between 0 and 5     value = random() * 5     # block to simulate work     sleep(value)     # construct result     result = (number, value)     # store result     queue.put(result) |
The main process will create the shared queue and then configure, start, and wait for all processes to complete.
1 2 3 4 5 6 7 8 9 10 11 12 |
... # create the shared priority queue queue = Queue() # create the processes n_tasks = 5 processes = [Process(target=task, args=(i, queue)) for i in range(n_tasks)] # start the processes for process in processes: Â Â Â Â process.start() # wait on all processes for process in processes: Â Â Â Â process.join() |
The main process will then consume all results from the shared queue, order them by the generated value, and report results in the specific dynamic ordering.
1 2 3 4 5 6 7 8 |
... # consume all results results = [queue.get() for _ in range(n_tasks)] # sort results by rank results.sort(key=lambda value: value[1]) # process results in order for result in results: Â Â Â Â print(f'> {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# SuperFastPython.com # example of executing tasks and processing results in order from time import sleep from random import random from multiprocessing import Process from multiprocessing import Queue  # task executed in a process simultaneously def task(number, queue):     # it is my turn     print(f'Task {number} is running!', flush=True)     # generate a random number between 0 and 5     value = random() * 5     # block to simulate work     sleep(value)     # construct result     result = (number, value)     # store result     queue.put(result)  # protect the entry point if __name__ == '__main__':     # create the shared priority queue     queue = Queue()     # create the processes     n_tasks = 5     processes = [Process(target=task, args=(i, queue)) for i in range(n_tasks)]     # start the processes     for process in processes:         process.start()     # wait on all processes     for process in processes:         process.join()     # consume all results     results = [queue.get() for _ in range(n_tasks)]     # sort results by rank     results.sort(key=lambda value: value[1])     # process results in order     for result in results:         print(f'> {result}') |
Running the example first creates the shared queue.
The 5 child processes are then configured, started and waited for by the main process.
Each task runs, reporting a message, generating a random value, sleeping, then placing the result and its unique task number on the queue.
Once all tasks finish, the main process resumes.
It first consumes all results from the shared queue. It then orders the task results by the generated values, in ascending order, then reports the results.
We can see from the results that although the tasks were assigned unique orders and finished at different times, the results were reported according to the dynamically generated data.
This highlights an alternate approach of separating out the sequential subtask from the parallel subtask when tasks require ordering.
1 2 3 4 5 6 7 8 9 10 |
Task 0 is running! Task 1 is running! Task 2 is running! Task 3 is running! Task 4 is running! > (2, 1.6107475428847495) > (0, 2.437362032261128) > (1, 3.402050163398573) > (4, 3.8032861495856984) > (3, 4.688215621941093) |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to order tasks that are executed concurrently or in parallel in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Robin van Geenen on Unsplash
Do you have any questions?