Simple usage of the ThreadPoolExecutor in Python can result in unexpected behavior.
Issuing a task to the thread pool that in turn issues a subsequent task can fail unexpectedly. This is not a forbidden activity, and it is not clear why this happens.
In fact, this is a common “gotcha” when using the ThreadPoolExecutor, where the pool is shutdown while the task is running, but before the second task has been issued, resulting in an exception:
- cannot schedule new futures after shutdown
In this tutorial, you will discover the problem of unintentionally issuing tasks to the ThreadPoolExecutor after it has shutdown.
Let’s get started.
ThreadPoolExecutor Dies While Task is Running
I came across an interesting situation recently that is worth exploring.
I was developing a program that made use of the ThreadPoolExecutor. A task was issued that in turn issued many tasks to the same instance of the ThreadPoolExecutor.
The task would start and then the program would abruptly end. Subtasks were not submitted to the ThreadPoolExecutor.
This should not be possible because shutting down the ThreadPoolExecutor normally blocks until all issued and running tasks are done.
Therefore an investigation was needed!
Run loops using all CPUs, download your FREE book to learn how.
Example of ThreadPoolExecutor Dies While Task is Running
We can develop the simplest possible example that makes use of the ThreadPoolExecutor and fails in the way described above, namely, the task appears to die while processing.
A ThreadPoolExecutor is created using the context manager interface.
You can learn more about this interface in the tutorial:
Next, a task is issued to the ThreadPoolExecutor, then this task in turn issues at least one task to the ThreadPoolExecutor.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of the ThreadPoolExecutor with context manager not waiting for task to complete from concurrent.futures import ThreadPoolExecutor # task to execute in the thread pool def task(tpe): # report a message print('>Hello from task') # issue a task to the same thread pool _ = tpe.submit(print, '>>Hello from subtask') # report a final message print('>Task is done.') # protect the entry point if __name__ == '__main__': # report a message print('Starting program...') # create the thread pool using the context manager interface with ThreadPoolExecutor() as tpe: # issue task _ = tpe.submit(task, tpe) # block and wait for all tasks to complete... # report a final message print('All done') |
Running the program first reports a message from the main thread. This works as expected.
Next, a ThreadPoolExecutor is created with the default number of workers using the context manager interface.
The task() function is then issued to the ThreadPoolExecutor.
The main thread then exits the context manager for the ThreadPoolExecutor which calls shutdown() with default arguments. This causes the main thread to block until all issued tasks are complete, e.g. the task() function, and does not cancel issued but not executing tasks.
The task() function executes, first reporting a message. It then attempts to issue a subtask to the same ThreadPoolExecutor, in this case, a call to print() with a string argument.
This message is not seen.
Instead, it seems the task() dies abruptly.
The program ends with an “All done” message reported by the main thread.
1 2 3 |
Starting program... >Hello from task All done |
What is happening here?
Debugging the Example And Quick Fix
Initially, I was stumped.
My first thought was that I was not saving a reference to the Future object, so the garbage collector was destroying the task. Something akin to unsaved references coroutines in asyncio.
This was mad of course. Nevertheless, I saved references to my Future objects.
For example:
1 2 3 |
... # issue task future1 = tpe.submit(task, tpe) |
And:
1 2 3 |
... # issue a task to the same thread pool future2 = tpe.submit(print, '>>Hello from subtask') |
This had no effect.
I then thought it was something to do with the context manager interface.
I swapped it out for manually creating and shutting down the ThreadPoolExecutor.
For example:
1 2 3 4 5 6 7 |
... # create the thread pool using the context manager interface tpe = ThreadPoolExecutor() # issue task _ = tpe.submit(task, tpe) # shutdown the pool tpe.shutdown() |
No effect.
I also tried removing the explicit call to shutdown(), just to see what would happen.
This too had no effect.
I had to get on with things, so I explicitly waited on the top-level task within the context manager via the concurrent.futures.wait() module function.
For example:
1 2 3 |
... # block on the top-level task _ = wait([future1]) |
You can learn more about waiting for all tasks to complete in the tutorial:
The complete example with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of the ThreadPoolExecutor with explicit wait from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait # task to execute in the thread pool def task(tpe): # report a message print('>Hello from task') # issue a task to the same thread pool future2 = tpe.submit(print, '>>Hello from subtask') # report a final message print('>Task is done.') # protect the entry point if __name__ == '__main__': # report a message print('Starting program...') # create the thread pool using the context manager interface with ThreadPoolExecutor() as tpe: # issue task future1 = tpe.submit(task, tpe) # block on the top-level task _ = wait([future1]) # block and wait for all tasks to complete... # report a final message print('All done') |
This had the desired effect.
The subtask was issued, and reported its message as intended.
The task then reported its final message and the program ended normally.
1 2 3 4 5 |
Starting program... >Hello from task >>Hello from subtask >Task is done. All done |
I did not think anymore about it until this morning when it began to bug me.
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Identifying the Cause of the Fault
Why is the top-level task dying in the ThreadPoolExecutor?
I tried removing the interaction with the ThreadPoolExecutor from within the task. I replaced it with all manner of other activities, such as loops, sleeps, etc. to no effect. The program always worked as intended.
The cause was the interaction with the ThreadPoolExecutor from within an executing task.
I remember seeing something like this before. My guess was that issuing a task from the task was raising an exception somehow.
I decided to wrap the entire content of the task() function with exception handling and reported any exceptions raised.
This is the approach I recommend in the tutorial:
For example:
1 2 3 4 5 6 7 8 9 10 11 |
# task to execute in the thread pool def task(tpe): try: # report a message print('>Hello from task') # issue a task to the same thread pool _ = tpe.submit(print, '>>Hello from subtask') # report a final message print('>Task is done.') except Exception as e: print(e) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of the ThreadPoolExecutor with context manager with exception handling from concurrent.futures import ThreadPoolExecutor # task to execute in the thread pool def task(tpe): try: # report a message print('>Hello from task') # issue a task to the same thread pool _ = tpe.submit(print, '>>Hello from subtask') # report a final message print('>Task is done.') except Exception as e: print(e) # protect the entry point if __name__ == '__main__': # report a message print('Starting program...') # create the thread pool using the context manager interface with ThreadPoolExecutor() as tpe: # issue task _ = tpe.submit(task, tpe) # block and wait for all tasks to complete... # report a final message print('All done') |
Running the example reported the message from the main thread and the task as per normal.
As soon as we issue the subtask from task(), an exception is raised, caught and the message reported: cannot schedule new futures after shutdown.
The exception suggests that we cannot issue a task to the ThreadPoolExecutor after it has been shutdown.
Of course!
1 2 3 4 |
Starting program... >Hello from task cannot schedule new futures after shutdown All done |
This is a classic gotcha.
I have seen it before when attempting to reissue tasks to the ThreadPoolExecutor that have failed, in the tutorial:
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Why Submitting a Subtask from Task to the ThreadPoolExector Fails
In the examples above, submitting a subtask to the ThreadPoolExecutor from a task running in the ThreadPoolExecutor fails with an Exception.
The reason is that the ThreadPoolExecutor is shutdown while the task() function is running. When a ThreadPoolExecutor is shutdown it cannot accept new tasks via the submit() or map() methods. It raises an exception with the message:
- cannot schedule new futures after shutdown
This is also why the quick fix described above worked.
It caused the main thread to block on the top-level task until it was complete, before closing the ThreadPoolExecutor by exiting the context manager. This allowed the subtask to be submitted to the ThreadPoolExecutor before being shutdown.
Recommended Solution
The general solution is to not call shutdown() on the ThreadPoolExecutor directly or indirectly via the context manager interface until after all tasks have been issued via calls to submit() or map().
If issued task themselves issue tasks, then they must form an acyclic graph (e.g. a tree), this is in order to avoid a deadlock.
For example:
As such, waiting on the root task or tasks in the tree of issued tasks would be sufficient to avoid the fault.
This was the solution we saw above, where the main thread waited on the primary task directly via the wait() method.
My recommendation to avoid the problem entirely is to ensure tasks are only submitted to the ThreadPoolExecutor from one context, e.g. the point in the program where the ThreadPoolExecutor is created and shutdown.
This might mean using a queue or similar data structure to connect task functions or done callback functions to the main thread to signal what and when subtasks should be issued.
In fact, a more robust design might be to have a separate daemon thread whose job is to manage the ThreadPoolExecutor and spend its time reading from the work queue and issuing tasks into the ThreadPoolExecutor. This would be a good point to add task logging and a way to ensure tasks are only issued from one point.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to avoid the problem where we cannot schedule new futures after shutdown in the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Alejandro Lopez on Unsplash
Do you have any questions?