How to Run Blocking Tasks in Asyncio

January 5, 2023 Python Asyncio

You can run blocking calls asynchronously in an asyncio program via the asyncio.to_thread() and loop.run_in_executor() functions.

In this tutorial, you will discover how to execute blocking calls in asyncio programs.

Let's get started.

Need to Run Blocking Tasks in Asyncio

The focus of asyncio is asynchronous programming and non-blocking IO.

Nevertheless, we often need to execute a blocking function call within an asyncio application.

This could be for many reasons, such as:

  1. To execute a CPU-bound task like calculating something.
  2. To execute a blocking IO-bound task like reading or writing from a file.
  3. To call into a third-party library that does not support asyncio yet.

Making a blocking call directly in an asyncio program will cause the event loop to stop while the blocking call is executing. It will not allow other coroutines to run in the background.

How can we execute a blocking call in an asyncio program asynchronously?

How to Run Blocking Tasks

The asyncio module provides two approaches for executing blocking calls in asyncio programs.

The first is to use the asyncio.to_thread() function.

This is in the high-level API and is intended for application developers.

The asyncio.to_thread() function takes a function name to execute and any arguments.

The function is executed in a separate thread. It returns a coroutine that can be awaited or scheduled as an independent task.

For example:

...
# execute a function in a separate thread
await asyncio.to_thread(task)

The task will not begin executing until the returned coroutine is given an opportunity to run in the event loop.

You can learn more about how to use the asyncio.to_thread() function in the tutorial:

The asyncio.to_thread() function creates a ThreadPoolExecutor behind the scenes to execute blocking calls.

As such, the asyncio.to_thread() function is only appropriate for IO-bound tasks.

An alternative approach is to use the loop.run_in_executor() function.

This is in the low-level asyncio API and first requires access to the event loop, such as via the asyncio.get_running_loop() function.

The loop.run_in_executor() function takes an executor and a function to execute.

If None is provided for the executor, then the default executor is used, which is a ThreadPoolExecutor.

The loop.run_in_executor() function returns an awaitable that can be awaited if needed. The task will begin executing immediately, so the returned awaitable does not need to be awaited or scheduled for the blocking call to start executing.

For example:

...
# get the event loop
loop = asyncio.get_running_loop()
# execute a function in a separate thread
await loop.run_in_executor(None, task)

Alternatively, an executor can be created and passed to the loop.run_in_executor() function, which will execute the asynchronous call in the executor.

The caller must manage the executor in this case, shutting it down once the caller is finished with it.

For example:

...
# create a process pool
with ProcessPoolExecutor as exe:
	# get the event loop
	loop = asyncio.get_running_loop()
	# execute a function in a separate thread
	await loop.run_in_executor(exe, task)
	# process pool is shutdown automatically...

These two approaches allow a blocking call to be executed as an asynchronous task in an asyncio program.

Now that we know how to execute blocking calls in an asyncio program, let's look at some worked examples.

Example of Running I/O-Bound Task in Asyncio with to_thread()

We can explore how to execute a blocking IO-bound call in an asyncio program using asyncio.to_thread().

In this example, we will define a function that blocks the caller for a few seconds. We will then execute this function asynchronously in a thread pool from asyncio using the asyncio.to_thread() function.

This will free the caller to continue with other activities.

The complete example is listed below.

# SuperFastPython.com
# example of running a blocking io-bound task in asyncio
import asyncio
import time

# a blocking io-bound task
def blocking_task():
    # report a message
    print('Task starting')
    # block for a while
    time.sleep(2)
    # report a message
    print('Task done')

# main coroutine
async def main():
    # report a message
    print('Main running the blocking task')
    # create a coroutine for  the blocking task
    coro = asyncio.to_thread(blocking_task)
    # schedule the task
    task = asyncio.create_task(coro)
    # report a message
    print('Main doing other things')
    # allow the scheduled task to start
    await asyncio.sleep(0)
    # await the task
    await task

# run the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine and runs it as the entry point into the asyncio program.

The main() coroutine runs and reports a message. It then issues a call to the blocking function call to the thread pool. This returns a coroutine,

The coroutine is then wrapped in a Task and executed independently.

The main() coroutine is free to continue with other activities. In this case, it sleeps for a moment to allow the scheduled task to start executing. This allows the target function to be issued to the ThreadPoolExecutor behind the scenes and start running.

The main() coroutine then suspends and waits for the task to complete.

