You can submit new tasks from tasks running in the ThreadPoolExecutor by passing the thread pool as an argument to the task or having the task access the thread pool via a global variable.
In this tutorial, you will discover how to submit new tasks from tasks running in the ThreadPoolExecutor.
Let’s get started.
Need Tasks To Issue New Tasks
The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.
Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.
A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.
For example:
1 2 3 4 5 6 7 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # issue a task future = tpe.submit(task) # the the task result once the task is done result = future.result() |
You can learn more about the ThreadPoolExecutor in the tutorial:
There are many cases where we may need to issue a new task to the ThreadPoolExecutor from within a task already being executed by a task.
This may be a follow-up task that naturally arises out of the first task and can be executed asynchronously.
For example:
It may be a retry of the current task if the current task determines that it has failed,
For example:
Both of these cases can be solved by a separate thread that manages the ThreadPoolExecutor, issues tasks, and checks results to determine whether follow-up or retry tasks are required.
This is a good design, but sometimes we may prefer to have tasks issue new tasks themselves.
How can we issue new tasks to the ThreadPoolExecutor from tasks that are currently executing within the ThreadPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How Tasks Can Submit New Tasks to the ThreadPoolExecutor
There are many ways that tasks can issue their own tasks to the ThreadPoolExecutor.
Some common approaches include:
- Tasks access the ThreadPoolExecutor directly which is passed via an argument.
- Tasks access the ThreadPoolExecutor directly via a global variable.
- Tasks share requests with new tasks with another thread that issues new tasks on their behalf.
The major risk when submitting new tasks to the ThreadPoolExecutor from tasks executing within the same ThreadPoolExecutor is a deadlock.
A deadlock is a concurrency failure mode where one thread waits on another thread forever. A deadlock can occur when tasks issue new tasks if the task that issues the new task waits for the new task or tasks to complete and the new task or tasks are never allowed to execute because all worker threads are occupied.
You can see an example of a deadlock of this kind in the tutorial:
As such, it is generally suggested that tasks should not submit new tasks to the thread pool.
If tasks must submit new tasks to the thread pool, then they must not wait on the task to complete, or on the result directly.
Now that we know how to safely submit new tasks from tasks in the thread pool, let’s look at some worked examples.
Example of Task Submitting a Task via an Argument
We can explore an example of submitting a new task to the ThreadPoolExecutor from a running task by passing in the ThreadPoolExecutor to the running task as an argument.
In this example, we will define a task that takes a ThreadPoolExecutor as an argument, runs for a moment, then submits a second task before terminating. The second task runs as per normal.
Firstly, we can define the second task.
This task reports a message, sleeps for a moment to simulate computational effort, then reports a final message.
The task2() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in the thread pool def task2(): # report a message print('>>Task 2 is running') # block for a moment to simulate work sleep(1) # report a message print('>>Task 2 is done.') |
Next, we can define the first level task.
This task function takes the ThreadPoolExecutor as an argument. It first reports a message, then blocks for a moment to simulate effort. Finally, it submits a new task to the ThreadPoolExecutor, in this case, the task2() function, before reporting a final message.
This task does not wait on the result of the submitted task, in an effort to avoid a deadlock.
The task1() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in the thread pool def task1(tpe): # report a message print('>Task 1 is running') # block for a moment to simulate work sleep(1) # issue a new task _ = tpe.submit(task2) # report a message print('>Task 1 is done.') |
Finally, in the main thread, will create and start a new ThreadPoolExecutor with a default number of workers using the context manager interface.
We then issue the task1() function to the thread pool and wait for it to complete.
1 2 3 4 5 6 7 |
... # create the thread pool with ThreadPoolExecutor() as tpe: # issue a task to the thread pool future = tpe.submit(task1, tpe) # wait for the task to complete _ = future.result() |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of a task submitting a task with the thread pool shared as an argument from concurrent.futures import ThreadPoolExecutor from time import sleep # task executed in the thread pool def task2(): # report a message print('>>Task 2 is running') # block for a moment to simulate work sleep(1) # report a message print('>>Task 2 is done.') # task executed in the thread pool def task1(tpe): # report a message print('>Task 1 is running') # block for a moment to simulate work sleep(1) # issue a new task _ = tpe.submit(task2) # report a message print('>Task 1 is done.') # protect the entry point if __name__ == '__main__': # create the thread pool with ThreadPoolExecutor() as tpe: # issue a task to the thread pool future = tpe.submit(task1, tpe) # wait for the task to complete _ = future.result() |
Running the example first creates and starts the thread pool.
Next, the task1() function is submitted to the thread pool for execution with the ThreadPoolExecutor as an argument and the main thread blocks until the task is complete.
The task1() function executes, reports a message, and sleeps. The task resumes and submits task2() to the thread pool and reports a final message before terminating.
The task2() function is executed by the thread pool. It reports a message and sleeps for a moment. The task then resumes and reports a final message.
This highlights how one task running in the ThreadPoolExecutor can submit a subsequent task to the ThreadPoolExecutor by passing the thread pool as an argument to the running task.
1 2 3 4 |
>Task 1 is running >>Task 2 is running >Task 1 is done. >>Task 2 is done. |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Task Submitting a Task via a Global Variable
We can explore the case of submitting a new task to the ThreadPoolExecutor from a running task by accessing the ThreadPoolExecutor via a global variable.
In this example, we will update the above example so that task1() no longer receives the ThreadPoolExecutor as an argument and instead declares the thread pool as a global variable and accesses it directly.
This can be achieved via the global expression that signals to the Python interpreter that the variable we are accessing, in this case, “tpe“, is a global variable.
1 2 3 4 5 |
... # declare the thread pool as a global variable global tpe # issue a new task _ = tpe.submit(task2) |
The updated version of the task1() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in the thread pool def task1(): # report a message print('>Task 1 is running') # block for a moment to simulate work sleep(1) # declare the thread pool as a global variable global tpe # issue a new task _ = tpe.submit(task2) # report a message print('>Task 1 is done.') |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of a task submitting a task with the thread pool shared as a global variable from concurrent.futures import ThreadPoolExecutor from time import sleep # task executed in the thread pool def task2(): # report a message print('>>Task 2 is running') # block for a moment to simulate work sleep(1) # report a message print('>>Task 2 is done.') # task executed in the thread pool def task1(): # report a message print('>Task 1 is running') # block for a moment to simulate work sleep(1) # declare the thread pool as a global variable global tpe # issue a new task _ = tpe.submit(task2) # report a message print('>Task 1 is done.') # protect the entry point if __name__ == '__main__': # create the thread pool tpe = ThreadPoolExecutor() # issue a task to the thread pool future = tpe.submit(task1) # wait for the task to complete _ = future.result() # shutdown the thread pool tpe.shutdown() |
Running the example first creates and starts the thread pool.
Next, the task1() function is submitted to the thread pool for execution and the main thread blocks until the task is complete.
The task1() function executes, reports a message, and sleeps.
The task resumes and declares the ThreadPoolExecutor as a global variable and uses it to submit task2() to the thread pool before reporting a final message and terminating.
The task2() function is executed by the thread pool. It reports a message and sleeps for a moment. The task then resumes and reports a final message.
This highlights how one task running in the ThreadPoolExecutor can submit a subsequent task to the ThreadPoolExecutor by accessing it as a global variable from within a running task.
1 2 3 4 |
>Task 1 is running >>Task 2 is running >Task 1 is done. >>Task 2 is done. |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to submit new tasks from tasks running in the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Rizki Yulian on Unsplash
Do you have any questions?