Last Updated on October 29, 2022
You can map a function that takes multiple arguments to tasks in the ThreadPool asynchronously via the starmap_async() method.
In this tutorial you will discover how to issue tasks asynchronously to the ThreadPool that take multiple arguments in Python.
Let’s get started.
Problem with ThreadPool starmap()
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
The built-in map() function allows you to apply a function to each item in an iterable. It yields one result returned from the given target function called with one item from a given iterable. It is common to call map and iterate the results in a for-loop.
The ThreadPool provides a version of the map() method called starmap() that will call a target function that is called for each item in the provided iterable with worker threads. Like the built-in map() function, it supports multiple arguments for the target function.
The problem with the starmap() method is that it blocks until all tasks are completed. The caller must wait until all of the issued function calls on the provided iterable return.
The starmap_async() method provides a way to workaround this limitation.
Run loops using all CPUs, download your FREE book to learn how.
How to Use ThreadPool starmap_async()
The ThreadPool provides an asynchronous version of the starmap() method via the starmap_async() method.
The starmap_async() method does not block while the function is applied to each item in the iterable, instead it returns a AsyncResult object from which the results may be accessed.
A combination of starmap() and map_async() that iterates over iterables and calls func with the iterables unpacked. Returns a result object.
— multiprocessing — Process-based parallelism
Like the starmap() method, the starmap_async() method allows the target task function to receive multiple arguments.
For example, we may have an iterable in which each item in the iterable is an iterable of arguments for each function call.
We might have a target task function that takes two arguments.
1 2 3 |
# target task function def task(arg1, arg2): # ... |
We may then define an iterable that contains 3 items and will result in 3 calls to the target task function.
1 2 3 |
... # define an iterable items = [(1,2), (3,4), (5,6)] |
Each item in the iterable is a tuple that contains two items, for the two arguments to the target task function.
We can issue this to the ThreadPool using the starmap_async() method.
For example:
1 2 3 |
... # issue tasks to the thread pool result = starmap_async(task, items) |
This will result in three tasks in the ThreadPool, each calling the target task() function with two arguments:
- task(1,2)
- task(3,4)
- task(5,6)
Like the map() method the starmap_async() allows us to issue tasks in chunks to the ThreadPool. That is, we can group a fixed number of items from the input iterable and issue them as one task to be executed by a worker thread.
This can make completing a large number of tasks in a very long iterable more efficient as arguments and return values from the target task function can be transmitted in batches with less computational overhead.
This can be achieved via the “chunksize” argument to starmap_async().
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
— multiprocessing — Process-based parallelism
For example:
1 2 3 |
... # issue tasks to the thread pool result = starmap_async(task, items, chunksize=10) |
Because starmap_async() does not block, it allows the caller to continue and retrieve the result when needed.
The result is an iterable of return values which can be accessed via the get() method on the AsyncResult object.
1 2 3 4 |
... # iterate over return values for value in result.get(): # ... |
For example:
A callback function can be called automatically if the task was successful, e.g. no error or exception.
The callback function must take one argument, which is the iterable of return values from the target task function.
The function is specified via the “callback” argument to the starmap_async() method.
For example:
1 2 3 4 5 6 7 8 9 |
# callback function def custom_callback(result_iterable): # iterate results for result in result_iterable: print(f'Got result: {result}') ... # issue a task asynchronously to the thread pool with a callback result = pool.starmap_async(task, callback=custom_callback) |
Similarly, an error callback function can be specified via the “error_callback” argument that is called only when an unexpected error or exception is raised.
The error callback function must take one argument, which is the instance of the error or exception that was raised.
For example:
1 2 3 4 5 6 7 |
# error callback function def custom_error_callback(error): print(f'Got error: {error}') ... # issue a task asynchronously to the thread pool with an error callback result = pool.starmap_async(task, error_callback=custom_error_callback) |
Next, let’s take a closer look at how the starmap() method compares to other methods on the ThreadPool.
Difference Between starmap_async() and starmap()
How does the starmap_async() method compare to the starmap() for issuing tasks to the ThreadPool?
Both the starmap_async() and starmap() may be used to issue tasks that call a function in the ThreadPool with more than one argument.
The following summarizes the key differences between these two methods:
- The starmap_async() method does not block, whereas the starmap() method blocks.
- The starmap_async() method returns an AsyncResult, whereas the starmap() method returns an iterable of return values from the target function
- The starmap_async() method can execute callback functions on return values and errors, whereas the starmap() method does not support callback functions.
The starmap_async() method should be used for issuing target task functions to the ThreadPool where the caller cannot or must not block while the task is executing.
The starmap() method should be used for issuing target task functions to the ThreadPool where the caller can or must block until all function calls are complete.
Now that we know how to use the starmap() method to execute tasks in the ThreadPool, let’s look at some worked examples.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Example of ThreadPool starmap_async()
We can explore how to use the starmap_async() method with the ThreadPool.
In this example, we can define a target task function that takes two arguments, reports the values then returns the values that were passed in. We can then call this function for each integer between 0 and 9 using the ThreadPool starmap_async().
This will apply the function to each integer in parallel using as many cores as are available in the system.
Firstly, we can define the target task function.
The function takes an integer identifier and floating point value. It reports the values, then blocks for a fraction of a second to simulate computational effort, then returns the values that were provided as arguments.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return (identifier, value) |
We can then create and configure a ThreadPool.
We will use the context manager interface to ensure the pool is shutdown automatically once we are finished with it.
If you are new to the context manager interface of the ThreadPool, you can learn more in the tutorial:
1 2 3 4 |
... # create and configure the thread pool with ThreadPool() as pool: # ... |
We can then define an iterable that provides the arguments to the task() function. The iterable will be a list of tuples, where each tuple will contain an integer value and randomly generated floating point value between 0 and 1.
This can be prepared in a list comprehension.
1 2 3 |
... # prepare arguments items = [(i, random()) for i in range(10)] |
We can then call the starmap() method on the ThreadPool to apply our task() function to each tuple of arguments in the prepared list.
This will return immediately with an AsyncResult object.
1 2 3 |
... # issues tasks to thread pool result = pool.starmap_async(task, items) |
We will then iterate over the iterable of results accessible via the get() method on the AsyncResult object.
This can be achieved via a for-loop.
1 2 3 4 |
... # iterate results for result in result.get(): print(f'Got result: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of parallel starmap_async() with the thread pool from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool result = pool.starmap_async(task, items) # iterate results for result in result.get(): print(f'Got result: {result}') # thread pool is closed automatically |
Running the example first creates the ThreadPool with a default configuration.
It will have one worker thread for each logical CPU in your system.
The list of function arguments is then prepared, then the starmap_async() method is then called for the target function and the list of arguments.
This issues 10 calls to the task() function, one for each tuple of arguments. A AsyncResult object is returned immediately, and the main thread is free to continue on.
Each call to the task function reports the arguments as a message, blocks, then returns a tuple of the arguments.
The main thread then accesses the iterable of return values via the AsyncResult. It iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each worker thread.
Importantly, all task() function calls are issued and executed before the iterator of results is made available via the AsyncResult instance. We cannot iterate over results as they are completed by the caller.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Task 0 executing with 0.20612725219300176 Task 1 executing with 0.8005394102944386 Task 2 executing with 0.26033448784767166 Task 3 executing with 0.9694928298278335 Task 4 executing with 0.49358329324964934 Task 5 executing with 0.7148496837384375 Task 6 executing with 0.054516499852573896 Task 7 executing with 0.44353922256450407 Task 8 executing with 0.020708025577314326 Task 9 executing with 0.4062042427218545 Got result: (0, 0.20612725219300176) Got result: (1, 0.8005394102944386) Got result: (2, 0.26033448784767166) Got result: (3, 0.9694928298278335) Got result: (4, 0.49358329324964934) Got result: (5, 0.7148496837384375) Got result: (6, 0.054516499852573896) Got result: (7, 0.44353922256450407) Got result: (8, 0.020708025577314326) Got result: (9, 0.4062042427218545) |
Next, let’s look at an example where we might call starmap_async() for a function with no return value.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of starmap_async with No Return Value
We can explore using the starmap_async() method to call a function for each tuple in an iterable that does not have a return value.
This means that we are not interested in the iterable of results returned by the call to starmap_async() and instead are only interested that all issued tasks get executed.
This can be achieved by updating the previous example so that the task() function does not return a value.
The updated task() function with this change is listed below.
1 2 3 4 5 6 |
# task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) |
Then, in the main thread, we can call starmap_async() with our task() function and the list of tuples, as before.
1 2 3 4 5 |
... # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool result = pool.starmap_async(task, items) |
Importantly, because the call to starmap_async() does not block, we need to wait for the issued tasks to complete.
If we do not wait for the tasks to complete, we will exit the context manager for the ThreadPool which will terminate the worker threads and stop the tasks.
This can be achieved by calling the wait() method on the AsyncResult object.
1 2 3 |
... # wait for tasks to complete result.wait() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of parallel starmap_async() with the thread pool and no return values from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool result = pool.starmap_async(task, items) # wait for tasks to complete result.wait() # thread pool is closed automatically |
Running the example first creates the ThreadPool with a default configuration.
The starmap_async() method is then called for each tuple of arguments in the list. This issues 10 calls to the task() function.
A AsyncResult object is returned immediately, and the main thread is free to continue on.
Each call to the task function reports a message, then blocks for a moment.
The main thread waits on the AsyncResult object, blocking until all calls to the task() function are complete.
The tasks finish, and the wait() function returns and the main thread is free to carry on.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.011812413258115662 Task 1 executing with 0.27590013060088836 Task 2 executing with 0.5033227105717938 Task 3 executing with 0.9915439194365574 Task 4 executing with 0.44286843859901526 Task 5 executing with 0.3036038108154744 Task 6 executing with 0.8557015800583464 Task 7 executing with 0.804522945506136 Task 8 executing with 0.005663114808695857 Task 9 executing with 0.6094483700965514 |
Next, let’s look at issuing many tasks over multiple calls, then wait for all tasks in the ThreadPool to complete.
Example of starmap_async And Wait For All Tasks To Complete
We can explore how to issue many tasks to the ThreadPool via starmap_async(), and wait for all issued tasks to complete.
This could be achieved by calling starmap_async() multiple times, and calling the wait() method AsyncResult object after each call.
An alternate approach is to call starmap_async() multiple times, then wait on the ThreadPool itself for all issued tasks to complete.
In this example, we can update the previous example to call the starmap_async() method twice and ignore the AsyncResult objects that are returned.
1 2 3 4 5 6 7 8 9 |
... # prepare arguments items1 = [(i, random()) for i in range(10)] # issues tasks to thread pool _ = pool.starmap_async(task, items1) # prepare arguments items2 = [(i, random()) for i in range(11, 20)] # issues tasks to thread pool _ = pool.starmap_async(task, items2) |
We can then explicitly close the ThreadPool to prevent additional tasks being submitted to the pool, then call the join() method to wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the thread pool pool.close() # wait for all tasks to complete and threads to close pool.join() |
You can learn more about joining the ThreadPool in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of parallel starmap_async() with the thread pool and wait for tasks to complete from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # prepare arguments items1 = [(i, random()) for i in range(10)] # issues tasks to thread pool _ = pool.starmap_async(task, items1) # prepare arguments items2 = [(i, random()) for i in range(11, 20)] # issues tasks to thread pool _ = pool.starmap_async(task, items2) # close the thread pool pool.close() # wait for all tasks to complete and threads to close pool.join() # thread pool is closed automatically |
Running the example first creates the ThreadPool with a default configuration.
The starmap_async() method is then called for the first list of tuples. This issues 10 calls to the task() function, one for each tuple in the list.
The starmap_async() method is then called again for a different list of tuple arguments, issuing 10 more calls to the task() function.
Both calls return immediately, and the AsyncResult object returned is ignored.
Each call to the task function reports a message, then blocks for a moment.
The main thread then closes the ThreadPool, preventing any additional tasks from being issued. It then joins the ThreadPool, blocking until all issued tasks are completed.
The tasks finish, and the join() method returns and the main thread is free to carry on.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Task 0 executing with 0.863256559555241 Task 1 executing with 0.22677574910303255 Task 2 executing with 0.7597887486908026 Task 3 executing with 0.33611363563926866 Task 4 executing with 0.06894922331270847 Task 5 executing with 0.10488678205589752 Task 6 executing with 0.4269556786982138 Task 7 executing with 0.25239820561317183 Task 8 executing with 0.5510070212775743 Task 9 executing with 0.08671432464073048 Task 11 executing with 0.5193289985108285 Task 12 executing with 0.5978682114312743 Task 13 executing with 0.3483993422321091 Task 14 executing with 0.48059182015948554 Task 15 executing with 0.31914713567970954 Task 16 executing with 0.6053953231214081 Task 17 executing with 0.4923293888102128 Task 18 executing with 0.8545168246962658 Task 19 executing with 0.15297202065923643 |
Next, let’s look at issuing tasks and handle each return value with a callback function.
Example of starmap_async() with a Callback Function
We can issue tasks to the ThreadPool that return a value and specify a callback function to handle each returned value.
This can be achieved via the “callback” argument to the starmap_async() method.
In this example, we can update the above example so that the task() function returns a tuple of the arguments that were provided. We can then define a function to handle the return value, in this case to simply report the values.
Firstly, we can define the function to call to handle the value returned from the function.
The custom_callback() below implements this, taking one argument which is the value returned from the target task function.
1 2 3 |
# custom callback function def custom_callback(result): print(f'Callback got values: {result}') |
Next, we can update the task function to return a tuple containing the arguments.
The updated task() function with these changes is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return (identifier, value) |
Finally, we can update the call to starmap_async() to specify the callback function via the “callback” argument, giving the name of our custom function.
1 2 3 4 5 |
... # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool _ = pool.starmap_async(task, items, callback=custom_callback) |
The main thread will then close the ThreadPool and wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the thread pool pool.close() # wait for all tasks to complete and threads to close pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of parallel starmap_async() with the thread pool and a callback function from random import random from time import sleep from multiprocessing.pool import ThreadPool # custom callback function def custom_callback(result): print(f'Callback got values: {result}') # task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool _ = pool.starmap_async(task, items, callback=custom_callback) # close the thread pool pool.close() # wait for all tasks to complete and threads to close pool.join() |
Running the example first creates and configures the ThreadPool.
The starmap_async() method is then called for the range and a return callback function. This issues ten calls to the task() function, one for each tuple in the prepared list.
The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the ThreadPool close.
Each call to the task function reports a message, blocks for a moment, then returns a tuple of values.
When all task() functions are finished, the callback function is called, provided with the iterable of return values.
The iterable is printed directly, showing all return values at once.
Finally, the worker threads are closed and the main thread continues on.
1 2 3 4 5 6 7 8 9 10 11 |
Task 0 executing with 0.5406472925162025 Task 1 executing with 0.5420424053994809 Task 2 executing with 0.39305810337917957 Task 3 executing with 0.6658513262866549 Task 4 executing with 0.5646449587734699 Task 5 executing with 0.37607422469743823 Task 6 executing with 0.6573841290472451 Task 7 executing with 0.1683366712687585 Task 8 executing with 0.45097894963243157 Task 9 executing with 0.40005589810161 Callback got values: [(0, 0.5406472925162025), (1, 0.5420424053994809), (2, 0.39305810337917957), (3, 0.6658513262866549), (4, 0.5646449587734699), (5, 0.37607422469743823), (6, 0.6573841290472451), (7, 0.1683366712687585), (8, 0.45097894963243157), (9, 0.40005589810161)] |
Next, let’s look at an example of issuing tasks to the pool with an error callback function.
Example of starmap_async() with an Error Callback Function
We can issue tasks to the ThreadPool that may raise an unhandled exception and specify an error callback function to handle the exception.
This can be achieved via the “error_callback” argument.
In this example, we can update the above example so that the task() function raises an exception for one specific argument. We can then define a function to handle the raised example, in this case to simply report the details of the exception.
Firstly, we can define the function to call to handle an exception raised by the function.
The custom_error_callback() below implements this, taking one argument which is the exception raised by the target task function.
1 2 3 |
# custom error callback function def custom_error_callback(error): print(f'Got an error: {error}') |
Next, we can update the task() function so that it raises an exception conditionally for one argument, e.g. an identifier of 5.
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a worker thread def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return (identifier, value) |
Finally, we can update the call to starmap_async() to specify the error callback function via the “custom_error_callback” argument, giving the name of our custom function.
1 2 3 4 5 |
... # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool _ = pool.starmap_async(task, items, error_callback=custom_error_callback) |
The main thread then closes the ThreadPool and waits for all tasks to complete.
1 2 3 4 5 |
... # close the thread pool pool.close() # wait for all tasks to complete and threads to close pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of parallel starmap_async() with the thread pool and an error callback function from random import random from time import sleep from multiprocessing.pool import ThreadPool # custom error callback function def custom_error_callback(error): print(f'Got an error: {error}') # task executed in a worker thread def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool _ = pool.starmap_async(task, items, error_callback=custom_error_callback) # close the thread pool pool.close() # wait for all tasks to complete and threads to close pool.join() |
Running the example first creates and configures the ThreadPool.
The starmap_async() method is then called for the list of tuples and an error callback function. This issues ten calls to the task() function, one for each tuple in the list.
An AsyncResult object is returned immediately and is ignored, and the main thread is free to continue on.
The main thread then closes the ThreadPool and blocks waiting for all issued tasks to complete.
Each call to the task function reports a message, then blocks for a moment. The task that receives the identifier value of 5 as an argument raises an exception.
All tasks finish executing and the exception callback function is called with the raised exception, reporting a message.
All worker threads are closed and the main thread is free to continue on.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.6366938649029978 Task 1 executing with 0.09712339475762755 Task 2 executing with 0.5857090116161293 Task 3 executing with 0.5231443454111913 Task 4 executing with 0.8838321018195835 Task 6 executing with 0.8033055587541071 Task 7 executing with 0.5581325851768317 Task 8 executing with 0.8372816966266331 Task 9 executing with 0.6512660897160109 Got an error: Something bad happened |
Next, let’s look at an example of issuing tasks to the pool and accessing the results, where one task may raise an exception.
Example of starmap_async() with an Exception
As we saw in the previous example, it is possible for the target function to raise an exception.
If this occurs in just one call to the target function, it will prevent all return values from being accessed.
We can demonstrate this with a worked example.
Firstly, we can update the task() function to conditionally raise an exception, in this case if the identifier argument to the function is equal to five, e.g. the same as in the previous example.
1 2 3 4 |
... # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') |
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a worker thread def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return (identifier, value) |
We can then call starmap_async() to call the task() function for each tuple in the list.
1 2 3 4 5 |
... # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool result = pool.starmap_async(task, items) |
We can then get the iterable of return values.
1 2 |
... values = result.get() |
This may raise an exception if one of the calls failed, so we must protect the call with a try-except pattern.
1 2 3 4 5 6 |
... # get the return values try: values = result.get() except Exception as e: print(f'Failed with: {e}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of parallel starmap_async() with the thread pool and handle an exception from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to thread pool result = pool.starmap_async(task, items) # get the return values try: values = result.get() except Exception as e: print(f'Failed with: {e}') |
Running the example first creates and configures the ThreadPool.
The starmap_async() method is then called for each tuple in the list. This issues 10 calls to the task() function.
An AsyncResult object is returned immediately, and the main thread is free to continue on.
The main thread then blocks, attempting to get the iterable of return values from the AsyncResult object.
Each call to the task function reports a message, then blocks for a moment, then returns a tuple. The task that receives the value of 5 as an argument raises an exception.
All tasks complete.
The exception is then re-raised in the main thread, handled and a message is reported. The iterable cannot be looped and no return values are reported.
The ThreadPool is then closed automatically by the context manager.
This highlights that a failure in one task can prevent the results from all tasks from being accessed.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.9289974360593543 Task 1 executing with 0.6024272054114548 Task 2 executing with 0.9704708709296377 Task 3 executing with 0.5135459054931496 Task 4 executing with 0.24824251718111556 Task 6 executing with 0.33115364877408504 Task 7 executing with 0.06259782452746365 Task 8 executing with 0.6044700712307999 Task 9 executing with 0.740956607932948 Failed with: Something bad happened |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to issue tasks asynchronously to the ThreadPool that take multiple arguments in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Georg Bommeli on Unsplash
Do you have any questions?