Multiprocessing Pool.imap_unordered() in Python

July 12, 2022 Python Multiprocessing Pool

You can issue tasks to the process pool one-by-one, execute them in parallel, and get results in the order that tasks are completed via the imap_unordered() function.

In this tutorial you will discover how to use the imap_unordered() function to issue tasks to the process pool in Python.

Let's get started.

Problem with imap()

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

-- multiprocessing — Process-based parallelism

The built-in map() function allows you to apply a function to each item in an iterable. A problem with this function is that it converts the provided iterable of items into a list and submits all items as tasks to the process pool then blocks until all tasks are complete.

It yields one result returned from the given target function called with one item from a given iterable. It is common to call map() and iterate the results in a for-loop.

The process pool provides a version of the map() function where the target function is called for each item in the provided iterable in parallel. The problem with the Pool.map() function is that it converts the provided iterable into a list and issues a task for each item all at once. This can be a problem if the iterable contains many hundreds or thousands of items as it may use a lot of main memory.

As an alternative, the process pool provides the imap() function which is a lazy version of map for applying a target function to each item in an iterable in a lazy manner. Items are yielded from the provided iterable one at a time instead of all at once and results are yielded in order as they are completed rather than after all tasks are completed.

A problem with imap() is that tasks are returned in the order that tasks are completed.

If there are some tasks that are slower than the others to complete, they will hold up the stream of results.

A version of the imap() function is needed that will allow return values to be iterated as fast as tasks are completed. That is, to iterate results as tasks are completed, not the order that tasks are completed.

The imap_unordered() function provides this capability.

How to Use Pool.imap_unordered()

The process pool provides an unordered version of the imap() function via Pool.imap_unordered().

The imap_unordered() function takes the name of a function to apply and an iterable.

It will then iterate items in the iterable one at a time and issue a task in the process pool. that calls the specified function on the iterable.

The same as imap() except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be “correct”.)

-- multiprocessing — Process-based parallelism

The imap_unordered() function then returns an iterable of return values. The return values are yielded in the order that tasks are completed, not the order that the tasks were issued to the process pool.

For example:

...
# apply function to each item in the iterable in parallel
for result in pool.imap_unordered(task, items):
	# ...

Each item in the iterable is taken as a separate task in the process pool.

Unlike the built-in map() function, the Pool.imap_unordered() function only takes one iterable as an argument. This means that the target function executed in the process can only take a single argument.

Unlike the Pool.map() function, the Pool.imap_unordered() function will iterate the provided iterable one item at a time and issue tasks to the process pool. It will also yield return values as tasks are completed rather than all at once after all tasks are completed.

Unlike the Pool.imap() function, the Pool.imap_unordered() function will yield return values in the order that tasks are completed, not the order that tasks were issued to the process pool.

Although the Pool.imap_unordered() function is lazy, we can issue tasks in chunks to the process pool. That is, we can yield a fixed number of items from the input iterable and issue them as one task to be executed by a child worker process.

This can make completing a large number of tasks in a very long iterable more efficient as arguments and return values from the target task function can be transmitted in batches with less computational overhead.

This can be achieved via the "chunksize" argument to imap().

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

-- multiprocessing — Process-based parallelism

For example:

...
# iterates results from map with chunksize
for result in pool.imap_unordered(task, items, chunksize=10):
	# ...

Next, let's take a closer look at how the imap_unordered() function compares to other functions on the process pool.

Difference Between imap_unordered() and imap()

How does the Pool.imap_unordered() function compare to the Pool.imap() for issuing tasks to the process pool?

The imap_unordered() and imap() functions have a lot in common, such as:

  1. Both the imap_unordered() and imap() may be used to issue tasks that call a function to all items in an iterable via the process pool.
  2. Both the imap_unordered() and imap() are lazy versions of the map() function.
  3. Both the imap_unordered() and imap() functions return an iterable over the return values immediately.

Nevertheless, there is one key difference between the two functions:

  1. The iterable returned from imap_unordered() yields results in the arbitrary order that tasks are completed, whereas the imap() function yields return values in the order that tasks were submitted.

The imap_unordered() function should be used when the caller needs to iterate return values as tasks are completed, e.g. in any arbitrary order not the order that they were submitted.

The imap() function should be used when the caller needs to iterate return values in the order that they were submitted from tasks as they are completed.

Now that we know how to use the imap_unordered() function to execute tasks in the process pool, let's look at some worked examples.

