You can create a producer thread pool and a consumer thread pool connected by a shared queue.
This allows many producer tasks to run concurrently as well as many consumer tasks to run concurrently, allowing the producer-consumer pattern to scale with the amount of work or capabilities of the system.
In this tutorial, you will discover how to create producer and consumer thread pools connected by a queue.
This tutorial was inspired by questions and discussions with Bart B. Thank you deeply! If you have a question about Python concurrency, message me anytime.
Let’s get started.
Need to Share Tasks Between Thread Pools
The Producer-Consumer pattern is a common concurrency programming design pattern.
It involves producer tasks, consumer tasks, and a shared buffer or queue that connects these two types of tasks.
- Producer Tasks: Generate work that is added to the shared buffer.
- Consumer Tasks: Remove work from the shared buffer and process it in some way.
- Shared Buffer: Thread-safe data structure that connects producers and consumers that may be bounded.
We may have a problem that requires many concurrent producers and many concurrent consumers connected using a queue.
For example, producer tasks may be reading data from a file or downloading data from the internet. Producer tasks may be saving data into a database or presenting it to the user.
The tasks themselves can be executed concurrently in thread pools, such as the multiprocessing.pool.ThreadPool class or the concurrent.futures.ThreadPoolExecutor class.
Note, a producer-consumer pattern with one producer and one consumer and a shared thread is a much simpler design. You can see an example of this in the tutorial:
How can we develop a producer-consumer pattern using thread pools in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Develop Producer and Consumer Thread Pools
We can develop a producer-consumer pattern with concurrent producers and concurrent consumers using thread pools and a queue.
ProducerThread Pool
A producer manager function can be created to manage the producer’s tasks.
This would involve creating and managing the thread pool and issuing all required producer tasks into the producer thread pool.
The number of workers in the queue can limit the number of concurrent producer tasks permitted by the system.
For example:
1 2 3 4 5 6 7 8 9 |
# producer manager task def producer_manager(queue): # create thread pool with ThreadPool() as pool: # issue producer tasks into the thread pool # ... # wait for all tasks to complete pool.close() pool.join() |
The producer task would be defined as a separate function from the producer manager task.
Each producer task will require access to the shared queue on which the outcome of the producer task will be placed.
For example:
1 2 3 4 5 |
# producer task def producer_task(queue): # ... # push data into queue queue.put(value) |
Once all producer tasks have been completed, the producer manager can post a message on the queue to indicate that no further tasks are to be expected.
For example:
1 2 3 |
... # put a signal to expect no further tasks queue.put(None) |
Consumer Thread Pool
The consumer thread pool can be managed by a consumer manager task.
This task is responsible for creating and managing the thread pool and issuing tasks into the thread pool that consume work from the shared queue.
The number of workers in the queue can limit the number of concurrent consumer tasks permitted by the system.
For example:
1 2 3 4 5 6 7 8 9 |
# consumer manager def consumer_manager(queue): # create thread pool with ThreadPool() as pool: # start consumer tasks # ... # wait for all tasks to complete pool.close() pool.join() |
Each consumer task can be defined as a function that takes the shared queue as an argument.
The consumer tasks may run once or may run in a loop, consuming items from the queue until no further items are available.
If the latter and a loop is used, then when the message that further items are to be expected is received, it can be re-added to the queue for other tasks and the task can exit.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 |
# consumer task def consumer_task(queue): # run until there is no more work while True: # retrieve one item from the queue value = queue.get() # check for signal of no more work if not value: # put back on the queue for other consumers queue.put(value) # shutdown return |
Coordination
Not a lot of coordination is then required.
The main thread can create the shared queue.
It can also start a separate thread for the producer manager and one for the consumer manager.
The main thread can wait for the producer to finish first, then for the consumer, although the consumer manager will not finish until the producer is finished so this may be redundant.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
... # create the shared queue queue = Queue() # run the consumer consumer = Thread(target=consumer_manager, args=(queue,)) consumer.start() # run the producer producer = Thread(target=producer_manager, args=(queue,)) producer.start() # wait for the producer to finish producer.join() # wait for the consumer to finish consumer.join() |
And that’s it.
Next, let’s look at a worked example of developing a producer-consumer pattern using thread pools.
Example of Producer and Consumer Thread Pools
We can explore an example of developing producer and consumer thread pools connected by a shared pool.
The producer tasks will put items on the shared queue and the consumer tasks will remove items from the shared queue.
The number of producer and consumer workers can scale based on the amount of work required or the capacity of the system using thread pools.
In this example, we will define a simple producer task that generates a random number between 0 and 10, blocks for that many seconds, then places the generated value on the shared queue.
For example:
1 2 3 4 5 6 7 8 |
# producer task def producer_task(queue): # generate a random number between 0 and 10 value = random() * 10 # block for a moment to simulate work sleep(value) # push data into queue queue.put(value) |
We will then define a thread pool with 20 workers and issue 20 producers into the pool to complete as fast as they are able.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# producer manager task def producer_manager(queue): # create thread pool with ThreadPool(20) as pool: # use threads to generate items and put into the queue _ = [pool.apply_async(producer_task, args=(queue,)) for _ in range(20)] # wait for all tasks to complete pool.close() pool.join() # put a signal to expect no further tasks queue.put(None) # report a message print('>producer_manager done.') |
The consumer task will run in a loop until explicitly stopped.
In each iteration, the consumer task will retrieve an item from the shared queue. If the item is a message that indicates no further work, it will re-add the message to the queue for other consumers to process and exit. Otherwise, it will retrieve the value, block for that number of seconds to simulate work, and repeat the process.
For example:
1 2 3 4 5 6 7 8 9 10 |
# consumer manager def consumer_manager(queue): # create thread pool with ThreadPool(5) as pool: # start consumer tasks _ = [pool.apply_async(consumer_task, args=(queue,)) for _ in range(5)] # wait for all tasks to complete pool.close() pool.join() print('>consumer_manager done.') |
The consumer manager will create and manage a pool of 5 consumer tasks which will run for as long as there is work on the shared queue to complete.
1 2 3 4 5 6 7 8 9 10 |
# consumer manager def consumer_manager(queue): # create thread pool with ThreadPool(5) as pool: # start consumer tasks _ = [pool.apply_async(consumer_task, args=(queue,)) for _ in range(5)] # wait for all tasks to complete pool.close() pool.join() print('>consumer_manager done.') |
Finally, the main process will start a separate thread for each of the producer and consumer managers and wait for all work to be completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# protect the entry point if __name__ == '__main__': # create the shared queue queue = Queue() # run the consumer consumer = Thread(target=consumer_manager, args=(queue,)) consumer.start() # run the producer producer = Thread(target=producer_manager, args=(queue,)) producer.start() # wait for the producer to finish producer.join() # wait for the consumer to finish consumer.join() # report a final message print('>main done.') |
Tying this together, the complete example of producer and consumer thread pools connected via a shared queue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# SuperFastPython.com # example of producer and consumer thread pools from random import random from time import sleep from threading import Thread from multiprocessing.pool import ThreadPool from queue import Queue # producer task def producer_task(queue): # generate a random number between 0 and 10 value = random() * 10 # block for a moment to simulate work sleep(value) # push data into queue queue.put(value) # producer manager task def producer_manager(queue): # create thread pool with ThreadPool(20) as pool: # use threads to generate items and put into the queue _ = [pool.apply_async(producer_task, args=(queue,)) for _ in range(20)] # wait for all tasks to complete pool.close() pool.join() # put a signal to expect no further tasks queue.put(None) # report a message print('>producer_manager done.') # consumer task def consumer_task(queue): # run until there is no more work while True: # retrieve one item from the queue value = queue.get() # check for signal of no more work if not value: # put back on the queue for other consumers queue.put(value) # shutdown return # block for a moment to simulate work sleep(value) # report the value print(f'Consumer got: {value}') # consumer manager def consumer_manager(queue): # create thread pool with ThreadPool(5) as pool: # start consumer tasks _ = [pool.apply_async(consumer_task, args=(queue,)) for _ in range(5)] # wait for all tasks to complete pool.close() pool.join() print('>consumer_manager done.') # protect the entry point if __name__ == '__main__': # create the shared queue queue = Queue() # run the consumer consumer = Thread(target=consumer_manager, args=(queue,)) consumer.start() # run the producer producer = Thread(target=producer_manager, args=(queue,)) producer.start() # wait for the producer to finish producer.join() # wait for the consumer to finish consumer.join() # report a final message print('>main done.') |
Running the example first creates the shared queue.
Next, the consumer manager thread is started.
This in turn starts the consumer thread pool and all five consumer workers. The workers run and await items to appear on the shared queue.
Next, a new thread is created and started for the producer manager. This starts a thread pool with 20 workers and issues 20 tasks to the pool, each producing one item onto the shared queue as fast as they are able.
The main thread then waits for the threads to complete.
Each producer task generates a random number, sleeps, and places the value on the shared queue before exiting.
Consumers read a value from the shared queue, sleep, then report the message before repeating the process.
All consumers finish and the producer manager places a signal to expect no further tasks on the shared queue, then shuts down.
The consumer tasks continue to retrieve items from the queue until the shutdown message is read. Each consumer then shuts down in turn and finally, the consumer manager shuts down.
All items have been produced and consumed and the main thread then exits.
This highlights how to develop a producer and consumer thread pool connected by a shared queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
Consumer got: 0.5610484753890665 Consumer got: 0.6569748244844098 Consumer got: 1.2038347369908609 Consumer got: 2.4707941999782745 Consumer got: 2.9249689060927073 Consumer got: 2.9535460714993613 Consumer got: 3.128798374575743 Consumer got: 3.487363956384005 >producer_manager done. Consumer got: 3.992346728553197 Consumer got: 4.459087650477308 Consumer got: 4.499602838944 Consumer got: 4.538986711148029 Consumer got: 5.13572403183367 Consumer got: 5.345232979458765 Consumer got: 5.815908080748752 Consumer got: 5.921058518398532 Consumer got: 6.092388941145181 Consumer got: 6.307448212661738 Consumer got: 7.2477538938409864 Consumer got: 7.634070399288417 >consumer_manager done. >main done. |
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to create producer and consumer thread pools connected by a queue.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Remy Lovesy on Unsplash
Do you have any questions?