How to Order Parallel Tasks in Python

February 12, 2023 Python Multiprocessing

You can order parallel by having the tasks coordinate themselves or by splitting tasks into parallel and sequential subtasks.

In this tutorial, you will discover how to order tasks that are executed in parallel in Python.

This tutorial was inspired by questions and discussions with Casey M. Thank you deeply! If you have a question about Python concurrency, message me anytime.

Let's get started.

Need to Order Parallel Tasks

Consider this situation.

You have many tasks to execute and they are semi-independent, so they can be executed concurrently, perhaps in parallel.

However, the tasks are not completely independent.

There is a need to order the tasks at some point.

For example:

By order, we specifically mean that the tasks are executed sequentially in a mutually exclusive manner.

The order may be assigned to each task when created. Alternatively, the ordering of the tasks may be determined dynamically through initial processing within each task.

This raises the question, how can we execute tasks independently, yet order them in some way as needed?

How to Order Parallel Tasks

There are many ways to order concurrent and parallel tasks.

We must be careful to avoid ordering the complete tasks.

If concurrent or parallel tasks are ordered in their entirety, it means the tasks should just be executed sequentially.

Attempting to order concurrent tasks is an anti-pattern in concurrent programming. This means it is attempting to counter the concurrent nature of the tasks provided by threading or multiprocessing by imposing order, defeating the concurrency.

Nevertheless, a partial ordering of the task, such as the beginning, middle, or end, can be achieved.

There are perhaps two main approaches that could be explored to solving this problem:

  1. Use synchronization primitives to coordinate task order.
  2. Split tasks into concurrent and sequential subtasks.

Let's explore both approaches.

Use Synchronization Primitives to Coordinate Tasks

Perhaps a naive approach is to modify the tasks so that they coordinate at the point that ordering is required.

This might be preferred for existing code or complex tasks that cannot be easily split into subtasks.

Each task may be assigned or determined its ordering as an integer between 1 and n (the total number of tasks).

A shared counter can be used to keep track of which task may execute and a condition variable can be used by tasks to wait for their turn to execute and by executing tasks to notify waiting tasks to check if it is their time to execute.

Split Tasks into Concurrent and Sequential Subtasks

Ideally, we would split each task into the concurrent portion and the sequential portion.

The concurrent portion can be completed with coroutines, threads, and processes, and the sequential portion can be completed in one thread.

Queues can be used to create producer-consider tasks or a pipeline of tasks.

A priority queue may even be used to automatically order the sequential tasks, where each task item is assigned a priority order and the consumer of those tasks consumes them in the assigned order by default and executes them one by one.

This approach is appropriate for those tasks that are simple enough to split and may be suited for those tasks where only the initial processing or collation and reporting of results need to be ordered.

Now that we have explored some approaches to solving this problem, let's look at some worked examples.

Example of Using Synchronization Primitives to Coordinate Tasks

We can explore the case of complex tasks that cannot easily be split into subtasks.

In this case, we will use synchronization primitives to have the tasks coordinate themselves automatically.

Each task has an order, in this case, assigned beforehand.

We will execute tasks in parallel using process-based concurrency, suited for CPU-bound tasks. Processes do not have shared memory, so we will use a multiprocessing.Manager to host a shared counter and a shared condition variable.

...
# create the manager
with Manager() as manager:
	# ...

If you are new to working with managers, see the tutorial:

The shared counter will be an instance of a multiprocessing.Value, called a shared ctype configured to be an integer.

For example:

...
# create the shared value
value = manager.Value('i', -1)

You can learn more about shared ctype objects in the tutorial:

We will also use the manager to create the shared condition variable object.

...
# create a shared condition
condition = manager.Condition()

You can learn more about condition variables in the tutorial:

Each task will be executed in a new multiprocessing.Process instance and the main process will create, start and wait for all processes to complete.

For example:

...
# create the processes
processes = [Process(target=task, args=(i, condition, value)) for i in range(5)]
# start the processes
for process in processes:
    process.start()
# wait on all processes
for process in processes:
    process.join()

We can define a task() function to perform our operation.

