You can execute multi-step concurrent tasks using a pipeline of thread pools in Python.
In this tutorial, you will discover how to execute multi-step tasks using a ThreadPoolExecutors pipeline.
Let’s get started.
Need a Pipeline To Execute Multi-Step Tasks
The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.
Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.
A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.
For example:
1 2 3 4 5 6 7 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # issue a task future = tpe.submit(task) # the task result once the task is done result = future.result() |
You can learn more about the ThreadPoolExecutor in the tutorial:
When using the ThreadPoolExecutor, there are cases when we need to execute tasks in a sequence.
For example, each task may require processing in a series of steps and there may be many of these tasks to execute.
We can refer to these as multi-step tasks.
An example of problems of this type is the processing of files. Step one might involve loading data from a file, the second step might be to process the loaded data, and the third step might be to store the results back in a file or in a database.
- Step 1: Load data.
- Step 2: Process data.
- Step 3: Store data.
We may have tens, hundreds, or even thousands of these multi-step tasks to process.
Each step of the task may be a separate activity that we may wish to execute using the ThreadPoolExecutor.
How can we use the ThreadPoolExecutor to execute a pipeline of sequential tasks?
Run loops using all CPUs, download your FREE book to learn how.
How to Develop a ThreadPoolExecutor Pipeline for Multi-Step Tasks
We can use the ThreadPoolExecutor to execute a pipeline of task steps.
Firstly, a pipeline between the steps of each task can be created. We can achieve this by defining a function for each step in the pipeline, then connecting the steps of the pipeline using a queue.
This means that a given step would read the data required from an input queue and put any output from the task onto the output queue.
The output queue from one step would then be used as the input queue to the next step.
We can then use the ThreadPoolExecutor to execute the steps in the pipeline.
There are perhaps two main approaches we can use to structure the use of the ThreadPoolExecutor to execute tasks in the pipeline:
- Execute all steps of each task in one ThreadPoolExecutor.
- Execute each step of each task in a separate ThreadPoolExecutor.
A single ThreadPoolExecutor can be used to execute each step in the pipeline.
This is a simple design as the program only needs to manage a single thread pool. A downside of this approach is that there is no logical separation between the steps in the pipeline.
An alternative is to use one thread pool per step in the process. A benefit of this approach is that the workers of each step in the pipeline may be given different names, which can aid in logging and debugging the pipeline, as opposed to using a single thread pool for all tasks, where workers at each step are shared.
Using separate pools of workers at each step also allows a variable number of dedicated workers to be used at each step, and for the pool of threads to be switched out with a pool of processes, if needed.
Regardless of the approach taken, the definition of the tasks in the pipeline and the input and output queue structure does not need to change, offering some flexibility.
To run the pipeline, the data needed for each task is fed into the first input queue. The tasks of the first step consume this data, process, and push results into their output queue which is consumed by the next step in the process. Eventually, the results of each task are put into the final output queue which can be retrieved by the main program.
When there are no more input tasks, a sentinel value can be put into the first task’s input queue, signaling that there is no more work. Each task can read this value, shut down, and put the value into the input queue for other workers to read and into the output queue for workers at the next step to shutdown. This will safely shut down the entire pipeline.
Now that we know how to structure a pipeline of tasks using the ThreadPoolExecutor, let’s look at some worked examples.
Example of a ThreadPoolExecutor Pipeline
We can develop an example of using the ThreadPoolExecutor to execute a pipeline of tasks.
In this example, each task will have two steps, a separate step to initiate the task and a step to interpret the result of the task.
- Step 0: Generate an initial value for the task.
- Step 1: Read a data value, generate a data value, block, report the value, and pass on generated value to the next step.
- Step 2: Read a data value, generate a data value, block, report the value, and pass on generated value to the next step.
- Step 3: Read the final value from each task sequence and report it.
Each task function has access to an input and output queue. The main thread will push the initial value for each task into the first input queue. The output queue of the first task is used as the input queue for the second task, connecting the two. The main thread reads data from the output queue of the second task.
Steps 1 and 2 of each task will be executed within a single ThreadPoolExecutor, providing a simple design with a single thread pool to manage and shut down.
Firstly, we will define the first step in the sequence.
This step takes an input and output queue as arguments. The task loops forever. The first step reads data from the input queue.
If the read data is a sentinel value (None), it puts the sentinel back on the queue, puts it on the output queue, and exits the breaks the loop. If the value is not a sentinel value, it simulates work, generates a random value, blocks for a fraction of a second, reports a message, and puts the generated value on the output queue.
The task1() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# first task in the pipeline def task1(queue_in, queue_out): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> Task1 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> Task1 done') |
Next, we can define a function for the second task in the sequence.
For simplicity, the second task will be almost identical to the first task, with slightly different messages indicating “task2” instead of “task1“.
The task2() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# second task in the pipeline def task2(queue_in, queue_out): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> > Task2 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> > Task2 done') |
Next, the main thread will define the queues that will be used to connect the tasks.
A total of 3 queues are required.
- An input queue to connect the main thread to the first task.
- An output queue to connect the first task’s output to the second task’s input.
- Another output queue to connect the second task’s output to the main thread.
1 2 3 4 5 |
... # create the shared queues task1_queue_in = Queue() task1_queue_out = Queue() task2_queue_out = Queue() |
Next, the ThreadPoolExecutor is created with a capacity for 5 workers at each step of the pipeline, 10 workers in total.
We then issue 5 step 1 tasks and 5 step tasks to the thread pool using the submit() method, careful to provide the correct input and output queues for each task.
1 2 3 4 5 6 7 |
... # create the thread pool with ThreadPoolExecutor(10) as tpe: # issue task 1 workers _ = [tpe.submit(task1, task1_queue_in, task1_queue_out) for _ in range(5)] # issue task 2 workers _ = [tpe.submit(task2, task1_queue_out, task2_queue_out) for _ in range(5)] |
We could have the initial step 0 task executed by the thread pool and the final step 3 task executed by the thread pool if we wished, but in this case, we will execute them in the main thread.
Firstly, we can define the initial step 0 task that provides the input for the first step. In this case, we will generate 20 inputs to execute 20 multi-step tasks.
1 2 3 4 |
... # push work into task 1 for i in range(20): task1_queue_in.put(i) |
Next, we will put a sentinel value into the first input queue to signal to the workers that there will be no further work items and to shut down as soon as possible.
1 2 3 |
... # signal that there is no more work task1_queue_in.put(None) |
Finally, the main thread will block and wait for task results from the final queue in the pipeline and report them as needed. It will exit the main thread only once the sentinel is seen.
1 2 3 4 5 6 7 8 9 10 11 |
... # consume results while True: # retrieve data data = task2_queue_out.get() # check for the end of work if data is None: # stop processing break # report the result print(f'Main got {data}') |
Once ready to shut down, the main thread reports a final message.
1 2 3 |
... # report a message print('Main all done') |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# SuperFastPython.com # example of a threadpoolexecutor pipeline using queues from concurrent.futures import ThreadPoolExecutor from queue import Queue from random import random from time import sleep # first task in the pipeline def task1(queue_in, queue_out): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> Task1 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> Task1 done') # second task in the pipeline def task2(queue_in, queue_out): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> > Task2 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> > Task2 done') # protect the entry point if __name__ == '__main__': # create the shared queues task1_queue_in = Queue() task1_queue_out = Queue() task2_queue_out = Queue() # create the thread pool with ThreadPoolExecutor(10) as tpe: # issue task 1 workers _ = [tpe.submit(task1, task1_queue_in, task1_queue_out) for _ in range(5)] # issue task 2 workers _ = [tpe.submit(task2, task1_queue_out, task2_queue_out) for _ in range(5)] # push work into task 1 for i in range(20): task1_queue_in.put(i) # signal that there is no more work task1_queue_in.put(None) # consume results while True: # retrieve data data = task2_queue_out.get() # check for the end of work if data is None: # stop processing break # report the result print(f'Main got {data}') # report a message print('Main all done') |
Running the example first creates the shared queues.
Next, the thread pool is created with a capacity of 10 workers.
Five examples of task1 are issued to the thread pool, starting 5 worker threads. Five examples of task2 are issued to the pool, also starting 5 worker threads.
The main thread then pushes 20 integer values into the first input queue, then pushes a shutdown value onto the very end of the queue.
The main thread then loops forever, attempting to consume values from the last output queue.
Input values are consumed by the task1 workers. Each generates a random value, blocks, reports a message, and outs the generated value in the output queue.
Task2 workers consume values from their input queue sent from task1. They process them in the same general way as task1 workers, generating a random value, blocking, reporting a message, and putting their value on their own output queue.
The main thread consumes values from the final output queue and reports their values.
Eventually, all values are consumed by task1 from the first input queue and the sentinel value is read. Task1 tasks shutdown and share the None value with task2 tasks. They too shut down and share the result on the output queue. Eventually, the main thread sees the sentinel value and terminates with a final message.
This highlights how we can execute multi-step tasks in a single thread pool as a pipeline.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
> Task1 got 0, produced 0.06374619136923998 > Task1 got 5, produced 0.22280995185140817 > > Task2 got 0.06374619136923998, produced 0.42645408618034 Main got 0.42645408618034 > Task1 got 1, produced 0.5157028899332281 > Task1 got 4, produced 0.5797011315649749 > Task1 got 6, produced 0.31777744757423676 > > Task2 got 0.22280995185140817, produced 0.40323506535979026 Main got 0.40323506535979026 > > Task2 got 0.5797011315649749, produced 0.120355441229539 Main got 0.120355441229539 > Task1 got 3, produced 0.7404108956805835 > Task1 got 10, produced 0.03273152446602101 > > Task2 got 0.7404108956805835, produced 0.05523796156125449 Main got 0.05523796156125449 > > Task2 got 0.5157028899332281, produced 0.3712288895475049 Main got 0.3712288895475049 > Task1 got 11, produced 0.12300039009987385 > Task1 got 8, produced 0.3351433285865908 > Task1 got 2, produced 0.9817487550932361 > Task1 got 7, produced 0.5286607855669352 > Task1 got 15, produced 0.005510345480132073 > > Task2 got 0.03273152446602101, produced 0.2847713068693353 Main got 0.2847713068693353 > Task1 got 9, produced 0.5306014532294463 > Task1 got 17, produced 0.01546897513174983 > > Task2 got 0.31777744757423676, produced 0.6244883555127848 Main got 0.6244883555127848 > Task1 got 12, produced 0.42959573081012825 > Task1 got 16, produced 0.28678566185587373 > Task1 done > > Task2 got 0.3351433285865908, produced 0.4916113336570408 Main got 0.4916113336570408 > Task1 got 13, produced 0.5162535307172064 > Task1 done > Task1 got 18, produced 0.29613393709666846 > Task1 done > > Task2 got 0.9817487550932361, produced 0.5639002254831308 Main got 0.5639002254831308 > > Task2 got 0.005510345480132073, produced 0.32803121175495675 Main got 0.32803121175495675 > Task1 got 14, produced 0.7353253211413082 > Task1 done > > Task2 got 0.5286607855669352, produced 0.6663854537886 Main got 0.6663854537886 > Task1 got 19, produced 0.4420969832982634 > Task1 done > > Task2 got 0.12300039009987385, produced 0.8823808592344394 > > Task2 done Main got 0.8823808592344394 Main all done > > Task2 got 0.5306014532294463, produced 0.4640589560605952 > > Task2 got 0.5162535307172064, produced 0.021598748221471675 > > Task2 done > > Task2 got 0.01546897513174983, produced 0.4287481943664151 > > Task2 got 0.29613393709666846, produced 0.3094994345417468 > > Task2 done > > Task2 got 0.42959573081012825, produced 0.8063789219792559 > > Task2 got 0.28678566185587373, produced 0.6875850944011184 > > Task2 done > > Task2 got 0.7353253211413082, produced 0.7147631756629578 > > Task2 got 0.4420969832982634, produced 0.9049768392442796 > > Task2 done |
NOTE: There is a possible race condition in this example (I noticed it immediately after finishing the example).
It is possible for the sentinel value to propagate through the pipeline by fast-acting workers before all tasks have shared their results. In this demonstration case, it does not matter. If this is a concern, there are a few fixes we could adopt.
The first fix would be for the main thread to know how many results to expect, then once they have all been received, it can share the sentinel value and shut down the pipeline.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
... # push work into task 1 for i in range(20): task1_queue_in.put(i) # consume results for _ in range(20): # retrieve data data = task2_queue_out.get() # report the result print(f'Main got {data}') # signal that there is no more work task1_queue_in.put(None) # report a message print('Main all done') |
Another approach is to have all workers at a level share one barrier and to first wait on the barrier for all fellow workers to terminate before passing on the sentinel to the next step in the sequence via the output queue.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
... # create the barriers barrier1 = Barrier(5) barrier2 = Barrier(5) ... # first task in the pipeline def task1(queue_in, queue_out, barrier): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # wait on the barrier for all other workers barrier.wait() # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> Task1 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> Task1 done') |
You can learn more about how to use threading barriers in the tutorial:
This is a more general solution as it does not require that the main thread know how many final results to receive but does require each step to know how many fellow workers are running in order for the barrier to be correctly configured.
The complete example with this change is listed below, for reference.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# SuperFastPython.com # example of a threadpoolexecutor pipeline using queues and barriers from concurrent.futures import ThreadPoolExecutor from queue import Queue from random import random from time import sleep from threading import Barrier # first task in the pipeline def task1(queue_in, queue_out, barrier): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # wait on the barrier for all other workers barrier.wait() # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> Task1 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> Task1 done') # first task in the pipeline def task2(queue_in, queue_out, barrier): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # send signal on output queue queue_out.put(None) # stop processing break # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # wait on the barrier for all other workers barrier.wait() # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> > Task2 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> > Task2 done') # protect the entry point if __name__ == '__main__': # create the shared queues task1_queue_in = Queue() task1_queue_out = Queue() task2_queue_out = Queue() # create the barriers barrier1 = Barrier(5) barrier2 = Barrier(5) # create the thread pool with ThreadPoolExecutor(10) as tpe: # issue task 1 workers _ = [tpe.submit(task1, task1_queue_in, task1_queue_out, barrier1) for _ in range(5)] # issue task 2 workers _ = [tpe.submit(task2, task1_queue_out, task2_queue_out, barrier2) for _ in range(5)] # push work into task 1 for i in range(20): task1_queue_in.put(i) # signal that there is no more work task1_queue_in.put(None) # consume results while True: # retrieve data data = task2_queue_out.get() # check for the end of work if data is None: # stop processing break # report the result print(f'Main got {data}') # report a message print('Main all done') |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of a ThreadPoolExecutor Pipeline With Separate Thread Pools
We can update the example so that each step is executed using a separate ThreadPoolExecutor.
This can be achieved by updating the above example.
Firstly, we can create two ThreadPoolExecutor instances directly, one for each step, instead of using the context manager interface.
1 2 3 4 5 6 7 8 9 |
... # create the task 1 thread pool tpe1 = ThreadPoolExecutor(5) # issue task 1 workers _ = [tpe1.submit(task1, task1_queue_in, task1_queue_out) for _ in range(5)] # create the task 2 thread pool tpe2 = ThreadPoolExecutor(5) # issue task 2 workers _ = [tpe2.submit(task2, task1_queue_out, task2_queue_out) for _ in range(5)] |
To avoid the race condition seen in the previous section, this example explicitly consumes a fixed number of expected results before sending the shutdown signal into the pipeline.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
... # consume results for _ in range(20): # retrieve data data = task2_queue_out.get() # check for the end of work if data is None: # stop processing break # report the result print(f'Main got {data}') # signal that there is no more work task1_queue_in.put(None) |
At the end of the program, we can manually shut down each thread pool.
1 2 3 4 |
... # shutdown thread pools tpe1.shutdown() tpe2.shutdown() |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# SuperFastPython.com # example of a threadpoolexecutor pipeline using queues and separate thread pools from concurrent.futures import ThreadPoolExecutor from queue import Queue from random import random from time import sleep # first task in the pipeline def task1(queue_in, queue_out): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> Task1 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> Task1 done') # first task in the pipeline def task2(queue_in, queue_out): # loop forever while True: # read item from queue data = queue_in.get() # check for shutdown if data is None: # put signal back on queue queue_in.put(None) # send signal on output queue queue_out.put(None) # stop processing break # generate a random value between 0 and 1 value = random() # block to simulate work sleep(value) # report message print(f'> > Task2 got {data}, produced {value}') # push result onto queue queue_out.put(value) # report message print('> > Task2 done') # protect the entry point if __name__ == '__main__': # create the shared queues task1_queue_in = Queue() task1_queue_out = Queue() task2_queue_out = Queue() # create the task 1 thread pool tpe1 = ThreadPoolExecutor(5) # issue task 1 workers _ = [tpe1.submit(task1, task1_queue_in, task1_queue_out) for _ in range(5)] # create the task 2 thread pool tpe2 = ThreadPoolExecutor(5) # issue task 2 workers _ = [tpe2.submit(task2, task1_queue_out, task2_queue_out) for _ in range(5)] # push work into task 1 for i in range(20): task1_queue_in.put(i) # consume results for _ in range(20): # retrieve data data = task2_queue_out.get() # check for the end of work if data is None: # stop processing break # report the result print(f'Main got {data}') # signal that there is no more work task1_queue_in.put(None) # report a message print('Main all done') # shutdown thread pools tpe1.shutdown() tpe2.shutdown() |
Running the example first creates the shared queues.
Next, the thread pool for task1 is created with a capacity for 5 workers, and 5 tasks are issued to the pool.
Then the thread pool for task2 is created with a capacity for 5 workers, and all 5 tasks are issued to the pool.
The main thread then pushes 20 integer values into the first input queue.
The main thread then loops forever, attempting to consume values from the last output queue.
Input values are consumed by the task1 workers in the first thread pool. Each generates a random value, blocks, reports a message, and outs the generated value in the output queue.
Task2 workers in the second thread pool consume values from their input queue sent from task1. They process them in the same general way as task1 workers, generating a random value, blocking, reporting a message, and putting their value on their own output queue.
The main thread consumes values from the final output queue and reports their values. It then pushes a shutdown value onto the first input queue.
Eventually, all values are consumed by task1 from the first input queue and the sentinel value is read. Task1 tasks shutdown and share the None value with task2 tasks. They too shut down and share the result on the output queue. Eventually, the main thread sees the sentinel value and terminates with a final message.
This highlights how we can execute multi-step tasks in a pipeline with a separate thread pool for each step.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
> Task1 got 0, produced 0.36928729857298637 > Task1 got 4, produced 0.5391661944325759 > Task1 got 1, produced 0.5735422887671429 > > Task2 got 0.36928729857298637, produced 0.20843724288457122 Main got 0.20843724288457122 > > Task2 got 0.5391661944325759, produced 0.19758767163104396 Main got 0.19758767163104396 > Task1 got 2, produced 0.7848429380744457 > Task1 got 3, produced 0.8529521848021492 > Task1 got 5, produced 0.5111241599463854 > Task1 got 6, produced 0.4908872328758084 > Task1 got 10, produced 0.40307794959535304 > Task1 got 8, produced 0.6019551159981671 > Task1 got 7, produced 0.8241816834956085 > Task1 got 13, produced 0.09050412771639793 > > Task2 got 0.5735422887671429, produced 0.9459468872801333 Main got 0.9459468872801333 > > Task2 got 0.4908872328758084, produced 0.5162143391804884 Main got 0.5162143391804884 > Task1 got 11, produced 0.5820216834621625 > > Task2 got 0.6019551159981671, produced 0.11555441277605871 Main got 0.11555441277605871 > > Task2 got 0.5111241599463854, produced 0.8560806469629169 Main got 0.8560806469629169 > > Task2 got 0.8529521848021492, produced 0.9176874983371848 Main got 0.9176874983371848 > > Task2 got 0.7848429380744457, produced 0.9905936792940581 Main got 0.9905936792940581 > Task1 got 9, produced 0.9661051893683117 > Task1 got 15, produced 0.36828658286530924 > > Task2 got 0.9661051893683117, produced 0.12066545947023144 Main got 0.12066545947023144 > Task1 got 14, produced 0.6955094414264186 > > Task2 got 0.40307794959535304, produced 0.5857288345594803 Main got 0.5857288345594803 > > Task2 got 0.8241816834956085, produced 0.45749026507622026 Main got 0.45749026507622026 > Task1 got 12, produced 0.8854878444702812 > Task1 got 18, produced 0.36511931630679306 > Task1 got 16, produced 0.6616228450362268 > Task1 got 17, produced 0.6157262902267976 > > Task2 got 0.5820216834621625, produced 0.7264203708677363 Main got 0.7264203708677363 > Task1 got 19, produced 0.4985094535776535 > > Task2 got 0.36828658286530924, produced 0.6713814453765958 Main got 0.6713814453765958 > > Task2 got 0.6955094414264186, produced 0.5327026032134283 Main got 0.5327026032134283 > > Task2 got 0.09050412771639793, produced 0.9227140928329207 Main got 0.9227140928329207 > > Task2 got 0.6616228450362268, produced 0.2976807505368585 Main got 0.2976807505368585 > > Task2 got 0.8854878444702812, produced 0.7780435647414249 Main got 0.7780435647414249 > > Task2 got 0.6157262902267976, produced 0.3836770581178164 Main got 0.3836770581178164 > > Task2 got 0.4985094535776535, produced 0.5383580460138072 Main got 0.5383580460138072 > > Task2 got 0.36511931630679306, produced 0.7122436948383247 Main got 0.7122436948383247 Main all done > Task1 done > Task1 done > > Task2 done > Task1 done > > Task2 done > Task1 done > > Task2 done > Task1 done > > Task2 done > > Task2 done |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to execute multi-step tasks using a ThreadPoolExecutor pipeline.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by imagine riders on Unsplash
Do you have any questions?