Last Updated on August 21, 2023
You can implement concurrency in Python using threads, processes, and asyncio.
In this tutorial you will take a whirlwind tour of Python concurrency.
Let’s get started.
Python Concurrency Overview
Concurrency refers to parts of a program that are independent and can be executed out of order.
For example, a part in your program may comprise multiple tasks that do not need to interact with each other and can be completed in any order as long as they are all completed, such as downloading multiple separate files.
Concurrent tasks may or may not be executed in parallel. Parallelism refers explicitly to the ability to execute tasks simultaneously, such as with multiple CPU cores.
The reference Python interpreter, CPython provides four main modules for concurrency, they are:
- multiprocessing: for process-based concurrency with the multiprocessing.Process class.
- threading: for thread-based concurrency with the threading.Thread class.
- concurrent.futures: for thread and process pools that extend the concurrent.futures.Executor class.
- asyncio: for coroutine-based concurrency with the async/await keywords.
Only Processes provide true parallelism in Python, that is the ability to execute tasks simultaneously on multiple CPUs. This is achieved by starting new child instances of the Python interpreter process for executing tasks. Data sharing between processes is achieved via inter-process communication in which data must be serialized at added computational cost.
Python Threads provide concurrency, but only limited parallelism. This is because of the Global Interpreter Lock (GIL) that ensures the Python interpreter is thread safe by limiting only a single thread to execute at any one time within a Python process. The GIL is released in some situations, such as performing IO operations, allowing modest parallel execution when interacting with files, sockets, and devices.
Asynchronous I/O, or AsyncIO for short, is an implementation of asynchronous programming that provides the ability to create and run coroutines within an event-loop. A coroutine is a type of routine that can be suspended and resumed, allowing the asyncio event loop to jump from one to another when instructed via the await keyword. The paradigm provides a limited set of non-blocking IO operations.
Each of processes, threads, and asyncio has a sweet spot.
- Processes are suited for CPU-bound tasks, but are limited to perhaps tens of tasks and overhead in sharing data between processes.
- Threads are suited to IO-bound tasks, but are limited to perhaps thousands of tasks.
- AsyncIO is suited to large-scale IO-bound tasks, e.g. perhaps tens of thousands of tasks, but are limited to a subset of non-blocking IO operations and require adopting the asynchronous programming paradigm.
Now that we have a high-level understanding of the concurrency capabilities provided by the Python standard library, let’s take a whirlwind tour of how we might use each in turn to execute concurrent tasks.
Run loops using all CPUs, download your FREE book to learn how.
Threads and Processes
Threads and Processes provides a similar interface for executing ad hoc tasks defined by a function.
Both threads and processes are constructs created and managed by the underlying operating system. Threads are light-weight, they belong to a process and allow data sharing within the process. A process is more heavy weight, has at least one thread, and requires inter-process communication to share data.
Let’s take a closer look at how we might execute ad hoc tasks with threads and processes in Python.
Thread
A thread refers to a thread of execution by a computer program.
Every Python program is a process with one thread called the main thread used to execute your program instructions. Each process is in fact one instance of the Python interpreter that executes Python instructions (Python byte-code), which is a slightly lower level than the code you type into your Python program.
Python provides real or native (system-level) threads via the threading.Thread class.
A task can be run in a new thread by creating an instance of the Thread class and specifying the function to run in the new thread via the target argument.
1 2 3 |
... # create and configure a new thread to run a function thread = Thread(target=task) |
Once the thread is created, it must be started by calling the start() function.
1 2 3 |
... # start the task in a new thread thread.start() |
We can then wait around for the task to complete by joining the thread by calling the join() function, for example:
1 2 3 |
... # wait for the task to complete thread.join() |
We can demonstrate this with a complete example with a task that sleeps for a moment and prints a message.
The complete example of executing a target task function in a separate thread is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of executing a target task function in a separate thread from time import sleep from threading import Thread # a simple task that blocks for a moment and prints a message def task(): # block for a moment sleep(1) # display a message print('This is coming from another thread') # create and configure a new thread to run a function thread = Thread(target=task) # start the task in a new thread thread.start() # display a message print('Waiting for the new thread to finish...') # wait for the task to complete thread.join() |
Running the example creates the thread object to run the task() function.
The thread is started and the task() function is executed in another thread. The task sleeps for a moment, meanwhile in the main thread a message is printed that we are waiting around and the main thread joins the new thread.
Finally, the new thread finishes sleeping, prints a message and closes. The main thread then carries on and also closes as there are no more instructions to execute.
1 2 |
Waiting for the new thread to finish... This is coming from another thread |
Process
A process refers to a computer program.
Every Python program is a process and has one thread called the main thread used to execute your program instructions. Each process is in fact one instance of the Python interpreter that executes Python instructions (Python byte-code), which is a slightly lower level than the code you type into your Python program.
Python provides real system-level processes via the multiprocessing.Process class.
The underlying operating system controls how new processes are created. On some systems that may require spawning a new process and on others it may require that the process is forked. The operating-specific method used for creating new processes in Python is not something we need to worry about as it is managed by your installed Python interpreter.
A task can be run in a new process by creating an instance of the Process class and specifying the function to run in the new process via the “target” argument.
1 2 3 |
... # define a task to run in a new process process = Process(target=task) |
Once the process is created, it must be started by calling the start() function.
1 2 3 |
... # start the task in a new process process.start() |
We can then wait around for the task to complete by joining the process with the join() function, for example:
1 2 3 |
... # wait for the task to complete process.join() |
Whenever we create new processes, we must protect the entry point of the program.
1 2 3 |
# entry point for the program if __name__ == '__main__': # do things... |
Tying this together, the complete example of creating a Process to run an ad hoc task function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of running a function in a new process from time import sleep from multiprocessing import Process # a simple task that blocks for a moment and prints a message def task(): # block for a moment sleep(1) # display a message print('This is coming from another process', flush=True) # entry point for the program if __name__ == '__main__': # define a task to run in a new process process = Process(target=task) # start the task in a new process process.start() # display a message print('Waiting for the new process to finish...') # wait for the task to complete process.join() |
Running the example creates the process object to run the task() function.
The process is started and the task() function is executed in the child process. The task sleeps for a moment, meanwhile in the main thread in the parent process a message is printed that we are waiting around and the main process joins the new child process.
Finally, the child process finishes sleeping, prints a message and closes. The main thread in the parent process then carries on and also closes as there are no more instructions to execute.
1 2 |
Waiting for the new process to finish... This is coming from another process |
Thread Pools and Process Pools
A thread pool is a programming pattern for automatically managing a pool of workers.
Although they are generally referred to as thread pools, they may use either threads or processes internally.
The pool is responsible for a fixed number of workers.
- It controls when the worker threads or processes are created, such as just-in-time when they are needed.
- It also controls what threads or processes should do when they are not being used, such as making them wait without consuming computational resources.
Each worker is agnostic to the type of tasks that are executed. Workers are designed to be re-used once the task is completed and provide protection against the unexpected failure of the task, such as raising an exception, without impacting the worker itself.
This is unlike a single thread or process that is configured for the single execution of one specific task.
Python provides thread pools via the concurrent.futures.ThreadPoolExecutor class and process pools via the concurrent.futures.ProcessPoolExecutor.
Both classes extend the class concurrent.futures.Executor class and provide a generic interface.
Executors
The concurrent.futures.Executor class is an abstract class, meaning that it cannot be instantiated.
The Executor class defines three methods used to control our thread pool; they are: submit(), map(), and shutdown().
The Executor is started when the class is created and must be shut down explicitly by calling the shutdown() function, which will release any resources held by the Executor. We can also shut down automatically by using the context manager interface to the class.
The submit() and map() functions are used to submit tasks to the Executor for asynchronous execution.
The map() function operates just like the built-in map() function and is used to apply a function to each element in an iterable object, like a list. Unlike the built-in map() function, each application of the function to an element will happen asynchronously instead of sequentially.
The submit() function takes a function as well as any arguments and will execute it asynchronously, although the call returns immediately and provides a Future object.
Next, let’s take a closer look at Future objects.
Futures
A future is a programming pattern that represents a delayed result for an asynchronous task.
It is also sometimes called a promise or a delay. It provides a context for the result of a task that may or may not be executing and a way of getting a result once it is available.
In Python, the concurrent.futures.Future object is returned from an Executor, such as a ThreadPoolExecutor when calling the submit() function to dispatch a task to be executed asynchronously.
In general, we do not create Future objects objects; we only receive them, and we may need to call functions on them.
There is always one Future object for each task sent into the Executor via a call to submit().
The Future object provides a number of helpful functions for inspecting the status of the task, such as: cancelled(), running(), and done() to determine if the task was cancelled, is currently running, or has finished execution.
A Future object also provides access to the result of the task via the result() function. If an exception was raised while executing the task, it will be re-raised when calling the result() function, or can be accessed via the exception() function.
Both the result() and exception() functions allow a “timeout” to be specified as an argument, which is the number of seconds to wait for a return value if the task is not yet complete. If the timeout expires, then a TimeoutError will be thrown.
Now that we are familiar with the Executor and Future classes, let’s take a closer look at the ThreadPoolExecutor.
ThreadPoolExecutor
The ThreadPoolExecutor provides a pool of generic worker threads in Python.
Executing functions using worker threads in the ThreadPoolExecutor involves first creating the thread pool, then submitting the task into the pool. If the task returns a value, we can then retrieve it when the task is completed.
We can demonstrate this with a small example.
The example below defines a target task function that blocks for a moment, prints a message and returns a string.
1 2 3 4 5 6 7 8 |
# a simple task that blocks for a moment and prints a message def task(): # block for a moment sleep(1) # display a message print(f'Task running in a worker thread') # return a message return 'All done' |
We create the thread pool using the context manager ensuring that the pool is shutdown automatically once we are finished using it.
1 2 3 4 |
... # create the pool of worker threads with ThreadPoolExecutor() as executor: # ... |
Once created, we submit a task to the pool and get a Future object that provides a handle on the task. We then attempt to get the result from the task that blocks until the task is completed and returns a value.
1 2 3 4 5 6 7 8 9 |
... # execute a task in another thread future = executor.submit(task) # display a message print('Waiting for the task to finish...') # wait for the task to finish and get the result result = future.result() # report the result print(result) |
Tying this together, the complete example of using the ThreadPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # demonstration with the thread pool life-cycle from time import sleep from concurrent.futures import ThreadPoolExecutor # a simple task that blocks for a moment and prints a message def task(): # block for a moment sleep(1) # display a message print(f'Task running in a worker thread') # return a message return 'All done' # create the pool of worker threads with ThreadPoolExecutor() as executor: # execute a task in another thread future = executor.submit(task) # display a message print('Waiting for the task to finish...') # wait for the task to finish and get the result result = future.result() # report the result print(result) |
Running the example creates the thread pool, submits the task, and waits for the result in the main thread.
The thread pool creates the default number of worker threads and waits for a task. The task is submitted and is consumed by a worker thread in the pool and begins executing.
The task blocks for a moment, reports a message, and returns a value.
Finally, the main thread retrieves the result of the task and closes the thread pool.
1 2 3 |
Waiting for the new thread to finish... Task running in a worker thread All done |
Next, let’s look at an example of using the process pool.
ProcessPoolExecutor
The ProcessPoolExecutor provides a pool of generic worker processes in Python.
Executing functions using worker processes in the ProcessPoolExecutor involves first creating the process pool, then submitting the task into the pool. If the task returns a value, we can then retrieve it when the task is completed.
We can demonstrate this with a small example.
The example below defines a target task function that blocks for a moment, prints a message and returns a string. Importantly, calls to print() from child processes will buffer messages by default. Therefore it is important to flush output each call to the print() function by setting the “flush” argument to True.
1 2 3 4 5 6 7 8 |
# a simple task that blocks for a moment and prints a message def task(): # block for a moment sleep(1) # display a message print(f'Task running in a worker process', flush=True) # return a message return 'All done' |
We create the process pool using the context manager ensuring that the pool is shutdown automatically once we are finished using it.
1 2 3 4 |
... # create the pool of worker processes with ProcessPoolExecutor() as executor: # ... |
Once created, we submit a task to the pool and get a Future object that provides a handle on the task. We then attempt to get the result from the task that blocks until the task is completed and returns a value.
1 2 3 4 5 6 7 8 9 |
... # execute a task in another process future = executor.submit(task) # display a message print('Waiting for the task to finish...') # wait for the task to finish and get the result result = future.result() # report the result print(result) |
When using processes, we must protect the entry point of the program.
1 2 3 4 |
... # entry point if __name__ == '__main__': main() |
Tying this together, the complete example of using the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # demonstration with the process pool life-cycle from time import sleep from concurrent.futures import ProcessPoolExecutor # a simple task that blocks for a moment and prints a message def task(): # block for a moment sleep(1) # display a message print(f'Task running in a worker process', flush=True) # return a message return 'All done' # demonstrate the process pool def main(): # create the pool of worker processes with ProcessPoolExecutor() as executor: # execute a task in another process future = executor.submit(task) # display a message print('Waiting for the task to finish...') # wait for the task to finish and get the result result = future.result() # report the result print(result) # entry point if __name__ == '__main__': main() |
Running the example creates the process pool, submits the task, and waits for the result in the main process.
The process pool creates the default number of worker processes and waits for a task. The task is submitted and is consumed by a worker process in the pool and begins executing.
The task blocks for a moment, reports a message, and returns a value.
Finally, the main process retrieves the result of the task and closes the process pool.
1 2 3 |
Waiting for the task to finish... Task running in a worker process All done |
Next, let’s look at AsyncIO.
Free Concurrent File I/O Course
Get FREE access to my 7-day email course on concurrent File I/O.
Discover patterns for concurrent file I/O, how save files with a process pool, how to copy files with a thread pool, and how to append to a file from multiple threads safely.
Asynchronous IO
Python provides an asyncio module for Asynchronous Programming and specifically Asynchronous Input/Output (AsyncIO).
It primarily provides a way to create and run coroutines using the async/await syntax.
A coroutine is a programming pattern that generalizes routines (e.g. subroutines, functions, or blocks of code) to allow them to be suspended and resumed. Coroutines use cooperative multitasking, requiring threads of execution to explicitly yield control.
A coroutine can be defined by adding the “async” keyword prior to a function definition; for example:
1 2 3 |
# define a coroutine async def task(): # do things... |
This syntax defines an awaitable, which is a unit of execution that can be awaited, e.g. waited for.
A program can wait on an awaitable asynchronous task to complete using the “await” keyword; for example:
1 2 3 |
... # create and schedule the coroutine and wait for it to return, await task() |
This will do two things:
- Create a coroutine and schedule it for execution.
- Yield execution until the coroutine returns.
It is also possible to define a collection of coroutines, such as in a list.
For example:
1 2 3 |
... # create a list of coroutines task_list = [task() for _ in range(10)] |
This will create a list of ten coroutines, instead of calling the task() function ten times.
We can await on each coroutine in turn. Alternatively we can call the asyncio.gather() function to execute a collection of coroutines and wait for them all to complete.
This can be achieved by passing multiple coroutine to the asyncio.gather() function directly, for example:
1 2 3 |
... # execute tasks and wait for them to complete gather(task(), task(), task()) |
Alternately, we can provide a collection of coroutine to the function and unpack them using the star (*) operator, for example:
1 2 3 4 5 |
... # create a list of coroutines task_list = [task() for _ in range(10)] # execute tasks and wait for them to complete gather(*task_list) |
The async/await syntax is only supported under the asyncio runtime that creates an event loop, allowing the thread of execution to execute tasks asynchronously and block.
This is achieved using the asyncio.run() function for the entry point asynchronous function; for example:
1 2 3 |
... # start the asyncio runtime asyncio.run(main()) |
This will create the event loop runtime required to support the scheduling and execution of coroutines.
We can demonstrate this with a worked example.
First, we can define a task function as a coroutine using the async keyword. The task will simulate an IO operation for a moment and print a message. We must await the sleep operation which yields control to any other coroutines in the runtime instead of blocking.
1 2 3 4 5 6 |
# a simple task that blocks for a moment and prints a message async def task(): # block for a moment await asyncio.sleep(1) # display a message print('This is coming from another coroutine') |
We can then create the coroutine and await for it to complete.
1 2 3 |
... # execute the task in a coroutine and wait for the result await task() |
Tying this together, the complete example of executing a task in a coroutine using asyncio is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# SuperFastPython.com # example of running a function in a new coroutine import asyncio # a simple task that blocks for a moment and prints a message async def task(): # block for a moment await asyncio.sleep(1) # display a message print('This is coming from another coroutine') # demonstrate executing a task in a coroutine async def main(): print('Waiting for the new coroutine to finish...') # execute the task in a coroutine and wait for the result await task() # start the run loop asyncio.run(main()) |
Running the example starts the asyncio runtime, then creates the coroutine and waits for it to finish.
The coroutine runs then suspended yields control while sleeping. The coroutine is then resumed, prints a message, then returns allowing the program to complete.
1 2 |
Waiting for the new coroutine to finish... This is coming from another coroutine |
AsyncIO for Files
You cannot use conventional blocking IO operations from within an asyncio program.
Instead, you must use non-blocking IO operations. There is an alternate implementation of performing IO that yields control while reading or writing data outside of the program instead of blocking.
The asyncio provides a limited set of non-blocking IO operations mostly focused on socket communication.
It does not provide any file-based non-blocking IO. This is because non-blocking file IO is challenging to implement in a platform independent manner and may not be supported by many platforms (e.g. it is capability provided by the operating system).
As such, non-blocking file IO can be simulated using threads, allowing the function call to yield immediately while performing the IO in a background worker thread.
The aiofiles project is a third-party Python library that provides non-blocking IO simulated with threads for asyncio applications.
It can be used to read from files, write files and perform other related IO operations such as renaming, deleting, moving and copying files.
You can install the aiofiles library using your Python package manager, such as pip. For example:
1 |
pip3 install aiofiles |
The example below demonstrates how to perform non-blocking file IO using the aiofiles library from an asyncio coroutine.
First, we can define a coroutine that creates a file with no content.
We await the file creation process which yields control to any other coroutines executing in the runtime while the IO operation is being performed.
1 2 3 4 5 6 |
# a simple task that perform file io and prints a message async def task(filename='my_file.txt'): # create a file await aiofiles.open(filename, mode='x') # display a message print(f'We just created the file {filename} from a coroutine') |
Tying this together, the complete example of performing simulated non-blocking file IO in an asyncio program is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of performing file io in a new coroutine import aiofiles import asyncio # a simple task that perform file io and prints a message async def task(filename='my_file.txt'): # create a file await aiofiles.open(filename, mode='x') # display a message print(f'We just created the file {filename} from a coroutine') # demonstrate executing a task in a coroutine async def main(): print('Waiting for the new coroutine to finish...') # execute the task in a coroutine and wait for the result await task() # start the run loop asyncio.run(main()) |
Running the example starts the asyncio runtime, then creates the coroutine and waits for it to finish.
The coroutine runs then suspended yields control while creating the file. The coroutine is then resumed, prints a message, then returns allowing the program to complete.
1 2 |
Waiting for the new coroutine to finish... We just created the file my_file.txt from a coroutine |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Concurrent File I/O in Python, Jason Brownlee (my book!)
Guides
Python File I/O APIs
- Built-in Functions
- os - Miscellaneous operating system interfaces
- os.path - Common pathname manipulations
- shutil - High-level file operations
- zipfile — Work with ZIP archives
- Python Tutorial: Chapter 7. Input and Output
Python Concurrency APIs
- threading — Thread-based parallelism
- multiprocessing — Process-based parallelism
- concurrent.futures — Launching parallel tasks
- asyncio — Asynchronous I/O
File I/O in Asyncio
References
Takeaways
You now know how to implement concurrent tasks in Python using threads, processes, and asyncio.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?