The blocking function reports a message, sleeps for 2 seconds, then reports a final message.

This highlights how we can execute a blocking IO-bound task in a separate thread asynchronously from an asyncio program.

Main running the blocking task
Main doing other things
Task starting
Task done

Example of Running I/O-Bound Task in Asyncio with run_in_executor()

We can explore how to execute a blocking IO-bound call in an asyncio program using the run_in_executor() method.

This requires that we first get access to the currently running event loop.

In this example, we will use the default executor, which will be a ThreadPoolExecutor, appropriate for executing IO-bound tasks.

You can learn more about the ThreadPoolExecutor in the guide:

The blocking call will begin executing the thread pool as soon as it is issued. This allows the caller to continue with other activities.

The complete example is listed below.

# SuperFastPython.com
# example of running a blocking io-bound task in asyncio
import asyncio
import time

# a blocking io-bound task
def blocking_task():
    # report a message
    print('Task starting')
    # block for a while
    time.sleep(2)
    # report a message
    print('Task done')

# main coroutine
async def main():
    # report a message
    print('Main running the blocking task')
    # get the event loop
    loop = asyncio.get_running_loop()
    # schedule the function to run
    awaitable = loop.run_in_executor(None, blocking_task)
    # report a message
    print('Main doing other things')
    # allow the scheduled task to start
    await asyncio.sleep(3)

# run the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine and runs it as the entry point into the asyncio program.

The main() coroutine runs and reports a message. It then gets access to the currently running event loop and issues the blocking IO-bound task.

This returns an awaitable and begins executing the task immediately using the default executor, which is an instance of the ThreadPoolExecutor.

The main() coroutine is free to continue with other activities. In this case, it reports a message and sleeps for a while.

The blocking task runs, reporting a message, sleeping for two seconds, and reporting a final message.

This highlights how a blocking IO-bound task can be executed asynchronously in a separate thread using the low-level API.

Main running the blocking task
Task starting
Main doing other things
Task done

Example of Running CPU-Bound Task in Asyncio with run_in_executor()

We can explore how to execute a CPU-bound task in an asyncio program.

In this example, we will use the run_in_executor() method on the currently running event loop.

We will create an instance of a ProcessPoolExecutor, configured with 4 workers, appropriate for executing CPU-bound tasks. This process pool will then be passed to the run_in_executor() along with the target function.

You can learn more about the ProcessPoolExecutor in the guide:

The task will begin executing immediately in the process pool, asynchronously. This frees the caller to continue with other activities.

The complete example is listed below.

# SuperFastPython.com
# example of running a blocking cpu-bound task in asyncio
from concurrent.futures import ProcessPoolExecutor
import asyncio
import math

# a blocking cpu-bound task
def blocking_task():
    # report a message
    print('Task starting', flush=True)
    # block for a while
    data = [math.sqrt(i) for i in range(50000000)]
    # report a message
    print('Task done', flush=True)

# main coroutine
async def main():
    # report a message
    print('Main running the blocking task')
    # get the event loop
    loop = asyncio.get_running_loop()
    # create the executor
    exe = ProcessPoolExecutor(4)
    # schedule the function to run
    awaitable = loop.run_in_executor(exe, blocking_task)
    # report a message
    print('Main doing other things')
    # sleep a moment
    await asyncio.sleep(1)
    # await the task
    await awaitable
    # close the process pool
    exe.shutdown()

# protect the entry point
if __name__ == '__main__':
    # run the asyncio program
    asyncio.run(main())

Running the example first creates the main() coroutine and runs it as the entry point into the asyncio program.

The main() coroutine runs and reports a message. It then gets access to the currently running event loop and creates a new ProcessPoolExecutor with 4 worker processes.

The main() coroutine then issues the blocking function call asynchronously using the process pool. This returns an awaitable and begins executing the task immediately using the provided executor.

The main() coroutine is free to continue with other activities. In this case, it reports a message and sleeps for a moment, then awaits the asynchronous task directly.

The blocking task runs in a new child process, reporting a message, sleeping for two seconds, and reporting a final message.

Once the task is completed, the main() coroutines then closes the process pool explicitly.

This highlights how a blocking CPU-bound task can be executed asynchronously in a separate thread using the low-level API.

Main running the blocking task
Main doing other things
Task starting
Task done

Takeaways

You now know how to execute blocking calls in asyncio programs.



If you enjoyed this tutorial, you will love my book: Python Asyncio Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.