ThreadPoolExecutor Tasks Submit New Tasks

July 30, 2023 Python ThreadPoolExecutor

You can submit new tasks from tasks running in the ThreadPoolExecutor by passing the thread pool as an argument to the task or having the task access the thread pool via a global variable.

In this tutorial, you will discover how to submit new tasks from tasks running in the ThreadPoolExecutor.

Let's get started.

Need Tasks To Issue New Tasks

The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.

Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.

A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.

For example:

...
# create a thread pool
with ThreadPoolExecutor() as tpe:
	# issue a task
	future = tpe.submit(task)
	# the the task result once the task is done
	result = future.result()

You can learn more about the ThreadPoolExecutor in the tutorial:

There are many cases where we may need to issue a new task to the ThreadPoolExecutor from within a task already being executed by a task.

This may be a follow-up task that naturally arises out of the first task and can be executed asynchronously.

For example:

It may be a retry of the current task if the current task determines that it has failed,

For example:

Both of these cases can be solved by a separate thread that manages the ThreadPoolExecutor, issues tasks, and checks results to determine whether follow-up or retry tasks are required.

This is a good design, but sometimes we may prefer to have tasks issue new tasks themselves.

How can we issue new tasks to the ThreadPoolExecutor from tasks that are currently executing within the ThreadPoolExecutor?

How Tasks Can Submit New Tasks to the ThreadPoolExecutor

There are many ways that tasks can issue their own tasks to the ThreadPoolExecutor.

Some common approaches include:

  1. Tasks access the ThreadPoolExecutor directly which is passed via an argument.
  2. Tasks access the ThreadPoolExecutor directly via a global variable.
  3. Tasks share requests with new tasks with another thread that issues new tasks on their behalf.

The major risk when submitting new tasks to the ThreadPoolExecutor from tasks executing within the same ThreadPoolExecutor is a deadlock.

A deadlock is a concurrency failure mode where one thread waits on another thread forever. A deadlock can occur when tasks issue new tasks if the task that issues the new task waits for the new task or tasks to complete and the new task or tasks are never allowed to execute because all worker threads are occupied.

You can see an example of a deadlock of this kind in the tutorial:

As such, it is generally suggested that tasks should not submit new tasks to the thread pool.

If tasks must submit new tasks to the thread pool, then they must not wait on the task to complete, or on the result directly.

Now that we know how to safely submit new tasks from tasks in the thread pool, let's look at some worked examples.

Example of Task Submitting a Task via an Argument

We can explore an example of submitting a new task to the ThreadPoolExecutor from a running task by passing in the ThreadPoolExecutor to the running task as an argument.

In this example, we will define a task that takes a ThreadPoolExecutor as an argument, runs for a moment, then submits a second task before terminating. The second task runs as per normal.

Firstly, we can define the second task.

This task reports a message, sleeps for a moment to simulate computational effort, then reports a final message.

The task2() function below implements this.

# task executed in the thread pool
def task2():
    # report a message
    print('>>Task 2 is running')
    # block for a moment to simulate work
    sleep(1)
    # report a message
    print('>>Task 2 is done.')

Next, we can define the first level task.

This task function takes the ThreadPoolExecutor as an argument. It first reports a message, then blocks for a moment to simulate effort. Finally, it submits a new task to the ThreadPoolExecutor, in this case, the task2() function, before reporting a final message.

This task does not wait on the result of the submitted task, in an effort to avoid a deadlock.

The task1() function below implements this.

# task executed in the thread pool
def task1(tpe):
    # report a message
    print('>Task 1 is running')
    # block for a moment to simulate work
    sleep(1)
    # issue a new task
    _ = tpe.submit(task2)
    # report a message
    print('>Task 1 is done.')

Finally, in the main thread, will create and start a new ThreadPoolExecutor with a default number of workers using the context manager interface.

We then issue the task1() function to the thread pool and wait for it to complete.

...
# create the thread pool
with ThreadPoolExecutor() as tpe:
    # issue a task to the thread pool
    future = tpe.submit(task1, tpe)
    # wait for the task to complete
    _ = future.result()

