Thread Details in the Multiprocessing Pool

July 21, 2022 Python Multiprocessing Pool

You can get the details of the thread used to initialize child worker processes and to execute tasks in the process pool.

Additionally, the process pool uses three helper threads internally to help with the management of the process pool. We can get the details of these threads in the main process that creates the process pool.

In this tutorial you will discover how to get the details of threads used by the process pool in Python.

Let's get started.

Need Details of Threads in the Process Pool

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

-- multiprocessing — Process-based parallelism

We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

When using the process pool, we may need the names of the thread used to initialize and execute tasks in worker processes.

This may be for many reasons, although it is likely to aid in designing the thread-safe handling of data or program state in worker processes or to help with debugging.

How can we get the child worker process names in Python?

How to Get Threads Used in Process Pool

We can get the current thread via the threading.current_thread() function.

This will return a threading.Thread instance that can be used to get the details of the current thread in situations such as:

  1. A worker process in the process pool executing a task.
  2. Initializing a worker process in the process pool.

For example, we may have questions such as:

  1. Do worker processes execute tasks in the main thread or in a worker thread?
  2. Are worker processes initialized using the main thread or in a worker thread?

You can learn more about utility functions for getting threads in the tutorial:

Additionally, the multiprocessing.pool.Pool itself uses worker threads. We can get the details of these threads, to help differentiate them from the threads in the main process.

Let's take a closer look at each one of these aspects in turn.

Thread Used to Execute Task

We can get the details of the current thread executing a task in the process pool.

This can be achieved by calling the threading.current_thread() function to get the threading.Thread instance for the thread executing the task in the child worker process.

...
# get the current thread
thread = current_thread()

For example, we can call this function in a custom function executed by the process pool:

# task executed in a worker process
def task(identifier):
    # get the current thread
    thread = current_thread()
    print(thread)

Thread Used to Initialize Worker

We can get the details of the current thread initializing a task in the process pool.

This can be achieved by calling the threading.current_thread() function to get the threading.Thread instance for the thread executing the function used to initialize in the child worker process.

...
# get the current thread
thread = current_thread()

For example, we can call this function in a custom function executed used to initialize workers in the process pool:

# initialize the worker process
def init_worker():
    # get the current thread
    thread = current_thread()
    print(thread)

We can then configure the process pool to call the custom function when initializing worker processes.

...
# configure the process pool to initialize worker processes
pool = Pool(initializer=init_worker)

You can learn more about how to initialize worker processes in the process pool in the tutorial:

Threads Used to Manage Process Pool

The multiprocessing.pool.Pool class uses worker threads internally to help manage the process pool.

Looking at the source code for the process pool class, we can see that at least three threads are created with responsibilities such as:

  1. Handling the creation of child worker processes.
  2. Handling the dispatching of tasks to worker processes.
  3. Handing the task results from worker processes.

In the main process that creates the process pool, we can get a list of all running threads and inspect the details of the threads associated with the process pool.

We can call the threading.enumerate() function to get a list of all actively running threads. This will return a list of threading.Thread instances, one for each running thread.

For example:

...
# get a list of all running threads
threads = threading.enumerate()

We can get this list of threads after the process pool has been created, and ideally after tasks have already been issued so that all workers and maintenance threads have been started.

Now that we know how to get the details of threads associated with the process pool, let's look at some worked examples.

Example of Getting Thread Used to Execute Task

We can explore how to get the details of the thread used to execute tasks in the process pool.

In this example we will define a custom function to be executed in the process pool. The task will get the current thread and report its details. The main process will create the process pool and issue the task into the process pool.

Firstly, we can define a custom function to execute as a task in the process pool.

THe function will take an argument and then get the threading.Thread instance for the current thread and report the thread name. We could report other details of the thread as well.

The task() function below implements this.

# task executed in a worker process
def task(identifier):
    # get the current thread
    thread = current_thread()
    # report the name of the current thread
    print(f'Worker thread: {thread.name}', flush=True)

You can learn more about the details of a thread in the tutorial:

Next, in the main process we will create the process pool with the default configuration.

We will use the context manager interface to ensure that the process pool is closed automatically once we are finished with it.

...
# create and configure the process pool
with Pool(initializer=init_worker) as pool:
	# ...

You can learn more about the context manager interface in the tutorial:

We will issue a single call to the custom function as a task in the process pool.

This can be achieved via the apply_async() function that will issue the task asynchronously and return an AsyncResult as a handle on the task.

...
# issues task to the process pool
result = pool.apply_async(task, (0,))

You can learn more about issuing tasks via the apply_async() function in the tutorial:

The main process will then wait for the task to finish via the returned AsyncResult object.

...
# wait for tasks to complete
result.wait()

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting the details of the thread used to execute the task
from time import sleep
from multiprocessing.pool import Pool
from threading import current_thread

# task executed in a worker process
def task(identifier):
    # get the current thread
    thread = current_thread()
    # report the name of the current thread
    print(f'Worker thread: {thread.name}', flush=True)

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool() as pool:
        # issues task to the process pool
        result = pool.apply_async(task, (0,))
        # wait for tasks to complete
        result.wait()
    # process pool is closed automatically

Running the example first creates the process pool.

The main process then issues a task to the process pool then blocks until the task completes.

The custom function runs in the process pool. It gets the current thread and reports its name.

In this case, we can see, as we might expect, that the main thread of the worker process is responsible for executing the custom function in the child worker process.

Worker thread: MainThread

