Multiprocessing Pool.starmap() in Python

July 13, 2022 Python Multiprocessing Pool

You can map a function that takes multiple arguments to tasks in the process pool via the Pool starmap() method.

In this tutorial you will discover how to issue tasks to the process pool that take multiple arguments in Python.

Let's get started.

Problem with Pool.map()

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

-- multiprocessing — Process-based parallelism

The built-in map() function allows you to apply a function to each item in an iterable. A problem with this function is that it converts the provided iterable of items into a list and submits all items as tasks to the process pool then blocks until all tasks are complete.

It yields one result returned from the given target function called with one item from a given iterable. It is common to call map() and iterate the results in a for-loop.

The process pool provides a version of the map() function where the target function is called for each item in the provided iterable in parallel.

The problem with the Pool.map() function is that it only takes one iterable of items, allowing only a single argument to the target task function. This is different from the built-in map() function.

Task functions that only take one argument is a severe limitation.

The Pool.starmap() function provides a way to workaround this limitation.

How to Use Pool.starmap()

The process pool provides a version of map() that permits multiple arguments to the target task function via the Pool.starmap() function.

The starmap() function takes the name of a function to apply and an iterable.

It will then convert the provided iterable into a list and issue one task for each item in the iterable.

Importantly, each item in the iterable provided to the starmap() function may itself be an iterable containing the arguments to provide to the target task function.

Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments. Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].

-- multiprocessing — Process-based parallelism

This allows the target task function to receive multiple arguments.

For example, we may have an iterable in which each item in the iterable is an iterable of arguments for each function call.

We might have a target task function that takes two arguments.

# target task function
def task(arg1, arg2):
	# ...

We may then define an iterable that contains 3 items and will result in 3 calls to the target task function.

...
# define an iterable
items = [(1,2), (3,4), (5,6)]

Each item in the iterable is a tuple that contains two items, for the two arguments to the target task function.

We can issue this to the process pool using the starmap() function and traverse the iterable of return values.

...
# issue tasks and iterate the results from the issued tasks
for result in pool.starmap(task, items):
	# ...

This will result in three tasks in the process pool, each calling the target task() function with two arguments:

Like the Pool.map() function the Pool.starmap() allows us to issue tasks in chunks to the process pool. That is, we can group a fixed number of items from the input iterable and issue them as one task to be executed by a child worker process.

This can make completing a large number of tasks in a very long iterable more efficient as arguments and return values from the target task function can be transmitted in batches with less computational overhead.

This can be achieved via the "chunksize" argument to starmap().

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

-- multiprocessing — Process-based parallelism

For example:

...
# issue tasks in chunks and iterate the results
for result in pool.starmap(task, items, chunksize=10):
	# ...

Next, let's take a closer look at how the starmap() function compares to other functions on the process pool.

Difference Between starmap() and map()

How does the Pool.starmap() function compare to the Pool.map() for issuing tasks to the process pool?

Both the starmap() and map() may be used to issue tasks that call a function to all items in an iterable via the process pool.

  1. The key difference between the starmap() function and the map() function is that starmap() supports a target function with more than one argument, whereas the map() function supports target functions with only one argument.

Difference Between starmap() and starmap_async()

How does the Pool.starmap() function compare to the Pool.starmap_async() for issuing tasks to the process pool?

Both the starmap() and starmap_async() may be used to issue tasks that call a function in the process pool with more than one argument.

The following summarizes the key differences between these two functions:

  1. The starmap() function blocks, whereas the starmap_async() function does not block.
  2. The starmap() function returns an iterable of return values from the target function, whereas the starmap_async() function returns an AsyncResult.
  3. The starmap() function does not support callback functions, whereas the starmap_async() function can execute callback functions on return values and errors.

The starmap() function should be used for issuing target task functions to the process pool where the caller can or must block until all function calls are complete.

The starmap_async() function should be used for issuing target task functions to the process pool where the caller cannot or must not block while the task is executing.

Now that we know how to use the starmap() function to execute tasks in the process pool, let's look at some worked examples.

Example of Pool.starmap()

We can explore how to use the starmap() function with the process pool.

