Last Updated on August 21, 2023
Moving a single file is fast, but moving many files can be slow.
It can become painfully slow in situations where you may need to move thousands of files. The hope is that multithreading can be used to speed up the file moving operation.
In this tutorial, you will explore how to move thousands of files using multithreading.
Let’s dive in.
Create 10,000 Files To Move
Before we can move thousands of files, we must create them.
We can write a script that will create 10,000 CSV files, each file with 10,000 lines and each line containing 10 random numeric values.
Each file will be about 1.9 megabytes in size, and all files together will be about 19 gigabytes in size.
First, we can create a function that will take a file path and string data and save it to file.
The save_file() function below implements this, taking the full path and data, opens the file in ASCII format, then saves the data to the file. The context manager is used so that the file is closed automatically.
1 2 3 4 5 6 |
# save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) |
Next, we can generate the data to be saved.
First, we can write a function to generate a single line of data. In this case, a line is composed of 10 random numeric values in the range between 0 and 1, stored in CSV format (e.g. separated by a comma per value).
The generate_line() function below implements this, returning a string of random data.
1 2 3 |
# generate a line of mock data of 10 random data points def generate_line(): return ','.join([str(random()) for _ in range(10)]) |
Next, we can write a function to generate the contents of one file.
This will be 10,000 lines of random values, where each line is separated by a new line.
The generate_file_data() function below implements this, returning a string representing the data for a single data file.
1 2 3 4 5 6 |
# generate file data of 10K lines each with 10 data points def generate_file_data(): # generate many lines of data lines = [generate_line() for _ in range(10000)] # convert to a single ascii doc with new lines return '\n'.join(lines) |
Finally, we can generate data for all of the files and save each with a separate file name.
First, we can create a directory to store all of the created files (e.g. ‘tmp‘) under the current working directory.
This can be achieved using the os.makedirs() function, for example:
1 2 3 |
... # create a local directory to save files makedirs(path, exist_ok=True) |
We can then loop 10,000 times and create the data and a unique filename for each iteration, then save the generated contents of the file to disk.
1 2 3 4 5 6 7 8 9 10 11 |
... # create all files for i in range(10000): # generate data data = generate_file_data() # create filenames filepath = join(path, f'data-{i:04d}.csv') # save data file save_file(filepath, data) # report progress print(f'.saved {filepath}') |
The generate_all_files() function listed below implements this, creating the 10,000 files of 10,000 lines of data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# generate 10K files in a directory def generate_all_files(path='tmp'): # create a local directory to save files makedirs(path, exist_ok=True) # create all files for i in range(10000): # generate data data = generate_file_data() # create filenames filepath = join(path, f'data-{i:04d}.csv') # save data file save_file(filepath, data) # report progress print(f'.saved {filepath}') |
Tying this together, the complete example of creating a large number of CSV files is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# SuperFastPython.com # create a large number of data files from os import makedirs from os.path import join from random import random # save data to a file def save_file(filepath, data): # open the file with open(filepath, 'w') as handle: # save the data handle.write(data) # generate a line of mock data of 10 random data points def generate_line(): return ','.join([str(random()) for _ in range(10)]) # generate file data of 10K lines each with 10 data points def generate_file_data(): # generate many lines of data lines = [generate_line() for _ in range(10000)] # convert to a single ascii doc with new lines return '\n'.join(lines) # generate 10K files in a directory def generate_all_files(path='tmp'): # create a local directory to save files makedirs(path, exist_ok=True) # create all files for i in range(10000): # generate data data = generate_file_data() # create filenames filepath = join(path, f'data-{i:04d}.csv') # save data file save_file(filepath, data) # report progress print(f'.saved {filepath}') # entry point, generate all of the files generate_all_files() |
Running the example will create 10,000 CSV files of random data into a tmp/ directory.
It takes a long time to run, depending on the speed of your hard drive, e.g. SSD used in modern computer systems are quite fast.
On my system it takes about 11 minutes to complete.
1 2 3 4 5 6 7 8 9 10 11 |
... .saved tmp/data-9990.csv .saved tmp/data-9991.csv .saved tmp/data-9992.csv .saved tmp/data-9993.csv .saved tmp/data-9994.csv .saved tmp/data-9995.csv .saved tmp/data-9996.csv .saved tmp/data-9997.csv .saved tmp/data-9998.csv .saved tmp/data-9999.csv |
Next, we can explore ways of moving all files from one directory to another.
Run loops using all CPUs, download your FREE book to learn how.
Move Files One-By-One (Slowly)
We are interested in moving a list of files that could be in the same or different source and destination directories.
This is different from simply renaming the directory in which all files are contained that is a much simpler task. We will simulate the general file moving task by moving a list of files from one source directory to one destination directory.
Moving all files in one directory to a second directory in Python is relatively straightforward.
Python provides a number of ways for moving files, although the shutil module is recommended for efficient file operations generally.
In this tutorial, we will use the shutil.move() function that can be used to move a source file path to a destination file path.
For example:
1 2 3 |
... # move source file to destination file move(src_path, dest_path) |
First, we will create the destination directory if it does not already exist using the os.makedirs() function.
1 2 3 |
... # create the destination directory if needed makedirs(dest, exist_ok=True) |
Next, we can iterate all files in the source directory using the os.listdir() function that will return the name of each file.
1 2 3 4 |
... # move all files from src to dest for filename in listdir(src): # ... |
For each file to be moved, we will use the os.path.join() function to construct a full source path and a destination path that can be provided to the shutil.move() function directly.
Note, these are paths relative to the location of the Python script, but you can change them to be absolute paths if you prefer.
1 2 3 4 5 |
... # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) |
We can then move the file and report progress.
1 2 3 4 5 |
... # move source file to destination file move(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}') |
Tying this together, the complete example of moving all files from a source directory to a destination directory sequentially (one-by-one) is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # move files from one directory to another sequentially from os import makedirs from os import listdir from os.path import join from shutil import move # move files from src to dest def main(src='tmp', dest='tmp2'): # create the destination directory if needed makedirs(dest, exist_ok=True) # move all files from src to dest for filename in listdir(src): # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file move(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}') print('Done') # entry point if __name__ == '__main__': main() |
Running the program will move all files from the tmp/ sub-directory to a new subdirectory named tmp2/.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .moved tmp/data-5832.csv to tmp2/data-5832.csv .moved tmp/data-2185.csv to tmp2/data-2185.csv .moved tmp/data-5826.csv to tmp2/data-5826.csv .moved tmp/data-2191.csv to tmp2/data-2191.csv .moved tmp/data-1498.csv to tmp2/data-1498.csv .moved tmp/data-0786.csv to tmp2/data-0786.csv .moved tmp/data-7957.csv to tmp2/data-7957.csv .moved tmp/data-6491.csv to tmp2/data-6491.csv .moved tmp/data-5198.csv to tmp2/data-5198.csv .moved tmp/data-4286.csv to tmp2/data-4286.csv Done |
Moving files is faster than copying, although it will still take a moment as there are 10,000 files, each about 2 megabytes in size to move. The overall execution time will depend on the speed of your hardware, specifically the speed of your hard drive.
On my system with an SSD hard drive, it takes about 1.2 seconds to complete (resetting the tmp/ and tmp2/ directories between runs).
1 2 3 |
[Finished in 1.4s] [Finished in 1.2s] [Finished in 1.2s] |
How long does it take to run on your system?
Let me know in the comments below.
Now that we know how to move thousands of files from one directory to another, let’s consider how we might speed it up using multiple threads.
How to Move Files Concurrently
File moving is performed by the operating system which then calls down to the hardware, probably to do something with the table of contents for files on the disk (file allocation table or some-such).
As such, we might expect that file moving is an IO-bound task, limited by the speed that files can be “moved” by the disk, e.g. perhaps just updates to the hard drives file allocation table.
This suggests that threads may be appropriate to perform concurrent file moving operations. This is because the CPython interpreter will release the global interpreter lock when performing IO operations, perhaps such as moving a file. This is opposed to processes that are better suited to CPU-bound tasks, that execute as fast as the CPU can run them.
Given that moving files does not involve creating new data or moving data around on the disk, it is a generally fast process.
Ultimately we might expect that file moving on the disk is performed sequentially. That is the disk is only able to move one file at a time.
As such, our expectation may be that file moving cannot be performed in parallel on a single disk and using threads will not offer any speed-up.
Let’s explore this in the next section.
Free Concurrent File I/O Course
Get FREE access to my 7-day email course on concurrent File I/O.
Discover patterns for concurrent file I/O, how save files with a process pool, how to copy files with a thread pool, and how to append to a file from multiple threads safely.
Move Files Concurrently with Threads
The ThreadPoolExecutor class provides a pool of worker threads that we can use to multithread the moving of thousands of files.
Each file in the tmp/ directory will represent a task that will require the file to be moved from the source directory to the destination directory using the shutil.move() function.
First, we can define a function that represents the task. The function will take the name of the file, the source directory and the destination directory and will perform the move operation and report progress to stdout.
The move_file() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# move a file from source to destination def move_file(filename, src, dest): # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file move(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}') |
Next, we can call this function for each file in the tmp/ directory.
First, we can create the thread pool with ten worker threads, chosen arbitrarily.
We will use the context manager to ensure the thread pool is closed automatically once all pending and running tasks have been completed.
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(10) as exe: # ... |
We can then call the submit() function for each file to be moved, calling the move_file() function with the filename and directory arguments.
This can be achieved in a list comprehension, and the resulting list of Future objects can be ignored as we do not require any result from the task.
1 2 3 |
... # submit all file move tasks _ = [exe.submit(move_file, name, src, dest) for name in listdir(src)] |
Tying this together, the complete example of the multithreaded moving of files from one directory is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # move files from one directory to another concurrently with threads from os import makedirs from os import listdir from os.path import join from shutil import move from concurrent.futures import ThreadPoolExecutor # move a file from source to destination def move_file(filename, src, dest): # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file move(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}') # move files from src to dest def main(src='tmp', dest='tmp2'): # create the destination directory if needed makedirs(dest, exist_ok=True) # create the thread pool with ThreadPoolExecutor(10) as exe: # submit all file move tasks _ = [exe.submit(move_file, name, src, dest) for name in listdir(src)] print('Done') # entry point if __name__ == '__main__': main() |
Running the example moves all files from the source directory to the destination directory, as before.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .moved tmp/data-2185.csv to tmp2/data-2185.csv .moved tmp/data-5826.csv to tmp2/data-5826.csv .moved tmp/data-2191.csv to tmp2/data-2191.csv .moved tmp/data-7957.csv to tmp2/data-7957.csv .moved tmp/data-6485.csv to tmp2/data-6485.csv .moved tmp/data-0786.csv to tmp2/data-0786.csv .moved tmp/data-6491.csv to tmp2/data-6491.csv .moved tmp/data-1498.csv to tmp2/data-1498.csv .moved tmp/data-5198.csv to tmp2/data-5198.csv .moved tmp/data-4286.csv to tmp2/data-4286.csv Done |
We do not see any speed benefit to multithreading the file moving process, in fact, we see a slight penalty in using the thread pool.
On my system it takes about 1.4 seconds, which is slightly slower (200 milliseconds) than the non-threaded approach that took about 1.2 seconds.
1 2 3 |
[Finished in 1.4s] [Finished in 1.4s] [Finished in 1.4s] |
How long did it take to execute on your system?
Let me know in the comments below.
Perhaps 10 threads is too few.
We can run the example again with 100 threads instead to see if it makes any difference.
This require a single change to the number of worker threads in the constructor to the ThreadPoolExecutor, for example:
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(100) as exe: # ... |
Running the example moves all files from the source directory to the destination directory, as before.
This time we see equivalent performance to the single threaded implementation.
On my system it takes about 1.16 seconds compared to 1.2 for the sequential version, which is probably within the margin of measurement error.
1 2 3 |
[Finished in 1.2s] [Finished in 1.2s] [Finished in 1.1s] |
These results match our expectations that file moving is a sequential operation performed by the disk hardware and that we cannot speed it up using multithreading.
Next, let’s explore if we can see any benefit by performing batch file move operations concurrently.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Move Files Concurrently with Threads in Batch
Each task in the thread pool adds overhead that slows down the overall task.
This includes function calls and object creation for each task, occurring internally within the ThreadPoolExecutor class.
We might reduce this overhead by performing file operations in a batch within each worker thread.
This can be achieved by updating the move_file() function to receive a list of files to move, and splitting up the files in the main() function into chunks to be submitted to worker threads for batch processing.
The hope is that this would reduce some of the overhead required for submitting so many small tasks by replacing it with a few larger batch tasks.
First, we can change the target function to receive a list of file names to move, listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# move files from source to destination def move_files(filenames, src, dest): # process all file names for filename in filenames: # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file move(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}', flush=True) |
Next, in the main() function we can select a chunksize based on the number of worker threads and the number of files to move.
We must create a list of all files to move that we can split up later.
1 2 3 |
... # list of all files to move files = [name for name in listdir(src)] |
In this case, we will use 100 worker threads with 10,000 files to move. Dividing the files evenly between workers (10,000 / 100) or 100 files to move for each worker.
1 2 3 4 |
... # determine chunksize n_workers = 100 chunksize = round(len(files) / n_workers) |
Next, we can create the thread pool with the parameterized number of worker threads.
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(n_workers) as exe: ... |
We can then iterate over the list of files to move and split them into chunks defined by a chunksize.
This can be achieved by iterating over the list of files and using the chunksize as the increment size in a for-loop. We can then split off a chunk of files to send to a worker thread as a task.
1 2 3 4 5 6 7 |
... # split the move operations into chunks for i in range(0, len(files), chunksize): # select a chunk of filenames filenames = files[i:(i + chunksize)] # submit the batch move task _ = exe.submit(move_files, filenames, src, dest) |
And that’s it.
The complete example of moving files in a batch concurrently using worker threads is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# SuperFastPython.com # move files from one directory to another concurrently with threads in batch from os import makedirs from os import listdir from os.path import join from shutil import move from concurrent.futures import ThreadPoolExecutor # move files from source to destination def move_files(filenames, src, dest): # process all file names for filename in filenames: # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file move(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}', flush=True) # move files from src to dest def main(src='tmp', dest='tmp2'): # create the destination directory if needed makedirs(dest, exist_ok=True) # list of all files to move files = [name for name in listdir(src)] # determine chunksize n_workers = 100 chunksize = round(len(files) / n_workers) # create the process pool with ThreadPoolExecutor(n_workers) as exe: # split the move operations into chunks for i in range(0, len(files), chunksize): # select a chunk of filenames filenames = files[i:(i + chunksize)] # submit the batch move task _ = exe.submit(move_files, filenames, src, dest) print('Done') # entry point if __name__ == '__main__': main() |
Running the example moves all 10,000 files from tmp/ to tmp2/ as before.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .moved tmp/data-4411.csv to tmp2/data-4411.csv .moved tmp/data-4466.csv to tmp2/data-4466.csv .moved tmp/data-0600.csv to tmp2/data-0600.csv .moved tmp/data-7118.csv to tmp2/data-7118.csv .moved tmp/data-9142.csv to tmp2/data-9142.csv .moved tmp/data-0677.csv to tmp2/data-0677.csv .moved tmp/data-6271.csv to tmp2/data-6271.csv .moved tmp/data-1569.csv to tmp2/data-1569.csv .moved tmp/data-9624.csv to tmp2/data-9624.csv .moved tmp/data-6517.csv to tmp2/data-6517.csv Done |
In this case we see an improvement in performance compared to the sequential case of moving files one-by-one.
On my system it takes about 737 milliseconds (recall there are 1,000 milliseconds in a second) compared to 1.2 seconds with the sequential case. That is a speed improvement of about 1.63x.
Not bad.
1 2 3 |
[Finished in 757ms] [Finished in 694ms] [Finished in 759ms] |
Move Files Concurrently with Processes
We can also try to move files concurrently with processes instead of threads.
It is unclear whether processes can offer a speed benefit in this case. Given that we can get a benefit using threads with batch file moving, we know it is possible. Nevertheless, using processes requires data for each task to be serialized which introduces additional overhead that might negate any speed-up from performing file operations with true parallelism via processes.
We can explore using processes to move files concurrently using the ProcessPoolExecutor.
This can be achieved by switching out the ThreadPoolExecutor directly and specifying the number of worker processes.
We will use 8 processes in this case as I have 8 logical CPU cores on my system.
It may be interesting to try different configurations based on the number of logical CPU cores you have available. Given that the task is not CPU bound, the mapping between process and CPU cores is not likely relevant.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(8) as exe: # ... |
Tying this together, the complete example of moving files concurrently using the process pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# move a file from source to destination def move_file(filename, src, dest): # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file move(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}', flush=True) # move files from src to dest def main(src='tmp', dest='tmp2'): # create the destination directory if needed makedirs(dest, exist_ok=True) # create the process pool with ProcessPoolExecutor(8) as exe: # submit all file move tasks _ = [exe.submit(move_file, name, src, dest) for name in listdir(src)] print('Done') # entry point if __name__ == '__main__': main() |
Running the example moves all files to the new location as we did before.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .moved tmp/data-8457.csv to tmp2/data-8457.csv .moved tmp/data-5601.csv to tmp2/data-5601.csv .moved tmp/data-8443.csv to tmp2/data-8443.csv .moved tmp/data-1329.csv to tmp2/data-1329.csv .moved tmp/data-5629.csv to tmp2/data-5629.csv .moved tmp/data-8319.csv to tmp2/data-8319.csv .moved tmp/data-6452.csv to tmp2/data-6452.csv .moved tmp/data-2813.csv to tmp2/data-2813.csv .moved tmp/data-2185.csv to tmp2/data-2185.csv .moved tmp/data-4286.csv to tmp2/data-4286.csv Done |
In this case we do not see a speed improvement. In fact, the results are worse than the sequential case by a marked amount.
On my system, the example took 2.4 seconds to complete, compared to 1.2 seconds with the sequential case. This is about 2x slower.
1 2 3 |
[Finished in 2.4s] [Finished in 2.4s] [Finished in 2.4s] |
Move Files Concurrently with Processes in Batch
Any data sent to a worker process or received from a worker process must be serialized (pickled).
This adds overhead for each task executed by worker threads.
This is likely the cause of worse performance seen when using the process pool in the previous section.
We can address this by batching the move tasks into chunks to be executed by each worker process, just as we did when we batched filenames for the thread pools.
Again, this can be achieved by adapting the batched version of ThreadPoolExecutor from a previous section to use ProcessPoolExecutor.
We will use 8 worker processes. With 10,000 files to be moved, this means that each worker will receive a batch of 1,250 files to move.
1 2 3 4 5 |
... # determine chunksize # determine chunksize n_workers = 8 chunksize = round(len(files) / n_workers) |
We can then create the process pool using the parameterized number of workers.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(n_workers) as exe: # ... |
And that’s it.
The complete example of batch moving files using process pools in batch is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# SuperFastPython.com # move files from one directory to another concurrently with processes in batch from os import makedirs from os import listdir from os.path import join from shutil import move from concurrent.futures import ProcessPoolExecutor # move files from source to destination def move_files(filenames, src, dest): # process all file names for filename in filenames: # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file move(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}', flush=True) # move files from src to dest def main(src='tmp', dest='tmp2'): # create the destination directory if needed makedirs(dest, exist_ok=True) # list of all files to move files = [name for name in listdir(src)] # determine chunksize n_workers = 8 chunksize = round(len(files) / n_workers) # create the process pool with ProcessPoolExecutor(n_workers) as exe: # split the move operations into chunks for i in range(0, len(files), chunksize): # select a chunk of filenames filenames = files[i:(i + chunksize)] # submit the batch move task _ = exe.submit(move_files, filenames, src, dest) print('Done') # entry point if __name__ == '__main__': main() |
Running the example moved all 10,000 files without a problem.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .moved tmp/data-5832.csv to tmp2/data-5832.csv .moved tmp/data-2185.csv to tmp2/data-2185.csv .moved tmp/data-5826.csv to tmp2/data-5826.csv .moved tmp/data-2191.csv to tmp2/data-2191.csv .moved tmp/data-1498.csv to tmp2/data-1498.csv .moved tmp/data-0786.csv to tmp2/data-0786.csv .moved tmp/data-7957.csv to tmp2/data-7957.csv .moved tmp/data-6491.csv to tmp2/data-6491.csv .moved tmp/data-5198.csv to tmp2/data-5198.csv .moved tmp/data-4286.csv to tmp2/data-4286.csv Done |
In this case, we see a slight improvement over the sequential case, but worse performance than the same batch file moving performed with threads.
On my system, the example took 872 milliseconds, which is 1.38x faster than the single-threaded version which took about 1.2 seconds, but slower than the batch multithreaded version that took about 737 milliseconds.
1 2 3 |
[Finished in 1.1s] [Finished in 905ms] [Finished in 893ms] |
Move Files Concurrently with AsyncIO
We can also move files concurrently using asyncio.
Generally, Python does not support non-blocking operations when working with files.
It is not provided in the asyncio module. This is because it is challenging to implement in a general cross-platform manner.
The third-party library aiofiles provides file operations for use in asyncio operations, but again, the functions are not true non-blocking IO and instead the concurrency is simulated using threads.
Nevertheless, we can use aiofiles in an asyncio program to move files concurrently.
First, we can update the move_file() to be an async function that can be waited upon and change the shutil.move() function to use the aiofiles.os.rename() that can be awaited upon.
The updated version of move_file() is listed below.
1 2 3 4 5 6 7 8 9 10 |
# move a file from source to destination async def move_file(filename, src, dest): # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file await rename(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}') |
Next, we can update the main() function.
We can create a list of coroutines (asynchronous functions) that will execute concurrently. Each coroutine will be a call to the move_file() function with a separate file to move.
This can be achieved in a list comprehension, for example:
1 2 3 |
... # create a list of coroutines for moving files tasks = [move_file(name, src, dest) for name in files] |
You might recall that this defines a list of coroutines, and does not call the asynchronous function while constructing the list.
Next, we can execute the list of coroutines and wait for them to complete. This will attempt to perform all 10,000 file move operations concurrently, allowing each task to yield control while waiting for the move operation to complete via the await within the move_file() function.
This can be achieved using the asyncio.gather() function that takes a collection of awaitables. Given that we have a list of asynchronous function calls, we can unpack the function calls with the star (*) operator.
1 2 3 |
... # execute and wait for all tasks to complete await asyncio.gather(*tasks) |
Finally, we can start the asyncio event loop by calling the asyncio.run() function and pass it our async main function.
1 2 3 4 |
# entry point if __name__ == '__main__': # execute the asyncio run loop asyncio.run(main()) |
The complete example of moving files concurrently with asyncio is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # move files from one directory to another concurrently with asyncio from os import makedirs from os import listdir from os.path import join from aiofiles.os import rename import asyncio # move a file from source to destination async def move_file(filename, src, dest): # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file await rename(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}') # move files from src to dest async def main(src='tmp', dest='tmp2'): # create the destination directory if needed makedirs(dest, exist_ok=True) # list of all files to move files = [name for name in listdir(src)] # create a list of coroutines for moving files tasks = [move_file(name, src, dest) for name in files] # execute and wait for all tasks to complete await asyncio.gather(*tasks) print('Done') # entry point if __name__ == '__main__': # execute the asyncio run loop asyncio.run(main()) |
Running the example moves all 10,000 files from tmp/ to tmp2/ as in the previous cases.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .moved tmp/data-5832.csv to tmp2/data-5832.csv .moved tmp/data-0792.csv to tmp2/data-0792.csv .moved tmp/data-1498.csv to tmp2/data-1498.csv .moved tmp/data-2191.csv to tmp2/data-2191.csv .moved tmp/data-0976.csv to tmp2/data-0976.csv .moved tmp/data-6491.csv to tmp2/data-6491.csv .moved tmp/data-0786.csv to tmp2/data-0786.csv .moved tmp/data-4286.csv to tmp2/data-4286.csv .moved tmp/data-5198.csv to tmp2/data-5198.csv .moved tmp/data-7957.csv to tmp2/data-7957.csv Done |
The example runs about as fast as the single-threaded example and the multithreaded example without batch processing.
On my system, the program finishes in about 1.46 seconds, compared to 1.2 for the single-threaded example and about 1.4 seconds for the naive multithreaded example.
This is not surprising given that asyncio is operating in a single thread, that aiofiles is simulating non-blocking IO using threads internally and that we are attempting to run 10,000 coroutines simultaneously.
Nevertheless, it was an interesting exercise in case Python does support non-blocking file operations in the future.
1 2 3 |
[Finished in 1.4s] [Finished in 1.5s] [Finished in 1.5s] |
Move Files Concurrently with AsyncIO in Batch
Perhaps we can reduce some of the overhead in moving files concurrently with asyncio by performing tasks in batch.
In the previous example, each coroutine performed a single IO-bound operation. We can update the move_file() function to process multiple files in a batch as we did in previous batch processing examples.
This will reduce the number of coroutines in the asyncio runtime and may reduce the amount of data moving around. At the very least, it may be an interesting exercise.
The updated version of the task function that will process a list of files is listed below. The task will yield control for each move operation, but only to other coroutines. The for-loop within a task will not progress while it is waiting.
1 2 3 4 5 6 7 8 9 10 11 12 |
# move files from source to destination async def move_files(filenames, src, dest): # rename all files for filename in filenames: # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file await rename(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}') |
Next, we can choose the number of coroutines we wish to use and then divide the total number of files up evenly into chunks.
We will use 100 in this case, chosen arbitrarily.
1 2 3 4 |
... # determine chunksize n_workers = 100 chunksize = round(len(files) / n_workers) |
We can then iterate over the chunks and add each coroutine to a list for execution at the end.
1 2 3 4 5 6 7 8 |
... # split the move operations into chunks tasks = list() for i in range(0, len(files), chunksize): # select a chunk of filenames filenames = files[i:(i + chunksize)] # add coroutine to list of tasks tasks.append(move_files(filenames, src, dest)) |
Finally, we can execute all 100 coroutines concurrently.
1 2 3 |
... # execute all coroutines concurrently and wait for them to finish await asyncio.gather(*tasks) |
Tying this together, a complete example of batch file moving using asyncio is listed below.
Again, we don’t expect a speed benefit over the single-threaded example, but perhaps a small lift in performance over the previous example where we had 10,000 coroutines operating concurrently.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# SuperFastPython.com # move files from one directory to another concurrently with asyncio in batch from os import makedirs from os import listdir from os.path import join from aiofiles.os import rename import asyncio # move files from source to destination async def move_files(filenames, src, dest): # rename all files for filename in filenames: # create the path for the source src_path = join(src, filename) # create the path for the destination dest_path = join(dest, filename) # move source file to destination file await rename(src_path, dest_path) # report progress print(f'.moved {src_path} to {dest_path}') # move files from src to dest async def main(src='tmp', dest='tmp2'): # create the destination directory if needed makedirs(dest, exist_ok=True) # list of all files to move files = [name for name in listdir(src)] # determine chunksize n_workers = 100 chunksize = round(len(files) / n_workers) # split the move operations into chunks tasks = list() for i in range(0, len(files), chunksize): # select a chunk of filenames filenames = files[i:(i + chunksize)] # add coroutine to list of tasks tasks.append(move_files(filenames, src, dest)) # execute and wait for all coroutines await asyncio.gather(*tasks) print('Done') # entry point if __name__ == '__main__': # execute the asyncio run loop asyncio.run(main()) |
Running the example moves all 10,000 files from one sub-directory to another, as with all previous examples.
1 2 3 4 5 6 7 8 9 10 11 12 |
... .moved tmp/data-2959.csv to tmp2/data-2959.csv .moved tmp/data-5090.csv to tmp2/data-5090.csv .moved tmp/data-5439.csv to tmp2/data-5439.csv .moved tmp/data-5887.csv to tmp2/data-5887.csv .moved tmp/data-2383.csv to tmp2/data-2383.csv .moved tmp/data-3413.csv to tmp2/data-3413.csv .moved tmp/data-6573.csv to tmp2/data-6573.csv .moved tmp/data-7010.csv to tmp2/data-7010.csv .moved tmp/data-2287.csv to tmp2/data-2287.csv .moved tmp/data-4718.csv to tmp2/data-4718.csv Done |
In this case, we don’t see any speed improvement. In fact, the results are similar to the non-batch version of concurrent file moving with asyncio and the single-threaded example. The results may even be within the margin of measurement error.
On my system, the example completes in about 1.36 seconds, compared to 1.46 seconds for the non-batch asyncio version and 1.2 for the single-threaded version. If we were generous, we could say that the 1,000-fold fewer coroutines offers a slight benefit in performance, but not faster than the non-concurrent version.
1 2 3 |
[Finished in 1.3s] [Finished in 1.3s] [Finished in 1.5s] |
Extensions
This section lists ideas for extending the tutorial.
- Vary Number of Thread Workers. Update the multithreaded examples to test different numbers of worker threads such as 50, 1000, and more to see if it makes a difference to the run time.
- Vary Number of Process Workers. Update the multiprocess examples to test different numbers of worker processes such as 2, 4, 10, 50, and more to see if it makes a difference to the run time.
- Vary Number of Coroutines: Update the batch asyncio version to test different numbers of coroutines and different chunk sizes to see if it makes a difference to the runtime.
Share your extensions in the comments below, it would be great to see what you come up with.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Concurrent File I/O in Python, Jason Brownlee (my book!)
Guides
Python File I/O APIs
- Built-in Functions
- os - Miscellaneous operating system interfaces
- os.path - Common pathname manipulations
- shutil - High-level file operations
- zipfile — Work with ZIP archives
- Python Tutorial: Chapter 7. Input and Output
Python Concurrency APIs
- threading — Thread-based parallelism
- multiprocessing — Process-based parallelism
- concurrent.futures — Launching parallel tasks
- asyncio — Asynchronous I/O
File I/O in Asyncio
References
Takeaways
In this tutorial you discovered how to explore using multithreading for moving thousands of files.
Do you have any questions?
Leave your question in a comment below and I will reply fast with my best advice.
Photo by Roberto Nickson on Unsplash
Do you have any questions?