You can learn more about the main thread in the tutorial:

Next, let's explore how we might check which thread is used to initialize the child worker processes.

Example of Getting Thread Used to Initialize Worker

We can explore how to get the details of the thread used to initialize the child worker processes.

In this example we will define a custom function to initialize the worker processes. The initialization function will get the current thread and report its details. To ensure that the worker processes are initialized, we will issue one task into the process pool that will just block for a fraction of a second.

Firstly, we can define the custom function that will initialize the child worker processes.

The function does not take any arguments or return any values. It will get the threading.Thread instance for the current thread, then report its name.

The init_worker() function below implements this.

# initialize the worker process
def init_worker():
    # get the current thread
    thread = current_thread()
    # report the name of the current thread
    print(f'Initialize worker thread: {thread.name}', flush=True)

Next, we can define a custom function to execute a task in the process pool.

The task will take an argument and block for a fraction of a second.

The task() function below implements this.

# task executed in a worker process
def task(identifier):
    # block for a moment
    sleep(0.5)

Finally, in the main process, we can create the process pool with the default configuration using the context manager interface.

We can issue the task using the apply_async() function, get an AsyncResult in return immediately, then wait on the object for the task to complete.

...
# create and configure the process pool
with Pool(initializer=init_worker) as pool:
    # issues task to the process pool
    result = pool.apply_async(task, (0,))
    # wait for tasks to complete
    result.wait()

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting the details of the thread used to initialize worker process
from time import sleep
from multiprocessing.pool import Pool
from threading import current_thread

# initialize the worker process
def init_worker():
    # get the current thread
    thread = current_thread()
    # report the name of the current thread
    print(f'Initialize worker thread: {thread.name}', flush=True)

# task executed in a worker process
def task(identifier):
    # block for a moment
    sleep(0.5)

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool(initializer=init_worker) as pool:
        # issues task to the process pool
        result = pool.apply_async(task, (0,))
        # wait for tasks to complete
        result.wait()
    # process pool is closed automatically

Running the example first creates the process pool.

The main process then issues the task into the process pool and waits for it to complete.

The process pool is created and starts a default number of worker processes, eight in this case, although the number of processes will differ depending on the number of CPU cores in your system.

Each worker process is then initialized using the custom initialization function that gets the current thread and reports its name.

In this case, we can see that each worker process is initialized using the main thread of the child worker process.

This is not surprising.

Initialize worker thread: MainThread
Initialize worker thread: MainThread
Initialize worker thread: MainThread
Initialize worker thread: MainThread
Initialize worker thread: MainThread
Initialize worker thread: MainThread
Initialize worker thread: MainThread
Initialize worker thread: MainThread

Next, let's investigate the worker threads created to help manage the process pool.

Example of Getting Threads Used To Manage Pool

We can explore the threads created automatically by the process pool.

Looking at the source code for the multiprocessing.pool.Pool class, we can see that helper threads are created by the process pool.

A brief review of the code suggests that the roles of these helper functions might be to:

  1. Help to create worker child processes.
  2. Help with dispatching work to worker processes.
  3. Help handling results sent back from worker processes.

In this example we will create a process pool and issue tasks and wait for them to complete. We will then get a list of all active threads in the main process and report their details.

Firstly, we can define a custom function to execute in the process pool.

The task does not need to do anything other than help start-up the process pool completely.

In this case, the task will take an argument and block for a fraction of a second.

The task() function below implements this.

# task executed in a worker process
def task(identifier):
    # block for a moment
    sleep(0.5)

Next in the main process, we will create the process pool using the default configuration. We will use the context manager interface to ensure the pool is closed automatically once we are finished with it.

...
# create and configure the process pool
with Pool() as pool:
	# ...

Next, we will issue 10 tasks to the process pool, calling our custom task() function with different integer values from 0 to 9.

This can be achieved with the map_async() function that will issue the tasks asynchronously and return immediately with an AsyncResult object. We can then wait on the object until the issued tasks are completed.

...
# issues tasks to process pool
result = pool.map_async(task, range(10))
# wait for tasks to complete
result.wait()

You can learn more about the map_async() function in the tutorial:

Next, we can get a list of all active threads in the main process and report the name of each.

...
# report all threads in this process
for thread in threading.enumerate():
print(f'Thread: {thread.name}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting the details of threads used to maintain the process pool
from time import sleep
from multiprocessing.pool import Pool
from threading import current_thread
import threading

# task executed in a worker process
def task(identifier):
    # block for a moment
    sleep(0.5)

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    with Pool() as pool:
        # issues tasks to process pool
        result = pool.map_async(task, range(10))
        # wait for tasks to complete
        result.wait()
        # report all threads in this process
        for thread in threading.enumerate():
            print(f'Thread: {thread.name}')
    # process pool is closed automatically

Running the example first creates the process pool.

The 10 tasks are then issued to the process pool and the main process blocks until they are complete.

Each task runs in the process pool, blocks for a moment, then returns.

The main process continues on and gets a list of all active threads and reports their names.

We can see that in addition to the main threads, there are three additional threads. These are the helper threads used internally by the process pool.

Unfortunately, they have ambiguous names, e.g. "Thread-1" to "Thread-3". Hopefully they are renamed in a future version of Python to indicate their association with the process pool.

Thread: MainThread
Thread: Thread-1
Thread: Thread-2
Thread: Thread-3

Takeaways

You now know how to get the details of threads used by the process pool.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.