ThreadPool Does Not Support terminate() in Python
You cannot terminate worker threads in the ThreadPool class.
In this tutorial you will discover the effect of the terminate() method on the ThreadPool in Python.
Let's get started.
What is the ThreadPool
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
-- multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
What is terminate()
Processes in Python can be terminated.
This can be achieved via the terminate() method on the multiprocessing.Process class.
For example:
...
# terminate a process
process.terminate()
Calling terminate will raise a SIGTERM signal in the target process which if not handled by the process will result in the process terminating (nearly) immediately.
terminate(): Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used. Note that exit handlers and finally clauses, etc., will not be executed.
-- multiprocessing — Process-based parallelism
Similarly the multiprocessing pool of worker processes can be terminated by calling the terminate() method on the multiprocessing.Pool class.
For example:
...
# terminate the process pool
pool.terminate()
This will call the terminate() method on the worker processes in the pool, even if they are currently executing a task.
terminate(): Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected terminate() will be called immediately.
-- multiprocessing — Process-based parallelism
The terminate() method on the Pool class is also called automatically when using the Pool via the context manager interface.
For example:
...
# create a pool via the context manager
with Pool() as pool:
#...
# terminate() is called automatically
The multiprocessing.pool.ThreadPool class extends the Pool class and has a terminate() method that can be called.
For example:
...
# terminate the thread pool
pool.terminate()
There's just one problem, it does nothing.
Threads Cannot Be Terminated Directly
Threads cannot be terminated.
The threading.Thread class does not have a terminate() method.
As such, we cannot directly terminate or stop a thread in Python.
Instead, we must use indirect methods, such as having the thread check a variable to choose to stop.
You can learn more about how to gracefully stop a thread in the tutorial:
This has an impact on the ThreadPool class and calling the terminate() method.
ThreadPool Does Not Support Terminate
Because threads do not support the terminate() method, the ThreadPool terminate() method does not work as expected.
At the latest at the time of writing in Python version 3.10 and lower.
As mentioned, the terminate method exists and can be called.
We can learn more if we check the source code for the Pool class which the ThreadPool class extends.
For example, the snippet below is called when the terminate() method is called directly or indirectly via the context manager interface:
...
# Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
util.debug('terminating workers')
for p in pool:
if p.exitcode is None:
p.terminate()
It checks that the unit of concurrency, a process or thread, supports the terminate() method, and if so, calls it.
Therefore, when we call terminate() in a ThreadPool object, it does not terminate the threads.
Instead, it closes the threads that are not running, and waits for the executing threads to complete.
Calling terminate() on the ThreadPool has the same effect as calling the close() method.
close(): Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
-- multiprocessing — Process-based parallelism
This is not a bug per se, it is just an undocumented feature.
In fact, the API documentation for the ThreadPool class at the time of writing is practically non-existent. Which is why I am writing all of these tutorials.
We can demonstrate this with a worked example.
Example of ThreadPool Not Terminating
In this example we can demonstrate that calling terminate() on a ThreadPool does not terminate tasks that are running.
In this example we will first create a ThreadPool. We will then issue a task that blocks for a long time.
The main process will then report all the threads that are running in the process, e.g. all the workers and all the helpers.
It will then terminate the ThreadPool by calling the terminate() method. It then reports all the running threads in the process again, to confirm non-running threads were closed and running threads are still running.
This will highlight that terminate() does not have the desired effect and operates like calling close().
Firstly, let's define a task that blocks for 10 seconds then reports a message when it is done.
This is helpful because if the task is not terminated and is instead closed, we will see the message. Whereas if the task is truly terminated, we will not see the message
The task() function below implements this.
# task function executed in a new thread
def task():
# block for a moment
sleep(10)
# display a message
print('This is coming from another thread')
Next, in the main thread, we will create a ThreadPool.
...
# create a thread pool
pool = ThreadPool()
We will then issue a single task asynchronously into the pool, in this case via the apply_async() method.
...
# execute the task asynchronously
_ = pool.apply_async(task)
The main thread will then report all of the threads running in the process.
This is for reference after we call terminate() so we can see what threads were stopped and what threads remain running.
...
# report all threads
print('main reporting all threads:')
for thread in threading.enumerate():
print(f'\t{thread}')
Next, the main thread will wait a moment, then terminate the ThreadPool by calling the terminate() method.
...
print('Main is waiting a moment...')
sleep(2)
# terminate the pool
print('Main is terminating the pool')
pool.terminate()
The main thread then waits a moment more, then reports all of the threads still running in the process, after terminate() was called.
We expect that most workers and helper thread have closed and that the single worker executing our task remains running.
...
# wait a moment
print('Main is waiting a moment...')
sleep(1)
# check if the pool is terminated
print('Main reporting all threads:')
for thread in threading.enumerate():
print(f'\t{thread}')
Finally, the main thread joins the ThreadPool and waits for it to close completely. That is, the main thread blocks until all workers have closed.
...
# main joining the thread pool
print('Main joining the pool...')
pool.join()
print('Main done.')
You can learn more about joining the ThreadPool in the tutorial:
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of thread workers not being terminated
from time import sleep
from multiprocessing.pool import ThreadPool
import threading
# task function executed in a new thread
def task():
# block for a moment
sleep(10)
# display a message
print('This is coming from another thread')
# protect the entry point
if __name__ == '__main__':
# create a thread pool
pool = ThreadPool()
# execute the task asynchronously
_ = pool.apply_async(task)
# report all threads
print('main reporting all threads:')
for thread in threading.enumerate():
print(f'\t{thread}')
# wait a moment
print('Main is waiting a moment...')
sleep(2)
# terminate the pool
print('Main is terminating the pool')
pool.terminate()
# wait a moment
print('Main is waiting a moment...')
sleep(1)
# check if the pool is terminated
print('Main reporting all threads:')
for thread in threading.enumerate():
print(f'\t{thread}')
# main joining the thread pool
print('Main joining the pool...')
pool.join()
print('Main done.')
Running the example first starts the ThreadPool and issues the task.
The task starts running in a worker within the ThreadPool and blocks for 10 seconds.
The main thread reports all threads running in the process.
In this case, we can see the main thread, 8 worker threads, and 3 helper threads.
Note, the number of worker threads will differ depending on the number of logical CPU cores you have in your system.
The main thread then waits a moment, then terminates the ThreadPool and waits a moment more.
It then reports all running threads in the process.
This time we can only see the main thread and one worker thread. This is the worker thread executing our task.
This result highlights that the terminate() method indeed does not terminate worker threads that are running and instead only closes workers and helper threads that are not running.
Finally the main thread joins the ThreadPool and the running task completes normally, reporting its message.
main reporting all threads:
<_MainThread(MainThread, started 4460346880)>
<DummyProcess(Thread-1, started daemon 123145354792960)>
<DummyProcess(Thread-2, started daemon 123145371582464)>
<DummyProcess(Thread-3, started daemon 123145388371968)>
<DummyProcess(Thread-4, started daemon 123145405161472)>
<DummyProcess(Thread-5, started daemon 123145421950976)>
<DummyProcess(Thread-6, started daemon 123145438740480)>
<DummyProcess(Thread-7, started daemon 123145455529984)>
<DummyProcess(Thread-8, started daemon 123145472319488)>
<Thread(Thread-9, started daemon 123145489108992)>
<Thread(Thread-10, started daemon 123145505898496)>
<Thread(Thread-11, started daemon 123145522688000)>
Main is waiting a moment...
Main is terminating the pool
Main is waiting a moment...
Main reporting all threads:
<_MainThread(MainThread, started 4460346880)>
<DummyProcess(Thread-1, started daemon 123145354792960)>
Main joining the pool...
This is coming from another thread
Main done.
Takeaways
You now know that terminate() does not terminate workers in the ThreadPool in Python.
If you enjoyed this tutorial, you will love my book: Python ThreadPool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.