Last Updated on August 21, 2023
Loading files from disk in Python is typically a slow operation.
It can become painfully slow in situations where you may need to load thousands of files into memory. The hope is that multi-threading the file loading will speed up the operation.
In this tutorial, you will discover how to explore concurrent file loading in Python.
Let’s dive in.
Create 10,000 Files (to load)
Before we can load thousands of files from disk, we must create them.
We can write a script that will create 10,000 CSV files, each file with 10,000 lines and each line containing 10 random numeric values.
First, we can create a function that will take a file path and string data and save it to file.
The save_file() function below implements this, taking the full path and data, opening the file in ASCII format, then saving the data to the file. The context manager is used so that the file is closed automatically.
1 2 3 4 5 6 |
# save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) |
Next, we can generate the data to be saved.
First, we can write a function to generate a single line of data. In this case, a line is composed of 10 random numeric values in the range between 0 and 1, stored in CSV format (e.g. separated by a comma per value).
The generate_line() function below implements this, returning a string of random data.
1 2 3 |
# generate a line of mock data of 10 random data points def generate_line(): return ','.join([str(random()) for _ in range(10)]) |
Next, we can write a function to generate the contents of one file.
This will be 10,000 lines of random values, where each line is separated by a new line.
The generate_file_data() function below implements this, returning a string representing the data for a single data file.
1 2 3 4 5 6 |
# generate file data of 10K lines each with 10 data points def generate_file_data(): # generate many lines of data lines = [generate_line() for _ in range(10000)] # convert to a single ascii doc with new lines return '\n'.join(lines) |
Finally, we can generate data for all of the files and save each with a separate file name.
First, we can create a directory to store all of the created files (e.g. ‘tmp‘) under the current working directory.
1 2 3 |
... # create a local directory to save files makedirs(path, exist_ok=True) |
We can then loop 10,000 times and create the data and a unique filename for each iteration, then save the generated contents of the file to disk.
1 2 3 4 5 6 7 8 9 10 11 |
... # create all files for i in range(10000): # generate data data = generate_file_data() # create filenames filepath = join(path, f'data-{i:04d}.csv') # save data file save_file(filepath, data) # report progress print(f'.saved {filepath}') |
The generate_all_files() function listed below implements this, creating the 10,000 files of 10,000 lines of data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# generate 10K files in a directory def generate_all_files(path='tmp'): # create a local directory to save files makedirs(path, exist_ok=True) # create all files for i in range(10000): # generate data data = generate_file_data() # create filenames filepath = join(path, f'data-{i:04d}.csv') # save data file save_file(filepath, data) # report progress print(f'.saved {filepath}') |
Tying this together, the complete example of creating a large number of CSV files is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# SuperFastPython.com # create many files that we can open later from os import makedirs from os.path import join from random import random # save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) # generate a line of mock data of 10 random data points def generate_line(): return ','.join([str(random()) for _ in range(10)]) # generate file data of 10K lines each with 10 data points def generate_file_data(): # generate many lines of data lines = [generate_line() for _ in range(10000)] # convert to a single ascii doc with new lines return '\n'.join(lines) # generate 10K files in a directory def generate_all_files(path='tmp'): # create a local directory to save files makedirs(path, exist_ok=True) # create all files for i in range(10000): # generate data data = generate_file_data() # create filenames filepath = join(path, f'data-{i:04d}.csv') # save data file save_file(filepath, data) # report progress print(f'.saved {filepath}') # entry point, generate all of the files generate_all_files() |
Running the example will create 10,000 CSV files of random data into a tmp/ directory.
It takes a long time to run, depending on the speed of your hard drive.
On my system, it takes about 11 minutes to complete.
1 2 3 4 5 6 7 8 9 10 11 |
... .saved tmp/data-9990.csv .saved tmp/data-9991.csv .saved tmp/data-9992.csv .saved tmp/data-9993.csv .saved tmp/data-9994.csv .saved tmp/data-9995.csv .saved tmp/data-9996.csv .saved tmp/data-9997.csv .saved tmp/data-9998.csv .saved tmp/data-9999.csv |
Next, we can explore ways of loading all of these files into memory.
Run loops using all CPUs, download your FREE book to learn how.
Load Files One-by-One (slowly)
There are many applications where we need to load many files into memory.
A common example is loading a large number of CSV files into a large in-memory dataset in order to perform some analysis or modelling.
First, we can develop a function to load a single file into memory using the open() built-in function.
The load_file() function below implements this, taking the file path of the file to load and returning the contents of the file.
The file is opened as an ASCII file and returns the contents as a string. The context manager is used when opening the file, to ensure that the file handle is closed once we are finished with it.
1 2 3 4 5 6 |
# open a file and return the contents def load_file(filepath): # open the file with open(filepath, 'r') as handle: # return the contents return handle.read() |
We can then call this function for each file listed in a directory, such as ‘tmp‘ we created in the previous section.
First, we can create a list of all file paths that need to be loaded.
1 2 3 |
... # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] |
We can then iterate over this list of paths and load each file in turn and report progress along the way.
1 2 3 4 5 6 7 |
... # load each file in the directory for filepath in paths: # open the file and load the data data = load_file(filepath) # report progress print(f'.loaded {filepath}') |
The load_all_files() function below implements this, loading all files in the directory and reporting progress along the way.
1 2 3 4 5 6 7 8 9 10 11 |
# load all files in a directory into memory def load_all_files(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # load each file in the directory for filepath in paths: # open the file and load the data data = load_file(filepath) # report progress print(f'.loaded {filepath}') print('Done') |
Tying this together, the complete example of loading all 10,000 files in the directory sequentially is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # load many files sequentially from os import listdir from os.path import join # open a file and return the contents def load_file(filepath): # open the file with open(filepath, 'r') as handle: # return the contents return handle.read() # load all files in a directory into memory def load_all_files(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # load each file in the directory for filepath in paths: # open the file and load the data data = load_file(filepath) # report progress print(f'.loaded {filepath}') print('Done') # entry point if __name__ == '__main__': main() |
Running the example loads all files in the directory into memory.
The files are loaded in the order that files are returned from the listdir() order, probably not ordered by filename.
1 2 3 4 5 6 7 8 9 10 11 |
... .loaded tmp/data-5832.csv .loaded tmp/data-2185.csv .loaded tmp/data-5826.csv .loaded tmp/data-2191.csv .loaded tmp/data-1498.csv .loaded tmp/data-0786.csv .loaded tmp/data-7957.csv .loaded tmp/data-6491.csv .loaded tmp/data-5198.csv .loaded tmp/data-4286.csv |
It takes a moment to complete.
On my system, all files are loaded in about 10.7 seconds.
1 2 3 |
[Finished in 10.7s] [Finished in 10.7s] [Finished in 10.7s] |
How long does it take on your system?
Let me know in the comments below.
Next, let’s explore whether we can speed up the program by using multithreading.
How to Load Files Concurrently
Loading files involves copying data from disk into main memory.
We would expect that this is an IO-bound task. This is because the speed of loading a file is bounded by the speed that the hard drive can locate the file, read the bytes of the file and transfer those bytes into main memory. This is compared to CPU-bound tasks that run as fast as the CPU.
This suggests that using threads is appropriate instead of processes that are better suited to CPU-bound tasks. It also means we can have many more threads than we do CPUs, e.g. perhaps hundreds or thousands of threads in this case.
Naively, we might reasonably expect that hard drives are only able to load a single file into main memory at a time.
If true, this suggests that although loading files might be made concurrent with threads or processes, we may not expect any speed benefit because load operations cannot be performed in parallel.
Nevertheless, there may be some internal buffering or some ability to batch load operations by the operating system or hard disk hardware that we could exploit.
Now that we have some expectations of the effect of concurrent file loading, let’s explore a worked example.
Free Concurrent File I/O Course
Get FREE access to my 7-day email course on concurrent File I/O.
Discover patterns for concurrent file I/O, how save files with a process pool, how to copy files with a thread pool, and how to append to a file from multiple threads safely.
Load Files Concurrently with Threads
The ThreadPoolExecutor provides a pool of worker threads that we can use to multithread the loading of thousands of data files.
Each file in the tmp/ directory will represent a task that will require the file to be loaded from disk into main memory.
First, we can define a function that takes the path of a file to load and performs the read operation returning the contents of the file and the file path.
1 2 3 4 5 6 |
# open a file and return the contents def load_file(filepath): # open the file with open(filepath, 'r') as handle: # return the contents return handle.read(), filepath |
Next, we can call this function for each file in the tmp/ directory.
First, we can create the thread pool with ten worker threads. We will use the context manager to ensure the thread pool is closed automatically once all pending and running tasks have been completed.
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(10) as executor: # ... |
We can then call the submit() function for each file to be loaded, calling the load_file() function with the file path as an argument.
This can be achieved in a list comprehension, resulting in a list of Future objects.
1 2 3 |
... # submit all tasks futures = [executor.submit(load_file, p) for p in paths] |
We can then iterate over the Future objects and get the loaded data and file path information and report progress. This is to simulate some use of the loaded data, like collating it into one in-memory data structure or using the file contents to compute something.
The as_completed() function lists us iterating over the tasks in the order that they are completed.
1 2 3 4 5 6 7 |
... # process all results for future in as_completed(futures): # open the file and load the data data, filepath = future.result() # report progress print(f'.loaded {filepath}') |
Tying this together, the complete example of multithreaded loading of files from one directory into memory is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # load many files concurrently with threads from os import listdir from os.path import join from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # open a file and return the contents def load_file(filepath): # open the file with open(filepath, 'r') as handle: # return the contents return handle.read(), filepath # load all files in a directory into memory def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # create the thread pool with ThreadPoolExecutor(10) as executor: # submit all tasks futures = [executor.submit(load_file, p) for p in paths] # process all results for future in as_completed(futures): # open the file and load the data data, filepath = future.result() # report progress print(f'.loaded {filepath}') print('Done') # entry point if __name__ == '__main__': main() |
Running the example loads all 10,000 files into memory and reports a message as the contents of each file is made available.
1 2 3 4 5 6 7 8 9 10 11 12 |
… .loaded tmp/data-2185.csv .loaded tmp/data-5826.csv .loaded tmp/data-5832.csv .loaded tmp/data-1498.csv .loaded tmp/data-2191.csv .loaded tmp/data-6491.csv .loaded tmp/data-7957.csv .loaded tmp/data-4286.csv .loaded tmp/data-5198.csv .loaded tmp/data-0786.csv Done |
In this case, the operation is performed relatively quickly, but not quite as fast as the single-threaded example.
On my system it takes about 11.93 seconds, compared to 10.7 for the sequential case, or about 1.1x slower.
1 2 3 |
[Finished in 12.2s] [Finished in 11.7s] [Finished in 11.9s] |
Perhaps 10 threads is too few.
We can run the example again with 100 threads instead to see if it makes any difference.
This requires a single change to the number of worker threads in the constructor to the ThreadPoolExecutor, for example:
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(100) as executor: # ... |
Running the example loads all files from the source directory, as before.
Using 100 threads does result in a small increase in performance compared to using 10 threads.
On my system it takes about 11.26 seconds compared to 10 threads that took about 11.93 seconds, but this may be within the margin of measurement error. Compared to the single-threaded version it is about 1.1x slower.
1 2 3 |
[Finished in 11.4s] [Finished in 11.1s] [Finished in 11.3s] |
Next, let’s see if we can get any benefit from batching file loads into chunks for the worker threads.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Load Files Concurrently with Threads in Batch
Each task in the thread pool adds overhead that slows down the overall task.
This includes function calls and object creation for each task, occurring internally within the ThreadPoolExecutor class.
We might reduce this overhead by performing file operations in a batch within each worker thread.
This can be achieved by updating the load_file() function to receive a list of files to load, and splitting up the files in the main() function into chunks to be submitted to worker threads for batch processing.
The hope is that this would reduce some of the overhead required for submitting so many small tasks by replacing it with a few larger batched tasks.
First, we can change the target function to receive a list of file paths to load, listed below.
1 2 3 4 5 6 7 8 9 10 |
# open a file and return the contents def load_files(filepaths): data_list = list() # load each file for filepath in filepaths: # open the file with open(filepath, 'r') as handle: # return the contents data_list.append(handle.read()) return data_list, filepaths |
Next, in the main() function we can select a chunksize based on the number of worker threads and the number of files to load.
In this case, we will use 100 worker threads with 10,000 files to load. Dividing the files evenly between workers (10,000 / 100) or 100 files to load for each worker.
This means that we may have up to 10,000 * 2 megabyte files (20 gigabytes) in memory at one time in the worst case. If you don’t have a ton of main memory, you might want to set a different value to chunksize, like 10 or 50.
1 2 3 4 |
... # determine chunksize n_workers = 100 chunksize = round(len(files) / n_workers) |
Next, we can create the thread pool with the parameterized number of worker threads.
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(n_workers) as executor: ... |
Next, we can iterate over the list of files to load and split them into chunks defined by a chunksize.
This can be achieved by iterating over the list of files and using the chunksize as the increment size in a for-loop. We can then split off a chunk of files to send to a worker thread as a task.
We will keep track of the Future objects for each task for iterating later.
1 2 3 4 5 6 7 8 9 |
... futures = list() # split the load operations into chunks for i in range(0, len(paths), chunksize): # select a chunk of filenames filepaths = paths[i:(i + chunksize)] # submit the task future = executor.submit(load_files, filepaths) futures.append(future) |
Finally, we can iterate the Future objects in the order of task completion, as before, but this time, report the file paths loaded by each task.
1 2 3 4 5 6 7 8 |
... # process all results for future in as_completed(futures): # open the file and load the data _, filepaths = future.result() for filepath in filepaths: # report progress print(f'.loaded {filepath}') |
And that’s it.
The complete example of loading files in a batch concurrently using worker threads is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# SuperFastPython.com # load many files concurrently with threads in batch from os import listdir from os.path import join from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # open a file and return the contents def load_files(filepaths): data_list = list() # load each file for filepath in filepaths: # open the file with open(filepath, 'r') as handle: # return the contents data_list.append(handle.read()) return data_list, filepaths # load all files in a directory into memory def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # determine chunksize n_workers = 100 chunksize = round(len(paths) / n_workers) # create the thread pool with ThreadPoolExecutor(n_workers) as executor: futures = list() # split the load operations into chunks for i in range(0, len(paths), chunksize): # select a chunk of filenames filepaths = paths[i:(i + chunksize)] # submit the task future = executor.submit(load_files, filepaths) futures.append(future) # process all results for future in as_completed(futures): # open the file and load the data _, filepaths = future.result() for filepath in filepaths: # report progress print(f'.loaded {filepath}') print('Done') # entry point if __name__ == '__main__': main() |
All 10,000 files are loaded from tmp/ into main memory as before.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .loaded tmp/data-5837.csv .loaded tmp/data-2180.csv .loaded tmp/data-5189.csv .loaded tmp/data-4297.csv .loaded tmp/data-6480.csv .loaded tmp/data-7946.csv .loaded tmp/data-6494.csv .loaded tmp/data-7952.csv .loaded tmp/data-4283.csv .loaded tmp/data-5823.csv Done |
In this case, we don’t see any benefit from the concurrent loading of files in batches.
On my system, the example took about 11.16 seconds to complete, compared to about 11.26 seconds without batch mode, which is probably within the margin of measurement error.
1 2 3 |
[Finished in 11.2s] [Finished in 11.2s] [Finished in 11.1s] |
Next, let’s see if we can get any benefit using processes instead of threads.
Load Files Concurrently with Processes
We can also try to load files concurrently with processes instead of threads.
It is unclear whether processes can offer a speed benefit in this case. Given that we cannot get a benefit using threads.
Nevertheless, using processes requires data for each task to be serialized which introduces additional overhead that might negate any speed-up from performing file operations with true parallelism via processes.
In fact, the need to transmit the contents of each file from child processes back to the main process will likely add a large overhead, given that the contents of each file must be serialized and then deserialized.
We can explore using processes to load files concurrently using the ProcessPoolExecutor.
This can be achieved by switching out the ThreadPoolExecutor directly and specifying the number of worker processes.
We will use 4 processes in this case as I have 4 physical CPU cores.
It may be interesting to try different configurations of the number of worker processes to see if it makes a difference on the overall running time.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(4) as executor: # ... |
Tying this together, the complete example of loading files concurrently using the process pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # load many files concurrently with processes from os import listdir from os.path import join from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed # open a file and return the contents def load_file(filepath): # open the file with open(filepath, 'r') as handle: # return the contents return handle.read(), filepath # load all files in a directory into memory def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # create the process pool with ProcessPoolExecutor(4) as executor: # submit all tasks futures = [executor.submit(load_file, p) for p in paths] # process all results for future in as_completed(futures): # open the file and load the data data, filepath = future.result() # report progress print(f'.loaded {filepath}') print('Done') # entry point if __name__ == '__main__': main() |
Running the example loads all files from the directory as we did before.
1 2 3 4 5 6 7 8 9 10 11 12 |
… .loaded tmp/data-5832.csv .loaded tmp/data-2185.csv .loaded tmp/data-5826.csv .loaded tmp/data-2191.csv .loaded tmp/data-1498.csv .loaded tmp/data-0786.csv .loaded tmp/data-7957.csv .loaded tmp/data-6491.csv .loaded tmp/data-5198.csv .loaded tmp/data-4286.csv Done |
In this case, we do not see a speed improvement over the single-threaded case. In fact, we see much worse performance.
On my system, it takes about 30.26 seconds to complete, compared to about 10.7 seconds for the sequential case. This is about 2.8x slower.
The large increase in run time is likely the result of the overhead of inter-process communication, where the content of each file must be pickled by the child process and unpickled by the parent process in order to transmit it from one process to the other.
1 2 3 |
[Finished in 30.6s] [Finished in 30.2s] [Finished in 30.0s] |
Next, let’s see if we can reduce the overhead of inter-process communication by batching the file load operations.
Load Files Concurrently with Processes in Batch
Any data sent to a worker process or received from a worker process must be serialized (pickled).
This adds overhead for each task executed by worker threads.
This is likely the cause of worse performance seen when using the process pool in the previous section.
We can address this by batching the load tasks into chunks to be executed by each worker process, just as we did when we batched filenames for the thread pools.
Again, this can be achieved by adapting the batched version of ThreadPoolExecutor from a previous section to use the ProcessPoolExecutor class.
We will use 4 worker processes and a chunk size of 50 files loaded per task.
1 2 3 4 5 6 |
... # determine chunksize chunksize = 20 # create the process pool with ProcessPoolExecutor(4) as executor: # ... |
And that’s it.
The complete example of batch loading files using process pools is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# SuperFastPython.com # load many files concurrently with processes in batch from os import listdir from os.path import join from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed # open a file and return the contents def load_files(filepaths): data_list = list() # load each file for filepath in filepaths: # open the file with open(filepath, 'r') as handle: # return the contents data_list.append(handle.read()) return data_list, filepaths # load all files in a directory into memory def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # determine chunksize chunksize = 20 # create the process pool with ProcessPoolExecutor(4) as executor: futures = list() # split the load operations into chunks for i in range(0, len(paths), chunksize): # select a chunk of filenames filepaths = paths[i:(i + chunksize)] # submit the task future = executor.submit(load_files, filepaths) futures.append(future) # process all results for future in as_completed(futures): # open the file and load the data _, filepaths = future.result() for filepath in filepaths: # report progress print(f'.loaded {filepath}') print('Done') # entry point if __name__ == '__main__': main() |
Running the example loads all 10,000 files into memory as we did in all previous examples.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .loaded tmp/data-5832.csv .loaded tmp/data-2185.csv .loaded tmp/data-5826.csv .loaded tmp/data-2191.csv .loaded tmp/data-1498.csv .loaded tmp/data-0786.csv .loaded tmp/data-7957.csv .loaded tmp/data-6491.csv .loaded tmp/data-5198.csv .loaded tmp/data-4286.csv Done |
In this case, concurrent file loading with processes does not offer any performance improvement over processes without batch mode.
On my system, the example takes about 40.16 seconds to complete, compared to about 30.26 seconds without, which is nearly 10 seconds worse or about 3.8x slower.
Again, this is likely related to the manner in which the contents of the files must be serialized in order to be shared between processes.
1 2 3 |
[Finished in 38.9s] [Finished in 42.9s] [Finished in 38.7s] |
Next, let’s explore how we might use both thread pools and process pools.
Load Files Concurrently With Processes and Threads in Batch
The example in the previous section loaded batches of files in each worker process.
The slow parts of the example are the IO-bound loading of files within each process and the inter-process communication required to get the loaded data back to the parent process.
We may be able to speed-up the overall task by making the file loading within each process concurrent using threads.
This can be achieved by allowing each worker process to create a thread pool and to use worker threads to load files concurrently.
First, we must define a target task function for the thread tasks to execute.
The load_file() function below takes a file path as an argument, then returns the content of the file. This function can be called by worker threads concurrently.
1 2 3 4 5 6 |
# load a file and return the contents def load_file(filepath): # open the file with open(filepath, 'r') as handle: # return the contents handle.read() |
Next, we must update the load_files() function to use a thread pool instead of loading the files sequentially.
This can be achieved by creating an instance of the ThreadPoolExecutor class.
We will create the thread pool with one worker per file to be loaded.
1 2 3 4 |
... # create a thread pool with ThreadPoolExecutor(len(filepaths)) as exe: # ... |
Next, we can issue one call to the load_file() function for each file path provided to the function.
The load_files() function must return a list of the contents of all files and a parallel list of the file paths.
One approach would be to issue the tasks using map() then convert the returned iterator into a list. For example:
1 2 3 |
... # load the content of all files data_list = list(map(load_data, filepaths)) |
Although simple, this is quite slow, discovered after some ad hoc testing.
A faster approach seems to be to issue the tasks using submit(), then constructing the list of file contents from each Future object using a list comprehension. For example:
1 2 3 4 5 |
... # load files futures = [exe.submit(load_file, name) for name in filepaths] # collect data data_list = [future.result() for future in futures] |
Tying this together, the updated version of the list_files() function that uses a thread pool is listed below.
1 2 3 4 5 6 7 8 9 10 |
# return the contents of many files def load_files(filepaths): # create a thread pool with ThreadPoolExecutor(len(filepaths)) as exe: # load files futures = [exe.submit(load_file, name) for name in filepaths] # collect data data_list = [future.result() for future in futures] # return data and file paths return (data_list, filepaths) |
Finally, we can update the main() function slightly.
We will increase the number of processes in the process pool to match the number of logical CPU cores in my system, 8 in this case.
It may be interesting to test different values of the number of worker processes in order to discover the effect on over all task completion time.
We will then set the chunk size as an even fraction of all files to be loaded and the number of worker processes.
1 2 3 4 |
... # determine chunksize n_workers = 8 chunksize = round(len(paths) / n_workers) |
Given that we have 10,000 files to load and 8 worker processes, this means there are (10,000 / 8) or 1,250 files to be loaded per process.
Also, given that each process has a thread pool equal to the chunk size, it means we will have 10,000 concurrent threads in our program. Cool (if it works)!
Finally, we can update the configuration of the ProcessPoolExecutor to use the parameterized number of worker processes.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(n_workers) as executor: # ... |
And that’s it.
The complete example of using a thread pool to load files within each worker process in the process pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# SuperFastPython.com # load many files concurrently with processes and threads in batch from os import listdir from os.path import join from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # load a file and return the contents def load_file(filepath): # open the file with open(filepath, 'r') as handle: # return the contents handle.read() # return the contents of many files def load_files(filepaths): # create a thread pool with ThreadPoolExecutor(len(filepaths)) as exe: # load files futures = [exe.submit(load_file, name) for name in filepaths] # collect data data_list = [future.result() for future in futures] # return data and file paths return (data_list, filepaths) # load all files in a directory into memory def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # determine chunksize n_workers = 8 chunksize = round(len(paths) / n_workers) # create the process pool with ProcessPoolExecutor(n_workers) as executor: futures = list() # split the load operations into chunks for i in range(0, len(paths), chunksize): # select a chunk of filenames filepaths = paths[i:(i + chunksize)] # submit the task future = executor.submit(load_files, filepaths) futures.append(future) # process all results for future in as_completed(futures): # open the file and load the data _, filepaths = future.result() for filepath in filepaths: # report progress print(f'.loaded {filepath}') print('Done') # entry poimt if __name__ == '__main__': main() |
Running the example first loads all 10,000 files from disk into main memory as before.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .loaded tmp/data-5832.csv .loaded tmp/data-2185.csv .loaded tmp/data-5826.csv .loaded tmp/data-2191.csv .loaded tmp/data-1498.csv .loaded tmp/data-0786.csv .loaded tmp/data-7957.csv .loaded tmp/data-6491.csv .loaded tmp/data-5198.csv .loaded tmp/data-4286.csv Done |
In this case, we see a large speed-up compared to the single-threaded version and compared to other versions tried so far.
On my system, the example takes about 4.833 seconds, compared to about 10.7 seconds for the single threaded version. That is about 2.21x faster.
It may be interesting to test different chunk sizes to see if further speed improvements are possible.
1 2 3 |
[Finished in 4.8s] [Finished in 4.8s] [Finished in 4.9s] |
Next, let’s explore loading files concurrently using asyncio.
Load Files Concurrently with AsyncIO
We can also load files concurrently using asyncio.
Generally, Python does not support non-blocking IO operations when working with files. It is not provided in the asyncio module. This is because it is challenging to implement in a general cross-platform manner.
The third-party library aiofiles provides file operations for use in asyncio operations, but again, the operations are not true non-blocking IO and instead the concurrency is simulated using thread pools.
Nevertheless, we can use aiofiles in an asyncio program to load files concurrently.
You can install the aiofiles library using your Python package manager, such as pip:
1 |
pip3 install aiofiles |
A few minor changes are required to our program to use asyncio and aiofiles.
First, we must update the load_file() function to use the aiofiles.open() function to open files and to await on the call to read() from the file which will yield control.
The updated version of the load_file() function with these changes is listed below.
1 2 3 4 5 6 |
# open a file and return the contents async def load_file(filepath): # open the file async with aiofiles.open(filepath, 'r') as handle: # return the contents return (await handle.read(), filepath) |
Next, we can update the main() function to first create a list of calls to the load_file() function as coroutines to execute concurrently. This can be achieved in a list comprehension.
1 2 3 |
... # create coroutines tasks = [load_file(filepath) for filepath in paths] |
Recall, this will create coroutines and will not actually call the load_file() function.
Next, we can execute the created coroutines and process the results (loaded data and filepath) in the order that the files are loaded.
This can be achieved using the asyncio.as_completed() function that will return coroutines in the order they finish.
1 2 3 4 |
... # execute tasks and process results as they are completed for task in asyncio.as_completed(tasks): # ... |
We can then await the result from each coroutine and report progress.
1 2 3 4 5 |
... # open the file and load the data data, filepath = await task # report progress print(f'.loaded {filepath}') |
The updated version of the main() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# load all files in a directory into memory async def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # create coroutines tasks = [load_file(filepath) for filepath in paths] # execute tasks and process results as they are completed for task in asyncio.as_completed(tasks): # open the file and load the data data, filepath = await task # report progress print(f'.loaded {filepath}') print('Done') |
Finally, we can start the asyncio event loop and call the main() function to start the program.
1 2 3 |
# entry point if __name__ == '__main__': asyncio.run(main()) |
Tying this together, the complete example of concurrently loading files using asyncio is listed below.
There’s just one problem, it does not work.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # load many files concurrently with asyncio from os import listdir from os.path import join import asyncio import aiofiles # open a file and return the contents async def load_file(filepath): # open the file async with aiofiles.open(filepath, 'r') as handle: # return the contents return (await handle.read(), filepath) # load all files in a directory into memory async def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # create coroutines tasks = [load_file(filepath) for filepath in paths] # execute tasks and process results as they are completed for task in asyncio.as_completed(tasks): # open the file and load the data data, filepath = await task # report progress print(f'.loaded {filepath}') print('Done') # entry point if __name__ == '__main__': asyncio.run(main()) |
Running the program results in an error, as follows (or similar):
1 |
OSError: [Errno 24] Too many open files: 'tmp/data-0437.csv' |
The error is caused because the program attempts to open all 10,000 files at once.
This fails because the operating system imposes a limit on the maximum number of files that can be opened at one time, such as 256 on MacOS.
Therefore, we must update the program to limit the number of files that it attempts to open at once.
This can be achieved using an asyncio.Semaphore instance.
The Semaphore can be configured with the maximum number of resources, e.g. 50, and each coroutine must acquire the semaphore before it can attempt to load a file.
First, we can create the Semaphore instance in the main() function and pass it as an argument to the load_file() coroutines.
1 2 3 4 5 |
... # create a semaphore to limit open files semaphore = asyncio.Semaphore(100) # create coroutines tasks = [load_file(filepath, semaphore) for filepath in paths] |
Next, we can update the load_file() function to take the Semaphore as an argument and then acquire a resource before loading the file.
This can be achieved using the context manager on the Semaphore which will yield while waiting for a resource and will automatically release the resource once the block is exited.
The updated version of the load_file() function with these changes is listed below.
1 2 3 4 5 6 7 8 |
# open a file and return the contents async def load_file(filepath, semaphore): # acquire the semaphore async with semaphore: # open the file async with aiofiles.open(filepath, 'r') as handle: # return the contents return (await handle.read(), filepath) |
Tying this together, the complete example of loading files using asyncio and limiting the maximum number of files open concurrently is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # load many files concurrently with asyncio from os import listdir from os.path import join import asyncio import aiofiles # open a file and return the contents async def load_file(filepath, semaphore): # acquire the semaphore async with semaphore: # open the file async with aiofiles.open(filepath, 'r') as handle: # return the contents return (await handle.read(), filepath) # load all files in a directory into memory async def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # create a semaphore to limit open files semaphore = asyncio.Semaphore(100) # create coroutines tasks = [load_file(filepath, semaphore) for filepath in paths] # execute tasks and process results as they are completed for task in asyncio.as_completed(tasks): # open the file and load the data data, filepath = await task # report progress print(f'.loaded {filepath}') print('Done') # entry point if __name__ == '__main__': asyncio.run(main()) |
Running the example loads all 10,000 files into memory as before.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .loaded tmp/data-0703.csv .loaded tmp/data-8534.csv .loaded tmp/data-1007.csv .loaded tmp/data-0065.csv .loaded tmp/data-7607.csv .loaded tmp/data-5777.csv .loaded tmp/data-2114.csv .loaded tmp/data-6414.csv .loaded tmp/data-6768.csv .loaded tmp/data-6519.csv Done |
In this case, the asyncio example is faster than the single-threaded and multi-threaded versions.
On my system, the example takes about 9.5 seconds, compared to about 10.7 seconds for the single-threaded version. This slight improvement is about 1.13x faster.
1 2 3 |
[Finished in 9.6s] [Finished in 9.5s] [Finished in 9.4s] |
Load Files Concurrently with AsyncIO in Batch
We can update the concurrent loading of files with asyncio from the previous section to load files in batch.
That is, each coroutine can be responsbile for loading a list of files into memory to be returned to the calling coroutine.
This may or may not offer a speed benefit. Interestingly, we are able to avoid using the Semaphore that was required in the previous section.
Firstly, let’s update the load_file() function to load_files() that will load and return the contents of multiple files.
1 2 3 4 5 6 7 8 9 10 11 12 |
# load and return the contents of a list of file paths async def load_files(filepaths): # load all files data_list = list() for filepath in filepaths: # open the file async with aiofiles.open(filepath, 'r') as handle: # load the contents and add to list data = await handle.read() # store loaded data data_list.append(data) return (data_list, filepaths) |
Next, we can update the main() function to split the list of files to load into chunks and the pass each chunk to a call to the load_files() function as a coroutine.
This can be achieved by iterating over the list of file paths in chunks, just as we did with the batch loading examples developed previously.
First, we must define a chunk size. In this case, we will use a size of 10, found after a little trial and error. I recommend testing different chunk sizes in order to discover what works best.
1 2 3 |
... # split up the data chunksize = 10 |
We can then use the chunksize to split up the file paths into chunks and define a coroutine for each chunk.
1 2 3 4 5 6 7 8 |
... # split the operations into chunks tasks = list() for i in range(0, len(paths), chunksize): # select a chunk of filenames filepaths = paths[i:(i + chunksize)] # define the task tasks.append(load_file(filepaths)) |
Then, as we did in the previous section, we can process results of each task in the order they are completed, reporting results as we go.
1 2 3 4 5 6 7 8 9 |
... # execute tasks and process results as they are completed for task in asyncio.as_completed(tasks): # wait for the next task to complete _, filepaths = await task # process results for filepath in filepaths: # report progress print(f'.loaded {filepath}') |
Tying all of this together, the complete example of loading files concurrently in batch using asyncio is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# SuperFastPython.com # load many files concurrently with asyncio in batch from os import listdir from os.path import join import asyncio import aiofiles # load and return the contents of a list of file paths async def load_files(filepaths): # load all files data_list = list() for filepath in filepaths: # open the file async with aiofiles.open(filepath, 'r') as handle: # load the contents and add to list data = await handle.read() # store loaded data data_list.append(data) return (data_list, filepaths) # load all files in a directory into memory async def main(path='tmp'): # prepare all of the paths paths = [join(path, filepath) for filepath in listdir(path)] # split up the data chunksize = 10 # split the operations into chunks tasks = list() for i in range(0, len(paths), chunksize): # select a chunk of filenames filepaths = paths[i:(i + chunksize)] # define the task tasks.append(load_file(filepaths)) # execute tasks and process results as they are completed for task in asyncio.as_completed(tasks): # wait for the next task to complete _, filepaths = await task # process results for filepath in filepaths: # report progress print(f'.loaded {filepath}') print('Done') # entry point if __name__ == '__main__': asyncio.run(main()) |
Running the example loads all 10,000 files as we did in previous examples.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .loaded tmp/data-6938.csv .loaded tmp/data-3698.csv .loaded tmp/data-3840.csv .loaded tmp/data-2586.csv .loaded tmp/data-4849.csv .loaded tmp/data-4691.csv .loaded tmp/data-1931.csv .loaded tmp/data-7398.csv .loaded tmp/data-6086.csv .loaded tmp/data-1702.csv Done |
In this case, we don’t see any speed benefit over the singe-threaded example and slightly worse results than the non-batch asyncio version.
On my system, the example took about 11.866 seconds, compared to about 10.7 for the sequential example, which is about 1.1x slower.
1 2 3 |
[Finished in 11.9s] [Finished in 11.8s] [Finished in 11.9s] |
Comparison of Results
We have performed the same file load operations using a suite of different parallel methods.
We can compare the results and get some idea of those methods that may offer a performance benefit over the naive sequential method.
The table below provides a summary of the results.
1 2 3 4 5 6 7 8 9 10 |
Method | Avg Time (sec) | Speed-Up --------------------------------------------------- Sequential | 10.700 | n/a Threads | 11.930 | 0.90x Threads/Batch | 11.160 | 0.96x Processes | 30.260 | 0.35x Processes/Batch | 40.160 | 0.27x Processes/Threads/Batch | 4.833 | 2.21x Asyncio | 9.500 | 1.13x Asyncio/Batch | 11.866 | 0.90x |
The results suggest using processes with threads in batch offers the best performance.
Using threads or processes alone does not appear to offer any benefit in this case.
Extensions
This section lists ideas for extending the tutorial.
- Vary Number of Thread Workers. Update the multithreaded example to test different numbers of worker threads and evaluate the effect on overall task duration.
- Vary Number of Process Workers. Update the multiprocess example to test different numbers of worker processes and evaluate the effect on overall task duration.
- Vary Number of Process and Thread Workers: Update the combined multiprocess and multithreaded example and vary the number of worker processes and worker threads and evaluate the effect on overall task duration.
- Vary Asyncio Semaphore: Update the asyncio example and test different numbers of resources for the semaphore and discover the limit for your system and the effect on overall task duration.
- Vary Asyncio Chunk Size: Update the batch asyncio example and test different chunk sizes and their effect on the overall task duration.
Share your extensions in the comments below, it would be great to see what you come up with.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Concurrent File I/O in Python, Jason Brownlee (my book!)
Guides
Python File I/O APIs
- Built-in Functions
- os - Miscellaneous operating system interfaces
- os.path - Common pathname manipulations
- shutil - High-level file operations
- zipfile — Work with ZIP archives
- Python Tutorial: Chapter 7. Input and Output
Python Concurrency APIs
- threading — Thread-based parallelism
- multiprocessing — Process-based parallelism
- concurrent.futures — Launching parallel tasks
- asyncio — Asynchronous I/O
File I/O in Asyncio
References
Takeaways
In this tutorial, you discovered how to explore concurrent file loading in Python.
Do you have any questions?
Leave your question in a comment below and I will reply fast with my best advice.
Photo by Devon Janse van Rensburg on Unsplash
Hugh Browm says
In the general case, the math is off for chunking filepaths:
As an example, if the number of files were 10 and the number of workers were 8, the chunksize would be 1, leading to 10 iterations. Worse, if the number of files were 6 and the number of workers were 8, the chunksize would be 0.
Chunking works better like this:
Chunks then do not have contiguous elements, but the elements are exhaustively covered and they are optimally sized to be as nearly equal as possible.
Jason Brownlee says
Excellent tip for those cases where len(paths) > n_workers, thanks!
Konstantin says
You don’t take into account the disk cache. The best way to compare is to generate, for example, 100000 files and each time read a subset of 10000 random files from them.
In my case the cache effect is extremely strong. First sequential read is ~8 min, the second read is ~14 sec
Jason Brownlee says
Thanks for the tip!