ThreadPoolExecutor Pipeline For Multi-Step Tasks

August 13, 2023 Python ThreadPoolExecutor

You can execute multi-step concurrent tasks using a pipeline of thread pools in Python.

In this tutorial, you will discover how to execute multi-step tasks using a ThreadPoolExecutors pipeline.

Let's get started.

Need a Pipeline To Execute Multi-Step Tasks

The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.

Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.

A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.

For example:

...
# create a thread pool
with ThreadPoolExecutor() as tpe:
	# issue a task
	future = tpe.submit(task)
	# the task result once the task is done
	result = future.result()

You can learn more about the ThreadPoolExecutor in the tutorial:

When using the ThreadPoolExecutor, there are cases when we need to execute tasks in a sequence.

For example, each task may require processing in a series of steps and there may be many of these tasks to execute.

We can refer to these as multi-step tasks.

An example of problems of this type is the processing of files. Step one might involve loading data from a file, the second step might be to process the loaded data, and the third step might be to store the results back in a file or in a database.

We may have tens, hundreds, or even thousands of these multi-step tasks to process.

Each step of the task may be a separate activity that we may wish to execute using the ThreadPoolExecutor.

How can we use the ThreadPoolExecutor to execute a pipeline of sequential tasks?

How to Develop a ThreadPoolExecutor Pipeline for Multi-Step Tasks

We can use the ThreadPoolExecutor to execute a pipeline of task steps.

Firstly, a pipeline between the steps of each task can be created. We can achieve this by defining a function for each step in the pipeline, then connecting the steps of the pipeline using a queue.

This means that a given step would read the data required from an input queue and put any output from the task onto the output queue.

The output queue from one step would then be used as the input queue to the next step.

We can then use the ThreadPoolExecutor to execute the steps in the pipeline.

There are perhaps two main approaches we can use to structure the use of the ThreadPoolExecutor to execute tasks in the pipeline:

  1. Execute all steps of each task in one ThreadPoolExecutor.
  2. Execute each step of each task in a separate ThreadPoolExecutor.

A single ThreadPoolExecutor can be used to execute each step in the pipeline.

This is a simple design as the program only needs to manage a single thread pool. A downside of this approach is that there is no logical separation between the steps in the pipeline.

An alternative is to use one thread pool per step in the process. A benefit of this approach is that the workers of each step in the pipeline may be given different names, which can aid in logging and debugging the pipeline, as opposed to using a single thread pool for all tasks, where workers at each step are shared.

Using separate pools of workers at each step also allows a variable number of dedicated workers to be used at each step, and for the pool of threads to be switched out with a pool of processes, if needed.

Regardless of the approach taken, the definition of the tasks in the pipeline and the input and output queue structure does not need to change, offering some flexibility.

To run the pipeline, the data needed for each task is fed into the first input queue. The tasks of the first step consume this data, process, and push results into their output queue which is consumed by the next step in the process. Eventually, the results of each task are put into the final output queue which can be retrieved by the main program.

When there are no more input tasks, a sentinel value can be put into the first task's input queue, signaling that there is no more work. Each task can read this value, shut down, and put the value into the input queue for other workers to read and into the output queue for workers at the next step to shutdown. This will safely shut down the entire pipeline.

Now that we know how to structure a pipeline of tasks using the ThreadPoolExecutor, let's look at some worked examples.

Example of a ThreadPoolExecutor Pipeline

We can develop an example of using the ThreadPoolExecutor to execute a pipeline of tasks.

In this example, each task will have two steps, a separate step to initiate the task and a step to interpret the result of the task.

Each task function has access to an input and output queue. The main thread will push the initial value for each task into the first input queue. The output queue of the first task is used as the input queue for the second task, connecting the two. The main thread reads data from the output queue of the second task.

Steps 1 and 2 of each task will be executed within a single ThreadPoolExecutor, providing a simple design with a single thread pool to manage and shut down.

Firstly, we will define the first step in the sequence.

This step takes an input and output queue as arguments. The task loops forever. The first step reads data from the input queue.

If the read data is a sentinel value (None), it puts the sentinel back on the queue, puts it on the output queue, and exits the breaks the loop. If the value is not a sentinel value, it simulates work, generates a random value, blocks for a fraction of a second, reports a message, and puts the generated value on the output queue.

The task1() function below implements this.

# first task in the pipeline
def task1(queue_in, queue_out):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> Task1 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> Task1 done')

Next, we can define a function for the second task in the sequence.

