ThreadPoolExecutor Share Queue With Worker Threads

July 25, 2023 Python ThreadPoolExecutor

You can share a thread-safe queue with workers in the ThreadPoolExecutor using a function argument, global variable, and variable defined in the worker thread initialization function.

In this tutorial, you will discover how to share a queue with tasks executed in the ThreadPoolExecutor.

Let's get started.

Need to Share a Queue with All Workers in the ThreadPoolExecutor

The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.

Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.

A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.

For example:

...
# create a thread pool
with ThreadPoolExecutor() as tpe:
	# issue a task
	future = tpe.submit(task)
	# the the task result once the task is done
	result = future.result()

You can learn more about the ThreadPoolExecutor in the tutorial:

A queue.Queue is a thread-safe data structure. It can be used to share data between threads, such as having one thread put data on the queue and another thread get data from the queue.

You can learn more about thread-safe queues in the tutorial:

It is common to need to share a queue from the main thread with tasks executed by workers in the ThreadPoolExecutor.

For example, the main thread may create a queue, issues tasks that put data on the queue, and then the main thread reads results from the queue.

How can we share a thread-safe queue in the main thread with each task executed in the ThreadPoolExecutor?

How to Share a Queue with All Workers in the ThreadPoolExecutor

There are 3 main ways to share data from the main thread with tasks in the ThreadPoolExecutor, they are:

  1. Via an argument to the task function.
  2. Via a global variable defined in main.
  3. Via a global variable defined in the worker initializer function.

If the task function can be modified, then the queue can be added as an argument passed to the function directly.

This is not always possible, although it may be possible to change the content of the task function or a function executed by the task function. In that case, a global variable can be defined in the main thread that can be accessed by custom code.

A similar approach is to have the worker threads declare and define a global variable themselves when they are created in the ThreadPoolExecutor. This mechanism can be used to provide access to the queue to all tasks executed by worker threads.

You can learn more about how to share data with tasks executed by workers in the ThreadPoolExecutor in the tutorial:

Next, let's look at worked examples of sharing a queue with tasks executed in the ThreadPoolExecutor.

Example of Sharing a Queue As A Function Argument

We can share a queue with tasks executed by workers in the ThreadPoolExecutor by passing it as an argument to the task.

In this example, we will create a queue in the main thread and pass it as an argument to tasks in the thread pool. Each task will generate data and put it in the queue. The main thread will consume all data put in the queue.

This provides an example of how to easily share data between tasks and the main thread.

Firstly, we can define a task function that takes a queue as an argument. The task generates a random value between 0 and 1, blocks for a fraction of a second to simulate work, then puts the generated value on the queue before terminating.

The task() function below implements this.

# task executed in the thread pool
def task(queue):
    # generate a random value
    value = random()
    # block for a moment to simulate work
    sleep(value)
    # push data into the queue
    queue.put(value)

Next, in the main thread, we can define the shared queue.

...
# define the shared queue
queue = Queue()

We can then create a thread pool with a default number of workers and issue 10 tasks into the pool, passing the shared queue to each as an argument.

...
# create the thread pool
n_tasks = 10
with ThreadPoolExecutor() as tpe:
    # issue tasks to the thread pool
    _ = [tpe.submit(task, queue) for _ in range(n_tasks)]

The thread pool is created using the context manager interface, ensuring it is shut down automatically for us once we are finished with it.

You can learn more about the context manager interface for the ThreadPoolExecutor in the tutorial:

The main thread will then consume the expected number of results from the queue and report each in turn.

...
# consume results from the queue
for _ in range(n_tasks):
    # get data from the queue
    data = queue.get()
    # report data from the queue
    print(f'Got: {data}')

And that's it.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of sharing a queue with all workers in the thread pool
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from time import sleep
from random import random

# task executed in the thread pool
def task(queue):
    # generate a random value
    value = random()
    # block for a moment to simulate work
    sleep(value)
    # push data into the queue
    queue.put(value)

# protect the entry point
if __name__ == '__main__':
    # define the shared queue
    queue = Queue()
    # create the thread pool
    n_tasks = 10
    with ThreadPoolExecutor() as tpe:
        # issue tasks to the thread pool
        _ = [tpe.submit(task, queue) for _ in range(n_tasks)]
        # consume results from the queue
        for _ in range(n_tasks):
            # get data from the queue
            data = queue.get()
            # report data from the queue
            print(f'Got: {data}')

Running the example first creates the shared queue.

The thread pool is then created and all 10 tasks are issued, passing the queue as an argument.

The main thread then blocks, waiting for items to be retrieved from the shared queue.

The tasks are executed by the thread pool. Each task generates a random number, blocks for a fraction of a second, then puts the generated value on the queue.

The main thread retrieves values from the queue and reports their values as fast as it is able.

Once all tasks are completed and all values are retrieved from the queue, the thread pool is shut down.

This highlights how we can pass a shared queue to tasks executed in the ThreadPoolExecutor using a function argument.

Got: 0.22343788239973406
Got: 0.3037455207220955
Got: 0.3255019247040706
Got: 0.3592485706592823
Got: 0.471827190852797
Got: 0.5000176812566458
Got: 0.5748713783348164
Got: 0.6230219935301552
Got: 0.7209176687021336
Got: 0.9569111780373742

Example of Sharing a Queue As a Global Variable

We can explore how to share a queue with tasks executed in the thread pool via a global variable.

All threads in a Python process can share the same memory, such as global variables.

You can learn more about sharing global variables between threads in the tutorial:

In this example, we will define the queue as a global variable, then access it from within tasks executed by the thread pool.

Firstly we can define the task function to be executed in the thread pool. The task function does not take any arguments. It first generates a random value, blocks for a fraction of a second, then declares the queue global variable and puts the generated data on the queue.

