How to Submit a Follow-up Task to a ThreadPoolExecutor in Python
You can submit a follow-up task to a ThreadPoolExecutor by calling the submit() function.
In this tutorial, you will discover how to submit follow-up tasks to a thread pool in Python.
Let's get started.
Need to Submit a Follow-Up Task to a ThreadPoolExecutor
The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
You can also submit tasks by calling the map() function and specifying the name of the function to execute and the iterable of items to which your function will be applied.
Some tasks require that a second task be executed that makes use of the result from the first task in some way.
We might call this the need to execute a follow-up task for each task that is submitted, which might be conditional on the result in some way.
How can you submit a follow-up task to a ThreadPoolExecutor?
How to Submit Follow-Up Tasks
You can submit a follow-up task to a thread pool in Python by calling submit() on the thread pool while processing the results of your first-round tasks.
The need for a subsequent or follow-up task may be determined within another task.
You may be tempted to submit a child or follow-up task directly from the first round task. This would require that the task needs access to the thread pool to be able to submit a new task.
Although possible, this would break encapsulation, requiring a component (task) to have knowledge of the executor of tasks (thread pool).
An alternative approach is to return values from tasks that indicate whether a follow-up task is required, and if so, any details required for the follow-up.
This allows the thread that is processing the outcomes from tasks to interpret the results of each task and trigger a follow-up task as needed.
As such, follow-up tasks are submitted to the ThreadPoolExecutor just like first-round tasks, specifically by calling the submit() function.
This could be achieved when processing the results from calling map(); for example:
...
# process the results from first-round tasks
for result in executor.map(work, items):
# check if the task requires a follow-up by calling a custom function
if requires_followup(result):
# submit a follow-up task
future = executor.submit(work)
# ...
This could also be achieved when processing Future objects directly returned from calling submit(); for example:
...
# process the results from first-round tasks
for future in as_completed(futures):
# get the result
result = future.result()
# check if the task requires a follow-up by calling a custom function
if requires_followup(result):
# submit a follow-up task
future2 = executor.submit(work)
# ...
Now that we know how to submit a follow-up task to a ThreadPoolExecutor, let's look at a worked example.
Example of Submitting Follow-Up Tasks
Let's develop an example of submitting a follow-up task to a ThreadPoolExecutor.
First, we need to define a mock task that sleeps for a moment and returns a value.
Our task will generate a random value between 0 and 1, sleep for that fraction of a second, then return the value for interpretation to see if a subsequent task is required.
# mock test that works for moment
def task1():
value = random()
sleep(value)
print(f'Task 1: {value}')
return value
Next, we can define a follow-up task to be called under some condition based on the result of the first round task.
This task takes the value returned from the first round task, generates a second random value, and sleeps for that fraction of a second before reporting both values and returning the newly generated value.
# mock test that works for moment
def task2(value1):
value2 = random()
sleep(value2)
print(f'Task 2: value1={value1}, value2={value2}')
return value2
Next, we can start a thread pool with five worker threads using the context manager, then call submit() with ten function calls to task1().
...
# start the thread pool
with ThreadPoolExecutor(5) as executor:
# send in the first tasks
futures1 = [executor.submit(task1) for _ in range(10)]
Recall that each call to submit() returns a Future object.
We store all Future objects in a list so that we can get the results later.
We will use the as_complete() module function to process the tasks, returning each Future as its task is completed.
...
# process results in the order they are completed
for future1 in as_completed(futures1):
# ...
For each future, we will retrieve the result and check the return value from the task.
If the value is above 0.5 (recall that task1() will return random values between 0 and 1), then we will submit a second task.
This is achieved by calling submit() with the name of the second task function, which is task2(), and passing in the value returned from the first task.
...
# get the result
result = future1.result()
# check if we should trigger a follow-up task
if result > 0.5:
_ = executor.submit(task2, result)
We can then let the context manager close the thread pool automatically for us, which will wait for all second round tasks in the thread pool to complete.
Tying this together, the complete example of submitting follow-up tasks to the ThreadPoolExecutor is listed below.
# SuperFastPython.com
# example of submitting follow-up tasks to the thread pool
from time import sleep
from random import random
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
# mock test that works for moment
def task1():
value = random()
sleep(value)
print(f'Task 1: {value}')
return value
# mock test that works for moment
def task2(value1):
value2 = random()
sleep(value2)
print(f'Task 2: value1={value1}, value2={value2}')
return value2
# start the thread pool
with ThreadPoolExecutor(5) as executor:
# send in the first tasks
futures1 = [executor.submit(task1) for _ in range(10)]
# process results in the order they are completed
for future1 in as_completed(futures1):
# get the result
result = future1.result()
# check if we should trigger a follow-up task
if result > 0.5:
_ = executor.submit(task2, result)
# wait for all follow-up tasks to complete
Running the example creates the thread pool and submits ten calls to task1() to the thread pool.
We then process results as they become available and submit follow-up calls to task2().
In this case, we can see that five of the ten calls to task1() resulted in a required follow-up call to task2() to be submitted into the thread pool.
Your results may vary given the use of random numbers.
Task 1: 0.09934212493527206
Task 1: 0.1338109803875105
Task 1: 0.4524962415643826
Task 1: 0.46384185278262724
Task 1: 0.07711961886475038
Task 1: 0.6396901127845064
Task 1: 0.6526351959831038
Task 2: value1=0.6526351959831038, value2=0.20307292716627035
Task 1: 0.9883198729630449
Task 2: value1=0.6396901127845064, value2=0.5545983932302203
Task 1: 0.8226047780912188
Task 1: 0.9248040494130205
Task 2: value1=0.8226047780912188, value2=0.22356055875124803
Task 2: value1=0.9883198729630449, value2=0.44057244287230257
Task 2: value1=0.9248040494130205, value2=0.5921666776161257
Takeaways
You now know how to submit follow-up tasks to a ThreadPoolExecutor.
If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.