Last Updated on October 29, 2022
You can issue follow-up tasks with the ThreadPool either manually by checking the results of tasks or automatically using a result callback function.
In this tutorial, you will discover how to execute follow-up tasks to the ThreadPool in Python.
Let’s get started.
Need to Issue Follow-Up Tasks to the ThreadPool
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
When using the ThreadPool, we may need to execute follow-up tasks. This may be for many reasons, such as based on the results of tasks already issued to the ThreadPool.
How can we execute follow-up tasks in the ThreadPool?
Run loops using all CPUs, download your FREE book to learn how.
How to Issue Follow-Up Tasks to the ThreadPool
We can execute follow-up tasks in the ThreadPool.
There are two main approaches we can use, they are:
- Manually issue follow-up tasks.
- Automatically issue follow-up tasks via a callback.
Let’s take a closer look at each approach to executing a follow-up task.
Manually Execute a Follow-Up Task
We can manually issue a follow-up task based on the results of a first-round task.
For example, we may issue a task asynchronously using the apply_async() function that returns an AsyncResult.
1 2 3 |
... # issue a task result = pool.apply_async(...) |
We can then get the result of the issued task, once available, and conditionally issue a follow-up task.
For example:
1 2 3 4 5 |
... # check the result of an issued task if result.get() > 1.0: # issue a follow-up task pool.apply_async(...) |
Automatically Execute a Follow-Up Task with a Callback
We can automatically issue follow-up tasks to the ThreadPool.
This can be achieved by configuring issued tasks to have a result callback function.
The callback function is a custom function that takes the result of the function call used to issue tasks, e.g. a single return value or an iterator of return values if multiple tasks are issued.
The callback function is executed in the main thread of the main thread.
If the ThreadPool is created and used within the main thread, then it may be available as a global variable to the result callback function.
As such, we can directly issue follow-up tasks from the callback function.
For example:
1 2 3 4 5 6 |
# result callback function def result_callback(result): # check the result of an issued task if result.get() > 1.0: # issue a follow-up task pool.apply_async(...) |
The callback function can be specified when issuing tasks in the main thread via the “callback” argument.
For example:
1 2 3 |
... # issue a task with a result callback result = pool.apply_async(..., callback=result_callback) |
Now that we know how to issue follow-up tasks to the ThreadPool, let’s look at some worked examples.
Example of Manually Issuing Follow-Up Tasks to the ThreadPool
We can explore how to manually issue follow-up tasks to the ThreadPool.
In this example, we will define a task that takes an integer argument, generates a random number, blocks for a moment, reports the value, then returns the integer argument and generated value. The main thread will issue many of these tasks asynchronously and then check the result. If the result is above a threshold, a follow-up task will be issued that will do much the same as the first task, it will generate a random number, block for a moment and report the values.
Firstly, we can define the function to execute for the follow-up or second-round task.
The function takes an integer identifier for the task and a floating point value generated by the first round task.
The function then generates a random number between 0 and 1 and then blocks for this many seconds to simulate computational effort. It then reports the value and returns a tuple of the two arguments as well as the generated value.
The task2() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker thread def task2(identifier, result): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>>{identifier} with {result}, generated {value}') # return result return (identifier, result, value) |
Next, we can define the function to execute first, the first-round task.
This function takes an integer identifier for the task. It then generates a random value between 0 and 1 and blocks for a fraction of a second. It then reports the value and returns a tuple of the integer argument and the generated value.
The task1() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker thread def task1(identifier): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>{identifier} generated {value}') # return result return (identifier, value) |
We then define a ThreadPool with the default number of worker threads. In this case, we use the context manager interface to ensure the ThreadPool closes automatically once we are finished with it.
1 2 3 4 |
... # create and configure the thread pool with ThreadPool() as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We can then issue 10 calls to task1() asynchronously with integer values from 0 to 9. We will issue the tasks using the imap_unordered() function as it will return results as tasks are completed out of order.
We will iterate the results of task1() directly in a for-loop.
1 2 3 4 |
... # issue tasks to the thread pool and handle results for i,v in pool.imap_unordered(task1, range(10)): # ... |
You can learn more about the imap_unordered() function in the tutorial:
For each result from the first-round task, we will check if the generated value is above 0.5, and if so, we will issue a single follow-up task using the apply_async() function to call task2().
1 2 3 |
... # issue a follow-up task _ = pool.apply_async(task2, args=(i, v)) |
You can learn more about the apply_async() function in the tutorial:
Once the results from all first-round tasks have been handled and any follow-up tasks issued, we will close the ThreadPool to prevent any further tasks from being issued, then join the pool to wait for all issued tasks to be complete.
1 2 3 4 5 |
... # close the pool pool.close() # wait for all issued tasks to complete pool.join() |
You can learn more about joining the ThreadPool in the tutorial:
Finally, an “all done” message is reported once all worker threads are closed.
1 2 3 |
... # all done print('All done.') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# SuperFastPython.com # example of manually issuing follow-up tasks to the thread pool from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task2(identifier, result): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>>{identifier} with {result}, generated {value}') # return result return (identifier, result, value) # task executed in a worker thread def task1(identifier): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>{identifier} generated {value}') # return result return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool and handle results for i,v in pool.imap_unordered(task1, range(10)): # check result if v > 0.5: # issue a follow-up task _ = pool.apply_async(task2, args=(i, v)) # close the pool pool.close() # wait for all issued tasks to complete pool.join() # all done print('All done.') |
Running the example first creates and starts the ThreadPool with a default configuration.
Next, 10 calls to task1() are issued as tasks to the ThreadPool, and the results of the tasks are handled in the order that the tasks are completed.
Each first-round task generates a random number, blocks, reports a message, and returns a tuple.
The main thread receives results from first-round tasks as they are completed and checks if the generated number was above 0.5. If so, it issues a follow-up task to call task2() asynchronously.
The second-round tasks generate another random number, report the value, and return a tuple. The return results from this task are not considered.
Once the results from all first-round tasks are handled, the main thread closes the pool and waits for all remaining tasks to complete, e.g. any remaining task2() tasks.
All tasks are completed, and the main thread unblocks and continues on, terminating the application.
Note, that the results will differ each time the program is run given the use of random numbers. Try running the example a few times.
This highlights how we can manually issue follow-up tasks to the ThreadPool from the main thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
>6 generated 0.02968337811151256 >8 generated 0.2074241065409741 >5 generated 0.266737147995011 >7 generated 0.37003972461307033 >0 generated 0.4234776827040716 >4 generated 0.5309317967257969 >9 generated 0.45219036344603725 >1 generated 0.8073731082312345 >3 generated 0.868294249111384 >2 generated 0.9181818442726075 >>4 with 0.5309317967257969, generated 0.49640855564496544 >>2 with 0.9181818442726075, generated 0.55859560713572 >>1 with 0.8073731082312345, generated 0.8500396311572717 >>3 with 0.868294249111384, generated 0.9677659607712451 All done. |
Next, let’s look at how we might issue follow-up tasks automatically using a callback function.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Example of Issuing Follow-Up Tasks From a Callback
We can explore how to issue follow-up tasks automatically using a callback function.
This can be achieved by updating the previous example to define a callback function that handles the results from first-round tasks and issues follow-tasks to the ThreadPool. The main thread is only responsible for issuing the first-round tasks.
This makes the code much cleaner to read and manage.
Importantly, this is only possible because the callback function is executed in the main thread, which means it has access to the ThreadPool directly as a global variable.
Firstly, we can define a callback function. This function is called only once all first-round tasks are completed and are passed an iterable of all results from the first-round tasks.
This is a downside of using a callback. Specifically, it is not called until all tasks are completed, compared to the previous approach of issuing follow-up tasks manually as first-round tasks are completed.
The callback function iterates over the return values from each first-round task, checks the floating point value, and issues a follow-up task call to task2() as needed.
The result_callback() function below implements this.
1 2 3 4 5 6 7 8 |
# handle results of the task (in the main thread) def result_callback(result_iterator): # unpack result for i,v in result_iterator: # check result if v > 0.5: # issue a follow-up task _ = pool.apply_async(task2, args=(i, v)) |
Next, in the main thread, we can issue all 10 first-round tasks asynchronously using the map_async() function and specify the result callback function.
This returns an AsyncResult object.
1 2 3 |
... # issue tasks asynchronously to the thread pool result = pool.map_async(task1, range(10), callback=result_callback) |
You can learn more about the map_async() function in the tutorial:
We can then wait for all first-round tasks to be completed by waiting on the AsyncResult object.
1 2 3 |
... # wait for issued tasks to complete result.wait() |
After we know that all first-round tasks are completed and any follow-up tasks have been issued, we can then close the pool and wait for all second-round tasks to be completed.
1 2 3 4 5 |
... # close the pool pool.close() # wait for all issued tasks to complete pool.join() |
If we did not wait for the first-round tasks to complete first and closed the pool immediately, then we would not be able to issue any follow-up tasks, and an error would be raised.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# SuperFastPython.com # example of issuing a follow-up task automatically with a result callback from random import random from time import sleep from multiprocessing.pool import ThreadPool # handle results of the task (in the main thread) def result_callback(result_iterator): # unpack result for i,v in result_iterator: # check result if v > 0.5: # issue a follow-up task _ = pool.apply_async(task2, args=(i, v)) # task executed in a worker thread def task2(identifier, result): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>>{identifier} with {result}, generated {value}') # return result return (identifier, result, value) # task executed in a worker thread def task1(identifier): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>{identifier} generated {value}') # return result return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks asynchronously to the thread pool result = pool.map_async(task1, range(10), callback=result_callback) # wait for issued tasks to complete result.wait() # close the pool pool.close() # wait for all issued tasks to complete pool.join() # all done print('All done.') |
Running the example first creates and starts the ThreadPool with a default configuration.
Next, 10 calls to task1() are issued as tasks to the ThreadPool, and an AsyncResult is returned. The main thread then blocks waiting for all first-round tasks to be completed.
Each first-round task generates a random number, blocks, reports a message, and returns a tuple.
Once all first-round tasks finish, the result callback is then called with an iterable of the return values from the first-round tasks. The iterable is traversed and any second-round tasks are issued to the ThreadPool.
The main thread carries on, closing the pool and blocking until all second-round tasks are completed.
The second-round tasks generate another random number, report the value, and return a tuple. The return results from this task are not considered.
All second-round tasks are completed, and the main thread unblocks and continues on, terminating the application.
Note, that the results will differ each time the program is run given the use of random numbers. Try running the example a few times.
This highlights how we can automatically issue follow-up tasks to the ThreadPool from the main thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
>3 generated 0.2387321541129851 >6 generated 0.2905473999983814 >1 generated 0.4747636145164348 >2 generated 0.5917820472473575 >4 generated 0.8272344029491345 >5 generated 0.8663920475565895 >8 generated 0.655281648334691 >0 generated 0.9258262409168762 >7 generated 0.9521778472629924 >9 generated 0.8818478510866756 >>0 with 0.9258262409168762, generated 0.012142433405473163 >>9 with 0.8818478510866756, generated 0.2185535145295543 >>7 with 0.9521778472629924, generated 0.28010278928179344 >>2 with 0.5917820472473575, generated 0.4547263678109016 >>4 with 0.8272344029491345, generated 0.48682374727972644 >>5 with 0.8663920475565895, generated 0.7814307314216037 >>8 with 0.655281648334691, generated 0.823282476450547 All done. |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to execute follow-up tasks to the ThreadPool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Yoshiki 787 on Unsplash
Do you have any questions?