The task() function below implements this.

# task executed in the thread pool
def task():
    # generate a random value
    value = random()
    # block for a moment to simulate work
    sleep(value)
    # declare the global variable
    global queue
    # push data into the queue
    queue.put(value)

Next, the queue is created as a global variable, then the thread pool is created, all tasks are issued and the main thread consumes all results from the queue. This is identical to the example above.

# protect the entry point
if __name__ == '__main__':
    # define the shared queue
    queue = Queue()
    # create the thread pool
    n_tasks = 10
    with ThreadPoolExecutor() as tpe:
        # issue tasks to the thread pool
        _ = [tpe.submit(task) for _ in range(n_tasks)]
        # consume results from the queue
        for _ in range(n_tasks):
            # get data from the queue
            data = queue.get()
            # report data from the queue
            print(f'Got: {data}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of sharing a queue with all workers in the thread pool via a global variable
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from time import sleep
from random import random

# task executed in the thread pool
def task():
    # generate a random value
    value = random()
    # block for a moment to simulate work
    sleep(value)
    # declare the global variable
    global queue
    # push data into the queue
    queue.put(value)

# protect the entry point
if __name__ == '__main__':
    # define the shared queue
    queue = Queue()
    # create the thread pool
    n_tasks = 10
    with ThreadPoolExecutor() as tpe:
        # issue tasks to the thread pool
        _ = [tpe.submit(task) for _ in range(n_tasks)]
        # consume results from the queue
        for _ in range(n_tasks):
            # get data from the queue
            data = queue.get()
            # report data from the queue
            print(f'Got: {data}')

Running the example first creates the shared queue.

The thread pool is then created and all 10 tasks are issued.

The main thread then blocks, waiting for items to be retrieved from the shared queue.

The tasks are executed by the thread pool. Each task generates a random number, blocks for a fraction of a second, then declares the queue as a global variable and puts the generated value on the queue.

The main thread retrieves values from the queue and reports their values as fast as it is able.

Once all tasks are completed and all values are retrieved from the queue, the thread pool is shut down.

This highlights how tasks executed in the ThreadPoolExecutor are able to access a global variable defined in another thread, in this case, in the main thread.

Got: 0.3953209941257878
Got: 0.417335789582704
Got: 0.43046855896474334
Got: 0.48521935549599005
Got: 0.5342764937924521
Got: 0.5349478798500471
Got: 0.5489116895705005
Got: 0.6573965347346484
Got: 0.7276962760365066
Got: 0.7985033779944768

Example of Sharing a Queue Via Worker Initialization

We can explore how to share a queue with tasks executed by workers in the ThreadPoolExecutor using the initializer function.

In this example, we will define a custom worker threads initialization function. This function will take the shared queue as an argument, declare a new global variable, and store the queue in the global variable. Each worker thread will then be able to execute this global variable.

The worker_init() function below implements this.

# worker initialization function
def worker_init(queue):
    # declare a global variable for the worker
    global worker_queue
    # define the worker global variable
    worker_queue = queue

You can learn more about worker thread initialization functions in the tutorial:

The task() function is able to access the global variable directly and put the result on the queue.

# task executed in the thread pool
def task():
    # generate a random value
    value = random()
    # block for a moment to simulate work
    sleep(value)
    # push data into the queue
    worker_queue.put(value)

The main thread defines the queue as before.

When the ThreadPoolExecutor is created, it is configured so that new worker threads are initialized with the custom worker_init() function initialization function, passing in the shared queue as an argument to this function.

...
# create the thread pool
n_tasks = 10
with ThreadPoolExecutor(initializer=worker_init, initargs=(queue,)) as tpe:
	# ...

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of sharing a queue with all workers in the thread pool via worker init
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from time import sleep
from random import random

# worker initialization function
def worker_init(queue):
    # declare a global variable for the worker
    global worker_queue
    # define the worker global variable
    worker_queue = queue

# task executed in the thread pool
def task():
    # generate a random value
    value = random()
    # block for a moment to simulate work
    sleep(value)
    # push data into the queue
    worker_queue.put(value)

# protect the entry point
if __name__ == '__main__':
    # define the shared queue
    queue = Queue()
    # create the thread pool
    n_tasks = 10
    with ThreadPoolExecutor(initializer=worker_init, initargs=(queue,)) as tpe:
        # issue tasks to the thread pool
        _ = [tpe.submit(task) for _ in range(n_tasks)]
        # consume results from the queue
        for _ in range(n_tasks):
            # get data from the queue
            data = queue.get()
            # report data from the queue
            print(f'Got: {data}')

Running the example first creates the shared queue.

The thread pool is then created. Each thread in the pool is configured to call the custom initialization function with the queue as an argument.

The 10 tasks are issued with no arguments. New threads in the pool are created on-demand to service the tasks. Each thread declares a global variable and stores the queue in the variable.

The main thread then blocks, waiting for items to be retrieved from the shared queue.

The tasks are executed by the thread pool. Each task generates a random number, blocks for a fraction of a second, then puts the generated value on the queue, accessed directly via the global variable.

The main thread retrieves values from the queue and reports their values as fast as it is able.

Once all tasks are completed and all values are retrieved from the queue, the thread pool is shut down.

This highlights how worker threads are able to declare and define global variables that can be accessed by tasks executed in the ThreadPoolExecutor.

Got: 0.19156964353381334
Got: 0.2508924728141374
Got: 0.4911395898698
Got: 0.5230069652486774
Got: 0.5737375485195993
Got: 0.5872976604093911
Got: 0.6462470850341827
Got: 0.7030385133937166
Got: 0.7084200901876665
Got: 0.7619349071440448

Takeaways

You now know how to share a queue with tasks executed in the ThreadPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.