For simplicity, the second task will be almost identical to the first task, with slightly different messages indicating "task2" instead of "task1".

The task2() function below implements this.

# second task in the pipeline
def task2(queue_in, queue_out):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> > Task2 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> > Task2 done')

Next, the main thread will define the queues that will be used to connect the tasks.

A total of 3 queues are required.

...
# create the shared queues
task1_queue_in = Queue()
task1_queue_out = Queue()
task2_queue_out = Queue()

Next, the ThreadPoolExecutor is created with a capacity for 5 workers at each step of the pipeline, 10 workers in total.

We then issue 5 step 1 tasks and 5 step tasks to the thread pool using the submit() method, careful to provide the correct input and output queues for each task.

...
# create the thread pool
with ThreadPoolExecutor(10) as tpe:
    # issue task 1 workers
    _ = [tpe.submit(task1, task1_queue_in, task1_queue_out) for _ in range(5)]
    # issue task 2 workers
    _ = [tpe.submit(task2, task1_queue_out, task2_queue_out) for _ in range(5)]

We could have the initial step 0 task executed by the thread pool and the final step 3 task executed by the thread pool if we wished, but in this case, we will execute them in the main thread.

Firstly, we can define the initial step 0 task that provides the input for the first step. In this case, we will generate 20 inputs to execute 20 multi-step tasks.

...
# push work into task 1
for i in range(20):
    task1_queue_in.put(i)

Next, we will put a sentinel value into the first input queue to signal to the workers that there will be no further work items and to shut down as soon as possible.

...
# signal that there is no more work
task1_queue_in.put(None)

Finally, the main thread will block and wait for task results from the final queue in the pipeline and report them as needed. It will exit the main thread only once the sentinel is seen.

...
# consume results
while True:
    # retrieve data
    data = task2_queue_out.get()
    # check for the end of work
    if data is None:
        # stop processing
        break
    # report the result
    print(f'Main got {data}')

Once ready to shut down, the main thread reports a final message.

...
# report a message
print('Main all done')

And that's it.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a threadpoolexecutor pipeline using queues
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from random import random
from time import sleep

# first task in the pipeline
def task1(queue_in, queue_out):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> Task1 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> Task1 done')

# second task in the pipeline
def task2(queue_in, queue_out):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> > Task2 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> > Task2 done')

# protect the entry point
if __name__ == '__main__':
    # create the shared queues
    task1_queue_in = Queue()
    task1_queue_out = Queue()
    task2_queue_out = Queue()
    # create the thread pool
    with ThreadPoolExecutor(10) as tpe:
        # issue task 1 workers
        _ = [tpe.submit(task1, task1_queue_in, task1_queue_out) for _ in range(5)]
        # issue task 2 workers
        _ = [tpe.submit(task2, task1_queue_out, task2_queue_out) for _ in range(5)]
        # push work into task 1
        for i in range(20):
            task1_queue_in.put(i)
        # signal that there is no more work
        task1_queue_in.put(None)
        # consume results
        while True:
            # retrieve data
            data = task2_queue_out.get()
            # check for the end of work
            if data is None:
                # stop processing
                break
            # report the result
            print(f'Main got {data}')
        # report a message
        print('Main all done')

Running the example first creates the shared queues.

Next, the thread pool is created with a capacity of 10 workers.

Five examples of task1 are issued to the thread pool, starting 5 worker threads. Five examples of task2 are issued to the pool, also starting 5 worker threads.

The main thread then pushes 20 integer values into the first input queue, then pushes a shutdown value onto the very end of the queue.

The main thread then loops forever, attempting to consume values from the last output queue.

Input values are consumed by the task1 workers. Each generates a random value, blocks, reports a message, and outs the generated value in the output queue.

Task2 workers consume values from their input queue sent from task1. They process them in the same general way as task1 workers, generating a random value, blocking, reporting a message, and putting their value on their own output queue.

The main thread consumes values from the final output queue and reports their values.

Eventually, all values are consumed by task1 from the first input queue and the sentinel value is read. Task1 tasks shutdown and share the None value with task2 tasks. They too shut down and share the result on the output queue. Eventually, the main thread sees the sentinel value and terminates with a final message.

This highlights how we can execute multi-step tasks in a single thread pool as a pipeline.

