Last Updated on September 12, 2022
You can map a function that takes multiple arguments to tasks in the process pool asynchronously via the Pool starmap_async() function.
In this tutorial you will discover how to issue tasks asynchronously to the process pool that take multiple arguments in Python.
Let’s get started.
Problem with Pool.starmap()
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
The built-in map() function allows you to apply a function to each item in an iterable. A problem with this function is that it converts the provided iterable of items into a list and submits all items as tasks to the process pool then blocks until all tasks are complete.
It yields one result returned from the given target function called with one item from a given iterable. It is common to call map() and iterate the results in a for-loop.
The process pool provides a version of the map() function called Pool.starmap() that will call a target function that is called for each item in the provided iterable in parallel. Like the built-in map() function, it supports multiple arguments for the target function.
The problem with the Pool.starmap() function is that it blocks until all tasks are completed. The caller must wait until all of the issued function calls on the provided iterable return.
The Pool.starmap_async() function provides a way to workaround this limitation.
Run loops using all CPUs, download your FREE book to learn how.
How to Use Pool.starmap_async()
The process pool provides an asynchronous version of the starmap() function via the Pool.starmap_async() function.
The starmap_async() function does not block while the function is applied to each item in the iterable, instead it returns a AsyncResult object from which the results may be accessed.
A combination of starmap() and map_async() that iterates over iterables and calls func with the iterables unpacked. Returns a result object.
— multiprocessing — Process-based parallelism
Like the starmap() function, the Pool.starmap_async() function allows the target task function to receive multiple arguments.
For example, we may have an iterable in which each item in the iterable is an iterable of arguments for each function call.
We might have a target task function that takes two arguments.
1 2 3 |
# target task function def task(arg1, arg2): # ... |
We may then define an iterable that contains 3 items and will result in 3 calls to the target task function.
1 2 3 |
... # define an iterable items = [(1,2), (3,4), (5,6)] |
Each item in the iterable is a tuple that contains two items, for the two arguments to the target task function.
We can issue this to the process pool using the starmap_async() function.
For example:
1 2 3 |
... # issue tasks to the process pool result = pool.starmap_async(task, items) |
This will result in three tasks in the process pool, each calling the target task() function with two arguments:
- task(1,2)
- task(3,4)
- task(5,6)
Like the Pool.map() function the Pool.starmap_async() allows us to issue tasks in chunks to the process pool. That is, we can group a fixed number of items from the input iterable and issue them as one task to be executed by a child worker process.
This can make completing a large number of tasks in a very long iterable more efficient as arguments and return values from the target task function can be transmitted in batches with less computational overhead.
This can be achieved via the “chunksize” argument to starmap_async().
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
— multiprocessing — Process-based parallelism
For example:
1 2 3 |
... # issue tasks to the process pool result = pool.starmap_async(task, items, chunksize=10) |
Because starmap_async() does not block, it allows the caller to continue and retrieve the result when needed.
The result is an iterable of return values which can be accessed via the get() function on the AsyncResult object.
For example:
1 2 3 4 |
... # iterate over return values for value in result.get(): # ... |
A callback function can be called automatically if the task was successful, e.g. no error or exception.
The callback function must take one argument, which is the iterable of return values from the target task function.
The function is specified via the “callback” argument to the starmap_async() function.
For example:
1 2 3 4 5 6 7 8 9 |
# callback function def custom_callback(result_iterable): # iterate results for result in result_iterable: print(f'Got result: {result}') ... # issue a task asynchronously to the process pool with a callback result = pool.starmap_async(task, callback=custom_callback) |
Similarly, an error callback function can be specified via the “error_callback” argument that is called only when an unexpected error or exception is raised.
The error callback function must take one argument, which is the instance of the error or exception that was raised.
For example:
1 2 3 4 5 6 7 |
# error callback function def custom_error_callback(error): print(f'Got error: {error}') ... # issue a task asynchronously to the process pool with an error callback result = pool.starmap_async(task, error_callback=custom_error_callback) |
Next, let’s take a closer look at how the starmap() function compares to other functions on the process pool.
Difference Between starmap_async() and starmap()
How does the Pool.starmap_async() function compare to the Pool.starmap() for issuing tasks to the process pool?
Both the starmap_async() and starmap() may be used to issue tasks that call a function in the process pool with more than one argument.
The following summarizes the key differences between these two functions:
- The starmap_async() function does not block, whereas the starmap() function blocks.
- The starmap_async() function returns an AsyncResult, whereas the starmap() function returns an iterable of return values from the target function
- The starmap_async() function can execute callback functions on return values and errors, whereas the starmap() function does not support callback functions.
The starmap_async() function should be used for issuing target task functions to the process pool where the caller cannot or must not block while the task is executing.
The starmap() function should be used for issuing target task functions to the process pool where the caller can or must block until all function calls are complete.
Now that we know how to use the starmap() function to execute tasks in the process pool, let’s look at some worked examples.
Example of starmap_async()
We can explore how to use the starmap_async() function with the process pool.
In this example, we can define a target task function that takes two arguments, reports the values then returns the values that were passed in. We can then call this function for each integer between 0 and 9 using the process pool starmap_async().
This will apply the function to each integer in parallel using as many cores as are available in the system.
Firstly, we can define the target task function.
The function takes an integer identifier and floating point value. It reports the values, then blocks for a fraction of a second to simulate computational effort, then returns the values that were provided as arguments.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) |
We can then create and configure a process pool.
We will use the context manager interface to ensure the pool is shutdown automatically once we are finished with it.
If you are new to the context manager interface of the process pool, you can learn more in the tutorial:
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
We can then define an iterable that provides the arguments to the task() function. The iterable will be a list of tuples, where each tuple will contain an integer value and randomly generated floating point value between 0 and 1.
This can be prepared in a list comprehension.
1 2 3 |
... # prepare arguments items = [(i, random()) for i in range(10)] |
We can then call the starmap() function on the process pool to apply our task() function to each tuple of arguments in the prepared list.
This will return immediately with an AsyncResult object.
1 2 3 |
... # issues tasks to process pool result = pool.starmap_async(task, items) |
We will then iterate over the iterable of results accessible via the get() function on the AsyncResult object.
This can be achieved via a for-loop.
1 2 3 4 |
... # iterate results for result in result.get(): print(f'Got result: {result}', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of parallel starmap_async() with the process pool from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap_async(task, items) # iterate results for result in result.get(): print(f'Got result: {result}', flush=True) # process pool is closed automatically |
Running the example first creates the process pool with a default configuration.
It will have one child worker process for each logical CPU in your system.
The list of function arguments is then prepared, then the starmap_async() function is then called for the target function and the list of arguments.
This issues 10 calls to the task() function, one for each tuple of arguments. A AsyncResult object is returned immediately, and the main process is free to continue on.
Each call to the task function reports the arguments as a message, blocks, then returns a tuple of the arguments.
The main process then accesses the iterable of return values via the AsyncResult. It iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each child process.
Importantly, all task() function calls are issued and executed before the iterator of results is made available via the AsyncResult instance. We cannot iterate over results as they are completed by the caller.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Task 0 executing with 0.04858687961908725 Task 1 executing with 0.8660633005744954 Task 2 executing with 0.29229441676119794 Task 3 executing with 0.30829421674756163 Task 4 executing with 0.04643637303763071 Task 5 executing with 0.5717794864804547 Task 6 executing with 0.8129779908301857 Task 7 executing with 0.5574933281142 Task 8 executing with 0.5505255197961183 Task 9 executing with 0.8470605835868059 Got result: (0, 0.04858687961908725) Got result: (1, 0.8660633005744954) Got result: (2, 0.29229441676119794) Got result: (3, 0.30829421674756163) Got result: (4, 0.04643637303763071) Got result: (5, 0.5717794864804547) Got result: (6, 0.8129779908301857) Got result: (7, 0.5574933281142) Got result: (8, 0.5505255197961183) Got result: (9, 0.8470605835868059) |
Next, let’s look at an example where we might call starmap_async() for a function with no return value.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of starmap_async with No Return Value
We can explore using the starmap_async() function to call a function for each tuple in an iterable that does not have a return value.
This means that we are not interested in the iterable of results returned by the call to starmap_async() and instead are only interested that all issued tasks get executed.
This can be achieved by updating the previous example so that the task() function does not return a value.
The updated task() function with this change is listed below.
1 2 3 4 5 6 |
# task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) |
Then, in the main process, we can call starmap_async() with our task() function and the list of tuples, as before.
1 2 3 4 5 |
... # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap_async(task, items) |
Importantly, because the call to starmap_async() does not block, we need to wait for the issued tasks to complete.
If we do not wait for the tasks to complete, we will exit the context manager for the process pool which will terminate the worker processes and stop the tasks.
This can be achieved by calling the wait() function on the AsyncResult object.
1 2 3 |
... # wait for tasks to complete result.wait() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of parallel starmap_async() with the process pool and no return values from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap_async(task, items) # wait for tasks to complete result.wait() # process pool is closed automatically |
Running the example first creates the process pool with a default configuration.
The starmap_async() function is then called for each tuple of arguments in the list. This issues 10 calls to the task() function.
A AsyncResult object is returned immediately, and the main process is free to continue on.
Each call to the task function reports a message, then blocks for a moment.
The main process waits on the AsyncResult object, blocking until all calls to the task() function are complete.
The tasks finish, and the wait() function returns and the main process is free to carry on.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.006606991782238114 Task 1 executing with 0.6719397377154739 Task 2 executing with 0.6406317070314566 Task 3 executing with 0.696648214806934 Task 4 executing with 0.06816605478594484 Task 5 executing with 0.37397804024123604 Task 6 executing with 0.42344652631037527 Task 7 executing with 0.007119523670944039 Task 8 executing with 0.8025552512742259 Task 9 executing with 0.7773457061021367 |
Next, let’s look at issuing many tasks over multiple calls, then wait for all tasks in the process pool to complete.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of starmap_async And Wait For All Tasks To Complete
We can explore how to issue many tasks to the process pool via starmap_async(), and wait for all issued tasks to complete.
This could be achieved by calling starmap_async() multiple times, and calling the wait() function AsyncResult object after each call.
An alternate approach is to call starmap_async() multiple times, then wait on the process pool itself for all issued tasks to complete.
In this example, we can update the previous example to call the starmap_async() function twice and ignore the AsyncResult objects that are returned.
1 2 3 4 5 6 7 8 9 |
... # prepare arguments items1 = [(i, random()) for i in range(10)] # issues tasks to process pool _ = pool.starmap_async(task, items1) # prepare arguments items2 = [(i, random()) for i in range(11, 20)] # issues tasks to process pool _ = pool.starmap_async(task, items2) |
We can then explicitly close the process pool to prevent additional tasks being submitted to the pool, then call the join() function to wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete and processes to close pool.join() |
You can learn more about joining the process pool in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of parallel starmap_async() with the process pool and wait for tasks to complete from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items1 = [(i, random()) for i in range(10)] # issues tasks to process pool _ = pool.starmap_async(task, items1) # prepare arguments items2 = [(i, random()) for i in range(11, 20)] # issues tasks to process pool _ = pool.starmap_async(task, items2) # close the process pool pool.close() # wait for all tasks to complete and processes to close pool.join() # process pool is closed automatically |
Running the example first creates the process pool with a default configuration.
The starmap_async() function is then called for the first list of tuples. This issues 10 calls to the task() function, one for each tuple in the list.
The starmap_async() function is then called again for a different list of tuple arguments, issuing 10 more calls to the task() function.
Both calls return immediately, and the AsyncResult object returned is ignored.
Each call to the task function reports a message, then blocks for a moment.
The main process then closes the process pool, preventing any additional tasks from being issued. It then joins the process pool, blocking until all issued tasks are completed.
The tasks finish, and the join() function returns and the main process is free to carry on.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Task 0 executing with 0.7775901642489408 Task 1 executing with 0.4611633388999593 Task 2 executing with 0.8437736627316834 Task 3 executing with 0.8854986246277919 Task 4 executing with 0.028456490572653426 Task 5 executing with 0.9359308610071692 Task 6 executing with 0.014212311146263312 Task 7 executing with 0.7942811715252199 Task 8 executing with 0.8411986243212594 Task 9 executing with 0.7271159611159674 Task 11 executing with 0.04125757747179182 Task 12 executing with 0.47726164472299637 Task 13 executing with 0.6968279555266779 Task 14 executing with 0.04007928715583686 Task 15 executing with 0.7486045325597867 Task 16 executing with 0.6202094282124178 Task 17 executing with 0.2718389522903977 Task 18 executing with 0.20806899624428443 Task 19 executing with 0.9747310063333213 |
Next, let’s look at issuing tasks and handle each return value with a callback function.
Example of starmap_async() with a Callback Function
We can issue tasks to the process pool that return a value and specify a callback function to handle each returned value.
This can be achieved via the “callback” argument to the starmap_async() function.
In this example, we can update the above example so that the task() function returns a tuple of the arguments that were provided. We can then define a function to handle the return value, in this case to simply report the values.
Firstly, we can define the function to call to handle the value returned from the function.
The custom_callback() below implements this, taking one argument which is the value returned from the target task function.
1 2 3 |
# custom callback function def custom_callback(result): print(f'Callback got values: {result}') |
Next, we can update the task function to return a tuple containing the arguments.
The updated task() function with these changes is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) |
Finally, we can update the call to starmap_async() to specify the callback function via the “callback” argument, giving the name of our custom function.
1 2 3 4 5 |
... # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool _ = pool.starmap_async(task, items, callback=custom_callback) |
The main process will then close the process pool and wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete and processes to close pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of parallel starmap_async() with the process pool and a callback function from random import random from time import sleep from multiprocessing.pool import Pool # custom callback function def custom_callback(result): print(f'Callback got values: {result}') # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool _ = pool.starmap_async(task, items, callback=custom_callback) # close the process pool pool.close() # wait for all tasks to complete and processes to close pool.join() |
Running the example first creates and configures the process pool.
The starmap_async() function is then called for the range and a return callback function. This issues ten calls to the task() function, one for each tuple in the prepared list.
The main process then closes the process pool and blocks until all tasks complete and all processes in the process pool close.
Each call to the task function reports a message, blocks for a moment, then returns a tuple of values.
When all task() functions are finished, the callback function is called, provided with the iterable of return values.
The iterable is printed directly, showing all return values at once.
Finally, the worker processes are closed and the main process continues on.
1 2 3 4 5 6 7 8 9 10 11 |
Task 0 executing with 0.41262763526462787 Task 1 executing with 0.5647592159687416 Task 2 executing with 0.8327049315109151 Task 3 executing with 0.9140524318291349 Task 4 executing with 0.8699088278461099 Task 5 executing with 0.9899772955673786 Task 6 executing with 0.27086990809541145 Task 7 executing with 0.6572422519829662 Task 8 executing with 0.06981878272942554 Task 9 executing with 0.5817534685800275 Callback got values: [(0, 0.41262763526462787), (1, 0.5647592159687416), (2, 0.8327049315109151), (3, 0.9140524318291349), (4, 0.8699088278461099), (5, 0.9899772955673786), (6, 0.27086990809541145), (7, 0.6572422519829662), (8, 0.06981878272942554), (9, 0.5817534685800275)] |
Next, let’s look at an example of issuing tasks to the pool with an error callback function.
Example of starmap_async() with an Error Callback Function
We can issue tasks to the process pool that may raise an unhandled exception and specify an error callback function to handle the exception.
This can be achieved via the “error_callback” argument.
In this example, we can update the above example so that the task() function raises an exception for one specific argument. We can then define a function to handle the raised example, in this case to simply report the details of the exception.
Firstly, we can define the function to call to handle an exception raised by the function.
The custom_error_callback() below implements this, taking one argument which is the exception raised by the target task function.
1 2 3 |
# custom error callback function def custom_error_callback(error): print(f'Got an error: {error}') |
Next, we can update the task() function so that it raises an exception conditionally for one argument, e.g. an identifier of 5.
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a worker process def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) |
Finally, we can update the call to starmap_async() to specify the error callback function via the “custom_error_callback” argument, giving the name of our custom function.
1 2 3 4 5 |
... # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool _ = pool.starmap_async(task, items, error_callback=custom_error_callback) |
The main process then closes the process pool and waits for all tasks to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete and processes to close pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of parallel starmap_async() with the process pool and an error callback function from random import random from time import sleep from multiprocessing.pool import Pool # custom error callback function def custom_error_callback(error): print(f'Got an error: {error}') # task executed in a worker process def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool _ = pool.starmap_async(task, items, error_callback=custom_error_callback) # close the process pool pool.close() # wait for all tasks to complete and processes to close pool.join() |
Running the example first creates and configures the process pool.
The starmap_async() function is then called for the list of tuples and an error callback function. This issues ten calls to the task() function, one for each tuple in the list.
An AsyncResult object is returned immediately and is ignored, and the main process is free to continue on.
The main process then closes the process pool and blocks waiting for all issued tasks to complete.
Each call to the task function reports a message, then blocks for a moment. The task that receives the identifier value of 5 as an argument raises an exception.
All tasks finish executing and the exception callback function is called with the raised exception, reporting a message.
All worker processes are closed and the main process is free to continue on.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.32137576941737844 Task 1 executing with 0.2703458381403999 Task 2 executing with 0.2363588917185917 Task 3 executing with 0.8623534534517294 Task 4 executing with 0.7409736574146876 Task 6 executing with 0.5914949142738242 Task 7 executing with 0.49987113535632166 Task 8 executing with 0.6536687540199568 Task 9 executing with 0.2965858295185979 Got an error: Something bad happened |
Next, let’s look at an example of issuing tasks to the pool and accessing the results, where one task may raise an exception.
Example of starmap_async() with an Exception
As we saw in the previous example, it is possible for the target function to raise an exception.
If this occurs in just one call to the target function, it will prevent all return values from being accessed.
We can demonstrate this with a worked example.
Firstly, we can update the task() function to conditionally raise an exception, in this case if the identifier argument to the function is equal to five, e.g. the same as in the previous example.
1 2 3 4 |
... # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') |
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a worker process def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) |
We can then call starmap_async() to call the task() function for each tuple in the list.
1 2 3 4 5 |
... # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap_async(task, items) |
We can then get the iterable of return values.
1 2 |
... values = result.get() |
This may raise an exception if one of the calls failed, so we must protect the call with a try-except pattern.
1 2 3 4 5 6 |
... # get the return values try: values = result.get() except Exception as e: print(f'Failed with: {e}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of parallel starmap_async() with the process pool and handle an exception from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap_async(task, items) # get the return values try: values = result.get() except Exception as e: print(f'Failed with: {e}') |
Running the example first creates and configures the process pool.
The starmap_async() function is then called for each tuple in the list. This issues 10 calls to the task() function.
An AsyncResult object is returned immediately, and the main process is free to continue on.
The main process then blocks, attempting to get the iterable of return values from the AsyncResult object.
Each call to the task function reports a message, then blocks for a moment, then returns a tuple. The task that receives the value of 5 as an argument raises an exception.
All tasks complete.
The exception is then re-raised in the main process, handled and a message is reported. The iterable cannot be looped and no return values are reported.
The process pool is then closed automatically by the context manager.
This highlights that a failure in one task can prevent the results from all tasks from being accessed.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.5221601941516373 Task 1 executing with 0.31370263368668594 Task 2 executing with 0.6319152637731589 Task 3 executing with 0.5996534898921915 Task 4 executing with 0.30235628455388386 Task 6 executing with 0.5992044450848245 Task 7 executing with 0.21668081008564 Task 8 executing with 0.2881390911951681 Task 9 executing with 0.6274888670358355 Failed with: Something bad happened |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to issue tasks asynchronously to the process pool that take multiple arguments in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Josh Withers on Unsplash
Hannu says
I am sending multiple parameters to the thread in starmap_async call. I would like to use timeout to get as much of the results to the user as possible even though some of the worker threads might timeout.
What is the way to get in multiprocessing (from starmap_async) to get the successful results of the successful (non timeouted) threads results when there is a timeout in some of the threads.
Let’s say there is 20 threads that normally execute successfully, in some situations 1 or 2 or even more of them might timeout – would like to get the results of the ones that did not timeout, nor went to an error.
Jason Brownlee says
Great question!
Off the cuff, I’d think about sending results back to the caller using a queue, have the caller control how long it wants to wait, and stop retrieving results from the queue once the timeout has elapsed – manually.
For example:
https://superfastpython.com/multiprocessing-queue-in-python/
And:
https://superfastpython.com/multiprocessing-manager-share-queue/
And:
https://superfastpython.com/multiprocessing-pool-first-result/
This would allow tasks to run/finish as fast as possible with no complexity, and house all complexity in the caller.
Does that help?