And that's it.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a task submitting a task with the thread pool shared as an argument
from concurrent.futures import ThreadPoolExecutor
from time import sleep

# task executed in the thread pool
def task2():
    # report a message
    print('>>Task 2 is running')
    # block for a moment to simulate work
    sleep(1)
    # report a message
    print('>>Task 2 is done.')

# task executed in the thread pool
def task1(tpe):
    # report a message
    print('>Task 1 is running')
    # block for a moment to simulate work
    sleep(1)
    # issue a new task
    _ = tpe.submit(task2)
    # report a message
    print('>Task 1 is done.')

# protect the entry point
if __name__ == '__main__':
    # create the thread pool
    with ThreadPoolExecutor() as tpe:
        # issue a task to the thread pool
        future = tpe.submit(task1, tpe)
        # wait for the task to complete
        _ = future.result()

Running the example first creates and starts the thread pool.

Next, the task1() function is submitted to the thread pool for execution with the ThreadPoolExecutor as an argument and the main thread blocks until the task is complete.

The task1() function executes, reports a message, and sleeps. The task resumes and submits task2() to the thread pool and reports a final message before terminating.

The task2() function is executed by the thread pool. It reports a message and sleeps for a moment. The task then resumes and reports a final message.

This highlights how one task running in the ThreadPoolExecutor can submit a subsequent task to the ThreadPoolExecutor by passing the thread pool as an argument to the running task.

>Task 1 is running
>>Task 2 is running
>Task 1 is done.
>>Task 2 is done.

Example of Task Submitting a Task via a Global Variable

We can explore the case of submitting a new task to the ThreadPoolExecutor from a running task by accessing the ThreadPoolExecutor via a global variable.

In this example, we will update the above example so that task1() no longer receives the ThreadPoolExecutor as an argument and instead declares the thread pool as a global variable and accesses it directly.

This can be achieved via the global expression that signals to the Python interpreter that the variable we are accessing, in this case, "tpe", is a global variable.

...
# declare the thread pool as a global variable
global tpe
# issue a new task
_ = tpe.submit(task2)

The updated version of the task1() function with this change is listed below.

# task executed in the thread pool
def task1():
    # report a message
    print('>Task 1 is running')
    # block for a moment to simulate work
    sleep(1)
    # declare the thread pool as a global variable
    global tpe
    # issue a new task
    _ = tpe.submit(task2)
    # report a message
    print('>Task 1 is done.')

And that's it.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a task submitting a task with the thread pool shared as a global variable
from concurrent.futures import ThreadPoolExecutor
from time import sleep

# task executed in the thread pool
def task2():
    # report a message
    print('>>Task 2 is running')
    # block for a moment to simulate work
    sleep(1)
    # report a message
    print('>>Task 2 is done.')

# task executed in the thread pool
def task1():
    # report a message
    print('>Task 1 is running')
    # block for a moment to simulate work
    sleep(1)
    # declare the thread pool as a global variable
    global tpe
    # issue a new task
    _ = tpe.submit(task2)
    # report a message
    print('>Task 1 is done.')

# protect the entry point
if __name__ == '__main__':
    # create the thread pool
    tpe = ThreadPoolExecutor()
    # issue a task to the thread pool
    future = tpe.submit(task1)
    # wait for the task to complete
    _ = future.result()
    # shutdown the thread pool
    tpe.shutdown()

Running the example first creates and starts the thread pool.

Next, the task1() function is submitted to the thread pool for execution and the main thread blocks until the task is complete.

The task1() function executes, reports a message, and sleeps.

The task resumes and declares the ThreadPoolExecutor as a global variable and uses it to submit task2() to the thread pool before reporting a final message and terminating.

The task2() function is executed by the thread pool. It reports a message and sleeps for a moment. The task then resumes and reports a final message.

This highlights how one task running in the ThreadPoolExecutor can submit a subsequent task to the ThreadPoolExecutor by accessing it as a global variable from within a running task.

>Task 1 is running
>>Task 2 is running
>Task 1 is done.
>>Task 2 is done.

Takeaways

You now know how to submit new tasks from tasks running in the ThreadPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.