Example of Pool.imap_unordered()

We can explore how to use the imap_unordered() on the process pool.

In this example, we can define a target task function that takes an integer as an argument, generates a random number, reports the value then returns the value that was generated. We can then call this function for each integer between 0 and 9 using the process pool imap().

This will apply the function to each integer in parallel using as many cores as are available in the system.

Firstly, we can define the target task function.

The function takes an argument, generates a random number between 0 and 1, reports the integer and generated number. It then blocks for a fraction of a second to simulate computational effort, then returns the number that was generated.

The task() function below implements this.

# task executed in a worker process
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)
    # return the generated value
    return value

We can then create and configure a process pool.

We will use the context manager interface to ensure the pool is shutdown automatically once we are finished with it.

If you are new to the context manager interface of the process pool, you can learn more in the tutorial:

...
# create and configure the process pool
with Pool() as pool:
	# ...

We can then call the imap_unordered() function on the process pool to apply our task() function to each value in a range between 0 and 49.

This returns an iterator over return values that will yield results in the order that the tasks are completed.

We can then report the return values directly.

This can all be achieved in a for-loop.

...
# execute tasks in order, process results out of order
for result in pool.imap_unordered(task, range(50)):
    print(f'Got result: {result}', flush=True)

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of parallel imap_unordered() with the process pool
from random import random
from time import sleep
from multiprocessing.pool import Pool

# task executed in a worker process
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool() as pool:
        # execute tasks in order, process results out of order
        for result in pool.imap_unordered(task, range(50)):
            print(f'Got result: {result}', flush=True)
    # process pool is closed automatically

Running the example first creates the process pool with a default configuration.

It will have one child worker process for each logical CPU in your system.

The imap_unordered() function is then called for the range.

This issues 50 calls to the task() function, one for each integer between 0 and 49. An iterator is returned with the result for each function call in an arbitrary order.

Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.

The main process iterates over the values returned from the calls to the task() function as tasks are completed and reports the generated values. The reported values match those in the worker child processes.

Importantly, tasks are issued to the process pool one-by-one, as space in the pool becomes available.

As importantly, results in the main process are reported as tasks are completed, although not in the order that tasks were issued.

A truncated listing of results is provided below. We can see that tasks are running and reporting their generated results while the main process is receiving and reporting return values.

This is unlike the map() function that must wait for all tasks to complete before reporting return values.

...
Got result: 0.7382378240622508
Task 44 executing with 0.5268535697675194
Got result: 0.5539452700247147
Task 45 executing with 0.5580038966445552
Got result: 0.15597031847231024
Task 46 executing with 0.6994459495257512
Got result: 0.6956160587702503
Task 47 executing with 0.14472451723383273
Got result: 0.9140941300966938
Task 48 executing with 0.1917787016368715
Got result: 0.14472451723383273
Task 49 executing with 0.2954713222884531
Got result: 0.41907675578630865
Got result: 0.8898523941289793
Got result: 0.7605836122006914
Got result: 0.1917787016368715
Got result: 0.5268535697675194
Got result: 0.8365549573778605
Got result: 0.2954713222884531
Got result: 0.5580038966445552
Got result: 0.6994459495257512

Next, let's look at an example of issuing tasks that do not have a return value.

Example of Pool.imap_unordered() with No Return Value

We can explore using the imap_unordered() function to call a function for each item in an iterable that does not have a return value.

This means that we are not interested in the iterable of results returned by the call to imap_unordered() and instead are only interested that all issued tasks get executed.

This can be achieved by updating the previous example so that the task() function does not return a value.

The updated task() function with this change is listed below.

# task executed in a worker process
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)

Then, in the main process, we can call imap_unordered() with our task() function and the range, and not iterate the results.

...
# issue tasks to the process pool
pool.imap_unordered(task, range(50))

The call to imap_unordered() will return immediately.

Therefore, we must explicitly wait for all tasks in the process pool to complete. Otherwise, the context manager for the process pool would exit and forcefully terminate the process pool and all running tasks in the pool.

This can be achieved by first closing the process pool so no further tasks can be submitted to the pool.

...
# shutdown the process pool
pool.close()

You can learn more about closing the process pool in the tutorial:

We can then join the pool to wait for all tasks to complete and all worker processes to close.

...
# wait for all issued task to complete
pool.join()