> Task1 got 0, produced 0.06374619136923998
> Task1 got 5, produced 0.22280995185140817
> > Task2 got 0.06374619136923998, produced 0.42645408618034
Main got 0.42645408618034
> Task1 got 1, produced 0.5157028899332281
> Task1 got 4, produced 0.5797011315649749
> Task1 got 6, produced 0.31777744757423676
> > Task2 got 0.22280995185140817, produced 0.40323506535979026
Main got 0.40323506535979026
> > Task2 got 0.5797011315649749, produced 0.120355441229539
Main got 0.120355441229539
> Task1 got 3, produced 0.7404108956805835
> Task1 got 10, produced 0.03273152446602101
> > Task2 got 0.7404108956805835, produced 0.05523796156125449
Main got 0.05523796156125449
> > Task2 got 0.5157028899332281, produced 0.3712288895475049
Main got 0.3712288895475049
> Task1 got 11, produced 0.12300039009987385
> Task1 got 8, produced 0.3351433285865908
> Task1 got 2, produced 0.9817487550932361
> Task1 got 7, produced 0.5286607855669352
> Task1 got 15, produced 0.005510345480132073
> > Task2 got 0.03273152446602101, produced 0.2847713068693353
Main got 0.2847713068693353
> Task1 got 9, produced 0.5306014532294463
> Task1 got 17, produced 0.01546897513174983
> > Task2 got 0.31777744757423676, produced 0.6244883555127848
Main got 0.6244883555127848
> Task1 got 12, produced 0.42959573081012825
> Task1 got 16, produced 0.28678566185587373
> Task1 done
> > Task2 got 0.3351433285865908, produced 0.4916113336570408
Main got 0.4916113336570408
> Task1 got 13, produced 0.5162535307172064
> Task1 done
> Task1 got 18, produced 0.29613393709666846
> Task1 done
> > Task2 got 0.9817487550932361, produced 0.5639002254831308
Main got 0.5639002254831308
> > Task2 got 0.005510345480132073, produced 0.32803121175495675
Main got 0.32803121175495675
> Task1 got 14, produced 0.7353253211413082
> Task1 done
> > Task2 got 0.5286607855669352, produced 0.6663854537886
Main got 0.6663854537886
> Task1 got 19, produced 0.4420969832982634
> Task1 done
> > Task2 got 0.12300039009987385, produced 0.8823808592344394
> > Task2 done
Main got 0.8823808592344394
Main all done
> > Task2 got 0.5306014532294463, produced 0.4640589560605952
> > Task2 got 0.5162535307172064, produced 0.021598748221471675
> > Task2 done
> > Task2 got 0.01546897513174983, produced 0.4287481943664151
> > Task2 got 0.29613393709666846, produced 0.3094994345417468
> > Task2 done
> > Task2 got 0.42959573081012825, produced 0.8063789219792559
> > Task2 got 0.28678566185587373, produced 0.6875850944011184
> > Task2 done
> > Task2 got 0.7353253211413082, produced 0.7147631756629578
> > Task2 got 0.4420969832982634, produced 0.9049768392442796
> > Task2 done

NOTE: There is a possible race condition in this example (I noticed it immediately after finishing the example).

It is possible for the sentinel value to propagate through the pipeline by fast-acting workers before all tasks have shared their results. In this demonstration case, it does not matter. If this is a concern, there are a few fixes we could adopt.

The first fix would be for the main thread to know how many results to expect, then once they have all been received, it can share the sentinel value and shut down the pipeline.

For example:

...
# push work into task 1
for i in range(20):
    task1_queue_in.put(i)
# consume results
for _ in range(20):
    # retrieve data
    data = task2_queue_out.get()
    # report the result
    print(f'Main got {data}')
# signal that there is no more work
task1_queue_in.put(None)
# report a message
print('Main all done')

Another approach is to have all workers at a level share one barrier and to first wait on the barrier for all fellow workers to terminate before passing on the sentinel to the next step in the sequence via the output queue.

For example:

...
# create the barriers
barrier1 = Barrier(5)
barrier2 = Barrier(5)

...

# first task in the pipeline
def task1(queue_in, queue_out, barrier):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
        	# wait on the barrier for all other workers
        	barrier.wait()
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> Task1 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> Task1 done')

You can learn more about how to use threading barriers in the tutorial:

This is a more general solution as it does not require that the main thread know how many final results to receive but does require each step to know how many fellow workers are running in order for the barrier to be correctly configured.

The complete example with this change is listed below, for reference.

# SuperFastPython.com
# example of a threadpoolexecutor pipeline using queues and barriers
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from random import random
from time import sleep
from threading import Barrier

# first task in the pipeline
def task1(queue_in, queue_out, barrier):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # wait on the barrier for all other workers
            barrier.wait()
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> Task1 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> Task1 done')

