Thread Pipeline in Python
You can develop a pipeline using threading.Thread and queue.Queue classes.
In this tutorial you will discover how to develop a multithreaded pipeline in Python.
Let's get started.
What is a Thread Pipeline
A pipeline is a linear series of tasks that build on each other.
Each task or step in the pipeline executes concurrently, reading and processing units of work from the previous step and writing the result to the next step.
We might consider a series of producer-consumer tasks, each executed by one or more threads and connected by queues.
Recall that the Producer-Consumer pattern is a common concurrency programming design pattern. It involves producer tasks, consumer tasks and a shared buffer or queue that connects these two types of tasks.
In a pipeline, each task may be a producer and consumer.
That is, each step in the pipeline may consume an item from the previous step in the pipeline, perform some processing, then produce an item for the next step in the pipeline.
- Pipeline Task: Consume items from the previous step and produce an item for the next step.
The exception to this is the first and last step in the pipeline. The first step of the pipeline may only produce work items without consuming anything. Similarly the last step of the pipeline may consume and process work items without producing anything.
Like the producer-consumer pattern, each step in the pipeline may be executed by one or more threads concurrently. This allows the steps of the pipeline to be scaled independently, perhaps based on the speed of processing of each step.
Now that we are familiar with a thread pipeline, let's consider why we might to use a thread pipeline.
Why Use a Thread Pipeline
A pipeline of tasks executed by threads can be a useful pattern in concurrent programming.
For example, we might imagine a series of steps to search a series of webpages for a keyword.
The first step may read a list of URLs from a file and produce each URL as a task. The next step may be responsible for downloading the URL into memory. The next step may be responsible for saving the downloaded content to file. The next step may be responsible for parsing the HTML from the downloaded file and the final step may be responsible for searching the text content for a keyword.
For example:
- Step 1: Load a file of URLs and produce each URL as a task.
- Step 2: Download a URL from the internet into memory.
- Step 3: Save content from memory to file.
- Step 4: Read a file and parse the HTML.
- Step 5: Search text content of document for keyword.
- Step 6: Collate and report results.
The decomposition of the application into a pipeline offers a number of benefits, for example:
- Clear definition of subtasks with specific inputs and outputs, e.g. better design
- Concurrency execution of subtasks, e.g. faster overall execution time.
- Configurable concurrency of subtasks, e.g. some tasks will benefit more than others.
- Responsive application, e.g. results may start appearing sooner than processing each step in a batch.
The pipeline pattern allows each aspect of the program to be well defined with an interface for input and output.
Some tasks may be slower than others. For example, downloading URLs and saving documents to file are slow tasks, involving blocking calls using socket and file communication. As such this step could benefit from more concurrency with threads. Parsing HTML documents and searching text is a CPU bound task and may benefit from less concurrency from threads.
The precise amount of concurrency to use at each step for maximum application performance is hard to know beforehand. As such, some testing of different configurations would be required.
Next, let's look at how we might implement a thread pipeline in Python.
How to Develop a Thread Pipeline
A thread pipeline may be implemented using threads and queues in Python.
Task Threads
Each step of the pipeline may be defined using a custom function that is executed by one or more new threads via the threading.Thread class.
A new threading.Thread class may be created and configured to execute a step in the pipeline by specifying the name of the function via the "target" argument and any arguments to the function via the "args" keyword.
For example:
...
# create a new thread to execute a task
thread = threading.Thread(target=task1, args=(queue_in, queue_out))
You can learn more about executing functions in a new thread in the tutorial:
Multiple threads can be created and configured to execute the same task, such as in a list comprehension.
For example:
...
# create many threads for one task
threads = [threading.Thread(target=task1, args=(queue_in, queue_out)) for i in range(10)]
Once created and configured, the new thread can be started by calling the start() function which will begin executing the new thread and return immediately to the current thread.
For example:
...
# start the new thread
thread.start()
The calling thread may then wait for the new thread to finish by calling the join() function.
For example:
...
# wait for the new thread to finish
thread.join()
You can learn more about waiting for threads to terminate in the tutorial:
Connect Tasks With Queues
Threads may share data or items of work using queues.
A queue is a data structure that allows items to be added to the end via calls to put() and retrieved from the start by calls to get().
Python provides thread-safe queues via the queue module, such as the queue.Queue class.
Thread-safe means that multiple threads may get and put items on the queue concurrently without a race condition or corruption of the queue's internal data structure.
For example:
...
# create a new queue
queue = Queue()
Importantly, if the queue is empty, the call to get() by a consumer will block until a unit of work becomes available.
The size of the queue may be limited via the "maxsize" argument, which can help to limit the number of tasks kept in memory at any one time. As such, if the queue is full, then calls to put() by producer threads will block until space on the queue becomes available.
For example:
...
# create a new queue with a size limit
queue = Queue(maxsize=100)
Each task in the pipeline may have one queue used as the source of items of work to process and another queue as the destination of processed work items.
Tasks may then be connected together using the queues such that each task consumes items from the input queue connected to the previous task and produces items to the output queue that sends work to the next task.
Each task may loop forever consuming, processing and producing tasks.
For example:
# task in a pipeline
def task(queue_in, queue_out):
# loop forever
while True:
# consume an item
item = queue_in.get()
# process item
processed_item = ...
# produce an item
queue_out.put(processed_item)
Stop with Sentinel Values
The pipeline may be stopped once there are no further items to process.
This may be determined by the first step in the process that only produces work items. Once it has determined that no further work items may be produced, it may signal to subsequent steps in the pipeline that no more work items are to be expected.
This signal may be a special value put on the queue that connects the tasks. This is called a sentinel value.
Once consumed by the subsequent task, the task loop may be exited allowing the thread executing the function to terminate.
The value None may be used as a sentinel value. Once read, the task may re-add the value so that other task threads see the value, it may also pass on the value to the next task so that it too may shutdown.
For example:
...
# check for stop
if item is None:
# re-add the message for other threads on this task
queue_in.put(item)
# pass on message to the next step
queue_out.put(item)
break
Now that we know how to develop a pipeline using threads and queues, let's look at a worked example.
Example of a Thread Pipeline
We can explore a thread pipeline with a worked example.
In this example, we will have three tasks.
The first task will be responsible for generating a fixed number of random values between 0 and 1 via random.random() and block for this fraction of a second to simulate computational effort. It will create a list, add the task number and the generated number and pass it on to the next task.
The next task will read the item, generate another random value, block, and append it to the list, then pass the result on. Finally, the third task will retrieve values and report them. Each will be a list with an integer task number, and two randomly generated values.
- Task 1: Generate a random value, block, output list with task number and random value.
- Task 2: Generate a random value, block, output list with a new number appended.
- Task 3: Read lists and report their contents.
We will use one thread for each task.
Task 1 and Task 2 will be connected by one queue. Task 1 will put items on this queue. Task 2 will be connected to Task 1 with a queue and will consume values. Task 2 will also be connected to task 3 with a different queue on which it will put values. Finally, Task 3 will be connected to task 2 with a queue from which it will get values.
- Queue 1: Connects Task 1 and Task 2.
- Queue 2: Connects Task 2 and Task 3.
Using print() statements with many queues can get messy, because the print() statement is not thread-safe.
You can learn more about this in the tutorial:
The logging module is thread-safe and can be used to report messages from multiple threads without race conditions.
You can learn more about this in the tutorial:
As such, we will use the logging module and report all messages from each thread to the command prompt (standard out) using information messages.
Task 1 Function
The first task will generate a random value, block and combine the value with the task id in a list and put it on the output queue.
This task will not consume items from a previous task, it will only produce values. Therefore it only needs one queue as an argument to the task function which will be shared with Task 2.
# first task in the pipeline
def task1(queue_out):
logging.info(f'Started')
# ...
The task will loop for a fixed number of iterations, generating items and putting them on the output queue.
In this case, we will limit it to just ten items.
...
# add items
for i in range(10):
# ...
Each iteration, the task will generate a random value between 0 and 1 via random.random(), block for this many seconds via time.sleep(), add the value to a list and then put the list in the output queue.
...
# generate a value
value = random()
# block to simulate work
sleep(value)
# create item
item = [i, value]
# add to the queue
queue_out.put(item)
# report progress
logging.info(f'Generated {item}')
Once there are no further items to generate, the task will put a sentinel value to signal to the rest of the pipeline that no further work items are to be expected, e.g. to shutdown.
...
# add signal that we are done
queue_out.put(None)
logging.info(f'Done')
Tying this together, the task1() function implements this.
# first task in the pipeline
def task1(queue_out):
logging.info(f'Started')
# add items
for i in range(10):
# generate a value
value = random()
# block to simulate work
sleep(value)
# create item
item = [i, value]
# add to the queue
queue_out.put(item)
# report progress
logging.info(f'Generated {item}')
# add signal that we are done
queue_out.put(None)
logging.info(f'Done')
Task 2 Function
The second task will consume list items from Task 1, generate a new random value, block, then create a new list from the consumed list and add the generated value which is put on the output queue and sent to Task 3.
This means that task requires both an input queue connected to Task 1 and an output queue connected to Task 3 as arguments to the task function.
# second task in the pipeline
def task2(queue_in, queue_out):
logging.info(f'Started')
# ...
The task will loop forever.
# process values
while True:
# ...
Each iteration, a value from the input queue is consumed. If the input queue is empty, the call will block until an item is added.
...
# retrieve from queue
item = queue_in.get()
If the item is a sentinel value, the sentinel message is first passed on to Task 3 then the task loop is exited. This will cause the function to return and terminate the thread.
...
# check for stop
if item is None:
# pass on message
queue_out.put(item)
break
Otherwise, a list was received from the previous task.
This task will then generate a new random number between 0 and 1 using random.random(), block for a fraction of a second via time.sleep(), then create a new list from the consumed list and add on the new random value. This list is then put on the output queue for Task 3 to deal with.
...
# generate a value
value = random()
# block to simulate work
sleep(value)
# add to item
new_item = item +
# pass it on
queue_out.put(new_item)
# report progress
logging.info(f'Got {item} generated {new_item}')
Tying this together, the task2() function below implements this.
# second task in the pipeline
def task2(queue_in, queue_out):
logging.info(f'Started')
# process values
while True:
# retrieve from queue
item = queue_in.get()
# check for stop
if item is None:
# pass on message
queue_out.put(item)
break
# generate a value
value = random()
# block to simulate work
sleep(value)
# add to item
new_item = item +
# pass it on
queue_out.put(new_item)
# report progress
logging.info(f'Got {item} generated {new_item}')
logging.info(f'Done')
Task 3 Function
The third task will read items from Task 2 and report them. Not a lot to this task.
This means that the task will need an input queue that is connected from Task 2, and will not require an output queue as it does not produce any items.
...
# third task in the pipeline
def task3(queue_in):
logging.info(f'Started')
# ...
Like Task 2, Task 3 will loop forever until the task loop is explicitly exited.
# process values
while True:
# ...
Each iteration, an item is retrieved from the input queue. This call will block until an item becomes available.
...
# retrieve from queue
item = queue_in.get()
If the item is a sentinel value, the task loop is exited, causing the task function to return and terminate the new thread.
Because this is the final task in the pipeline, the sentinel value does not need to be passed on.
...
# check for stop
if item is None:
break
Otherwise, the consumed item is reported directly.
...
logging.info(f'Got {item}')
Tying this together, the task3() function below implements this.
# third task in the pipeline
def task3(queue_in):
logging.info(f'Started')
# process values
while True:
# retrieve from queue
item = queue_in.get()
# check for stop
if item is None:
break
logging.info(f'Got {item}')
logging.info(f'Done')
Create Queues and Threads
Finally, we can configure the logging infrastructure and create all of the queues and threads.
Firstly, we can create a new logging.StreamHandler configured to output all messages to standard out (also called the console, terminal or command prompt).
We will use a custom formatted for log messages that shows the name of the thread along with the message. Later will give meaningful names to our threads so that these log messages are readable.
...
# configure the log handler
handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter('[%(levelname)s] [%(threadName)s] %(message)s'))
We can then register our new log handler and configure the logging infrastructure to log all messages at INFO level or higher.
...
# add the log handler
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)
Next, we can create the queue that will connect Task 1 and Task 2.
...
# create queue between first two tasks
queue1_2 = Queue()
We can then create a thread for Task 1 and configure it to execute the task1() function and take the connecting queue as an argument. We will give this thread the name "Task1".
...
# create thread for first task
thread1 = Thread(target=task1, args=(queue1_2,), name='Task1')
thread1.start()
Next, we can create the queue that will connect Task 2 and Task 3.
...
# create queue between second and third tasks
queue2_3 = Queue()
We can then create the new thread for Task 2, configured to execute our task2() function and take the input and output queues as arguments. The thread is given the meaningful name of 'Task2'.
...
# create thread for second task
thread2 = Thread(target=task2, args=(queue1_2,queue2_3), name='Task2')
thread2.start()
We can then create the third new thread to execute our task3() function. It takes the second queue as an input queue and is given the meaningful name 'Task3'.
...
# create thread for third task
thread3 = Thread(target=task3, args=(queue2_3,), name='Task3')
thread3.start()
The main thread then blocks until all three task threads terminate.
...
# wait for all threads to finish
thread1.join()
thread2.join()
thread3.join()
Complete Example
Tying this together, the complete example of a multithreaded pipeline is listed below.
# SuperFastPython.com
# example of a pipeline with threads and queues
from random import random
from time import sleep
from threading import Thread
from queue import Queue
import logging
import sys
# first task in the pipeline
def task1(queue_out):
logging.info(f'Started')
# add items
for i in range(10):
# generate a value
value = random()
# block to simulate work
sleep(value)
# create item
item = [i, value]
# add to the queue
queue_out.put(item)
# report progress
logging.info(f'Generated {item}')
# add signal that we are done
queue_out.put(None)
logging.info(f'Done')
# second task in the pipeline
def task2(queue_in, queue_out):
logging.info(f'Started')
# process values
while True:
# retrieve from queue
item = queue_in.get()
# check for stop
if item is None:
# pass on message
queue_out.put(item)
break
# generate a value
value = random()
# block to simulate work
sleep(value)
# add to item
new_item = item +
# pass it on
queue_out.put(new_item)
# report progress
logging.info(f'Got {item} generated {new_item}')
logging.info(f'Done')
# third task in the pipeline
def task3(queue_in):
logging.info(f'Started')
# process values
while True:
# retrieve from queue
item = queue_in.get()
# check for stop
if item is None:
break
logging.info(f'Got {item}')
logging.info(f'Done')
# configure the log handler
handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter('[%(levelname)s] [%(threadName)s] %(message)s'))
# add the log handler
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)
# create queue between first two tasks
queue1_2 = Queue()
# create thread for first task
thread1 = Thread(target=task1, args=(queue1_2,), name='Task1')
thread1.start()
# create queue between second and third tasks
queue2_3 = Queue()
# create thread for second task
thread2 = Thread(target=task2, args=(queue1_2,queue2_3), name='Task2')
thread2.start()
# create thread for third task
thread3 = Thread(target=task3, args=(queue2_3,), name='Task3')
thread3.start()
# wait for all threads to finish
thread1.join()
thread2.join()
thread3.join()
Running the example first configures the logging infrastructure.
Next, the queues and new threads are created, configured and started. The main thread then blocks until the new threads terminate.
Task 1 generates random numbers and blocks for random intervals before adding items to its output queue. Tasks 2 and 3 block, waiting for items to appear on their input queues.
Once an item arrives for Task 2 via its input queue, it is consumed, a new random number is generated, the thread blocks for a random interval and then pushes an item out on its output queue.
Items arrive at the third task via its input queue and the items are reported.
Items flow through the pipeline.
Once Task 1 has generated all ten items, it exits its task loop and sends a sentinel value then its thread terminates. The sentinel signal is read by Task 2, which passes it on to Task 3 then exits its task loop and terminates.
Finally, Task 3 receives the sentinel signal, exits its task loop and its thread terminates.
The main thread notices that each task thread has terminated then exits, closing the Python process.
A sample output of the program is provided below. Note, your specific output will differ given the use of random numbers.
We can see that each message is stamped with the task thread that generated it. We can clearly see items pass through the pipeline, for example "[0, 0.5116626851136895]" generated in Task 1, updated to "[0, 0.5116626851136895, 0.5696002102034979]" in Task 2 then reported in Task 3.
This highlights how to construct a simple thread pipeline from scratch in Python using threads and queues.
[INFO] [Task1] Started
[INFO] [Task2] Started
[INFO] [Task3] Started
[INFO] [Task1] Generated [0, 0.5116626851136895]
[INFO] [Task1] Generated [1, 0.5523684174445173]
[INFO] [Task2] Got [0, 0.5116626851136895] generated [0, 0.5116626851136895, 0.5696002102034979]
[INFO] [Task3] Got [0, 0.5116626851136895, 0.5696002102034979]
[INFO] [Task2] Got [1, 0.5523684174445173] generated [1, 0.5523684174445173, 0.28842284262569606]
[INFO] [Task3] Got [1, 0.5523684174445173, 0.28842284262569606]
[INFO] [Task1] Generated [2, 0.41714627059159837]
[INFO] [Task2] Got [2, 0.41714627059159837] generated [2, 0.41714627059159837, 0.1564381452294611]
[INFO] [Task3] Got [2, 0.41714627059159837, 0.1564381452294611]
[INFO] [Task1] Generated [3, 0.9144089531830464]
[INFO] [Task2] Got [3, 0.9144089531830464] generated [3, 0.9144089531830464, 0.14746762562123983]
[INFO] [Task3] Got [3, 0.9144089531830464, 0.14746762562123983]
[INFO] [Task1] Generated [4, 0.8717635792924607]
[INFO] [Task2] Got [4, 0.8717635792924607] generated [4, 0.8717635792924607, 0.527362769034913]
[INFO] [Task3] Got [4, 0.8717635792924607, 0.527362769034913]
[INFO] [Task1] Generated [5, 0.8741510481937913]
[INFO] [Task2] Got [5, 0.8741510481937913] generated [5, 0.8741510481937913, 0.07298378345376944]
[INFO] [Task3] Got [5, 0.8741510481937913, 0.07298378345376944]
[INFO] [Task1] Generated [6, 0.586299430129957]
[INFO] [Task2] Got [6, 0.586299430129957] generated [6, 0.586299430129957, 0.13169401113038903]
[INFO] [Task3] Got [6, 0.586299430129957, 0.13169401113038903]
[INFO] [Task1] Generated [7, 0.2246208900956096]
[INFO] [Task1] Generated [8, 0.3993720593556219]
[INFO] [Task2] Got [7, 0.2246208900956096] generated [7, 0.2246208900956096, 0.7043715599303149]
[INFO] [Task3] Got [7, 0.2246208900956096, 0.7043715599303149]
[INFO] [Task1] Generated [9, 0.7687134926632286]
[INFO] [Task1] Done
[INFO] [Task2] Got [8, 0.3993720593556219] generated [8, 0.3993720593556219, 0.7352103423297296]
[INFO] [Task3] Got [8, 0.3993720593556219, 0.7352103423297296]
[INFO] [Task2] Got [9, 0.7687134926632286] generated [9, 0.7687134926632286, 0.7462437765726843]
[INFO] [Task2] Done
[INFO] [Task3] Got [9, 0.7687134926632286, 0.7462437765726843]
[INFO] [Task3] Done
Takeaways
You now know how to develop a threading pipeline in Python.
If you enjoyed this tutorial, you will love my book: Python Threading Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.