It will take its assigned task order as an argument, as well as the shared condition variable and counter.

Tasks coordinate themselves.

This is achieved using a spin lock structure.

Each task loops forever. Each iteration, it acquires the shared condition and checks the shared counter to see if it is its turn to execute. If it is, the spin lock is exited, otherwise, it waits on the condition to be notified by another task.

...
# loop until the task can execute
while True:
    # attempt to acquire the condition
    with shared_condition:
        # check stat to see if it is my turn
        if shared_value.value == number - 1:
            break
        else:
            # wait to be notified to re-check the state
            shared_condition.wait()

This is an efficient way of waiting as tasks only execute and check the status of the counter when another task finishes.

You can learn more about spin locks in the tutorial:

Once a task is permitted to run, it performs its activity.

In this case, it will sleep for a random interval.

...
# it is my turn
print(f'Task {number} is running!', flush=True)
# block to simulate work
sleep(random())

Once it has finished its sequential activity, it can increment the shared counter and notify the next task.

This involves first acquiring the shared condition variable, changing the counter value, then notifying all waiting tasks.

...
# acquire the condition so we can update state
with shared_condition:
    # update the shared value
    shared_value.value = number
    print(f'Task {number} done, shared number is {shared_value.value}!', flush=True)
    # notify everyone so the next task can run
    shared_condition.notify_all()

Tying this together, the complete task() function is listed below.

# task executed in a process simultaneously
def task(number, shared_condition, shared_value):
    # loop until the task can execute
    while True:
        # attempt to acquire the condition
        with shared_condition:
            # check stat to see if it is my turn
            if shared_value.value == number - 1:
                break
            else:
                # wait to be notified to re-check the state
                shared_condition.wait()
    # it is my turn
    print(f'Task {number} is running!', flush=True)
    # block to simulate work
    sleep(random())
    # acquire the condition so we can update state
    with shared_condition:
        # update the shared value
        shared_value.value = number
        print(f'Task {number} done, shared number is {shared_value.value}!', flush=True)
        # notify everyone so the next task can run
        shared_condition.notify_all()

The task() function only includes the sequential aspects of the complex task.

The parallel activities performed by each task may be performed before the spin lock and after releasing the condition variable when completing their sequential portion.

The spinlock and notification mechanisms are quite standard structures and could be abstracted onto private methods of an object that is overridden by each task.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of executing tasks in order sequentially
from time import sleep
from random import random
from multiprocessing import Manager
from multiprocessing import Process

# task executed in a process simultaneously
def task(number, shared_condition, shared_value):
    # loop until the task can execute
    while True:
        # attempt to acquire the condition
        with shared_condition:
            # check stat to see if it is my turn
            if shared_value.value == number - 1:
                break
            else:
                # wait to be notified to re-check the state
                shared_condition.wait()
    # it is my turn
    print(f'Task {number} is running!', flush=True)
    # block to simulate work
    sleep(random())
    # acquire the condition so we can update state
    with shared_condition:
        # update the shared value
        shared_value.value = number
        print(f'Task {number} done, shared number is {shared_value.value}!', flush=True)
        # notify everyone so the next task can run
        shared_condition.notify_all()

# protect the entry point
if __name__ == '__main__':
    # create the manager
    with Manager() as manager:
        # create a shared condition
        condition = manager.Condition()
        # create the shared value
        value = manager.Value('i', -1)
        print(f'Shared value is {value.value}')
        # create the processes
        processes = [Process(target=task, args=(i, condition, value)) for i in range(5)]
        # start the processes
        for process in processes:
            process.start()
        # wait on all processes
        for process in processes:
            process.join()
        print('Done')

Running the example first creates the manager using the context manager interface.

Next, the manager is used to create the shared condition variable and counter.

Five child processes are then configured and passed a unique task order and the shared counter and condition variables as arguments.

The child processes are then started and the main process waits for the tasks to be completed.

Each task runs and executes its own spinlock.

The initial counter has a value of -1 and the first task is assigned the value of 0. Each task checks if the current counter value equals one minus its assigned order.

This means the counter will run from -1 to 3, whereas task orders will run from 0 to 4. This could be cleaner.