In this example, we can define a target task function that takes two arguments, reports the values then returns the values that were passed in. We can then call this function for each integer between 0 and 9 using the process pool starmap().

This will apply the function to each integer in parallel using as many cores as are available in the system.

Firstly, we can define the target task function.

The function takes an integer identifier and floating point value. It reports the values, then blocks for a fraction of a second to simulate computational effort, then returns the values that were provided as arguments.

The task() function below implements this.

# task executed in a worker process
def task(identifier, value):
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)
    # return the generated value
    return (identifier, value)

We can then create and configure a process pool.

We will use the context manager interface to ensure the pool is shutdown automatically once we are finished with it.

If you are new to the context manager interface of the process pool, you can learn more in the tutorial:

...
# create and configure the process pool
with Pool() as pool:
	# ...

We can then define an iterable that provides the arguments to the task() function. The iterable will be a list of tuples, where each tuple will contain an integer value and randomly generated floating point value between 0 and 1.

This can be prepared in a list comprehension.

...
# prepare arguments
items = [(i, random()) for i in range(10)]

We can then call the starmap() function on the process pool to apply our task() function to each tuple of arguments in the prepared list.

This returns an iterator over the results returned from the task() function, in the order that function calls are completed. We will iterate over the results and report each in turn.

This can all be achieved in a for-loop.

...
# execute tasks and process results in order
for result in pool.starmap(task, items):
    print(f'Got result: {result}', flush=True)

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of using starmap() with the process pool
from random import random
from time import sleep
from multiprocessing.pool import Pool

# task executed in a worker process
def task(identifier, value):
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)
    # return the generated value
    return (identifier, value)

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool() as pool:
        # prepare arguments
        items = [(i, random()) for i in range(10)]
        # execute tasks and process results in order
        for result in pool.starmap(task, items):
            print(f'Got result: {result}', flush=True)
    # process pool is closed automatically

Running the example first creates the process pool with a default configuration.

It will have one child worker process for each logical CPU in your system.

The list of function arguments is then prepared, then the starmap() function is then called for the target function and the list of arguments.

This issues ten calls to the task() function, one for each tuple of arguments. An iterator is returned with the result for each function call, in order.

Each call to the task function reports the arguments as a message, blocks, then returns a tuple of the arguments.

The main process iterates over the values returned from the calls to the task() function and reports the values, matching those reported in each child process.

Importantly, all task() function calls are issued and executed before the iterator of results is returned. We cannot iterate over results as they are completed by the caller.

Task 0 executing with 0.7713147455455241
Task 1 executing with 0.1964734654902801
Task 2 executing with 0.6086237770389655
Task 3 executing with 0.8567952900043411
Task 4 executing with 0.17564858314614085
Task 5 executing with 0.761031383838424
Task 6 executing with 0.8954732687092991
Task 7 executing with 0.8908427671343335
Task 8 executing with 0.4626459009945638
Task 9 executing with 0.873074346713649
Got result: (0, 0.7713147455455241)
Got result: (1, 0.1964734654902801)
Got result: (2, 0.6086237770389655)
Got result: (3, 0.8567952900043411)
Got result: (4, 0.17564858314614085)
Got result: (5, 0.761031383838424)
Got result: (6, 0.8954732687092991)
Got result: (7, 0.8908427671343335)
Got result: (8, 0.4626459009945638)
Got result: (9, 0.873074346713649)

Next, let's look at an example where we might call a starmap() for a function with no return value.

Example of Pool.starmap() with No Return Value

We can explore using the starmap() function to call a target task function that does not have a return value.

This means that we are not interested in the iterable of results returned by the call to starmap() and instead are only interested that all issued tasks get executed.

This can be achieved by updating the previous example so that the task() function does not return a value.

The updated task() function with this change is listed below.

# task executed in a worker process
def task(identifier, value):
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)

Then, in the main process, we can call starmap() with our task() function and the list of arguments, and not iterate the results.

...
# prepare arguments
items = [(i, random()) for i in range(10)]
# issue tasks to the process pool and wait for tasks to complete
pool.starmap(task, items)

Importantly, the call to starmap() on the process pool will block the main process until all issued tasks are completed.

Once completed, the call will return and the process pool will be closed by the context manager.

