Last Updated on August 21, 2023
Saving a single file to disk can be slow.
It can become painfully slow in situations where you may need to save thousands of files. The hope is that multithreading can be used to speed up file saving.
In this tutorial, you will explore how to save thousands of files using multithreading.
Let’s dive in.
Save Files One-By-One (slowly)
Saving a file in Python is relatively straightforward.
First, we can open the file and get a file handle using the built-in open() function. This is best used via the context manager so that the file is closed automatically once we are finished with it.
When calling open() we specify the path of the file to open and the mode in which to open it, such as ‘w’ for writing ASCII data.
1 2 3 4 |
... # open the file with open(filepath, 'w') as handle: # ... |
Once open, we can then write data to the file. Once we exit the context manager block, the file will be closed automatically for us.
1 2 3 |
... # save the data handle.write(data) |
Tying this together, the save_file() function below takes a file path and ASCII data and writes it to a file.
1 2 3 4 5 6 |
# save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) |
Next we need to create some data to write.
To keep things simple we will generate CSV data files composed of lines of numbers separated by commas.
We will seed each file with a unique integer and use that in the calculation of each numeric value, combined with the line number and column number. This is to ensure that the content of each file is unique in order to simulate a moderately real-world scenario.
The generate_file() function below will generate a CSV file in memory and return it as a string that we might later save to a file.
The function takes an identifier which is a numerical seed for the values saved in the file. It also uses default arguments that specify the number of values to create per line (defaults to 10) and the number of lines to create in the file (defaults to 5,000).
This will result in just under one megabyte of CSV data per file.
1 2 3 4 5 6 7 8 9 10 11 |
# generate a csv file with v=10 values per line and l=5k lines def generate_file(identifier, n_values=10, n_lines=5000): # generate many lines of data data = list() for i in range(n_lines): # generate a list of numeric values as strings line = [str(identifier+i+j) for j in range(n_values)] # convert list to string with values separated by commas data.append(','.join(line)) # convert list of lines to a string separated by new lines return '\n'.join(data) |
Next we can create a function that defines the task of generating the data for one file and saving it to disk.
It will first call the generate_file() function to generate data.
1 2 3 |
... # generate data data = generate_file(identifier) |
Next, we will use the os.path.join() function to create a relative file path for the file and construct a unique filename that includes the unique identifier used to seed the content of the file.
1 2 3 |
... # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') |
Finally, the file contents are saved to the file path and progress of the task is reported.
1 2 3 4 5 |
... # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}') |
The generate_and_save() function below ties this together, taking the directory path where we wish to save files and the unique identifier for the file as arguments.
1 2 3 4 5 6 7 8 9 10 |
# generate and save a file def generate_and_save(path, identifier): # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}') |
The final step involves creating the sub-directory where we wish to save all of the files.
First, we can call the os.makedirs() function to create the directory, e.g. tmp/ relative to the current working directory where the Python script is located.
1 2 3 |
... # create a local directory to save files makedirs(path, exist_ok=True) |
Next, we can create a fixed number of files, one-by-one by calling the generate_and_save() function.
1 2 3 4 5 |
... # create all files for i in range(n_files): # generate and save a data file generate_and_save(path, i) |
The main() function implements this and uses default arguments to specify the sub-directory where to save files (e.g. tmp/) and the number of files to create (e.g. 5,000).
1 2 3 4 5 6 7 8 |
# generate many data files in a directory def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # create all files for i in range(n_files): # generate and save a data file generate_and_save(path, i) |
And that’s it.
Tying this together, the complete example of creating 5,000 files sequentially is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# SuperFastPython.com # create a large number of data files sequentially from os import makedirs from os.path import join # save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) # generate a csv file with v=10 values per line and l=5k lines def generate_file(identifier, n_values=10, n_lines=5000): # generate many lines of data data = list() for i in range(n_lines): # generate a list of numeric values as strings line = [str(identifier+i+j) for j in range(n_values)] # convert list to string with values separated by commas data.append(','.join(line)) # convert list of lines to a string separated by new lines return '\n'.join(data) # generate and save a file def generate_and_save(path, identifier): # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}') # generate many data files in a directory def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # create all files for i in range(n_files): # generate and save a data file generate_and_save(path, i) # entry point if __name__ == '__main__': main() |
Running the example first creates the unique content for each file, then saves the content with a unique filename from tmp/data-0000.csv to tmp/data-4999.csv.
Each file is just under one megabyte, and given that we have 5,000 files, that is about 5 gigabytes of data.
1 2 3 4 5 6 7 8 9 10 11 |
... .saved tmp/data-4990.csv .saved tmp/data-4991.csv .saved tmp/data-4992.csv .saved tmp/data-4993.csv .saved tmp/data-4994.csv .saved tmp/data-4995.csv .saved tmp/data-4996.csv .saved tmp/data-4997.csv .saved tmp/data-4998.csv .saved tmp/data-4999.csv |
The program will take a moment to run and the running time will depend on the speed of your hardware, especially your CPU and hard drive speeds.
On my system, it completes in about 63.4 seconds.
1 2 3 |
[Finished in 63.6s] [Finished in 63.3s] [Finished in 63.3s] |
How long did it take to run on your system?
Let me know in the comments below.
Next, let’s consider how we might speed this program up using concurrency.
Run loops using all CPUs, download your FREE book to learn how.
How to Save Files Concurrently
We might think of the task of saving files to disk as being composed of two elements:
- Preparing the data in memory (e.g. generating it in this case).
- Writing data from main memory to disk.
Preparing the data that will be saved is a CPU-bound task. This means that it will be performed as fast as the CPU can run.
We can generate data for files concurrently, but the contents of each file is about one megabyte and will consume main memory. For example, generating all 5,000 files before saving them will occupy at minimum five gigabytes of main memory, and likely a factor of two or more than that.
Saving data to file is IO-bound. This means that the task is limited by the speed of the hardware in transferring data from main memory to the disk. It will run as fast as the disk can lay down bytes, which may be relatively fast for solid-state drives (SSDs) and slower for those with moving parts (spindle disks).
Writing data to disk is likely the slowest part of the program.
Threads would be appropriate for writing files concurrently. This is because it is computationally cheap to create hundreds or thousands of threads and each thread will release the global interpreter lock when performing IO operations like writing to a file, allowing other threads to progress.
Nevertheless, I suspect that most modern hard drives do not support parallel writes. That is, they can probably only write one file (or one byte or word of data) to disk at a time.
This means that even if we can make writing files concurrent using threads, it will very likely not offer any speed-up.
That being said, it does take a little bit of time to prepare the data for each file in memory before writing. It may be possible to focus on this aspect and make it parallel using processes.
Processes would be appropriate for making the data generation concurrent, as each process will be independent (e.g. have an independent global interpreter lock), allowing each to potentially run on a separate CPU core, as chosen by the underlying operating system.
Finally, it may be possible to hybridise the thread and process approaches.
Communication between processes is computationally expensive, so generating data in one process and sending it to another for saving would likely not be viable.
Nevertheless, it may be possible to have data generation occur within each process and have that process save files somewhat concurrently using its own additional worker threads. It really depends on how much effort is required to generate files compared to saving them for this to be effective.
Next, let’s start to explore the use of threads to speed-up file creation and saving.
Save Files Concurrently With Threads
The ThreadPoolExecutor provides a pool of worker threads that we can use to multithread the saving of thousands of data files.
Each file in the tmp/ directory will represent a task that will require the file to be created and saved to disk by a call to our generate_and_save() function.
Adapting our program to use the ThreadPoolExecutor is relatively straightforward.
First, we can create the thread pool and specify the number of worker threads. We will use 100 in this case, chosen arbitrarily.
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(100) as exe: # ... |
Next, we can submit each file creation and saving task to the thread pool for execution.
We can send in all 5,000 tasks and allow the workers to complete them as fast as they can, probably about 50 files per worker.
It might be interesting to test different numbers of worker threads, such as 10, 50, and 1,000 to see if it makes a difference to the run time of the program on your system.
The submit() function can be used for each call to the generate_and_save() function. This will return a Future object that we can ignore as we are not concerned with a return value from the function.
This can be achieved in a list comprehension, for example:
1 2 3 |
... # submit tasks to generate files _ = [exe.submit(generate_and_save, path, i) for i in range(n_files)] |
It might be interesting to adapt the example so that the generate_and_save() function returns the path of the file that was created so that only the main thread can report progress of the task.
Tying this together, the complete example of creating thousands of files concurrently using a thread pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# SuperFastPython.com # create a large number of data files concurrently with threads from os import makedirs from os.path import join from concurrent.futures import ThreadPoolExecutor # save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) # generate a csv file with v=10 values per line and l=5k lines def generate_file(identifier, n_values=10, n_lines=5000): # generate many lines of data data = list() for i in range(n_lines): # generate a list of numeric values as strings line = [str(identifier+i+j) for j in range(n_values)] # convert list to string with values separated by commas data.append(','.join(line)) # convert list of lines to a string separated by new lines return '\n'.join(data) # generate and save a file def generate_and_save(path, identifier): # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}') # generate many data files in a directory def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # create the thread pool with ThreadPoolExecutor(100) as exe: # submit tasks to generate files _ = [exe.submit(generate_and_save, path, i) for i in range(n_files)] # entry point if __name__ == '__main__': main() |
Running the example generates and saves all 5,000 files as we did in the sequential case previously.
1 2 3 4 5 6 7 8 9 10 11 |
... .saved tmp/data-4945.csv .saved tmp/data-4953.csv .saved tmp/data-4925.csv .saved tmp/data-4986.csv .saved tmp/data-4869.csv .saved tmp/data-4987.csv .saved tmp/data-4994.csv .saved tmp/data-4977.csv .saved tmp/data-4960.csv .saved tmp/data-4963.csv |
In this case, no significant difference was noted with this multithreaded version compared to the sequential version above.
On my system, the program took about 64.6 seconds, compared to the 63.4 seconds for the sequential case. The difference is probably within the margin of measurement error.
1 2 3 |
[Finished in 64.5s] [Finished in 65.8s] [Finished in 63.5s] |
Perhaps indeed we cannot achieve parallel writes to disk. Also, perhaps data generation is a major factor in this worked example.
Next, let’s explore the use of processes instead.
Free Concurrent File I/O Course
Get FREE access to my 7-day email course on concurrent File I/O.
Discover patterns for concurrent file I/O, how save files with a process pool, how to copy files with a thread pool, and how to append to a file from multiple threads safely.
Save Files Concurrently with Processes
We can try to speed-up file saving by making it concurrent with processes instead of threads.
This can be achieved using the ProcessPoolExecutor that has an identical interface to the ThreadPoolExecutor, allowing us to switch the classes directly
We will use 8 processes in this case as I have 8 logical CPU cores.
You may want to try different configurations based on the number of logical CPU cores you have available. You may also want to explore different configurations, such as matching the number of processes to the number of physical, rather than logical CPU cores, and having many more processes than cores.
1 2 3 4 |
... # create the thread pool with ProcessPoolExecutor(8) as exe: # ... |
It’s also a good idea to flush output when reporting progress on stdout when using processes. For example, the print() statement in the generate_and_save() function can be updated as follows:
1 2 3 |
... # report progress print(f'.saved {filepath}', flush=True) |
Tying this together, the complete example of saving files concurrently using the process pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# SuperFastPython.com # create a large number of data files concurrently with processes from os import makedirs from os.path import join from concurrent.futures import ProcessPoolExecutor # save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) # generate a csv file with v=10 values per line and l=5k lines def generate_file(identifier, n_values=10, n_lines=5000): # generate many lines of data data = list() for i in range(n_lines): # generate a list of numeric values as strings line = [str(identifier+i+j) for j in range(n_values)] # convert list to string with values separated by commas data.append(','.join(line)) # convert list of lines to a string separated by new lines return '\n'.join(data) # generate and save a file def generate_and_save(path, identifier): # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}', flush=True) # generate many data files in a directory def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # create the process pool with ProcessPoolExecutor(8) as exe: # submit tasks to generate files _ = [exe.submit(generate_and_save, path, i) for i in range(n_files)] # entry point if __name__ == '__main__': main() |
Running the example generates and saves 5,000 files as before.
1 2 3 4 5 6 7 8 9 10 11 |
... .saved tmp/data-4990.csv .saved tmp/data-4991.csv .saved tmp/data-4993.csv .saved tmp/data-4994.csv .saved tmp/data-4992.csv .saved tmp/data-4995.csv .saved tmp/data-4996.csv .saved tmp/data-4997.csv .saved tmp/data-4998.csv .saved tmp/data-4999.csv |
Interestingly, we do see a dramatic speed-up in this case.
On my system, the run time with processes was about 21.43 seconds, compared to 63.4 seconds for the sequential case. This is about a 2.96x speedup.
1 2 3 |
[Finished in 21.8s] [Finished in 21.3s] [Finished in 21.2s] |
This highlights that indeed that generating the data is a large part of the program and that it can be dramatically sped-up by performing this aspect in parallel.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Save Files Concurrently with Processes in Batch
Perhaps we can get some small benefit by allowing each process to save a batch of files rather than a single file per task.
This can be achieved by pre-defining all of the files that need to be created, e.g. a list of file identifiers/seeds, then splitting these identifiers up by the number of processes and sending each process a “task” consisting of a block of files to generate and save.
This may offer some small benefit as rather than creating and transmitting 5,000 discrete tasks from the main thread to 8 worker processes in the process pool, we can distribute just 8 tasks of (5,000/8) or 625 files.
The example from the previous section can be adapted to generate and save files in batches.
First, we can update the generate_and_save() function to take a list of identifiers as an argument, then iterate over the list and generate and save each file in turn.
1 2 3 4 5 6 7 8 9 10 11 12 |
# generate and save a file def generate_and_save(path, identifiers): # generate each file for identifier in identifiers: # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}', flush=True) |
Next, we can update the main() function to first pre-generate the list of file identifiers.
1 2 3 |
... # generate the file identifiers identifiers = [i for i in range(n_files)] |
We can parameterize the number of worker processes and use this along with the number of files to generate to calculate the size of each chunk. That is, the number of files to generate in each worker process task.
1 2 3 4 |
... # determine chunksize n_workers = 8 chunksize = round(len(identifiers) / n_workers) |
We can then create the process pool using the parametrized number of workers.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(n_workers) as exe: # ... |
Finally, we can iterate over all identifiers by the chunksize in a for loop and use the index of the loop to cut the list of file identifiers into chunks and send each into the process pool.
1 2 3 4 5 6 7 |
... # split the rename operations into chunks for i in range(0, len(identifiers), chunksize): # select a chunk of filenames ids = identifiers[i:(i + chunksize)] # submit file rename tasks _ = exe.submit(generate_and_save, path, ids) |
Tying this together, the complete example of generating and saving files concurrently in batch using a process pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# SuperFastPython.com # create a large number of data files concurrently with processes in batch from os import makedirs from os.path import join from concurrent.futures import ProcessPoolExecutor # save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) # generate a csv file with v=10 values per line and l=5k lines def generate_file(identifier, n_values=10, n_lines=5000): # generate many lines of data data = list() for i in range(n_lines): # generate a list of numeric values as strings line = [str(identifier+i+j) for j in range(n_values)] # convert list to string with values separated by commas data.append(','.join(line)) # convert list of lines to a string separated by new lines return '\n'.join(data) # generate and save a file def generate_and_save(path, identifiers): # generate each file for identifier in identifiers: # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}', flush=True) # generate many data files in a directory def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # generate the file identifiers identifiers = [i for i in range(n_files)] # determine chunksize n_workers = 8 chunksize = round(len(identifiers) / n_workers) # create the process pool with ProcessPoolExecutor(n_workers) as exe: # split the rename operations into chunks for i in range(0, len(identifiers), chunksize): # select a chunk of filenames ids = identifiers[i:(i + chunksize)] # submit file rename tasks _ = exe.submit(generate_and_save, path, ids) # entry point if __name__ == '__main__': main() |
Running the example generates and saves all 5,000 files into the tmp/ subdirectory as before.
1 2 3 4 5 6 7 8 9 10 11 |
... .saved tmp/data-4999.csv .saved tmp/data-0619.csv .saved tmp/data-1249.csv .saved tmp/data-4373.csv .saved tmp/data-0620.csv .saved tmp/data-4374.csv .saved tmp/data-0621.csv .saved tmp/data-0622.csv .saved tmp/data-0623.csv .saved tmp/data-0624.csv |
In this case, we don’t see any clear benefit over using the process pool with one task per file.
On my system it took about 20.86 seconds to complete, compared to 21.43 seconds in the previous sections. This might be a real by relatively minor speed increase, although is more likely within the margin of measurement error.
1 2 3 |
[Finished in 20.4s] [Finished in 21.0s] [Finished in 21.2s] |
Save Files Concurrently with Processes and Threads in Batch
Another idea might be to combine a process pool with thread pools.
That is, each worker process can be responsible for a batch of files to generate and save, and use a thread pool within the process to concurrently save each file after it is generated.
This would only provide a benefit if the file saving process is slow relative to the data generation process, allowing multiple save operations to be submitted to a thread pool and executed concurrently.
We can adapt the batch file saving using processes in the previous section to achieve this.
The only change required is to the generate_and_save() function. We must split out the file saving operation as a function (e.g. save and report progress) and then submit this task to a thread pool for each generated file.
First, we can define a new function that takes a file path and the data, calls our save_file() function and then reports progress. If we didn’t want to report progress, we could just call the save_file() function directly and avoid creating a new function.
Nevertheless, the new save_task() function below implements this.
1 2 3 4 5 6 |
# save a file to a file def save_task(filepath, data): # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}', flush=True) |
Next, we can update the generate_and_save() function to first create a thread pool with some number of worker threads. Only a few workers are required, although we will use 10 in this case, chosen arbitrarily.
It may be interesting to experiment with different numbers of worker threads, along with different numbers of worker processes, to see if it makes a difference to the overall run time.
1 2 3 4 |
... # create a thread pool for saving files with ThreadPoolExecutor(10) as exe: # ... |
We can then iterate over the list of identifiers provided to the process in batch and generate the data for each file as per normal. Finally, we can submit the call to the save_task() function to the thread pool to concurrently save the file to disk while we begin to generate the data for the next file.
1 2 3 |
... # submit the save task to the thread pool _ = exe.submit(save_task, filepath, data) |
The updated version of the generate_and_save() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# generate and save a file def generate_and_save(path, identifiers): # create a thread pool for saving files with ThreadPoolExecutor(10) as exe: # generate each file for identifier in identifiers: # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # submit the save task to the thread pool _ = exe.submit(save_task, filepath, data) |
Tying this together, the complete example of generating and saving files using a process pool, with a thread pool in each worker process is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# SuperFastPython.com # create a large number of data files concurrently with process and thread pools from os import makedirs from os.path import join from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor # save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) # generate a csv file with v=10 values per line and l=5k lines def generate_file(identifier, n_values=10, n_lines=5000): # generate many lines of data data = list() for i in range(n_lines): # generate a list of numeric values as strings line = [str(identifier+i+j) for j in range(n_values)] # convert list to string with values separated by commas data.append(','.join(line)) # convert list of lines to a string separated by new lines return '\n'.join(data) # save a file to a file def save_task(filepath, data): # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}', flush=True) # generate and save a file def generate_and_save(path, identifiers): # create a thread pool for saving files with ThreadPoolExecutor(10) as exe: # generate each file for identifier in identifiers: # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # submit the save task to the thread pool _ = exe.submit(save_task, filepath, data) # generate many data files in a directory def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # generate the file identifiers identifiers = [i for i in range(n_files)] # determine chunksize n_workers = 8 chunksize = round(len(identifiers) / n_workers) # create the process pool with ProcessPoolExecutor(n_workers) as exe: # split the rename operations into chunks for i in range(0, len(identifiers), chunksize): # select a chunk of filenames ids = identifiers[i:(i + chunksize)] # submit file rename tasks _ = exe.submit(generate_and_save, path, ids) # entry point if __name__ == '__main__': main() |
Running the example generates the data and saves all files to the tmp/ subdirectory as with all of the previous cases.
1 2 3 4 5 6 7 8 9 10 11 |
... .saved tmp/data-3120.csv .saved tmp/data-4995.csv .saved tmp/data-4996.csv .saved tmp/data-4997.csv .saved tmp/data-3121.csv .saved tmp/data-4998.csv .saved tmp/data-4999.csv .saved tmp/data-3122.csv .saved tmp/data-3123.csv .saved tmp/data-3124.csv |
In this case, we don’t see any significant improvement over the batch process pool example in the previous section.
On my system it takes about 20.56 seconds, compared to 20.86 seconds with the batch process mode, which is likely within the margin of measurement error.
1 2 3 |
[Finished in 19.9s] [Finished in 20.7s] [Finished in 21.1s] |
It may be interesting to explore a pipeline design where worker processes generate the data for files and worker threads consume the generated data and save files. It may require careful control over the number of concurrent files that may be generated at a time to ensure that it does not consume too much memory.
Save Files Concurrently with AsyncIO
We can also save files concurrently using asyncio.
Generally, Python does not support non-blocking IO operations when working with files. It is not provided in the asyncio module. This is because it is challenging to implement in a general cross-platform manner.
The third-party library aiofiles provides file operations for use in asyncio operations, but again, the operations are not true non-blocking IO and instead the concurrency is simulated using thread pools.
Nevertheless, we can use aiofiles in an asyncio program to save files concurrently.
You can install the aiofiles library using your Python package manager, such as pip:
1 |
pip3 install aiofiles |
A few minor changes are required to our program to use asyncio and aiofiles.
We must convert the save_file() function to use the aiofiles library to open and save the file.
First, we can update the save_file() function definition to use the “async” keyword so that it becomes a coroutine that can yield control.
1 2 3 |
# save data to a file async def save_file(filepath, data): # ... |
Next, we can change the open() function for opening the file to use the aiofile.open() function instead and change the context manager itself to be another coroutine.
1 2 3 4 |
... # open the file async with aiofiles.open(filepath, 'w') as handle: # ... |
We can then await the call to write to yield control.
1 2 3 |
... # save the data await handle.write(data) |
Tying this together, the updated version of the save_file() function is listed below.
1 2 3 4 5 6 |
# save data to a file async def save_file(filepath, data): # open the file async with aiofiles.open(filepath, 'w') as handle: # save the data await handle.write(data) |
Next we need to update the generate_and_save() function.
Only an async function can call another async function. The generate_and_save() calls our async save_file() function, therefore we must update it too to be an async function.
1 2 3 |
# generate and save a file async def generate_and_save(path, identifier): # ... |
We can then call the save_file() function and await the call which will create a coroutine and yield control.
1 2 3 |
... # save data file to disk await save_file(filepath, data) |
The updated version of the generate_and_save() function is listed below.
1 2 3 4 5 6 7 8 9 10 |
# generate and save a file async def generate_and_save(path, identifier): # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk await save_file(filepath, data) # report progress print(f'.saved {filepath}') |
The main function must be updated to also be an async function and to create and execute a coroutine for each file to be created.
This can be achieved by first creating a list of coroutine calls to the generate_and_save() via a list comprehension.
1 2 3 |
... # create one coroutine for each file to rename tasks = [generate_and_save(path, i) for i in range(n_files)] |
Recall, this will only create coroutines, it will not call the function.
Next, we can execute all of the coroutines concurrently using the asyncio.gather() function and unpack the list of coroutines using the start operator.
1 2 3 |
... # execute and wait for all coroutines await asyncio.gather(*tasks) |
The updated version of the main() function is listed below.
1 2 3 4 5 6 7 8 |
# generate many data files in a directory async def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # create one coroutine for each file to rename tasks = [generate_and_save(path, i) for i in range(n_files)] # execute and wait for all coroutines await asyncio.gather(*tasks) |
We can then update the call to the main() function to start the asyncio event loop to run the coroutines.
1 2 3 4 |
# entry point if __name__ == '__main__': # execute the asyncio run loop asyncio.run(main()) |
Tying this together, the complete example of creating 5,000 files concurrently using asyncio with aiofiles is listed below.
There’s just one problem. It does not work.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# SuperFastPython.com # create a large number of data files concurrently with asyncio from os import makedirs from os.path import join from random import random import aiofiles import asyncio # save data to a file async def save_file(filepath, data): # open the file async with aiofiles.open(filepath, 'w') as handle: # save the data await handle.write(data) # generate a csv file with v=10 values per line and l=5k lines def generate_file(identifier, n_values=10, n_lines=5000): # generate many lines of data data = list() for i in range(n_lines): # generate a list of numeric values as strings line = [str(identifier+i+j) for j in range(n_values)] # convert list to string with values separated by commas data.append(','.join(line)) # convert list of lines to a string separated by new lines return '\n'.join(data) # generate and save a file async def generate_and_save(path, identifier): # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk await save_file(filepath, data) # report progress print(f'.saved {filepath}') # generate many data files in a directory async def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # create one coroutine for each file to rename tasks = [generate_and_save(path, i) for i in range(n_files)] # execute and wait for all coroutines await asyncio.gather(*tasks) # entry point if __name__ == '__main__': # execute the asyncio run loop asyncio.run(main()) |
Running the example looks like it works, but it doesn’t.
For about a minute, the program will create the 5,000 files, but it will fail to write any data to the files.
It will then fail with an error, as follows (or similar):
1 2 3 |
Traceback (most recent call last): ... OSError: [Errno 24] Too many open files: 'tmp/data-2554.csv' |
The reason for the error is because the program ran 5,000 coroutines concurrently, each attempting to open a file and write data.
This fails because the operating system limits the maximum number of files that can be open simultaneously, e.g. 256 on my MacOS system.
Given we know the cause of the failure, the fix is straightforward.
We can limit the maximum number of concurrent open files by in turn limiting the maximum number of concurrent coroutines executing the generate_and_save() function.
This can be achieved with a asyncio.Semaphore instance.
We can create a Semaphore in the main() function and configure it with the maximum number of concurrent open files, e.g. 50.
1 2 3 |
... # create a semaphore to limit concurrent file creations semaphore = asyncio.Semaphore(50) |
We can then pass the semaphore to the generate_and_save() function and require that the coroutine acquire a resource from the semaphore before executing the body of the function, e.g. creating data and writing it to file.
1 2 3 |
# generate and save a file async def generate_and_save(path, identifier, semaphore): # ... |
This can be achieved using a context manager for the semaphore which will acquire a resource and will release it once the block is exited.
The updated version of the generate_and_save() function that limits the number of concurrent files that can be created is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# generate and save a file async def generate_and_save(path, identifier, semaphore): # acquire the semaphore async with semaphore: # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}') |
We could have used the semaphore further down the call graph, such as in the save_file() function, but it would mean that there would be upto 5,000 files worth of data in memory waiting to be written, which would not be desirable.
Tying this together, the complete example of creating and writing 5,000 files concurrently with asyncio that limits the maximum number of currently open files is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# SuperFastPython.com # create a large number of data files concurrently with asyncio from os import makedirs from os.path import join from random import random import aiofiles import asyncio # save data to a file async def save_file(filepath, data): # open the file async with aiofiles.open(filepath, 'w') as handle: # save the data await handle.write(data) # generate a csv file with v=10 values per line and l=5k lines def generate_file(identifier, n_values=10, n_lines=5000): # generate many lines of data data = list() for i in range(n_lines): # generate a list of numeric values as strings line = [str(identifier+i+j) for j in range(n_values)] # convert list to string with values separated by commas data.append(','.join(line)) # convert list of lines to a string separated by new lines return '\n'.join(data) # generate and save a file async def generate_and_save(path, identifier, semaphore): # acquire the semaphore async with semaphore: # generate data data = generate_file(identifier) # create a unique filename filepath = join(path, f'data-{identifier:04d}.csv') # save data file to disk save_file(filepath, data) # report progress print(f'.saved {filepath}') # generate many data files in a directory async def main(path='tmp', n_files=5000): # create a local directory to save files makedirs(path, exist_ok=True) # create a semaphore to limit concurrent file creations semaphore = asyncio.Semaphore(50) # create one coroutine for each file to rename tasks = [generate_and_save(path, i, semaphore) for i in range(n_files)] # execute and wait for all coroutines await asyncio.gather(*tasks) # entry point if __name__ == '__main__': # execute the asyncio run loop asyncio.run(main()) |
Running the example creates all files in the tmp/ directory as we would expect.
1 2 3 4 5 6 7 8 9 10 11 |
... .saved tmp/data-4989.csv .saved tmp/data-4990.csv .saved tmp/data-4993.csv .saved tmp/data-4992.csv .saved tmp/data-4994.csv .saved tmp/data-4995.csv .saved tmp/data-4996.csv .saved tmp/data-4998.csv .saved tmp/data-4999.csv .saved tmp/data-4997.csv |
In this case, the example is about as fast as the single-threaded version and the multi-threaded version, offering no speed improvement.
On my system, the example took about 62.36 seconds, compared to about 63.4 seconds with the single-threaded version and about 64.6 seconds for the multi-threaded version, all probably within the margin of measurement error.
Nevertheless, it was an interesting exercise on how to achieve the same result using asyncio.
1 2 3 |
[Finished in 66.1s] [Finished in 63.8s] [Finished in 61.6s] |
Comparison of Results
We have performed the same file save operations using a suite of different parallel methods.
We can compare the results and get some idea of those methods that may offer a performance benefit over the naive sequential method.
The table below provides a summary of the results.
1 2 3 4 5 6 7 8 |
Method | Avg Time (sec) | Speed-Up --------------------------------------------- Sequential | 63.4 | n/a Threads | 64.6 | 0.98x Processes | 21.43 | 2.96x Processes/Batch | 20.86 | 3.04x Processes/Threads | 20.56 | 3.08x Asyncio | 62.36 | 1.02x |
The results suggest using processes is the faster approach, perhaps saving files in batch or using threads.
Using threads or asyncio directly does not appear to offer a benefit in this case.
Extensions
This section lists ideas for extending the tutorial.
- Vary Number of Thread Workers. Update the multithreaded example to test different numbers of worker threads such as 10, 50, 1000, and more to see if it makes a difference to the run time.
- Vary Number of Process Workers. Update the multiprocess example to test different numbers of worker processes such as 2, 4, 10, 50, and more to see if it makes a difference to the run time.
- Vary Process and Thread Workers. Update the combined process and thread pool example with different numbers of worker processes and worker threads per process to see if it makes a difference to the run time.
- Explore a Pipeline. Develop a pipeline with processes for generating files and threads for saving files with rate limiting to ensure that not too many files are stored in memory at a time.
Share your extensions in the comments below, it would be great to see what you come up with.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Concurrent File I/O in Python, Jason Brownlee (my book!)
Guides
Python File I/O APIs
- Built-in Functions
- os - Miscellaneous operating system interfaces
- os.path - Common pathname manipulations
- shutil - High-level file operations
- zipfile — Work with ZIP archives
- Python Tutorial: Chapter 7. Input and Output
Python Concurrency APIs
- threading — Thread-based parallelism
- multiprocessing — Process-based parallelism
- concurrent.futures — Launching parallel tasks
- asyncio — Asynchronous I/O
File I/O in Asyncio
References
Takeaways
In this tutorial you discovered how to explore using multithreading for saving thousands of files.
Do you have any questions?
Leave your question in a comment below and I will reply fast with my best advice.
Photo by Devon Janse van Rensburg on Unsplash
Do you have any questions?