You can share a thread-safe queue with workers in the ThreadPoolExecutor using a function argument, global variable, and variable defined in the worker thread initialization function.
In this tutorial, you will discover how to share a queue with tasks executed in the ThreadPoolExecutor.
Let’s get started.
Need to Share a Queue with All Workers in the ThreadPoolExecutor
The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.
Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.
A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.
For example:
1 2 3 4 5 6 7 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # issue a task future = tpe.submit(task) # the the task result once the task is done result = future.result() |
You can learn more about the ThreadPoolExecutor in the tutorial:
A queue.Queue is a thread-safe data structure. It can be used to share data between threads, such as having one thread put data on the queue and another thread get data from the queue.
You can learn more about thread-safe queues in the tutorial:
It is common to need to share a queue from the main thread with tasks executed by workers in the ThreadPoolExecutor.
For example, the main thread may create a queue, issues tasks that put data on the queue, and then the main thread reads results from the queue.
How can we share a thread-safe queue in the main thread with each task executed in the ThreadPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How to Share a Queue with All Workers in the ThreadPoolExecutor
There are 3 main ways to share data from the main thread with tasks in the ThreadPoolExecutor, they are:
- Via an argument to the task function.
- Via a global variable defined in main.
- Via a global variable defined in the worker initializer function.
If the task function can be modified, then the queue can be added as an argument passed to the function directly.
This is not always possible, although it may be possible to change the content of the task function or a function executed by the task function. In that case, a global variable can be defined in the main thread that can be accessed by custom code.
A similar approach is to have the worker threads declare and define a global variable themselves when they are created in the ThreadPoolExecutor. This mechanism can be used to provide access to the queue to all tasks executed by worker threads.
You can learn more about how to share data with tasks executed by workers in the ThreadPoolExecutor in the tutorial:
Next, let’s look at worked examples of sharing a queue with tasks executed in the ThreadPoolExecutor.
Example of Sharing a Queue As A Function Argument
We can share a queue with tasks executed by workers in the ThreadPoolExecutor by passing it as an argument to the task.
In this example, we will create a queue in the main thread and pass it as an argument to tasks in the thread pool. Each task will generate data and put it in the queue. The main thread will consume all data put in the queue.
This provides an example of how to easily share data between tasks and the main thread.
Firstly, we can define a task function that takes a queue as an argument. The task generates a random value between 0 and 1, blocks for a fraction of a second to simulate work, then puts the generated value on the queue before terminating.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in the thread pool def task(queue): # generate a random value value = random() # block for a moment to simulate work sleep(value) # push data into the queue queue.put(value) |
Next, in the main thread, we can define the shared queue.
1 2 3 |
... # define the shared queue queue = Queue() |
We can then create a thread pool with a default number of workers and issue 10 tasks into the pool, passing the shared queue to each as an argument.
1 2 3 4 5 6 |
... # create the thread pool n_tasks = 10 with ThreadPoolExecutor() as tpe: # issue tasks to the thread pool _ = [tpe.submit(task, queue) for _ in range(n_tasks)] |
The thread pool is created using the context manager interface, ensuring it is shut down automatically for us once we are finished with it.
You can learn more about the context manager interface for the ThreadPoolExecutor in the tutorial:
The main thread will then consume the expected number of results from the queue and report each in turn.
1 2 3 4 5 6 7 |
... # consume results from the queue for _ in range(n_tasks): # get data from the queue data = queue.get() # report data from the queue print(f'Got: {data}') |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of sharing a queue with all workers in the thread pool from concurrent.futures import ThreadPoolExecutor from queue import Queue from time import sleep from random import random # task executed in the thread pool def task(queue): # generate a random value value = random() # block for a moment to simulate work sleep(value) # push data into the queue queue.put(value) # protect the entry point if __name__ == '__main__': # define the shared queue queue = Queue() # create the thread pool n_tasks = 10 with ThreadPoolExecutor() as tpe: # issue tasks to the thread pool _ = [tpe.submit(task, queue) for _ in range(n_tasks)] # consume results from the queue for _ in range(n_tasks): # get data from the queue data = queue.get() # report data from the queue print(f'Got: {data}') |
Running the example first creates the shared queue.
The thread pool is then created and all 10 tasks are issued, passing the queue as an argument.
The main thread then blocks, waiting for items to be retrieved from the shared queue.
The tasks are executed by the thread pool. Each task generates a random number, blocks for a fraction of a second, then puts the generated value on the queue.
The main thread retrieves values from the queue and reports their values as fast as it is able.
Once all tasks are completed and all values are retrieved from the queue, the thread pool is shut down.
This highlights how we can pass a shared queue to tasks executed in the ThreadPoolExecutor using a function argument.
1 2 3 4 5 6 7 8 9 10 |
Got: 0.22343788239973406 Got: 0.3037455207220955 Got: 0.3255019247040706 Got: 0.3592485706592823 Got: 0.471827190852797 Got: 0.5000176812566458 Got: 0.5748713783348164 Got: 0.6230219935301552 Got: 0.7209176687021336 Got: 0.9569111780373742 |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Sharing a Queue As a Global Variable
We can explore how to share a queue with tasks executed in the thread pool via a global variable.
All threads in a Python process can share the same memory, such as global variables.
You can learn more about sharing global variables between threads in the tutorial:
In this example, we will define the queue as a global variable, then access it from within tasks executed by the thread pool.
Firstly we can define the task function to be executed in the thread pool. The task function does not take any arguments. It first generates a random value, blocks for a fraction of a second, then declares the queue global variable and puts the generated data on the queue.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in the thread pool def task(): # generate a random value value = random() # block for a moment to simulate work sleep(value) # declare the global variable global queue # push data into the queue queue.put(value) |
Next, the queue is created as a global variable, then the thread pool is created, all tasks are issued and the main thread consumes all results from the queue. This is identical to the example above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# protect the entry point if __name__ == '__main__': # define the shared queue queue = Queue() # create the thread pool n_tasks = 10 with ThreadPoolExecutor() as tpe: # issue tasks to the thread pool _ = [tpe.submit(task) for _ in range(n_tasks)] # consume results from the queue for _ in range(n_tasks): # get data from the queue data = queue.get() # report data from the queue print(f'Got: {data}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of sharing a queue with all workers in the thread pool via a global variable from concurrent.futures import ThreadPoolExecutor from queue import Queue from time import sleep from random import random # task executed in the thread pool def task(): # generate a random value value = random() # block for a moment to simulate work sleep(value) # declare the global variable global queue # push data into the queue queue.put(value) # protect the entry point if __name__ == '__main__': # define the shared queue queue = Queue() # create the thread pool n_tasks = 10 with ThreadPoolExecutor() as tpe: # issue tasks to the thread pool _ = [tpe.submit(task) for _ in range(n_tasks)] # consume results from the queue for _ in range(n_tasks): # get data from the queue data = queue.get() # report data from the queue print(f'Got: {data}') |
Running the example first creates the shared queue.
The thread pool is then created and all 10 tasks are issued.
The main thread then blocks, waiting for items to be retrieved from the shared queue.
The tasks are executed by the thread pool. Each task generates a random number, blocks for a fraction of a second, then declares the queue as a global variable and puts the generated value on the queue.
The main thread retrieves values from the queue and reports their values as fast as it is able.
Once all tasks are completed and all values are retrieved from the queue, the thread pool is shut down.
This highlights how tasks executed in the ThreadPoolExecutor are able to access a global variable defined in another thread, in this case, in the main thread.
1 2 3 4 5 6 7 8 9 10 |
Got: 0.3953209941257878 Got: 0.417335789582704 Got: 0.43046855896474334 Got: 0.48521935549599005 Got: 0.5342764937924521 Got: 0.5349478798500471 Got: 0.5489116895705005 Got: 0.6573965347346484 Got: 0.7276962760365066 Got: 0.7985033779944768 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Sharing a Queue Via Worker Initialization
We can explore how to share a queue with tasks executed by workers in the ThreadPoolExecutor using the initializer function.
In this example, we will define a custom worker threads initialization function. This function will take the shared queue as an argument, declare a new global variable, and store the queue in the global variable. Each worker thread will then be able to execute this global variable.
The worker_init() function below implements this.
1 2 3 4 5 6 |
# worker initialization function def worker_init(queue): # declare a global variable for the worker global worker_queue # define the worker global variable worker_queue = queue |
You can learn more about worker thread initialization functions in the tutorial:
The task() function is able to access the global variable directly and put the result on the queue.
1 2 3 4 5 6 7 8 |
# task executed in the thread pool def task(): # generate a random value value = random() # block for a moment to simulate work sleep(value) # push data into the queue worker_queue.put(value) |
The main thread defines the queue as before.
When the ThreadPoolExecutor is created, it is configured so that new worker threads are initialized with the custom worker_init() function initialization function, passing in the shared queue as an argument to this function.
1 2 3 4 5 |
... # create the thread pool n_tasks = 10 with ThreadPoolExecutor(initializer=worker_init, initargs=(queue,)) as tpe: # ... |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of sharing a queue with all workers in the thread pool via worker init from concurrent.futures import ThreadPoolExecutor from queue import Queue from time import sleep from random import random # worker initialization function def worker_init(queue): # declare a global variable for the worker global worker_queue # define the worker global variable worker_queue = queue # task executed in the thread pool def task(): # generate a random value value = random() # block for a moment to simulate work sleep(value) # push data into the queue worker_queue.put(value) # protect the entry point if __name__ == '__main__': # define the shared queue queue = Queue() # create the thread pool n_tasks = 10 with ThreadPoolExecutor(initializer=worker_init, initargs=(queue,)) as tpe: # issue tasks to the thread pool _ = [tpe.submit(task) for _ in range(n_tasks)] # consume results from the queue for _ in range(n_tasks): # get data from the queue data = queue.get() # report data from the queue print(f'Got: {data}') |
Running the example first creates the shared queue.
The thread pool is then created. Each thread in the pool is configured to call the custom initialization function with the queue as an argument.
The 10 tasks are issued with no arguments. New threads in the pool are created on-demand to service the tasks. Each thread declares a global variable and stores the queue in the variable.
The main thread then blocks, waiting for items to be retrieved from the shared queue.
The tasks are executed by the thread pool. Each task generates a random number, blocks for a fraction of a second, then puts the generated value on the queue, accessed directly via the global variable.
The main thread retrieves values from the queue and reports their values as fast as it is able.
Once all tasks are completed and all values are retrieved from the queue, the thread pool is shut down.
This highlights how worker threads are able to declare and define global variables that can be accessed by tasks executed in the ThreadPoolExecutor.
1 2 3 4 5 6 7 8 9 10 |
Got: 0.19156964353381334 Got: 0.2508924728141374 Got: 0.4911395898698 Got: 0.5230069652486774 Got: 0.5737375485195993 Got: 0.5872976604093911 Got: 0.6462470850341827 Got: 0.7030385133937166 Got: 0.7084200901876665 Got: 0.7619349071440448 |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to share a queue with tasks executed in the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Mac Blades on Unsplash
Do you have any questions?