The task assigned the order value of zero breaks the spinlock, sleeps, reports a message, then changes the counter value to zero and notify all waiting tasks. It is done.

All tasks resume, check the counter value. Task 1 breaks the lock and all other tasks wait.

This process continues until all tasks execute in their assigned order.

This example highlights how many concurrent or parallel tasks started at an arbitrary time can be coordinated to execute in a specific order at some part during their execution.

Shared value is -1
Task 0 is running!
Task 0 done, shared number is 0!
Task 1 is running!
Task 1 done, shared number is 1!
Task 2 is running!
Task 2 done, shared number is 2!
Task 3 is running!
Task 3 done, shared number is 3!
Task 4 is running!
Task 4 is done, shared number is 4!
Done

Next, let's look at an alternate approach to ordering concurrent tasks.

Example of Processing Results in Order

We can explore an example of using a queue to handle, process, or report task results in order.

This is a preferred approach to handling tasks that have some ordering requirements. It involves splitting each task into subtasks that can be made concurrent or parallel, and subtasks, like processing results, that can be performed sequentially.

It avoids the complication of having to have tasks coordinate their order amongst themselves.

In this example, each task will generate a random number between 0 and 5 and block for that many seconds. This will be the result of the task which is then placed on a shared queue.

For example:

# task executed in a process simultaneously
def task(number, queue):
    # it is my turn
    print(f'Task {number} is running!', flush=True)
    # generate a random number between 0 and 5
    value = random() * 5
    # block to simulate work
    sleep(value)
    # construct result
    result = (number, value)
    # store result
    queue.put(result)

The main process will create the shared queue and then configure, start, and wait for all processes to complete.

...
# create the shared priority queue
queue = Queue()
# create the processes
n_tasks = 5
processes = [Process(target=task, args=(i, queue)) for i in range(n_tasks)]
# start the processes
for process in processes:
    process.start()
# wait on all processes
for process in processes:
    process.join()

The main process will then consume all results from the shared queue, order them by the generated value, and report results in the specific dynamic ordering.

...
# consume all results
results = [queue.get() for _ in range(n_tasks)]
# sort results by rank
results.sort(key=lambda value: value[1])
# process results in order
for result in results:
    print(f'> {result}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of executing tasks and processing results in order
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Queue

# task executed in a process simultaneously
def task(number, queue):
    # it is my turn
    print(f'Task {number} is running!', flush=True)
    # generate a random number between 0 and 5
    value = random() * 5
    # block to simulate work
    sleep(value)
    # construct result
    result = (number, value)
    # store result
    queue.put(result)

# protect the entry point
if __name__ == '__main__':
    # create the shared priority queue
    queue = Queue()
    # create the processes
    n_tasks = 5
    processes = [Process(target=task, args=(i, queue)) for i in range(n_tasks)]
    # start the processes
    for process in processes:
        process.start()
    # wait on all processes
    for process in processes:
        process.join()
    # consume all results
    results = [queue.get() for _ in range(n_tasks)]
    # sort results by rank
    results.sort(key=lambda value: value[1])
    # process results in order
    for result in results:
        print(f'> {result}')

Running the example first creates the shared queue.

The 5 child processes are then configured, started and waited for by the main process.

Each task runs, reporting a message, generating a random value, sleeping, then placing the result and its unique task number on the queue.

Once all tasks finish, the main process resumes.

It first consumes all results from the shared queue. It then orders the task results by the generated values, in ascending order, then reports the results.

We can see from the results that although the tasks were assigned unique orders and finished at different times, the results were reported according to the dynamically generated data.

This highlights an alternate approach of separating out the sequential subtask from the parallel subtask when tasks require ordering.

Task 0 is running!
Task 1 is running!
Task 2 is running!
Task 3 is running!
Task 4 is running!
> (2, 1.6107475428847495)
> (0, 2.437362032261128)
> (1, 3.402050163398573)
> (4, 3.8032861495856984)
> (3, 4.688215621941093)

Takeaways

You now know how to order tasks that are executed concurrently or in parallel in Python.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.