ThreadPool Producer-Consumer Pattern in Python
You can create a producer thread pool and a consumer thread pool connected by a shared queue.
This allows many producer tasks to run concurrently as well as many consumer tasks to run concurrently, allowing the producer-consumer pattern to scale with the amount of work or capabilities of the system.
In this tutorial, you will discover how to create producer and consumer thread pools connected by a queue.
This tutorial was inspired by questions and discussions with Bart B. Thank you deeply! If you have a question about Python concurrency, message me anytime.
Let's get started.
Need to Share Tasks Between Thread Pools
The Producer-Consumer pattern is a common concurrency programming design pattern.
It involves producer tasks, consumer tasks, and a shared buffer or queue that connects these two types of tasks.
- Producer Tasks: Generate work that is added to the shared buffer.
- Consumer Tasks: Remove work from the shared buffer and process it in some way.
- Shared Buffer: Thread-safe data structure that connects producers and consumers that may be bounded.
We may have a problem that requires many concurrent producers and many concurrent consumers connected using a queue.
For example, producer tasks may be reading data from a file or downloading data from the internet. Producer tasks may be saving data into a database or presenting it to the user.
The tasks themselves can be executed concurrently in thread pools, such as the multiprocessing.pool.ThreadPool class or the concurrent.futures.ThreadPoolExecutor class.
Note, a producer-consumer pattern with one producer and one consumer and a shared thread is a much simpler design. You can see an example of this in the tutorial:
How can we develop a producer-consumer pattern using thread pools in Python?
How to Develop Producer and Consumer Thread Pools
We can develop a producer-consumer pattern with concurrent producers and concurrent consumers using thread pools and a queue.
ProducerThread Pool
A producer manager function can be created to manage the producer's tasks.
This would involve creating and managing the thread pool and issuing all required producer tasks into the producer thread pool.
The number of workers in the queue can limit the number of concurrent producer tasks permitted by the system.
For example:
# producer manager task
def producer_manager(queue):
# create thread pool
with ThreadPool() as pool:
# issue producer tasks into the thread pool
# ...
# wait for all tasks to complete
pool.close()
pool.join()
The producer task would be defined as a separate function from the producer manager task.
Each producer task will require access to the shared queue on which the outcome of the producer task will be placed.
For example:
# producer task
def producer_task(queue):
# ...
# push data into queue
queue.put(value)
Once all producer tasks have been completed, the producer manager can post a message on the queue to indicate that no further tasks are to be expected.
For example:
...
# put a signal to expect no further tasks
queue.put(None)
Consumer Thread Pool
The consumer thread pool can be managed by a consumer manager task.
This task is responsible for creating and managing the thread pool and issuing tasks into the thread pool that consume work from the shared queue.
The number of workers in the queue can limit the number of concurrent consumer tasks permitted by the system.
For example:
# consumer manager
def consumer_manager(queue):
# create thread pool
with ThreadPool() as pool:
# start consumer tasks
# ...
# wait for all tasks to complete
pool.close()
pool.join()
Each consumer task can be defined as a function that takes the shared queue as an argument.
The consumer tasks may run once or may run in a loop, consuming items from the queue until no further items are available.
If the latter and a loop is used, then when the message that further items are to be expected is received, it can be re-added to the queue for other tasks and the task can exit.
For example:
# consumer task
def consumer_task(queue):
# run until there is no more work
while True:
# retrieve one item from the queue
value = queue.get()
# check for signal of no more work
if not value:
# put back on the queue for other consumers
queue.put(value)
# shutdown
return
Coordination
Not a lot of coordination is then required.
The main thread can create the shared queue.
It can also start a separate thread for the producer manager and one for the consumer manager.
The main thread can wait for the producer to finish first, then for the consumer, although the consumer manager will not finish until the producer is finished so this may be redundant.
For example:
...
# create the shared queue
queue = Queue()
# run the consumer
consumer = Thread(target=consumer_manager, args=(queue,))
consumer.start()
# run the producer
producer = Thread(target=producer_manager, args=(queue,))
producer.start()
# wait for the producer to finish
producer.join()
# wait for the consumer to finish
consumer.join()
And that's it.
Next, let's look at a worked example of developing a producer-consumer pattern using thread pools.
Example of Producer and Consumer Thread Pools
We can explore an example of developing producer and consumer thread pools connected by a shared pool.
The producer tasks will put items on the shared queue and the consumer tasks will remove items from the shared queue.
The number of producer and consumer workers can scale based on the amount of work required or the capacity of the system using thread pools.
In this example, we will define a simple producer task that generates a random number between 0 and 10, blocks for that many seconds, then places the generated value on the shared queue.
For example:
# producer task
def producer_task(queue):
# generate a random number between 0 and 10
value = random() * 10
# block for a moment to simulate work
sleep(value)
# push data into queue
queue.put(value)
We will then define a thread pool with 20 workers and issue 20 producers into the pool to complete as fast as they are able.
For example:
# producer manager task
def producer_manager(queue):
# create thread pool
with ThreadPool(20) as pool:
# use threads to generate items and put into the queue
_ = [pool.apply_async(producer_task, args=(queue,)) for _ in range(20)]
# wait for all tasks to complete
pool.close()
pool.join()
# put a signal to expect no further tasks
queue.put(None)
# report a message
print('>producer_manager done.')
The consumer task will run in a loop until explicitly stopped.
In each iteration, the consumer task will retrieve an item from the shared queue. If the item is a message that indicates no further work, it will re-add the message to the queue for other consumers to process and exit. Otherwise, it will retrieve the value, block for that number of seconds to simulate work, and repeat the process.
For example:
# consumer manager
def consumer_manager(queue):
# create thread pool
with ThreadPool(5) as pool:
# start consumer tasks
_ = [pool.apply_async(consumer_task, args=(queue,)) for _ in range(5)]
# wait for all tasks to complete
pool.close()
pool.join()
print('>consumer_manager done.')
The consumer manager will create and manage a pool of 5 consumer tasks which will run for as long as there is work on the shared queue to complete.
# consumer manager
def consumer_manager(queue):
# create thread pool
with ThreadPool(5) as pool:
# start consumer tasks
_ = [pool.apply_async(consumer_task, args=(queue,)) for _ in range(5)]
# wait for all tasks to complete
pool.close()
pool.join()
print('>consumer_manager done.')
Finally, the main process will start a separate thread for each of the producer and consumer managers and wait for all work to be completed.
# protect the entry point
if __name__ == '__main__':
# create the shared queue
queue = Queue()
# run the consumer
consumer = Thread(target=consumer_manager, args=(queue,))
consumer.start()
# run the producer
producer = Thread(target=producer_manager, args=(queue,))
producer.start()
# wait for the producer to finish
producer.join()
# wait for the consumer to finish
consumer.join()
# report a final message
print('>main done.')
Tying this together, the complete example of producer and consumer thread pools connected via a shared queue is listed below.
# SuperFastPython.com
# example of producer and consumer thread pools
from random import random
from time import sleep
from threading import Thread
from multiprocessing.pool import ThreadPool
from queue import Queue
# producer task
def producer_task(queue):
# generate a random number between 0 and 10
value = random() * 10
# block for a moment to simulate work
sleep(value)
# push data into queue
queue.put(value)
# producer manager task
def producer_manager(queue):
# create thread pool
with ThreadPool(20) as pool:
# use threads to generate items and put into the queue
_ = [pool.apply_async(producer_task, args=(queue,)) for _ in range(20)]
# wait for all tasks to complete
pool.close()
pool.join()
# put a signal to expect no further tasks
queue.put(None)
# report a message
print('>producer_manager done.')
# consumer task
def consumer_task(queue):
# run until there is no more work
while True:
# retrieve one item from the queue
value = queue.get()
# check for signal of no more work
if not value:
# put back on the queue for other consumers
queue.put(value)
# shutdown
return
# block for a moment to simulate work
sleep(value)
# report the value
print(f'Consumer got: {value}')
# consumer manager
def consumer_manager(queue):
# create thread pool
with ThreadPool(5) as pool:
# start consumer tasks
_ = [pool.apply_async(consumer_task, args=(queue,)) for _ in range(5)]
# wait for all tasks to complete
pool.close()
pool.join()
print('>consumer_manager done.')
# protect the entry point
if __name__ == '__main__':
# create the shared queue
queue = Queue()
# run the consumer
consumer = Thread(target=consumer_manager, args=(queue,))
consumer.start()
# run the producer
producer = Thread(target=producer_manager, args=(queue,))
producer.start()
# wait for the producer to finish
producer.join()
# wait for the consumer to finish
consumer.join()
# report a final message
print('>main done.')
Running the example first creates the shared queue.
Next, the consumer manager thread is started.
This in turn starts the consumer thread pool and all five consumer workers. The workers run and await items to appear on the shared queue.
Next, a new thread is created and started for the producer manager. This starts a thread pool with 20 workers and issues 20 tasks to the pool, each producing one item onto the shared queue as fast as they are able.
The main thread then waits for the threads to complete.
Each producer task generates a random number, sleeps, and places the value on the shared queue before exiting.
Consumers read a value from the shared queue, sleep, then report the message before repeating the process.
All consumers finish and the producer manager places a signal to expect no further tasks on the shared queue, then shuts down.
The consumer tasks continue to retrieve items from the queue until the shutdown message is read. Each consumer then shuts down in turn and finally, the consumer manager shuts down.
All items have been produced and consumed and the main thread then exits.
This highlights how to develop a producer and consumer thread pool connected by a shared queue.
Consumer got: 0.5610484753890665
Consumer got: 0.6569748244844098
Consumer got: 1.2038347369908609
Consumer got: 2.4707941999782745
Consumer got: 2.9249689060927073
Consumer got: 2.9535460714993613
Consumer got: 3.128798374575743
Consumer got: 3.487363956384005
>producer_manager done.
Consumer got: 3.992346728553197
Consumer got: 4.459087650477308
Consumer got: 4.499602838944
Consumer got: 4.538986711148029
Consumer got: 5.13572403183367
Consumer got: 5.345232979458765
Consumer got: 5.815908080748752
Consumer got: 5.921058518398532
Consumer got: 6.092388941145181
Consumer got: 6.307448212661738
Consumer got: 7.2477538938409864
Consumer got: 7.634070399288417
>consumer_manager done.
>main done.
Takeaways
You now know how to create producer and consumer thread pools connected by a queue.
If you enjoyed this tutorial, you will love my book: Python ThreadPool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.