You can learn more about joining the process pool in the tutorial:

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of parallel imap_unordered() with the process pool and a task that does not return a value
from random import random
from time import sleep
from multiprocessing.pool import Pool

# task executed in a worker process
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool() as pool:
        # issue tasks to the process pool
        pool.imap_unordered(task, range(50))
        # shutdown the process pool
        pool.close()
        # wait for all issued task to complete
        pool.join()

Running the example first creates the process pool with a default configuration.

The imap_unordered() function is then called for the range. This issues 50 calls to the task() function, one for each integer between 0 and 49. An iterator is returned immediately with the result for each function call, but is ignored in this case.

The main process carries on, first closing the process pool then joining it to wait for all tasks to complete.

Each call to the task function generates a random number between 0 and 1, reports a message, then blocks.

The tasks in the pool finish then the process pool is closed.

This example again highlights that the call to imap_unordered() does not block. That it only blocks when we loop over the returned iterator of return values.

Below is a truncated listing of results.

...
Task 39 executing with 0.6374724645330898
Task 40 executing with 0.9291781480641043
Task 41 executing with 0.32250814804915273
Task 42 executing with 0.23407953827532524
Task 43 executing with 0.9240386212648637
Task 44 executing with 0.8424015566602372
Task 45 executing with 0.18304021833320383
Task 46 executing with 0.16448168623627968
Task 47 executing with 0.18176198404249644
Task 48 executing with 0.5619166318793606
Task 49 executing with 0.820731157392554

Next, let's explore the chunksize argument to the imap_unordered() function.

Example of Pool.imap_unordered() with chunksize

The imap_unordered() function will apply a function to each item in an iterable one-by-one.

If the iterable has a large number of items, it may be inefficient as each task must retrieve the input from the provided iterable and be serialized to be sent to and executed by a child process.

A more efficient approach for very large iterables might be to divide the items in the iterable into chunks and issue chunks of items to each worker process to which the target function can be applied.

This can be achieved with the "chunksize" argument to the imap_unordered() function.

The example below updates the example to first configure the process pool with 4 child worker processes, then to issue 40 tasks with 2 tasks per child worker process. That is, there will be 20 chunks sent into the process pool, where each chunk involves two function calls to the target task function with two return values sent back.

Firstly, we can configure the process pool.

...
# create and configure the process pool
with Pool(4) as pool:
	# ...

Next, we can issue 40 tasks, with 2 tasks assigned to each worker via the "chunksize" argument.

Return values in the returned iterator are still yielded one at a time, although behind the scenes two results are available for each chunk that returns.

...
# issue tasks to the process pool
for result in pool.imap_unordered(task, range(40), chunksize=2):
    print(f'Got result: {result}', flush=True)

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of parallel imap_unordered() with the process pool with a larger iterable and chunksize
from random import random
from time import sleep
from multiprocessing.pool import Pool

# task executed in a worker process
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(1)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool(4) as pool:
        # issue tasks to the process pool
        for result in pool.imap_unordered(task, range(40), chunksize=2):
            print(f'Got result: {result}', flush=True)
    # process pool is closed automatically

Running the example first creates the process pool with 4 child process workers.

The imap_unordered() function is then called for the range, issuing 20 tasks to the process pool, each with two function calls to the target task function.

Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.

The main process iterates over the values returned from the calls to the task() function as tasks are completed and reports the generated values. The reported values match those in the worker child processes.

Importantly, tasks are issued to the process pool two at a time, as space in the pool becomes available.

As importantly, results in the main process are reported as tasks are completed, although not in the order that tasks were issued.

A truncated listing of results is provided below.

...
Got result: 0.43310375354927966
Got result: 0.5975296871991002
Got result: 0.4380293092145111
Got result: 0.6246733538805757
Got result: 0.9493643619714428
Got result: 0.4192055352460351
Got result: 0.8528829643092762
Got result: 0.9788750703082292
Task 33 executing with 0.6841895492251734
Task 37 executing with 0.614327329633788
Task 35 executing with 0.08964755006855385
Task 39 executing with 0.9044652703771386
Got result: 0.7885755857787172
Got result: 0.614327329633788
Got result: 0.35434735386371075
Got result: 0.08964755006855385
Got result: 0.5439014436560977
Got result: 0.6841895492251734
Got result: 0.6329368345481452
Got result: 0.9044652703771386

Takeaways

You now know how to use the imap_unordered() function to issue tasks to the process pool in Python.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.