Last Updated on August 21, 2023
Appending a file from multiple threads is not thread-safe and will result in overwritten data and file corruption.
In this tutorial, you will explore how to append to a file from multiple threads.
Let’s dive in.
Appending a File From Multiple Threads is Not Thread-Safe
There are many situations where we may want to append the same file from multiple threads.
For example, you can imagine using a ThreadPoolExecutor where each worker is computing a value or interacting with a resource to prepare some unique data, then having all worker threads store or log their data as a line in the same file.
There are few ways we might implement this.
For example, we might naively try to open the file for appending in each worker thread, write the line, then close the file.
Alternatively, we may open the file once in the program, share the file handle with each worker thread, then have each worker thread write a message to the file when they are done.
The problem with these approaches is that messages in the resulting file will overwrite each other and the file will become corrupt.
The reason for this is because writing to a file is not thread-safe.
We can demonstrate this with a worked example.
First, let’s define a task function that takes a unique identifier (e.g. an integer) and a file handle. The task calculates some data, blocks for a variable amount of time, then writes a unique message to the file.
The task() function below implements this.
1 2 3 4 5 6 7 |
# task that blocks for a moment and write to a file def task(identifier, file_handle): # block for a moment value = random() sleep(value) # write to a file file_handle.write(f'Task {identifier} blocked for {value} of a second.\n') |
Next, in the main thread of our program, we can open the file and get a file handle.
This can be achieved using the open() built-in function and setting the mode to ‘a’ for appending ASCII strings to the file. We will use the context manager to ensure that the file is closed automatically once we are finished with it.
1 2 3 4 |
... # open a file with open('results.txt', 'a', encoding='utf-8') as handle: # ... |
Next, we can create a thread pool via the ThreadPoolExecutor and specify the number of worker threads. We will use 1,000 workers in this case as we are trying to force a failure case. Again, we will use the context manager to ensure the thread pool is closed down once all submitted tasks are completed.
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(1000) as executor: # ... |
Finally, we will submit 10,000 tasks to the thread pool as calls to our task() function.
This can be achieved by calling the submit() function in a list comprehension, assigning each call to the task() function a unique integer from 0 to 9,999. Each call to the submit() function returns a Future object to keep track of the task, which we will ignore in this case as we do not require any result from our task.
1 2 3 |
... # dispatch all tasks _ = [executor.submit(task, i, handle) for i in range(10000)] |
And that’s it.
Tying this together, the complete example of writing to a file from multiple threads in an unsafe manner is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # write to a file from multiple threads results in unexpected behaviour from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # task that blocks for a moment and write to a file def task(identifier, file_handle): # block for a moment value = random() sleep(value) # write to a file file_handle.write(f'Task {identifier} blocked for {value} of a second.\n') # write to a file from multiple threads def main(): # open a file with open('results.txt', 'a', encoding='utf-8') as handle: # create the thread pool with ThreadPoolExecutor(1000) as executor: # dispatch all tasks _ = [executor.submit(task, i, handle) for i in range(10000)] # wait for all tasks to complete print('Done.') # entry point if __name__ == '__main__': main() |
Running the example opens the file ‘results.txt‘, creates the thread pool with 1,000 workers and submits 10,000 tasks, each attempting to append a unique line to the file.
We expect 10,000 lines in the file, which is not the case because of the corruption.
This can be confirmed on a POSIX system by typing the following command (cat the file and count lines) in the same directory as our script and the results.txt file:
1 |
cat results.txt | wc -l |
In this case the result shows that we are naively missing 66 lines of data, although things are worse than this.
1 |
9934 |
In this case, the result is a file that contains large segments of corrupt data.
Some sections look fine, as we expect, for example:
1 2 3 4 5 6 7 8 9 10 11 12 |
... Task 133 blocked for 0.005006482755166575 of a second. Task 152 blocked for 0.010829546448452021 of a second. Task 53 blocked for 0.01792173357462179 of a second. Task 6 blocked for 0.024116631866498706 of a second. Task 345 blocked for 0.0025878837158017642 of a second. Task 346 blocked for 0.01254960400966576 of a second. Task 208 blocked for 0.020621257382234193 of a second. Task 192 blocked for 0.025857220704669226 of a second. Task 26 blocked for 0.03524524788377992 of a second. Task 81 blocked for 0.037315156394633786 of a second. ... |
Other sections clearly look like something went wrong, for example:
1 2 3 |
... õQð•Q9ð5QSð5îZãîõïZÿîUñZïõòZ/ï•ôZIï•÷Task 1242 blocked for 0.037004895386893 of a second. ... |
This corruption occurred because calling write() on the same file handle is not thread-safe.
Next, we can explore some ways of how we might append data to the same file in a thread-safe manner.
Run loops using all CPUs, download your FREE book to learn how.
Append a File From Multiple Threads With Lock
We can safely append to a file from multiple threads using a mutual exclusion lock.
Python provides a mutual exclusion lock, also called a mutex, via the threading.Lock class.
First, we can create an instance of the lock to be shared by all threads.
1 2 3 |
... # create a lock to protect the file lock = Lock() |
This lock instance can be passed to the task() function as an argument.
Then, before any thread attempts to write to the file it must obtain the lock. Only one thread can have the lock at any one time, and if writes to the file can only occur when the lock is held, then only one thread can write to the file at a time.
This will make writing to the file thread-safe and will avoid any corruption of messages.
A lock can be obtained by calling the acquire() function which will block (not return) until the lock is available. Once obtained, the lock can be released by calling the release() function.
It is a better practice to use the context manager, if possible, which will ensure that the lock is always released automatically once the code block is exited.
For example:
1 2 3 4 5 |
... # obtain the lock with lock: # write to a file file_handle.write(f'Task {identifier} blocked for {value} of a second.\n') |
Tying this together, the complete example of writing to file from multiple threads using a lock is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # write to a file from multiple threads using a shared lock from time import sleep from random import random from threading import Lock from concurrent.futures import ThreadPoolExecutor # task that blocks for a moment and write to a file def task(identifier, lock, file_handle): # block for a moment value = random() sleep(value) # obtain the lock with lock: # write to a file file_handle.write(f'Task {identifier} blocked for {value} of a second.\n') # write to a file from multiple threads def main(): # open a file with open('results.txt', 'a', encoding='utf-8') as handle: # create a lock to protect the file lock = Lock() # create the thread pool with ThreadPoolExecutor(1000) as executor: # dispatch all tasks _ = [executor.submit(task, i, lock, handle) for i in range(10000)] # wait for all tasks to complete print('Done.') # entry point if __name__ == '__main__': main() |
Running the example opens the file, creates the thread pool and writes messages to the file from multiple worker threads.
The mutual exclusion lock ensures that only one thread can write to the file at a time.
The result is clean crisp messages, one per line as we would expect.
1 2 3 4 5 6 7 8 9 10 11 |
Task 121 blocked for 0.001110901802077513 of a second. Task 170 blocked for 0.006215832438211666 of a second. Task 259 blocked for 8.716446563850067e-05 of a second. Task 152 blocked for 0.012671481481186686 of a second. Task 99 blocked for 0.01723248272457567 of a second. Task 297 blocked for 0.005388827042946631 of a second. Task 90 blocked for 0.024467488619056676 of a second. Task 277 blocked for 0.008290633269665726 of a second. Task 65 blocked for 0.029065442968607247 of a second. Task 104 blocked for 0.02690572979786665 of a second. ... |
Checking the number of lines confirms that all 10,000 tasks wrote their message to the file.
Running the command:
1 |
cat results.txt | wc -l |
Results in the output:
1 |
10000 |
This was one way to make appending a single file from threads thread-safe, but there are other approaches we can use.
Next, let’s look at an alternative.
Append to File From Multiple Threads Via Main
An alternative approach to appending a file from multiple threads is to send all messages back to the main thread.
This is a change to the program that ensures that writes to the file are thread-safe because only the main thread is able to perform those writes. It simplifies the problem by avoiding concurrent access to the file.
The benefit of this approach is that it is simpler as we do not need to maintain a lock.
We can implement this by first updating the task() function to not have access to the file handle and instead return the message it wishes logged as a string.
1 2 3 4 5 6 7 |
# task that blocks for a moment and write to a file def task(identifier): # block for a moment value = random() sleep(value) # return data to write return f'Task {identifier} blocked for {value} of a second.\n' |
Next, we can update the main thread.
First, we must collect the Future objects returned from each call to submit() so that we can get the return value from each task.
1 2 3 |
... # dispatch all tasks futures = [executor.submit(task, i) for i in range(10000)] |
We can then iterate over the Future objects in the order that tasks are completed by using the concurrent.futures.as_completed() function.
1 2 3 4 |
... # process results as tasks are completed for future in as_completed(futures): # ... |
Finally, for each task that is completed, we can retrieve the return value of the task via the result() function on the task’s Future object, then append it to the file.
1 2 3 4 5 |
... # retrieve result to write result = future.result() # write to file handle.write(result) |
Tying this together, the complete example of writing to a file from multiple threads via the main thread is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # write to a file from multiple threads from main from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # task that blocks for a moment and write to a file def task(identifier): # block for a moment value = random() sleep(value) # return data to write return f'Task {identifier} blocked for {value} of a second.\n' # write to a file from multiple threads def main(): # open a file with open('results.txt', 'a', encoding='utf-8') as handle: # create the thread pool with ThreadPoolExecutor(1000) as executor: # dispatch all tasks futures = [executor.submit(task, i) for i in range(10000)] # process results as tasks are completed for future in as_completed(futures): # retrieve result to write result = future.result() # write to file handle.write(result) # wait for all tasks to complete print('Done.') # entry point if __name__ == '__main__': main() |
Running the example creates the results.txt file as before and writes all task messages directly from the main thread of our program.
Because only a single thread is writing the messages, it is thread-safe and results in crisp one-message-per-line as we would expect.
1 2 3 4 5 6 7 8 9 10 11 |
Task 530 blocked for 0.1459173312697858 of a second. Task 1020 blocked for 0.002690097829839644 of a second. Task 1041 blocked for 0.02761931182005417 of a second. Task 982 blocked for 0.037005242195674914 of a second. Task 609 blocked for 0.1287759330853232 of a second. Task 270 blocked for 0.02273684939048015 of a second. Task 696 blocked for 0.017208115848418748 of a second. Task 903 blocked for 0.07774458649738047 of a second. Task 1135 blocked for 5.8044184067629345e-05 of a second. Task 590 blocked for 0.05840949802243367 of a second. ... |
We can then check the number of lines on the command line:
1 |
cat results.txt | wc -l |
The result confirms that indeed all ten thousand tasks were able to write one message per line without corruption.
1 |
10000 |
Free Concurrent File I/O Course
Get FREE access to my 7-day email course on concurrent File I/O.
Discover patterns for concurrent file I/O, how save files with a process pool, how to copy files with a thread pool, and how to append to a file from multiple threads safely.
Append a File From Multiple Threads Via Queue
Another alternative for writing to a file from multiple threads involves using a queue.
A queue is a data structure, like a list, although it allows you to add items at one end and get items from the other in a first-in-first-out manner (FIFO).
Python provides a number of queues, although the queue.Queue class is a version of a queue that is also thread-safe. This means that that data structure will not become corrupt by multiple threads putting or getting items from the queue concurrently.
The Queue class also provides a facility that allows threads to wait until all tasks that have been put on the queue have been completed, a feature we will require.
Each worker thread can put messages on the queue by calling the put() function. We can then define a separate thread that is responsible for consuming messages from the queue forever via calls to get(). This will return immediately with a message from the queue if available, otherwise will block and return once a message becomes available.
We can adapt our program to use a queue by first creating an instance of a Queue in the main thread.
1 2 3 |
... # create a queue for collecting message to write queue = Queue() |
Next, we can define a function that will run forever and will consume messages from the Queue.
We can loop forever using a while-loop.
1 2 3 4 |
... # loop forever while True: # ... |
Each iteration of the loop, we can get a message from the queue (or block until a message becomes available), then append the message to the file.
1 2 3 4 5 |
... # retrieve a messages from the queue message = queue.get() # write the message to file file_handle.write(message) |
Importantly, we can then mark the message as done. This can be achieved by calling the Queue.task_done() function.
1 2 3 |
... # mark this ask as done queue.task_done() |
This is useful for any threads that are waiting on the queue until all messages that have been sent to the queue have both been retrieved by get() and processed. We will come back to this later.
The file_writer() function below implements this that takes the queue and the file handle as arguments.
1 2 3 4 5 6 7 8 9 10 |
# consume messages from the queue and write to the file def file_writer(queue, file_handle): # loop forever while True: # retrieve a messages from the queue message = queue.get() # write the message to file file_handle.write(message) # mark this ask as done queue.task_done() |
Next, back in the main thread, we can create a new thread that is responsible for running our file_writer() function.
This can be achieved by creating an instance of the threading.Thread class and specifying the “target” argument as the file_writer function and the “args” argument as a tuple that contains the queue and the file handle.
We will also set the “daemon” argument to True so that the thread will close once we exit the main thread. If we didn’t set the thread to be a daemon, it means that the main thread will never exit because this new thread will run forever given our while loop.
1 2 3 4 |
... # start a daemon thread to write queued messages writer = Thread(target=file_writer, args=(queue, handle), daemon=True) writer.start() |
Finally, after all threads have finished running successfully and submitted their messages to the queue, the main thread will wait on the queue for all messages to be consumed and dealt with, e.g. appended to the file.
We can achieve this by calling the join() function on the queue which will block until all items on the queue are marked as done.
Recall that our daemon thread that appends to the file will mark each message as done by a call to the Queue.task_done() function. Think of each put() on the queue as incrementing a counter and each call to task_done() in the daemon thread as decrementing a counter. The call to the join() function in the main thread will block until the counter is zero, meaning all calls to put() on the queue have an equivalent get() and task_done() call.
1 2 3 |
... # after all threads are done, wait for all message to be consumed queue.join() |
Only after all messages have been appended to the file by the daemon thread will this function return.
Importantly, we will join the queue only after all tasks in the thread pool have been completed.
If we tried to join the queue while some tasks were still running, it is possible that we may stop prematurely as some workers may not have put their message on the queue, yet all consumed messages were marked done.
Tying this together, the complete example of writing to a file from multiple threads via a queue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# SuperFastPython.com # write to a file from multiple threads using a queue from time import sleep from random import random from queue import Queue from threading import Thread from concurrent.futures import ThreadPoolExecutor # task that blocks for a moment and write to a file def task(identifier, queue): # block for a moment value = random() sleep(value) # push data into the queue queue.put(f'Task {identifier} blocked for {value} of a second.\n') # consume messages from the queue and write to the file def file_writer(queue, file_handle): # loop forever while True: # retrieve a messages from the queue message = queue.get() # write the message to file file_handle.write(message) # mark this ask as done queue.task_done() # write to a file from multiple threads def main(): # open a file with open('results.txt', 'a', encoding='utf-8') as handle: # create a queue for collecting message to write queue = Queue() # start a daemon thread to write queued messages writer = Thread(target=file_writer, args=(queue, handle), daemon=True) writer.start() # create the thread pool with ThreadPoolExecutor(1000) as executor: # dispatch all tasks _ = [executor.submit(task, i, queue) for i in range(10000)] # after all threads are done, wait for all message to be consumed queue.join() print('Done.') # entry point if __name__ == '__main__': main() |
Running the example opens the file, starts the writer thread, then starts the thread pool and submits all tasks. Only after all messages have been written to file is the file closed and the main thread exits.
Checking the content of the results.txt file confirms one message per line as we would expect.
1 2 3 4 5 6 7 8 9 10 11 |
Task 75 blocked for 0.005800440245577132 of a second. Task 26 blocked for 0.015495757355513562 of a second. Task 213 blocked for 0.006410974121583091 of a second. Task 121 blocked for 0.01226657386657537 of a second. Task 315 blocked for 0.0010640189024394564 of a second. Task 86 blocked for 0.014855929776373511 of a second. Task 110 blocked for 0.02312239182836373 of a second. Task 264 blocked for 0.014721519221568835 of a second. Task 480 blocked for 0.0018767276024731494 of a second. Task 403 blocked for 0.012265893451503307 of a second. ... |
Finally, we can check the number of lines in the file:
1 |
cat results.txt | wc -l |
Which shows that all 10,000 lines were written successfully.
1 |
10000 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Append a File From Multiple Coroutines With Asyncio
We can also append a file concurrently using asyncio.
Generally, Python does not support non-blocking IO operations when working with files. It is not provided in the asyncio module. This is because it is challenging to implement in a general cross-platform manner.
The third-party library aiofiles provides file operations for use in asyncio operations, but again, the operations are not true non-blocking IO and instead the concurrency is simulated using thread pools.
Nevertheless, we can use aiofiles in an asyncio program to append to a file concurrently.
You can install the aiofiles library using your Python package manager, such as pip:
1 |
pip3 install aiofiles |
We will adapt the lock-based file appending example above to use coroutines and aiofiles.
First, we must adapt the task() function that appends the file.
The function must be made a coroutine that can yield control, therefore it must be defined using the async keyword.
1 2 3 |
# task that blocks for a moment and write to a file async def task(identifier, lock, file_handle): # ... |
Next, the function blocks for a moment using a call to sleep(). This can be updated to use the asyncio.sleep() function that we can await, yielding control to other coroutines.
1 2 |
... await asyncio.sleep(value) |
We can then append data to the file.
First, the lock must be acquired.
1 2 3 4 |
... # obtain the lock async with lock: # ... |
Finally, we can append data to the file awaiting the call and yielding control.
1 2 3 |
... # write to a file await file_handle.write(f'Task {identifier} blocked for {value} of a second.\n') |
Tying this together, the updated version of the task() function to append a file using asyncio is listed below.
1 2 3 4 5 6 7 8 9 |
# task that blocks for a moment and write to a file async def task(identifier, lock, file_handle): # block for a moment value = random() await asyncio.sleep(value) # obtain the lock async with lock: # write to a file await file_handle.write(f'Task {identifier} blocked for {value} of a second.\n') |
Next, we need to update the main() function.
We can open the file using the aiofiles.open() function and create a lock to protect appending the file between coroutines using the asyncio.Lock class.
1 2 3 4 |
... async with aiofiles.open('results.txt', 'a', encoding='utf-8') as handle: # create a lock to protect the file lock = asyncio.Lock() |
Next, we can create a list of coroutines, each a call to the task() function that will append the file.
This can be achieved using a list comprehension.
1 2 3 |
... # create coroutines tasks = [task(i, lock, handle) for i in range(10000)] |
Recall that this will create a list of coroutines, it will not execute the task() function.
Finally, we can execute all of the coroutines and wait for them to finish.
This can be achieved by awaiting the asyncio.gather() function and passing it the list of coroutines that can be unpacked using the star (*) operator. The task() function does not return any values, so we can ignore the return value from gather().
1 2 3 |
... # execute tasks and wait for them to finish _ = await asyncio.gather(*tasks) |
Tying this together, the updated main() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# write to a file from multiple threads async def main(): # open a file async with aiofiles.open('results.txt', 'a', encoding='utf-8') as handle: # create a lock to protect the file lock = asyncio.Lock() # create coroutines tasks = [task(i, lock, handle) for i in range(10000)] # execute tasks and wait for them to finish _ = await asyncio.gather(*tasks) print('Done.') |
Finally, we can start the asyncio event loop that will begin executing coroutines by calling the main() function.
1 2 3 |
# entry point if __name__ == '__main__': asyncio.run(main()) |
Tying this all together, the complete example of appending a file concurrently from coroutines using asyncio is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # write to a file concurrently with asyncio and a shared lock from random import random import asyncio import aiofiles # task that blocks for a moment and write to a file async def task(identifier, lock, file_handle): # block for a moment value = random() await asyncio.sleep(value) # obtain the lock async with lock: # write to a file await file_handle.write(f'Task {identifier} blocked for {value} of a second.\n') # write to a file from multiple threads async def main(): # open a file async with aiofiles.open('results.txt', 'a', encoding='utf-8') as handle: # create a lock to protect the file lock = asyncio.Lock() # create coroutines tasks = [task(i, lock, handle) for i in range(10000)] # execute tasks and wait for them to finish _ = await asyncio.gather(*tasks) print('Done.') # entry point if __name__ == '__main__': asyncio.run(main()) |
Running the example creates the results.txt file as before and writes all task messages directly from the main thread of our program.
Because appending the file is protected with the lock, it is thread-safe and the example results in crisp one-message-per-line output.
1 2 3 4 5 6 7 8 9 10 11 |
Task 35 blocked for 0.0061771157918364406 of a second. Task 385 blocked for 0.00516245918413738 of a second. Task 561 blocked for 0.0016374068833635569 of a second. Task 349 blocked for 0.008151823643203437 of a second. Task 515 blocked for 0.0066746106689640206 of a second. Task 126 blocked for 0.010198001664967093 of a second. Task 577 blocked for 0.004251209136449385 of a second. Task 256 blocked for 0.009546150249558849 of a second. Task 667 blocked for 0.00456190424278291 of a second. Task 6 blocked for 0.012846047059235088 of a second. ... |
We can then check the number of lines on the command line:
1 |
cat results.txt | wc -l |
The result confirms that indeed all ten thousand tasks were able to write one message per line without corruption.
1 |
10000 |
Extensions
This section lists ideas for extending the tutorial.
- Open File From Each Thread. Explore the failure cause of opening the target file from each worker thread and attempting to append.
- Use Logging. The Python logging module is thread-safe and supports writing to a single log file from multiple threads. Explore how you can use the logging framework to append messages to the same file from multiple threads.
- Use Processes. Explore how you might write to the same file from multiple workers in a ProcessPoolExecutor.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Concurrent File I/O in Python, Jason Brownlee (my book!)
Guides
Python File I/O APIs
- Built-in Functions
- os - Miscellaneous operating system interfaces
- os.path - Common pathname manipulations
- shutil - High-level file operations
- zipfile — Work with ZIP archives
- Python Tutorial: Chapter 7. Input and Output
Python Concurrency APIs
- threading — Thread-based parallelism
- multiprocessing — Process-based parallelism
- concurrent.futures — Launching parallel tasks
- asyncio — Asynchronous I/O
File I/O in Asyncio
References
Takeaways
In this tutorial you discovered how to explore how to append a file from multiple threads.
Do you have any questions?
Leave your question in a comment below and I will reply fast with my best advice.
Do you have any questions?