# first task in the pipeline
def task2(queue_in, queue_out, barrier):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # wait on the barrier for all other workers
            barrier.wait()
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> > Task2 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> > Task2 done')

# protect the entry point
if __name__ == '__main__':
    # create the shared queues
    task1_queue_in = Queue()
    task1_queue_out = Queue()
    task2_queue_out = Queue()
    # create the barriers
    barrier1 = Barrier(5)
    barrier2 = Barrier(5)
    # create the thread pool
    with ThreadPoolExecutor(10) as tpe:
        # issue task 1 workers
        _ = [tpe.submit(task1, task1_queue_in, task1_queue_out, barrier1) for _ in range(5)]
        # issue task 2 workers
        _ = [tpe.submit(task2, task1_queue_out, task2_queue_out, barrier2) for _ in range(5)]
        # push work into task 1
        for i in range(20):
            task1_queue_in.put(i)
        # signal that there is no more work
        task1_queue_in.put(None)
        # consume results
        while True:
            # retrieve data
            data = task2_queue_out.get()
            # check for the end of work
            if data is None:
                # stop processing
                break
            # report the result
            print(f'Main got {data}')
        # report a message
        print('Main all done')

Example of a ThreadPoolExecutor Pipeline With Separate Thread Pools

We can update the example so that each step is executed using a separate ThreadPoolExecutor.

This can be achieved by updating the above example.

Firstly, we can create two ThreadPoolExecutor instances directly, one for each step, instead of using the context manager interface.

...
# create the task 1 thread pool
tpe1 = ThreadPoolExecutor(5)
# issue task 1 workers
_ = [tpe1.submit(task1, task1_queue_in, task1_queue_out) for _ in range(5)]
 # create the task 2 thread pool
tpe2 = ThreadPoolExecutor(5)
# issue task 2 workers
_ = [tpe2.submit(task2, task1_queue_out, task2_queue_out) for _ in range(5)]

To avoid the race condition seen in the previous section, this example explicitly consumes a fixed number of expected results before sending the shutdown signal into the pipeline.

...
# consume results
for _ in range(20):
    # retrieve data
    data = task2_queue_out.get()
    # check for the end of work
    if data is None:
        # stop processing
        break
    # report the result
    print(f'Main got {data}')
# signal that there is no more work
task1_queue_in.put(None)

At the end of the program, we can manually shut down each thread pool.

...
# shutdown thread pools
tpe1.shutdown()
tpe2.shutdown()

And that's it.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a threadpoolexecutor pipeline using queues and separate thread pools
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from random import random
from time import sleep

# first task in the pipeline
def task1(queue_in, queue_out):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> Task1 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> Task1 done')

# first task in the pipeline
def task2(queue_in, queue_out):
    # loop forever
    while True:
        # read item from queue
        data = queue_in.get()
        # check for shutdown
        if data is None:
            # put signal back on queue
            queue_in.put(None)
            # send signal on output queue
            queue_out.put(None)
            # stop processing
            break
        # generate a random value between 0 and 1
        value = random()
        # block to simulate work
        sleep(value)
        # report message
        print(f'> > Task2 got {data}, produced {value}')
        # push result onto queue
        queue_out.put(value)
    # report message
    print('> > Task2 done')

# protect the entry point
if __name__ == '__main__':
    # create the shared queues
    task1_queue_in = Queue()
    task1_queue_out = Queue()
    task2_queue_out = Queue()
    # create the task 1 thread pool
    tpe1 = ThreadPoolExecutor(5)
    # issue task 1 workers
    _ = [tpe1.submit(task1, task1_queue_in, task1_queue_out) for _ in range(5)]
     # create the task 2 thread pool
    tpe2 = ThreadPoolExecutor(5)
    # issue task 2 workers
    _ = [tpe2.submit(task2, task1_queue_out, task2_queue_out) for _ in range(5)]
    # push work into task 1
    for i in range(20):
        task1_queue_in.put(i)
    # consume results
    for _ in range(20):
        # retrieve data
        data = task2_queue_out.get()
        # check for the end of work
        if data is None:
            # stop processing
            break
        # report the result
        print(f'Main got {data}')
    # signal that there is no more work
    task1_queue_in.put(None)
    # report a message
    print('Main all done')
    # shutdown thread pools
    tpe1.shutdown()
    tpe2.shutdown()

Running the example first creates the shared queues.

Next, the thread pool for task1 is created with a capacity for 5 workers, and 5 tasks are issued to the pool.