This is a helpful pattern to issue many tasks to the process pool with a single function call.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of using starmap() in the process pool with no return values
from random import random
from time import sleep
from multiprocessing.pool import Pool

# task executed in a worker process
def task(identifier, value):
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool() as pool:
        # prepare arguments
        items = [(i, random()) for i in range(10)]
        # issue tasks to the process pool and wait for tasks to complete
        pool.starmap(task, items)
    # process pool is closed automatically

Running the example first creates the process pool with a default configuration.

The list of arguments is prepared, then the starmap() function is then called to apply the target function to each tuple in the list. This issues ten tasks into the process pool. An iterator is returned with the result for each function call, but is ignored in this case.

Each call to the task() function reports a message, then blocks.

The main process blocks until the starmap() function returns.

The tasks finish, starmap() returns, then the process pool is closed.

This example again highlights that the call to starmap() blocks until all issued tasks are completed.

Task 0 executing with 0.38123755597856135
Task 1 executing with 0.9844199135059981
Task 2 executing with 0.053600569179194535
Task 3 executing with 0.45685585285522523
Task 4 executing with 0.6808662719854731
Task 5 executing with 0.47334579051992776
Task 6 executing with 0.22903377728790164
Task 7 executing with 0.3971896553074561
Task 8 executing with 0.37187226790578587
Task 9 executing with 0.891073904474667

Next, let's explore the chunksize argument to the starmap() function.

Example of Pool.starmap() with chunksize

The starmap() function will apply a function to each item in an iterable.

If the iterable has a large number of items, it may be inefficient to issue function calls to the target function for each item.

This is for two reasons:

  1. One task is created for each item in the iterable to be passed as an argument to the target function.
  2. The argument data for each task must be transmitted to a child process and the return value transmitted back to the parent process.

A more efficient approach would be to divide the items in the iterable into chunks and issue chunks of items to each worker process to which the target function can be applied.

This can be achieved with the "chunksize" argument to the starmap() function.

The example below updates the example to use a chunksize of 2 when issuing tasks into the process pool. With 10 function calls, this results in 5 tasks, each composed of two calls to the target task function.

...
# execute tasks and process results in order
for result in pool.starmap(task, items, chunksize=2):
    print(f'Got result: {result}', flush=True)

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of using starmap() in the process pool with chunksize
from random import random
from time import sleep
from multiprocessing.pool import Pool

# task executed in a worker process
def task(identifier, value):
    # report a message
    print(f'Task {identifier} executing with {value}', flush=True)
    # block for a moment
    sleep(value)
    # return the generated value
    return (identifier, value)

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool() as pool:
        # prepare arguments
        items = [(i, random()) for i in range(10)]
        # execute tasks and process results in order
        for result in pool.starmap(task, items, chunksize=2):
            print(f'Got result: {result}', flush=True)
    # process pool is closed automatically

Running the example first creates the process pool of child process workers.

The starmap() function is then called with an iterable of 10 items and a chunksize of 2.

This issues 5 units of work to the process pool, each unit of work composed of 2 calls to the task() function.

Each call to the task function reports a message, blocks, then returns a tuple value.

The main process iterates over the values returned from the calls to the task() function and reports the values, matching those reported in each child process.

Task 0 executing with 0.5098929421833336
Task 2 executing with 0.6780121197069486
Task 4 executing with 0.627066315440481
Task 6 executing with 0.5677011542683361
Task 8 executing with 0.6175545306082948
Task 1 executing with 0.49330278222648494
Task 7 executing with 0.7563975246943004
Task 9 executing with 0.191260636374323
Task 5 executing with 0.15345573717211092
Task 3 executing with 0.841694729779332
Got result: (0, 0.5098929421833336)
Got result: (1, 0.49330278222648494)
Got result: (2, 0.6780121197069486)
Got result: (3, 0.841694729779332)
Got result: (4, 0.627066315440481)
Got result: (5, 0.15345573717211092)
Got result: (6, 0.5677011542683361)
Got result: (7, 0.7563975246943004)
Got result: (8, 0.6175545306082948)
Got result: (9, 0.191260636374323)

Takeaways

You now know how to issue tasks to the process pool that take multiple arguments in Python.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.