Last Updated on September 12, 2022
You can submit a follow-up task to a ThreadPoolExecutor by calling the submit() function.
In this tutorial, you will discover how to submit follow-up tasks to a thread pool in Python.
Let’s get started.
Need to Submit a Follow-Up Task to a ThreadPoolExecutor
The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
You can also submit tasks by calling the map() function and specifying the name of the function to execute and the iterable of items to which your function will be applied.
Some tasks require that a second task be executed that makes use of the result from the first task in some way.
We might call this the need to execute a follow-up task for each task that is submitted, which might be conditional on the result in some way.
How can you submit a follow-up task to a ThreadPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How to Submit Follow-Up Tasks
You can submit a follow-up task to a thread pool in Python by calling submit() on the thread pool while processing the results of your first-round tasks.
The need for a subsequent or follow-up task may be determined within another task.
You may be tempted to submit a child or follow-up task directly from the first round task. This would require that the task needs access to the thread pool to be able to submit a new task.
Although possible, this would break encapsulation, requiring a component (task) to have knowledge of the executor of tasks (thread pool).
An alternative approach is to return values from tasks that indicate whether a follow-up task is required, and if so, any details required for the follow-up.
This allows the thread that is processing the outcomes from tasks to interpret the results of each task and trigger a follow-up task as needed.
As such, follow-up tasks are submitted to the ThreadPoolExecutor just like first-round tasks, specifically by calling the submit() function.
This could be achieved when processing the results from calling map(); for example:
1 2 3 4 5 6 7 8 |
... # process the results from first-round tasks for result in executor.map(work, items): # check if the task requires a follow-up by calling a custom function if requires_followup(result): # submit a follow-up task future = executor.submit(work) # ... |
This could also be achieved when processing Future objects directly returned from calling submit(); for example:
1 2 3 4 5 6 7 8 9 10 |
... # process the results from first-round tasks for future in as_completed(futures): # get the result result = future.result() # check if the task requires a follow-up by calling a custom function if requires_followup(result): # submit a follow-up task future2 = executor.submit(work) # ... |
Now that we know how to submit a follow-up task to a ThreadPoolExecutor, let’s look at a worked example.
Example of Submitting Follow-Up Tasks
Let’s develop an example of submitting a follow-up task to a ThreadPoolExecutor.
First, we need to define a mock task that sleeps for a moment and returns a value.
Our task will generate a random value between 0 and 1, sleep for that fraction of a second, then return the value for interpretation to see if a subsequent task is required.
1 2 3 4 5 6 |
# mock test that works for moment def task1(): value = random() sleep(value) print(f'Task 1: {value}') return value |
Next, we can define a follow-up task to be called under some condition based on the result of the first round task.
This task takes the value returned from the first round task, generates a second random value, and sleeps for that fraction of a second before reporting both values and returning the newly generated value.
1 2 3 4 5 6 |
# mock test that works for moment def task2(value1): value2 = random() sleep(value2) print(f'Task 2: value1={value1}, value2={value2}') return value2 |
Next, we can start a thread pool with five worker threads using the context manager, then call submit() with ten function calls to task1().
1 2 3 4 5 |
... # start the thread pool with ThreadPoolExecutor(5) as executor: # send in the first tasks futures1 = [executor.submit(task1) for _ in range(10)] |
Recall that each call to submit() returns a Future object.
We store all Future objects in a list so that we can get the results later.
We will use the as_complete() module function to process the tasks, returning each Future as its task is completed.
1 2 3 4 |
... # process results in the order they are completed for future1 in as_completed(futures1): # ... |
For each future, we will retrieve the result and check the return value from the task.
If the value is above 0.5 (recall that task1() will return random values between 0 and 1), then we will submit a second task.
This is achieved by calling submit() with the name of the second task function, which is task2(), and passing in the value returned from the first task.
1 2 3 4 5 6 |
... # get the result result = future1.result() # check if we should trigger a follow-up task if result > 0.5: _ = executor.submit(task2, result) |
We can then let the context manager close the thread pool automatically for us, which will wait for all second round tasks in the thread pool to complete.
Tying this together, the complete example of submitting follow-up tasks to the ThreadPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of submitting follow-up tasks to the thread pool from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # mock test that works for moment def task1(): value = random() sleep(value) print(f'Task 1: {value}') return value # mock test that works for moment def task2(value1): value2 = random() sleep(value2) print(f'Task 2: value1={value1}, value2={value2}') return value2 # start the thread pool with ThreadPoolExecutor(5) as executor: # send in the first tasks futures1 = [executor.submit(task1) for _ in range(10)] # process results in the order they are completed for future1 in as_completed(futures1): # get the result result = future1.result() # check if we should trigger a follow-up task if result > 0.5: _ = executor.submit(task2, result) # wait for all follow-up tasks to complete |
Running the example creates the thread pool and submits ten calls to task1() to the thread pool.
We then process results as they become available and submit follow-up calls to task2().
In this case, we can see that five of the ten calls to task1() resulted in a required follow-up call to task2() to be submitted into the thread pool.
Your results may vary given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Task 1: 0.09934212493527206 Task 1: 0.1338109803875105 Task 1: 0.4524962415643826 Task 1: 0.46384185278262724 Task 1: 0.07711961886475038 Task 1: 0.6396901127845064 Task 1: 0.6526351959831038 Task 2: value1=0.6526351959831038, value2=0.20307292716627035 Task 1: 0.9883198729630449 Task 2: value1=0.6396901127845064, value2=0.5545983932302203 Task 1: 0.8226047780912188 Task 1: 0.9248040494130205 Task 2: value1=0.8226047780912188, value2=0.22356055875124803 Task 2: value1=0.9883198729630449, value2=0.44057244287230257 Task 2: value1=0.9248040494130205, value2=0.5921666776161257 |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to submit follow-up tasks to a ThreadPoolExecutor.
Do you have any questions about how to submit follow-up tasks?
Ask your question in the comments below and I will do my best to answer.
Photo by John Gough on Unsplash
Iain Elder says
Thanks for this example. How would you handle the case where the follow-up tasks can also have follow-up tasks and so on? Say I’m querying a tree structure one node at a time and listing the children of each node. Each child would be a follow-up task. The example you give could be adapt to a tree two levels deep. But how would I handle a tree three, four, five levels deep? Is it something that would be better handled by a queue and a thread subclass?
Jason Brownlee says
Good question.
One approach would be to process tasks in batches, e.g. submit tasks and gather results, then process the results as a batch, submitting and gathering again as needed. This happens in a loop and continues until no further tasks are required. This will give you ideas:
https://superfastpython.com/threadpoolexecutor-retry-tasks/
Another approach might be a pipeline where results from the first step may produce tasks for the second step and so on. This may help:
https://superfastpython.com/thread-pipeline/
I’d recommend experimenting a little and see what works well/best for your specific use case.
Let me know how you go.