Then the thread pool for task2 is created with a capacity for 5 workers, and all 5 tasks are issued to the pool.

The main thread then pushes 20 integer values into the first input queue.

The main thread then loops forever, attempting to consume values from the last output queue.

Input values are consumed by the task1 workers in the first thread pool. Each generates a random value, blocks, reports a message, and outs the generated value in the output queue.

Task2 workers in the second thread pool consume values from their input queue sent from task1. They process them in the same general way as task1 workers, generating a random value, blocking, reporting a message, and putting their value on their own output queue.

The main thread consumes values from the final output queue and reports their values. It then pushes a shutdown value onto the first input queue.

Eventually, all values are consumed by task1 from the first input queue and the sentinel value is read. Task1 tasks shutdown and share the None value with task2 tasks. They too shut down and share the result on the output queue. Eventually, the main thread sees the sentinel value and terminates with a final message.

This highlights how we can execute multi-step tasks in a pipeline with a separate thread pool for each step.

> Task1 got 0, produced 0.36928729857298637
> Task1 got 4, produced 0.5391661944325759
> Task1 got 1, produced 0.5735422887671429
> > Task2 got 0.36928729857298637, produced 0.20843724288457122
Main got 0.20843724288457122
> > Task2 got 0.5391661944325759, produced 0.19758767163104396
Main got 0.19758767163104396
> Task1 got 2, produced 0.7848429380744457
> Task1 got 3, produced 0.8529521848021492
> Task1 got 5, produced 0.5111241599463854
> Task1 got 6, produced 0.4908872328758084
> Task1 got 10, produced 0.40307794959535304
> Task1 got 8, produced 0.6019551159981671
> Task1 got 7, produced 0.8241816834956085
> Task1 got 13, produced 0.09050412771639793
> > Task2 got 0.5735422887671429, produced 0.9459468872801333
Main got 0.9459468872801333
> > Task2 got 0.4908872328758084, produced 0.5162143391804884
Main got 0.5162143391804884
> Task1 got 11, produced 0.5820216834621625
> > Task2 got 0.6019551159981671, produced 0.11555441277605871
Main got 0.11555441277605871
> > Task2 got 0.5111241599463854, produced 0.8560806469629169
Main got 0.8560806469629169
> > Task2 got 0.8529521848021492, produced 0.9176874983371848
Main got 0.9176874983371848
> > Task2 got 0.7848429380744457, produced 0.9905936792940581
Main got 0.9905936792940581
> Task1 got 9, produced 0.9661051893683117
> Task1 got 15, produced 0.36828658286530924
> > Task2 got 0.9661051893683117, produced 0.12066545947023144
Main got 0.12066545947023144
> Task1 got 14, produced 0.6955094414264186
> > Task2 got 0.40307794959535304, produced 0.5857288345594803
Main got 0.5857288345594803
> > Task2 got 0.8241816834956085, produced 0.45749026507622026
Main got 0.45749026507622026
> Task1 got 12, produced 0.8854878444702812
> Task1 got 18, produced 0.36511931630679306
> Task1 got 16, produced 0.6616228450362268
> Task1 got 17, produced 0.6157262902267976
> > Task2 got 0.5820216834621625, produced 0.7264203708677363
Main got 0.7264203708677363
> Task1 got 19, produced 0.4985094535776535
> > Task2 got 0.36828658286530924, produced 0.6713814453765958
Main got 0.6713814453765958
> > Task2 got 0.6955094414264186, produced 0.5327026032134283
Main got 0.5327026032134283
> > Task2 got 0.09050412771639793, produced 0.9227140928329207
Main got 0.9227140928329207
> > Task2 got 0.6616228450362268, produced 0.2976807505368585
Main got 0.2976807505368585
> > Task2 got 0.8854878444702812, produced 0.7780435647414249
Main got 0.7780435647414249
> > Task2 got 0.6157262902267976, produced 0.3836770581178164
Main got 0.3836770581178164
> > Task2 got 0.4985094535776535, produced 0.5383580460138072
Main got 0.5383580460138072
> > Task2 got 0.36511931630679306, produced 0.7122436948383247
Main got 0.7122436948383247
Main all done
> Task1 done
> Task1 done
> > Task2 done
> Task1 done
> > Task2 done
> Task1 done
> > Task2 done
> Task1 done
> > Task2 done
> > Task2 done

Takeaways

You now know how to execute multi-step tasks using a ThreadPoolExecutor pipeline.



If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.