Last Updated on September 12, 2022
You can develop a pipeline using threading.Thread and queue.Queue classes.
In this tutorial you will discover how to develop a multithreaded pipeline in Python.
Let’s get started.
What is a Thread Pipeline
A pipeline is a linear series of tasks that build on each other.
Each task or step in the pipeline executes concurrently, reading and processing units of work from the previous step and writing the result to the next step.
We might consider a series of producer-consumer tasks, each executed by one or more threads and connected by queues.
Recall that the Producer-Consumer pattern is a common concurrency programming design pattern. It involves producer tasks, consumer tasks and a shared buffer or queue that connects these two types of tasks.
In a pipeline, each task may be a producer and consumer.
That is, each step in the pipeline may consume an item from the previous step in the pipeline, perform some processing, then produce an item for the next step in the pipeline.
- Pipeline Task: Consume items from the previous step and produce an item for the next step.
The exception to this is the first and last step in the pipeline. The first step of the pipeline may only produce work items without consuming anything. Similarly the last step of the pipeline may consume and process work items without producing anything.
Like the producer-consumer pattern, each step in the pipeline may be executed by one or more threads concurrently. This allows the steps of the pipeline to be scaled independently, perhaps based on the speed of processing of each step.
Now that we are familiar with a thread pipeline, let’s consider why we might to use a thread pipeline.
Run loops using all CPUs, download your FREE book to learn how.
Why Use a Thread Pipeline
A pipeline of tasks executed by threads can be a useful pattern in concurrent programming.
For example, we might imagine a series of steps to search a series of webpages for a keyword.
The first step may read a list of URLs from a file and produce each URL as a task. The next step may be responsible for downloading the URL into memory. The next step may be responsible for saving the downloaded content to file. The next step may be responsible for parsing the HTML from the downloaded file and the final step may be responsible for searching the text content for a keyword.
For example:
- Step 1: Load a file of URLs and produce each URL as a task.
- Step 2: Download a URL from the internet into memory.
- Step 3: Save content from memory to file.
- Step 4: Read a file and parse the HTML.
- Step 5: Search text content of document for keyword.
- Step 6: Collate and report results.
The decomposition of the application into a pipeline offers a number of benefits, for example:
- Clear definition of subtasks with specific inputs and outputs, e.g. better design
- Concurrency execution of subtasks, e.g. faster overall execution time.
- Configurable concurrency of subtasks, e.g. some tasks will benefit more than others.
- Responsive application, e.g. results may start appearing sooner than processing each step in a batch.
The pipeline pattern allows each aspect of the program to be well defined with an interface for input and output.
Some tasks may be slower than others. For example, downloading URLs and saving documents to file are slow tasks, involving blocking calls using socket and file communication. As such this step could benefit from more concurrency with threads. Parsing HTML documents and searching text is a CPU bound task and may benefit from less concurrency from threads.
The precise amount of concurrency to use at each step for maximum application performance is hard to know beforehand. As such, some testing of different configurations would be required.
Next, let’s look at how we might implement a thread pipeline in Python.
How to Develop a Thread Pipeline
A thread pipeline may be implemented using threads and queues in Python.
Task Threads
Each step of the pipeline may be defined using a custom function that is executed by one or more new threads via the threading.Thread class.
A new threading.Thread class may be created and configured to execute a step in the pipeline by specifying the name of the function via the “target” argument and any arguments to the function via the “args” keyword.
For example:
1 2 3 |
... # create a new thread to execute a task thread = threading.Thread(target=task1, args=(queue_in, queue_out)) |
You can learn more about executing functions in a new thread in the tutorial:
Multiple threads can be created and configured to execute the same task, such as in a list comprehension.
For example:
1 2 3 |
... # create many threads for one task threads = [threading.Thread(target=task1, args=(queue_in, queue_out)) for i in range(10)] |
Once created and configured, the new thread can be started by calling the start() function which will begin executing the new thread and return immediately to the current thread.
For example:
1 2 3 |
... # start the new thread thread.start() |
The calling thread may then wait for the new thread to finish by calling the join() function.
For example:
1 2 3 |
... # wait for the new thread to finish thread.join() |
You can learn more about waiting for threads to terminate in the tutorial:
Connect Tasks With Queues
Threads may share data or items of work using queues.
A queue is a data structure that allows items to be added to the end via calls to put() and retrieved from the start by calls to get().
Python provides thread-safe queues via the queue module, such as the queue.Queue class.
Thread-safe means that multiple threads may get and put items on the queue concurrently without a race condition or corruption of the queue’s internal data structure.
For example:
1 2 3 |
... # create a new queue queue = Queue() |
Importantly, if the queue is empty, the call to get() by a consumer will block until a unit of work becomes available.
The size of the queue may be limited via the “maxsize” argument, which can help to limit the number of tasks kept in memory at any one time. As such, if the queue is full, then calls to put() by producer threads will block until space on the queue becomes available.
For example:
1 2 3 |
... # create a new queue with a size limit queue = Queue(maxsize=100) |
Each task in the pipeline may have one queue used as the source of items of work to process and another queue as the destination of processed work items.
Tasks may then be connected together using the queues such that each task consumes items from the input queue connected to the previous task and produces items to the output queue that sends work to the next task.
Each task may loop forever consuming, processing and producing tasks.
For example:
1 2 3 4 5 6 7 8 9 10 |
# task in a pipeline def task(queue_in, queue_out): # loop forever while True: # consume an item item = queue_in.get() # process item processed_item = ... # produce an item queue_out.put(processed_item) |
Stop with Sentinel Values
The pipeline may be stopped once there are no further items to process.
This may be determined by the first step in the process that only produces work items. Once it has determined that no further work items may be produced, it may signal to subsequent steps in the pipeline that no more work items are to be expected.
This signal may be a special value put on the queue that connects the tasks. This is called a sentinel value.
Once consumed by the subsequent task, the task loop may be exited allowing the thread executing the function to terminate.
The value None may be used as a sentinel value. Once read, the task may re-add the value so that other task threads see the value, it may also pass on the value to the next task so that it too may shutdown.
For example:
1 2 3 4 5 6 7 8 |
... # check for stop if item is None: # re-add the message for other threads on this task queue_in.put(item) # pass on message to the next step queue_out.put(item) break |
Now that we know how to develop a pipeline using threads and queues, let’s look at a worked example.
Free Python Threading Course
Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.
Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores
Example of a Thread Pipeline
We can explore a thread pipeline with a worked example.
In this example, we will have three tasks.
The first task will be responsible for generating a fixed number of random values between 0 and 1 via random.random() and block for this fraction of a second to simulate computational effort. It will create a list, add the task number and the generated number and pass it on to the next task.
The next task will read the item, generate another random value, block, and append it to the list, then pass the result on. Finally, the third task will retrieve values and report them. Each will be a list with an integer task number, and two randomly generated values.
- Task 1: Generate a random value, block, output list with task number and random value.
- Task 2: Generate a random value, block, output list with a new number appended.
- Task 3: Read lists and report their contents.
We will use one thread for each task.
Task 1 and Task 2 will be connected by one queue. Task 1 will put items on this queue. Task 2 will be connected to Task 1 with a queue and will consume values. Task 2 will also be connected to task 3 with a different queue on which it will put values. Finally, Task 3 will be connected to task 2 with a queue from which it will get values.
- Queue 1: Connects Task 1 and Task 2.
- Queue 2: Connects Task 2 and Task 3.
Using print() statements with many queues can get messy, because the print() statement is not thread-safe.
You can learn more about this in the tutorial:
The logging module is thread-safe and can be used to report messages from multiple threads without race conditions.
You can learn more about this in the tutorial:
As such, we will use the logging module and report all messages from each thread to the command prompt (standard out) using information messages.
Task 1 Function
The first task will generate a random value, block and combine the value with the task id in a list and put it on the output queue.
This task will not consume items from a previous task, it will only produce values. Therefore it only needs one queue as an argument to the task function which will be shared with Task 2.
1 2 3 4 |
# first task in the pipeline def task1(queue_out): logging.info(f'Started') # ... |
The task will loop for a fixed number of iterations, generating items and putting them on the output queue.
In this case, we will limit it to just ten items.
1 2 3 4 |
... # add items for i in range(10): # ... |
Each iteration, the task will generate a random value between 0 and 1 via random.random(), block for this many seconds via time.sleep(), add the value to a list and then put the list in the output queue.
1 2 3 4 5 6 7 8 9 10 11 |
... # generate a value value = random() # block to simulate work sleep(value) # create item item = [i, value] # add to the queue queue_out.put(item) # report progress logging.info(f'Generated {item}') |
Once there are no further items to generate, the task will put a sentinel value to signal to the rest of the pipeline that no further work items are to be expected, e.g. to shutdown.
1 2 3 4 |
... # add signal that we are done queue_out.put(None) logging.info(f'Done') |
Tying this together, the task1() function implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# first task in the pipeline def task1(queue_out): logging.info(f'Started') # add items for i in range(10): # generate a value value = random() # block to simulate work sleep(value) # create item item = [i, value] # add to the queue queue_out.put(item) # report progress logging.info(f'Generated {item}') # add signal that we are done queue_out.put(None) logging.info(f'Done') |
Task 2 Function
The second task will consume list items from Task 1, generate a new random value, block, then create a new list from the consumed list and add the generated value which is put on the output queue and sent to Task 3.
This means that task requires both an input queue connected to Task 1 and an output queue connected to Task 3 as arguments to the task function.
1 2 3 4 |
# second task in the pipeline def task2(queue_in, queue_out): logging.info(f'Started') # ... |
The task will loop forever.
1 2 3 |
# process values while True: # ... |
Each iteration, a value from the input queue is consumed. If the input queue is empty, the call will block until an item is added.
1 2 3 |
... # retrieve from queue item = queue_in.get() |
If the item is a sentinel value, the sentinel message is first passed on to Task 3 then the task loop is exited. This will cause the function to return and terminate the thread.
1 2 3 4 5 6 |
... # check for stop if item is None: # pass on message queue_out.put(item) break |
Otherwise, a list was received from the previous task.
This task will then generate a new random number between 0 and 1 using random.random(), block for a fraction of a second via time.sleep(), then create a new list from the consumed list and add on the new random value. This list is then put on the output queue for Task 3 to deal with.
1 2 3 4 5 6 7 8 9 10 11 |
... # generate a value value = random() # block to simulate work sleep(value) # add to item new_item = item + [value] # pass it on queue_out.put(new_item) # report progress logging.info(f'Got {item} generated {new_item}') |
Tying this together, the task2() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# second task in the pipeline def task2(queue_in, queue_out): logging.info(f'Started') # process values while True: # retrieve from queue item = queue_in.get() # check for stop if item is None: # pass on message queue_out.put(item) break # generate a value value = random() # block to simulate work sleep(value) # add to item new_item = item + [value] # pass it on queue_out.put(new_item) # report progress logging.info(f'Got {item} generated {new_item}') logging.info(f'Done') |
Task 3 Function
The third task will read items from Task 2 and report them. Not a lot to this task.
This means that the task will need an input queue that is connected from Task 2, and will not require an output queue as it does not produce any items.
1 2 3 4 5 |
... # third task in the pipeline def task3(queue_in): logging.info(f'Started') # ... |
Like Task 2, Task 3 will loop forever until the task loop is explicitly exited.
1 2 3 |
# process values while True: # ... |
Each iteration, an item is retrieved from the input queue. This call will block until an item becomes available.
1 2 3 |
... # retrieve from queue item = queue_in.get() |
If the item is a sentinel value, the task loop is exited, causing the task function to return and terminate the new thread.
Because this is the final task in the pipeline, the sentinel value does not need to be passed on.
1 2 3 4 |
... # check for stop if item is None: break |
Otherwise, the consumed item is reported directly.
1 2 |
... logging.info(f'Got {item}') |
Tying this together, the task3() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 |
# third task in the pipeline def task3(queue_in): logging.info(f'Started') # process values while True: # retrieve from queue item = queue_in.get() # check for stop if item is None: break logging.info(f'Got {item}') logging.info(f'Done') |
Create Queues and Threads
Finally, we can configure the logging infrastructure and create all of the queues and threads.
Firstly, we can create a new logging.StreamHandler configured to output all messages to standard out (also called the console, terminal or command prompt).
We will use a custom formatted for log messages that shows the name of the thread along with the message. Later will give meaningful names to our threads so that these log messages are readable.
1 2 3 4 5 |
... # configure the log handler handler = logging.StreamHandler(stream=sys.stdout) handler.setLevel(logging.INFO) handler.setFormatter(logging.Formatter('[%(levelname)s] [%(threadName)s] %(message)s')) |
We can then register our new log handler and configure the logging infrastructure to log all messages at INFO level or higher.
1 2 3 4 5 |
... # add the log handler logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(handler) |
Next, we can create the queue that will connect Task 1 and Task 2.
1 2 3 |
... # create queue between first two tasks queue1_2 = Queue() |
We can then create a thread for Task 1 and configure it to execute the task1() function and take the connecting queue as an argument. We will give this thread the name “Task1“.
1 2 3 4 |
... # create thread for first task thread1 = Thread(target=task1, args=(queue1_2,), name='Task1') thread1.start() |
Next, we can create the queue that will connect Task 2 and Task 3.
1 2 3 |
... # create queue between second and third tasks queue2_3 = Queue() |
We can then create the new thread for Task 2, configured to execute our task2() function and take the input and output queues as arguments. The thread is given the meaningful name of ‘Task2‘.
1 2 3 4 |
... # create thread for second task thread2 = Thread(target=task2, args=(queue1_2,queue2_3), name='Task2') thread2.start() |
We can then create the third new thread to execute our task3() function. It takes the second queue as an input queue and is given the meaningful name ‘Task3‘.
1 2 3 4 |
... # create thread for third task thread3 = Thread(target=task3, args=(queue2_3,), name='Task3') thread3.start() |
The main thread then blocks until all three task threads terminate.
1 2 3 4 5 |
... # wait for all threads to finish thread1.join() thread2.join() thread3.join() |
Complete Example
Tying this together, the complete example of a multithreaded pipeline is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# SuperFastPython.com # example of a pipeline with threads and queues from random import random from time import sleep from threading import Thread from queue import Queue import logging import sys # first task in the pipeline def task1(queue_out): logging.info(f'Started') # add items for i in range(10): # generate a value value = random() # block to simulate work sleep(value) # create item item = [i, value] # add to the queue queue_out.put(item) # report progress logging.info(f'Generated {item}') # add signal that we are done queue_out.put(None) logging.info(f'Done') # second task in the pipeline def task2(queue_in, queue_out): logging.info(f'Started') # process values while True: # retrieve from queue item = queue_in.get() # check for stop if item is None: # pass on message queue_out.put(item) break # generate a value value = random() # block to simulate work sleep(value) # add to item new_item = item + [value] # pass it on queue_out.put(new_item) # report progress logging.info(f'Got {item} generated {new_item}') logging.info(f'Done') # third task in the pipeline def task3(queue_in): logging.info(f'Started') # process values while True: # retrieve from queue item = queue_in.get() # check for stop if item is None: break logging.info(f'Got {item}') logging.info(f'Done') # configure the log handler handler = logging.StreamHandler(stream=sys.stdout) handler.setLevel(logging.INFO) handler.setFormatter(logging.Formatter('[%(levelname)s] [%(threadName)s] %(message)s')) # add the log handler logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(handler) # create queue between first two tasks queue1_2 = Queue() # create thread for first task thread1 = Thread(target=task1, args=(queue1_2,), name='Task1') thread1.start() # create queue between second and third tasks queue2_3 = Queue() # create thread for second task thread2 = Thread(target=task2, args=(queue1_2,queue2_3), name='Task2') thread2.start() # create thread for third task thread3 = Thread(target=task3, args=(queue2_3,), name='Task3') thread3.start() # wait for all threads to finish thread1.join() thread2.join() thread3.join() |
Running the example first configures the logging infrastructure.
Next, the queues and new threads are created, configured and started. The main thread then blocks until the new threads terminate.
Task 1 generates random numbers and blocks for random intervals before adding items to its output queue. Tasks 2 and 3 block, waiting for items to appear on their input queues.
Once an item arrives for Task 2 via its input queue, it is consumed, a new random number is generated, the thread blocks for a random interval and then pushes an item out on its output queue.
Items arrive at the third task via its input queue and the items are reported.
Items flow through the pipeline.
Once Task 1 has generated all ten items, it exits its task loop and sends a sentinel value then its thread terminates. The sentinel signal is read by Task 2, which passes it on to Task 3 then exits its task loop and terminates.
Finally, Task 3 receives the sentinel signal, exits its task loop and its thread terminates.
The main thread notices that each task thread has terminated then exits, closing the Python process.
A sample output of the program is provided below. Note, your specific output will differ given the use of random numbers.
We can see that each message is stamped with the task thread that generated it. We can clearly see items pass through the pipeline, for example “[0, 0.5116626851136895]” generated in Task 1, updated to “[0, 0.5116626851136895, 0.5696002102034979]” in Task 2 then reported in Task 3.
This highlights how to construct a simple thread pipeline from scratch in Python using threads and queues.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
[INFO] [Task1] Started [INFO] [Task2] Started [INFO] [Task3] Started [INFO] [Task1] Generated [0, 0.5116626851136895] [INFO] [Task1] Generated [1, 0.5523684174445173] [INFO] [Task2] Got [0, 0.5116626851136895] generated [0, 0.5116626851136895, 0.5696002102034979] [INFO] [Task3] Got [0, 0.5116626851136895, 0.5696002102034979] [INFO] [Task2] Got [1, 0.5523684174445173] generated [1, 0.5523684174445173, 0.28842284262569606] [INFO] [Task3] Got [1, 0.5523684174445173, 0.28842284262569606] [INFO] [Task1] Generated [2, 0.41714627059159837] [INFO] [Task2] Got [2, 0.41714627059159837] generated [2, 0.41714627059159837, 0.1564381452294611] [INFO] [Task3] Got [2, 0.41714627059159837, 0.1564381452294611] [INFO] [Task1] Generated [3, 0.9144089531830464] [INFO] [Task2] Got [3, 0.9144089531830464] generated [3, 0.9144089531830464, 0.14746762562123983] [INFO] [Task3] Got [3, 0.9144089531830464, 0.14746762562123983] [INFO] [Task1] Generated [4, 0.8717635792924607] [INFO] [Task2] Got [4, 0.8717635792924607] generated [4, 0.8717635792924607, 0.527362769034913] [INFO] [Task3] Got [4, 0.8717635792924607, 0.527362769034913] [INFO] [Task1] Generated [5, 0.8741510481937913] [INFO] [Task2] Got [5, 0.8741510481937913] generated [5, 0.8741510481937913, 0.07298378345376944] [INFO] [Task3] Got [5, 0.8741510481937913, 0.07298378345376944] [INFO] [Task1] Generated [6, 0.586299430129957] [INFO] [Task2] Got [6, 0.586299430129957] generated [6, 0.586299430129957, 0.13169401113038903] [INFO] [Task3] Got [6, 0.586299430129957, 0.13169401113038903] [INFO] [Task1] Generated [7, 0.2246208900956096] [INFO] [Task1] Generated [8, 0.3993720593556219] [INFO] [Task2] Got [7, 0.2246208900956096] generated [7, 0.2246208900956096, 0.7043715599303149] [INFO] [Task3] Got [7, 0.2246208900956096, 0.7043715599303149] [INFO] [Task1] Generated [9, 0.7687134926632286] [INFO] [Task1] Done [INFO] [Task2] Got [8, 0.3993720593556219] generated [8, 0.3993720593556219, 0.7352103423297296] [INFO] [Task3] Got [8, 0.3993720593556219, 0.7352103423297296] [INFO] [Task2] Got [9, 0.7687134926632286] generated [9, 0.7687134926632286, 0.7462437765726843] [INFO] [Task2] Done [INFO] [Task3] Got [9, 0.7687134926632286, 0.7462437765726843] [INFO] [Task3] Done |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Python Threading Books
- Python Threading Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- Threading Module API Cheat Sheet
I also recommend specific chapters in the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Threading: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to develop a threading pipeline in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?