Multithreaded File Zipping in Python

December 28, 2021 Concurrent File I/O

Zipping a directory of files is typically slow.

It can become painfully slow in situations where you may need to zip thousands of files to disk. The hope is that multithreading will speed up the file compression operation.

In this tutorial, you will explore how to zip hundreds of files using multithreading.

Let's dive in.

Create 1,000 Files to Zip

Before we can zip the files on disk, we must create them.

We can write a script that will create 1,000 CSV files, each file with 10,000 lines and each line containing 10 random numeric values.

First, we can create a function that will take a file path and string data and save it to file.

The save_file() function below implements this, taking the full path and data, opening the file in ASCII format, then saving the data to the file. The context manager is used so that the file is closed automatically.

# save data to a file
def save_file(filepath, data):
    # open the file
    with open(filepath, 'w') as handle:
        # save the data
        handle.write(data)

Next, we can generate the data to be saved.

First, we can write a function to generate a single line of data. In this case, a line is composed of 10 random numeric values in the range between 0 and 1, stored in CSV format (e.g. separated by a comma per value).

The generate_line() function below implements this, returning a string of random data.

# generate a line of mock data of 10 random data points
def generate_line():
    return ','.join([str(random()) for _ in range(10)])

Next, we can write a function to generate the contents of one file.

This will be 10,000 lines of random values, where each line is separated by a new line.

The generate_file_data() function below implements this, returning a string representing the data for a single data file.

# generate file data of 10K lines each with 10 data points
def generate_file_data():
    # generate many lines of data
    lines = [generate_line() for _ in range(10000)]
    # convert to a single ascii doc with new lines
    return '\n'.join(lines)

Finally, we can generate data for all of the files and save each with a separate file name.

First, we can create a directory to store all of the created files (e.g. 'tmp') under the current working directory.

...
# create a local directory to save files
makedirs(path, exist_ok=True)

We can then loop 1,000 times and create the data and a unique filename for each iteration, then save the generated contents of the file to disk.

...
# create all files
for i in range(1000):
    # generate data
    data = generate_file_data()
    # create filenames
    filepath = join(path, f'data-{i:04d}.csv')
    # save data file
    save_file(filepath, data)
    # report progress
    print(f'.saved {filepath}')

The generate_all_files() function listed below implements this, creating the 1,000 files of 10,000 lines of data.

# generate 1K files in a directory
def generate_all_files(path='tmp'):
    # create a local directory to save files
    makedirs(path, exist_ok=True)
    # create all files
    for i in range(10000):
        # generate data
        data = generate_file_data()
        # create filenames
        filepath = join(path, f'data-{i:04d}.csv')
        # save data file
        save_file(filepath, data)
        # report progress
        print(f'.saved {filepath}')

Tying this together, the complete example of creating a large number of CSV files is listed below.

# SuperFastPython.com
# create many files that we can open later
from os import makedirs
from os.path import join
from random import random

# save data to a file
def save_file(filepath, data):
    # open the file
    with open(filepath, 'w') as handle:
        # save the data
        handle.write(data)

# generate a line of mock data of 10 random data points
def generate_line():
    return ','.join([str(random()) for _ in range(10)])

# generate file data of 10K lines each with 10 data points
def generate_file_data():
    # generate many lines of data
    lines = [generate_line() for _ in range(10000)]
    # convert to a single ascii doc with new lines
    return '\n'.join(lines)

# generate 1K files in a directory
def generate_all_files(path='tmp'):
    # create a local directory to save files
    makedirs(path, exist_ok=True)
    # create all files
    for i in range(1000):
        # generate data
        data = generate_file_data()
        # create filenames
        filepath = join(path, f'data-{i:04d}.csv')
        # save data file
        save_file(filepath, data)
        # report progress
        print(f'.saved {filepath}')

# entry point, generate all of the files
generate_all_files()

Running the example will create 1,000 CSV files of random data into a tmp/ directory.

It takes a long time to run, depending on the speed of your hard drive, e.g. SSD used in modern computer systems are quite fast.

On my system, it takes about 75.5s seconds to complete.

...
.saved tmp/data-0990.csv
.saved tmp/data-0991.csv
.saved tmp/data-0992.csv
.saved tmp/data-0993.csv
.saved tmp/data-0994.csv
.saved tmp/data-0995.csv
.saved tmp/data-0996.csv
.saved tmp/data-0997.csv
.saved tmp/data-0998.csv
.saved tmp/data-0999.csv

Next, we can explore ways of zipping all of these files into a single archive.

Zip Files One-by-One (slowly)

Zipping a directory of files in Python is relatively straightforward.

First, we must open the zip file we intend to save by creating an instance of the zipfile.ZipFile class.

You can learn more about the zipfile.ZipFile class here:

In this case, we will use the context manager to open the file to ensure it is closed correctly once we are finished adding files.

We will name our zip file 'testing.zip' and use the ZIP_DEFLATED compression method.

...
# open the zip file
with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
	# ...

We can then add files to the archive by calling the ZipFile.write() function on the zip file handle and specifying the local path to the file to add to the archive.

…
# add a file to the archive
handle.write('tmp/data-0000.csv')

We want to add all files listed in the tmp/ subdirectory.

One way we can achieve this is to use the os.listdir() to list all files in the directory and use the os.path.join() function to create a path to the file that can be passed directly to the ZipFile.write() function to add it to the zip.

For example, the following list comprehension will create a list of 10,000 paths (one for each file in the tmp/ subdirectory).

...
# list all files to add to the zip
files = [join(path,f) for f in listdir(path)]

These paths can then be used in calls to the ZipFile.write() function one by one to add each file to the archive.

...
# add all files to the zip
for filepath in files:
    # add file to the archive
    handle.write(filepath)

Tying this together, the complete example of zipping 1,000 data files in the tmp/ subdirectory is listed below.

# SuperFastPython.com
# create a zip file and add files sequentially
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED

# create a zip file
def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # add all files to the zip
        for filepath in files:
            # add file to the archive
            handle.write(filepath)
            # report progress
            print(f'.added {filepath}')

# entry point
if __name__ == '__main__':
    main()

Running the program creates the zip file 'testing.zip' in the current working directory and adds all files from the tmp sub-directory.

...
.added tmp/data-0779.csv
.added tmp/data-0037.csv
.added tmp/data-0751.csv
.added tmp/data-0989.csv
.added tmp/data-0745.csv
.added tmp/data-0023.csv
.added tmp/data-0976.csv
.added tmp/data-0962.csv
.added tmp/data-0792.csv
.added tmp/data-0786.csv

The resulting zip is large, about 847 megabytes in size.

testing.zip

It takes a while to create this zip file, depending on the speed of your hardware.

On my system, it takes about 136.9 seconds (about 2.2 minutes).

[Finished in 136.9s]

Next, let's consider the thread safety of the ZipFile class.

ZipFile is Not Thread Safe

Adding files to a zipfile.ZipFile is not thread safe.

There is no mention of thread safety in the API documentation for the ZipFile class at the time of writing (Python v3.10).

Reviewing the source code for the ZipFile class (Python v3.10), we can see some usage of internal locks, but it does not appear to be complete (I might be wrong).

Nevertheless, attempting to write files to a ZipFile from multiple threads will result in an error along the lines of:

ValueError: Can't write to ZIP archive while an open writing handle exists.

Therefore, the program to add files to a zip archive requires some modification to make it thread safe.

At the very least, calls on the ZipFile class to add files to the archive must be treated as a critical section and made thread safe using synchronization primitives like a mutual exclusion lock (mutex).

This includes functions such as the ZipFile.write() for automatically loading a file and writing it to the archive and the ZipFile.writestr() function for adding file data loaded as a string to the archive.

Next, let's look at how we might speed up the operation using multithreading.

How to Zip Files Concurrently

Creating a zip archive of many files has three elements:

  1. Loading a file from disk into memory.
  2. Compressing the data in memory.
  3. Writing the compressed data to the archive on disk.

Loading the file data from disk is IO-bound as it is limited by the speed that we can move data from the hard drive to main memory.

Compressing the loaded data is CPU-bound as it is purely algorithmic; it will run as fast as the CPU.

Adding the compressed data to the archive is IO-bound as it involves writing the data to file on disk.

It is hard to know from this review whether we can get some benefit from multi-threading file zipping.

For example, the hard drive can only load one file at a time, and can only write one zipped file to the archive at a time, so this aspect probably cannot be multithreaded.

Nevertheless, it is possible to be loading one file into memory at the same time as compressing another file in memory.

A naive approach would be to use multithreading and call the ZipFile.write() function for each file in the directory in a separate thread.

Synchronization primitives would be required to protect the ZipFile instance around each ZipFile.write() call to avoid corrupting the archive.

This would likely provide no benefit as the ZipFile.write() function both loads each file and compresses and saves to the archive all protected by a lock, e.g. there's no opportunity for concurrency.

Instead, we need to split the ZipFile.write() task into subtasks. In fact, this would likely be slower than the serial case given the overhead of acquiring and releasing a lock for each file added to the zip.

We can separate the file loading aspect from the file compressing/saving aspect in ZipFile.write().

We can load the file data into memory ourselves using the built-in open() function to get the file handle then call the read function. For example:

...
# load the data as a string
with open(filepath, 'r') as file_handle:
    data = file_handle.read()

We can then call the ZipFile.writestr() function to both compress and save the loaded data directly to the archive. For example:

...
# add data to zip
handle.writestr(filepath, data)

We don't have fine-grained control over first compressing the data, then saving it, at least not with the ZipFile class API.

One approach would be to use multithreading to load each file into memory, then compress and add each file to the zip file in the main thread. This would likely not provide much benefit as we can only load a single file from the hard drive at a time, even though it is a blocking operation.

Another approach would be to multithread the process of loading each file and adding it to the archive directly.

This might provide a benefit as it may be possible for one thread to be reading a file from disk to main memory at the same time as another thread running a compression algorithm or attempting a write to disk.

We will explore this approach in the next section.

Zip Files Concurrently With Threads

The ThreadPoolExecutor provides a pool of worker threads that we can use to multithread the zipping of hundreds of data files.

Each file in the tmp/ directory will represent a task that will require the file to be loaded from disk into main memory, then added to the zip archive.

First, we can create a threading.Lock that we will use to protect the ZipFile, ensuring that only one thread can add a file to the archive at a time.

...
# create lock for adding files to the zip
lock = Lock()

Next, we can create the thread pool with 100 worker threads, the number chosen somewhat arbitrarily. You might want to experiment with different numbers of worker threads to see if you can get better performance.

We will use the context manager to create the thread pool and ensure it is closed once we are finished with it.

...
# create the thread pool
with ThreadPoolExecutor(100) as exe:
	# ...

We can define a function that takes the lock, the zip file handle, and the path for a single file to add to the archive.

First, we will open the file and load it from disk into main memory.

...
# load the data as a string
with open(filepath, 'r') as file_handle:
    data = file_handle.read()

We can then acquire the lock that protects the ZipFile instance and call the ZipFile.writestr() function to add the loaded data to the archive.

We can use the context manager for the lock to ensure it is released once we are finished with it.

...
# add data to zip
with lock:
    handle.writestr(filepath, data)

Tying this together, the add_file() function below performs the task of adding a file to the zip archive in a thread-safe manner.

# load the file as a string then add it to the zip in a thread safe manner
def add_file(lock, handle, filepath):
    # load the data as a string
    with open(filepath, 'r') as file_handle:
        data = file_handle.read()
    # add data to zip
    with lock:
        handle.writestr(filepath, data)
    # report progress
    print(f'.added {filepath}')

We can then call the submit() function on the thread pool to issue a task to the worker threads for each file in the sub-directory to add to the archive. This returns one Future object for each call to submit, which we will ignore in this case.

...
# add all files to the zip archive
_ = [exe.submit(add_file, lock, handle, f) for f in files]

Tying this all together, the complete example of multithreading the zipping of one thousand of data files is listed below.

# SuperFastPython.com
# create a zip file and add files concurrently with threads
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
from threading import Lock
from concurrent.futures import ThreadPoolExecutor

# load the file as a string then add it to the zip in a thread safe manner
def add_file(lock, handle, filepath):
    # load the data as a string
    with open(filepath, 'r') as file_handle:
        data = file_handle.read()
    # add data to zip
    with lock:
        handle.writestr(filepath, data)
    # report progress
    print(f'.added {filepath}')

# create a zip file
def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # create lock for adding files to the zip
    lock = Lock()
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # create the thread pool
        with ThreadPoolExecutor(100) as exe:
            # add all files to the zip archive
            _ = [exe.submit(add_file, lock, handle, f) for f in files]

# entry point
if __name__ == '__main__':
    main()

Running the example creates the 'testing.zip' archive as before containing the 1,000 data files.

…
.added tmp/data-0779.csv
.added tmp/data-0037.csv
.added tmp/data-0751.csv
.added tmp/data-0989.csv
.added tmp/data-0745.csv
.added tmp/data-0023.csv
.added tmp/data-0976.csv
.added tmp/data-0962.csv
.added tmp/data-0792.csv
.added tmp/data-0786.csv

Archiving the 1,000 files using threads in this case does not seem to offer any speed improvement.

On my system, it takes about 135.2 seconds, compared to 136.9 seconds for the sequential case which is probably within the margin of measurement error.

[Finished in 135.2s]

The bottleneck may be the hard drive and its limitation of loading a single file into main memory at a time, and also the ability to save a single compressed file to the archive at a time.

We might achieve additional benefit if the zip file can be constructed in memory first before being written to disk at the end, but this is probably not practical given the memory requirements for the minor additional speed-up it would offer.

Zip Files Concurrently with Threads in Batch

Each task in the thread pool adds overhead that slows down the overall task.

This includes function calls and object creation for each task, occurring internally within the ThreadPoolExecutor class.

We might reduce this overhead by performing file operations in a batch within each worker thread.

This can be achieved by updating the add_file() function to receive a list of files to load and add to the archive, and splitting up the files in the main() function into chunks to be submitted to worker threads for batch processing.

The hope is that this would reduce some of the overhead required for submitting so many small tasks by replacing it with a few larger batch tasks.

First, we can change the target function to receive a list of file paths to load.

There are a number of ways we can handle this list of file paths.

The first is to iterate over the list and in each iteration, load the file, get the lock and add the loaded file to the archive.

For example:

# load files as strings then add them to the zip in a thread safe manner
def add_files(lock, handle, filepaths):
    # add all files
    for filepath in filepaths:
        # load the data as a string
        with open(filepath, 'r') as file_handle:
            data = file_handle.read()
        # add data to zip
        with lock:
            handle.writestr(filepath, data)
            # report progress
            print(f'.added {filepath}')

An alternate and perhaps more efficient approach (with regards to the lock) would be to first load all file data from disk into main memory, and then add the loaded data to the zip archive in batch. This means that each worker thread would only need to acquire the lock once and may make repeated calls to the ZipFile.writestr() function.

We will explore the second approach in this case.

The updated add_files() function is listed below, taking a list of files to add to the archive then loading the file data into memory in batch, then finally getting the lock and adding files to the archive in batch.

# load files as strings then add them to the zip in a thread safe manner
def add_files(lock, handle, filepaths):
    # load files first
    filedata = list()
    for filepath in filepaths:
        # load the data as a string
        with open(filepath, 'r') as file_handle:
            filedata.append(file_handle.read())
    # add all data to zip
    with lock:
        for filepath,data in zip(filepaths, filedata):
            handle.writestr(filepath, data)
            # report progress
            print(f'.added {filepath}')

Next, in the main() function we can select a chunksize based on the number of worker threads and the number of files to zip.

In this case, we will use 100 worker threads with 1,000 files to copy. Dividing the files evenly between workers (1,000 / 100) or 10 files to load for each worker.

This means that we may have up to 1,000 * 2 megabyte files (2 gigabytes) in memory at one time in the worst case.

...
# determine chunksize
n_workers = 100
chunksize = round(len(files) / n_workers)

Next, we can create the thread pool with the parameterized number of worker threads.

...
# create the thread pool
with ThreadPoolExecutor(n_workers) as exe:
    ...

We can iterate over the list of files to load and split them into chunks defined by a chunksize.

This can be achieved by iterating over the list of files and using the chunksize as the increment size in a for-loop. We can then split off a chunk of files to send to a worker thread as a task.

...
# split the copy operations into chunks
for i in range(0, len(files), chunksize):
    # select a chunk of filenames
    filepaths = files[i:(i + chunksize)]
    # submit the task
    _ = exe.submit(add_files, lock, handle, filepaths)

And that's it.

The complete example of adding files to the zip archive in a batch concurrently using worker threads is listed below.

# SuperFastPython.com
# add many files to a zip archive concurrently with threads in batch
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
from threading import Lock
from concurrent.futures import ThreadPoolExecutor

# load files as strings then add them to the zip in a thread safe manner
def add_files(lock, handle, filepaths):
    # load files first
    filedata = list()
    for filepath in filepaths:
        # load the data as a string
        with open(filepath, 'r') as file_handle:
            filedata.append(file_handle.read())
    # add all data to zip
    with lock:
        for filepath,data in zip(filepaths, filedata):
            handle.writestr(filepath, data)
            # report progress
            print(f'.added {filepath}')

# create a zip file
def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # create lock for adding files to the zip
    lock = Lock()
    # determine chunksize
    n_workers = 100
    chunksize = round(len(files) / n_workers)
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # create the thread pool
        with ThreadPoolExecutor(n_workers) as exe:
            # split the copy operations into chunks
            for i in range(0, len(files), chunksize):
                # select a chunk of filenames
                filepaths = files[i:(i + chunksize)]
                # submit the task
                _ = exe.submit(add_files, lock, handle, filepaths)

# entry point
if __name__ == '__main__':
    main()

Running the example adds all 1,000 files to the zip file as before.

...
.added tmp/data-0183.csv
.added tmp/data-0817.csv
.added tmp/data-0803.csv
.added tmp/data-0630.csv
.added tmp/data-0156.csv
.added tmp/data-0142.csv
.added tmp/data-0624.csv
.added tmp/data-0618.csv
.added tmp/data-0619.csv
.added tmp/data-0143.csv
.added tmp/data-0625.csv

In this case we don’t see an improvement compared to the previous multithreaded example or the single threaded example of zipping the files.

On my system, it took 135.8 seconds, compared to 135.2 seconds when not adding files in batch, which is probably within the margin of measurement error.

[Finished in 135.8s]

Zip Files Concurrently With Threads Without a Lock

The lock protecting the zip file instance can be removed if we only add files to the zip file from the main thread.

We would not expect any speed benefit from this change, but it may be slightly less complex code.

This can be achieved by starting with the non-batch version of the code and first defining a task function that only loads one file from disk into memory.

The load_file() function below implements this, taking a file path to load and returning the file path and the contents of the file.

# load file into memory
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # return the contents and the filepath
        return (filepath, handle.read())

We can then submit all file paths to the thread pool to have them loaded into memory.

...
# load all files into memory
futures = [exe.submit(load_file, filepath) for filepath in files]

We then process the files as they are loaded from disk via the as_completed() function and add them to the zip file.

...
# compress files as they are loaded
for future in as_completed(futures):
    # get the data
    filepath, data = future.result()
    # add to the archive
    handle.writestr(filepath, data)
    # report progress
    print(f'.added {filepath}')

Tying this together, the complete example of concurrently loading the file from disk and adding to the archive sequentially is listed below.

# SuperFastPython.com
# create a zip file and add files concurrently with threads without a lock
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# load file into memory
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # return the contents and the filepath
        return (filepath, handle.read())

# create a zip file
def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # create the thread pool
        with ThreadPoolExecutor(100) as exe:
            # load all files into memory
            futures = [exe.submit(load_file, filepath) for filepath in files]
            # compress files as they are loaded
            for future in as_completed(futures):
                # get the data
                filepath, data = future.result()
                # add to the archive
                handle.writestr(filepath, data)
                # report progress
                print(f'.added {filepath}')

# entry point
if __name__ == '__main__':
    main()

Running the example creates the zip file and the thread pool then adds files to the archive as they are loaded into memory.

...
.added tmp/data-0814.csv
.added tmp/data-0355.csv
.added tmp/data-0618.csv
.added tmp/data-0357.csv
.added tmp/data-0181.csv
.added tmp/data-0156.csv
.added tmp/data-0786.csv
.added tmp/data-0745.csv
.added tmp/data-0792.csv
.added tmp/data-0553.csv

Although the lock is removed, making the code simpler, we don't seem to get any speed benefit.

It takes approximately 136.0 seconds on my system, which is equivalent (within the margin of measurement error) to the 135.2 seconds with the lock.

[Finished in 136.0s]

Zip Files Concurrently with Processes

We can also try to zip files concurrently with processes instead of threads.

It is unclear whether processes can offer a speed benefit in this case. Given that we cannot get a benefit using threads.

Nevertheless, using processes requires data for each task to be serialized which introduces additional overhead that might negate any speed-up from performing file operations with true parallelism via processes. In fact, the need to transmit the contents of each file from child processes back to the main process will likely add a large overhead, given that the contents of each file must be serialized and then deserialized.

We can explore using processes to load files concurrently using the ProcessPoolExecutor.

This can be achieved by switching out the ThreadPoolExecutor directly and specifying the number of worker processes.

Specifically, we will adapt the multithreaded version that does not use a lock as we would not be able to add files to a zip file from multiple processes as the ZipFile class cannot be pickled for serialization (perhaps with good reason).

We will use 8 processes in this case as I have 8 logical CPU cores.

It may be interesting to try different configurations of the number of worker processes to see if it makes a difference on the overall running time.

...
# create the process pool
with ProcessPoolExecutor(8) as exe:
    # ...

Tying this together, the complete example of zipping files concurrently using the process pool is listed below.

# SuperFastPython.com
# create a zip file and add files concurrently with processes
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed

# load file into memory
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # return the contents and the filepath
        return (filepath, handle.read())

# create a zip file
def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # create the thread pool
        with ProcessPoolExecutor(8) as exe:
            # load all files into memory
            futures = [exe.submit(load_file, filepath) for filepath in files]
            # compress files as they are loaded
            for future in as_completed(futures):
                # get the data
                filepath, data = future.result()
                # add to the archive
                handle.writestr(filepath, data)
                # report progress
                print(f'.added {filepath}')

# entry point
if __name__ == '__main__':
    main()

Running the example adds all 1,000 files to testing.zip as before.

...
.added tmp/data-0779.csv
.added tmp/data-0037.csv
.added tmp/data-0751.csv
.added tmp/data-0989.csv
.added tmp/data-0745.csv
.added tmp/data-0023.csv
.added tmp/data-0976.csv
.added tmp/data-0962.csv
.added tmp/data-0792.csv
.added tmp/data-0786.csv

In this case, we don't get any speed benefit, and in fact it may be slightly slower than the single-threaded and multi-threaded versions.

On my system, the example takes about 138.7 seconds, compared to about 136.9 seconds seconds for the single-threaded case.

[Finished in 138.7s]

Zip Files Concurrently with Processes in Batch

Any data sent to a worker process or received from a worker process must be serialized (pickled).

This adds overhead for each task executed by worker threads.

This is likely the cause of worse performance seen when using the process pool in the previous section.

We can address this by batching the load tasks into chunks to be executed by each worker process, just as we did when we batched filenames for the thread pools.

First, we can adapt the load_file() function to take a list of files to load into memory.

# load files into memory
def load_files(filepaths):
    filedata = list()
    for filepath in filepaths:
        # open the file
        with open(filepath, 'r') as handle:
            # return the contents and the filepath
            filedata.append(handle.read())
    return (filepaths, filedata)

Next, we can split the list of files to load into chunks, in this case groups of 10 files.

Each process will load 10 files and send the data back to the main process.

...
# split the copy operations into chunks
futures = list()
for i in range(0, len(files), chunksize):
    # select a chunk of filenames
    filepaths = files[i:(i + chunksize)]
    # submit the task
    futures.append(exe.submit(load_files, filepaths))

Finally, we can process the loaded files in batch mode, adding each group of loaded files to the zip archive.

...
# compress batches of files as they are loaded
for future in as_completed(futures):
    # get the data
    filepaths, filedata = future.result()
    # add each to the zip file
    for filepath,data in zip(filepaths, filedata):
        # add to the archive
        handle.writestr(filepath, data)
        # report progress
        print(f'.added {filepath}')

And that's it.

The complete example of batch loading files using process pools is listed below.

# SuperFastPython.com
# create a zip file and add files concurrently with processes in batch
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed

# load files into memory
def load_files(filepaths):
    filedata = list()
    for filepath in filepaths:
        # open the file
        with open(filepath, 'r') as handle:
            # return the contents and the filepath
            filedata.append(handle.read())
    return (filepaths, filedata)

# create a zip file
def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # determine chunksize
        chunksize = 10
        # create the process pool
        with ProcessPoolExecutor(8) as exe:
            # split the copy operations into chunks
            futures = list()
            for i in range(0, len(files), chunksize):
                # select a chunk of filenames
                filepaths = files[i:(i + chunksize)]
                # submit the task
                futures.append(exe.submit(load_files, filepaths))
            # compress batches of files as they are loaded
            for future in as_completed(futures):
                # get the data
                filepaths, filedata = future.result()
                # add each to the zip file
                for filepath,data in zip(filepaths, filedata):
                    # add to the archive
                    handle.writestr(filepath, data)
                    # report progress
                    print(f'.added {filepath}')

# entry point
if __name__ == '__main__':
    main()

Running the example zips all 1,000 files as in all previous examples.

...
.added tmp/data-0779.csv
.added tmp/data-0037.csv
.added tmp/data-0751.csv
.added tmp/data-0989.csv
.added tmp/data-0745.csv
.added tmp/data-0023.csv
.added tmp/data-0976.csv
.added tmp/data-0962.csv
.added tmp/data-0792.csv
.added tmp/data-0786.csv

In this case, we don't see any speed improvement over any other versions of the code.

On my system it takes about 135.3 seconds to complete.

[Finished in 135.3s]

Zip Files Concurrently with Processes and Threads

It might be interesting to combine both process pools and thread pools to see if we can speed-up file zipping.

Specifically, we can adapt the example from the previous section to use a thread pool to load all files concurrently within each worker process.

This may not offer a benefit because the the loaded data must still be communicated from the child process back to the parent process to be added to the zip. Nevertheless, it may be an interesting exercise.

First, we must define a function that we can use as the task for the worker thread to load a given file into memory.

The load_file() function below implements this, taking a file path and returning the contents of the file.

# load a file
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # read and return the contents of the file
        return handle.read()

Next, we can update the load_files() function. Recall, this is the function that loads a list of files on a worker process and returns their paths and contents to the parent process.

We can update this function to create a thread pool and submit a call to our load_file() function for each file path to be loaded.

...
# create the thread pool
with ThreadPoolExecutor(10) as executor:
    # submit tasks
    futures = [executor.submit(load_file, name) for name in filepaths]

Next, we can retrieve the results of each task (the contents of the loaded file), then return a tuple of the file paths and file contents.

This means that the interface (arguments and return value) of the load_files() function is unchanged, although the manner in which we load files from disk into main memory has changed, e.g, uses a thread pool.

The updated version of the load_files() function with these changes is listed below.

# load files into memory
def load_files(filepaths):
    # create the thread pool
    with ThreadPoolExecutor(10) as executor:
        # submit tasks
        futures = [executor.submit(load_file, name) for name in filepaths]
        # get all results
        filedata = [future.result() for future in futures]
        # return paths and data
        return (filepaths, filedata)

Tying this together, the complete example of zipping files concurrently with a process pool where each worker process uses a thread pool is listed below.

# SuperFastPython.com
# create a zip file and add files concurrently with processes and threads in batch
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# load a file
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # read and return the contents of the file
        return handle.read()

# load files into memory
def load_files(filepaths):
    # create the thread pool
    with ThreadPoolExecutor(10) as executor:
        # submit tasks
        futures = [executor.submit(load_file, name) for name in filepaths]
        # get all results
        filedata = [future.result() for future in futures]
        # return paths and data
        return (filepaths, filedata)

# create a zip file
def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # determine chunksize
        chunksize = 10
        # create the process pool
        with ProcessPoolExecutor(8) as exe:
            # split the copy operations into chunks
            futures = list()
            for i in range(0, len(files), chunksize):
                # select a chunk of filenames
                filepaths = files[i:(i + chunksize)]
                # submit the task
                futures.append(exe.submit(load_files, filepaths))
            # compress batches of files as they are loaded
            for future in as_completed(futures):
                # get the data
                filepaths, filedata = future.result()
                # add each to the zip file
                for filepath,data in zip(filepaths, filedata):
                    # add to the archive
                    handle.writestr(filepath, data)
                    # report progress
                    print(f'.added {filepath}')

# entry point
if __name__ == '__main__':
    main()

Running the example loads all files and adds them to the zip file, as before.

...
.added tmp/data-0779.csv
.added tmp/data-0037.csv
.added tmp/data-0751.csv
.added tmp/data-0989.csv
.added tmp/data-0745.csv
.added tmp/data-0023.csv
.added tmp/data-0976.csv
.added tmp/data-0962.csv
.added tmp/data-0792.csv
.added tmp/data-0786.csv

In this case, we don't see any speed-up in using thread pools within each worker process, as we might have expected.

On my system, the example took about 135.1 seconds, compared to about 136.9 seconds for the sequential case, which is probably within the margin of measurement error.

[Finished in 135.1s]

Zip Files Concurrently with AsyncIO (without a lock)

We can also zip files concurrently using asyncio.

Generally, Python does not support non-blocking IO operations when working with files. It is not provided in the asyncio module. This is because it is challenging to implement in a general cross-platform manner.

The third-party library aiofiles provides file operations for use in asyncio operations, but again, the operations are not true non-blocking IO and instead the concurrency is simulated using thread pools.

Nevertheless, we can use aiofiles in an asyncio program to load files concurrently. It does not support for an asynchronous version of the ZipFile class.

You can install the aiofiles library using your Python package manager, such as pip:

pip3 install aiofiles

We will adapt the multithreaded version of zipping files that does not use locks, as described above.

First, we must update the load_file() function to use the aiofiles.open() function to open files and to await on the call to read() from the file which will yield control.

The updated version of the load_file() function with these changes is below.

# open a file and return the contents
async def load_file(filepath):
    # open the file
    async with aiofiles.open(filepath, 'r') as handle:
        # return the contents
        return (await handle.read(), filepath)

The main() function must be updated to also be an async function and to create and execute a coroutine for each file to be loaded.

This can be achieved by first creating a list of coroutine calls to the load_file() via a list comprehension.

...
# create coroutines
tasks = [load_file(filepath) for filepath in paths]

Recall, this will only create coroutines, it will not call the function.

Next, we can execute the coroutines and process their results in the order that the files are loaded using the asyncio.as_completed() function.

...
# execute tasks and process results as they are completed
for task in asyncio.as_completed(tasks):
    # open the file and load the data
    data, filepath = await task

Tying this together, the complete example of zipping files concurrently using asyncio is listed below.

# SuperFastPython.com
# create a zip file and add files concurrently with asyncio
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
import asyncio
import aiofiles

# open a file and return the contents
async def load_file(filepath):
    # open the file
    async with aiofiles.open(filepath, 'r') as handle:
        # return the contents
        return (await handle.read(), filepath)

# load all files in a directory into memory
async def main(path='tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # create coroutines
    tasks = [load_file(filepath) for filepath in paths]
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # execute tasks and process results as they are completed
        for task in asyncio.as_completed(tasks):
            # open the file and load the data
            data, filepath = await task
            # add to the archive
            handle.writestr(filepath, data)
            # report progress
            print(f'.added {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    asyncio.run(main())

Running the example adds all 1,000 files to the zip file as before.

...
.added tmp/data-0397.csv
.added tmp/data-0325.csv
.added tmp/data-0274.csv
.added tmp/data-0512.csv
.added tmp/data-0426.csv
.added tmp/data-0443.csv
.added tmp/data-0704.csv
.added tmp/data-0432.csv
.added tmp/data-0499.csv
.added tmp/data-0494.csv
Done

In this case, zipping files concurrently using asyncio does not offer any speed benefit.

This may be expected given that the aiofiles is using a thread pool behind the scenes.

On my system, example completes in about 137.3 seconds, compared to about 136.9 seconds for the sequential version.

[Finished in 137.3s]

Zip Files Concurrently with AsyncIO (with a lock)

The example in the previous section zips files concurrently with aiofiles and asyncio is based on the lock-free version of the code developed above when working with threads.

Similarly, we can develop a version of zipping files with asyncio using a Lock, allowing each coroutine to add files to the zip file themselves, rather than sending the data back to the main routine to be added.

This is relatively straightforward and involves adapting the first multithreaded version of the code developed above to use asyncio.

First, we can adapt the add_file() function to be a coroutine.

Recall, that this function takes a lock, the handle to the zip file and the file path of the file to load and add to the zip file.

# load a file as a string then add it to the zip in a thread safe manner
async def add_file(lock, handle, filepath):
    # ...

First, we must open the file and read its contents asynchronously.

...
# load the data as a string
async with aiofiles.open(filepath, 'r') as fd:
    data = await fd.read()

Next, we must acquire the lock and then write the loaded data to the zip file, in a blocking manner.

# add data to zip
async with lock:
    handle.writestr(filepath, data)

Tying this together, the updated async version of the add_file() function is listed below.

# load a file as a string then add it to the zip in a thread safe manner
async def add_file(lock, handle, filepath):
    # load the data as a string
    async with aiofiles.open(filepath, 'r') as fd:
        data = await fd.read()
    # add data to zip
    async with lock:
        handle.writestr(filepath, data)
    # report progress
    print(f'.added {filepath}')

Next, we can create a coroutine and call our add_file() function for each file path to be added to the zip file.

First, we can create a list of all files to be added to the zip file by enumerating the contents of the source directory with os.listdir() and calling the os.path.join() function to create a relative path.

...
# list all files to add to the zip
files = [join(path,f) for f in listdir(path)]

Next, we can create a lock to protect writing to the ZipFile instance across coroutines.

This can be achieved by using an asyncio.Lock instance.

...
# create lock for adding files to the zip
lock = asyncio.Lock()

We can then open the zip file in a blocking manner.

...
# open the zip file
with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
    # ...

We can then create a coroutine for each call to the add_file() function required for each file path. This can be achieved in a list comprehension.

...
# create all of the coroutines
tasks = [add_file(lock, handle, filepath) for filepath in files]

Recall that this will create a list of coroutines and will not call the add_file() function.

Finally, we can execute all coroutines concurrently and wait for all files to be added to the zip file.

This can be achieved by calling the asyncio.gather() function and we can unpack the list of calls to coroutines using the star operator (*).

...
# execute and wait for all coroutines
await asyncio.gather(*tasks)

The updated version of the main() function with these changes is listed below.

# create a zip file
async def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # create lock for adding files to the zip
    lock = asyncio.Lock()
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # create all of the coroutines
        tasks = [add_file(lock, handle, filepath) for filepath in files]
        # execute and wait for all coroutines
        await asyncio.gather(*tasks)

Tying this together, the complete example using asyncio and a lock to allow each coroutine to add files to the zip file themselves is listed below.

# SuperFastPython.com
# create a zip file and add files concurrently with asyncio with a lock
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
import asyncio
import aiofiles

# load a file as a string then add it to the zip in a thread safe manner
async def add_file(lock, handle, filepath):
    # load the data as a string
    async with aiofiles.open(filepath, 'r') as fd:
        data = await fd.read()
    # add data to zip
    async with lock:
        handle.writestr(filepath, data)
    # report progress
    print(f'.added {filepath}')

# create a zip file
async def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # create lock for adding files to the zip
    lock = asyncio.Lock()
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # create all of the coroutines
        tasks = [add_file(lock, handle, filepath) for filepath in files]
        # execute and wait for all coroutines
        await asyncio.gather(*tasks)

# entry point
if __name__ == '__main__':
    asyncio.run(main())

Running the example adds all files to the zip as we did in previous examples.

...
.added tmp/data-0584.csv
.added tmp/data-0023.csv
.added tmp/data-0786.csv
.added tmp/data-0989.csv
.added tmp/data-0745.csv
.added tmp/data-0221.csv
.added tmp/data-0382.csv
.added tmp/data-0976.csv
.added tmp/data-0792.csv
.added tmp/data-0962.csv

In this case, we don't see a benefit compared to the sequential version developed first, and no speed difference compared to the previous asyncio version.

On my system, the example took about 137.2 seconds, compared to about 136.9 seconds for the sequential version and about 137.3 seconds for the lock-free asyncio version.

[Finished in 137.2s]

Zip Files Concurrently with AsyncIO in Batch

We can update the previous lock-based asyncio version to add files to the zip in batch.

This would allow fewer overall coroutines and multiple files to be loaded in a batch within one coroutine before being added to the zip file.

This requires the add_file() function to be changed to add_files() and to take a list of file paths instead of a single file path.

# load files as strings then add them to the zip in a thread safe manner
async def add_files(lock, handle, filepaths):
    # ...

We have perhaps two options when updating the add_files() function.

Firstly, we could process each file path one-by-one, first loading the contents asynchronously, then getting the lock and adding the contents to the zip file.

An example of this approach is listed below.

# load files as strings then add them to the zip in a thread safe manner
async def add_files(lock, handle, filepaths):
    # add all files
    for filepath in filepaths:
        # load the data as a string
        async with aiofiles.open(filepath, 'r') as fd:
            data = await fd.read()
        # add data to zip
        async with lock:
            handle.writestr(filepath, data)
        # report progress
        print(f'.added {filepath}')

A benefit of this approach is that we only keep one file in memory per function call and that the lock is held briefly by each coroutine, although the lock also provides a bottleneck that may slow down the concurrent loading of the files.

Alternately, we might first load the contents of all files into memory, then get the lock once and then write all files to the zip file.

An example of this latter approach is listed below.

# load files as strings then add them to the zip in a thread safe manner
async def add_files(lock, handle, filepaths):
    # add all files
    filedata = list()
    for filepath in filepaths:
        # load the data as a string
        async with aiofiles.open(filepath, 'r') as fd:
            data = await fd.read()
            filedata.append(data)
    # lock on the zip file
    async with lock:
        # write all files to zip
        for filepath, data in zip(filepaths, filedata):
            handle.writestr(filepath, data)
            # report progress
            print(f'.added {filepath}')

This second approach may mean less of a bottleneck around the lock although requires more memory for the batch of files.

We will use this latter approach, although it may be interesting to benchmark the prior method.

Next, we can update the main() function to split the list of file paths into chunks and dispatch them to coroutines for execution.

Firstly, we can define the number of calls we want to make to the add_files() function and use that along with the total number of files to add to the zip to determine the chunk size. That is, the number of files that each call to add_files() is responsbile for handling.

...
# determine chunksize
n_workers = 100
chunksize = round(len(files) / n_workers)

After opening the ZipFile, we can then iterate over the list of files in chunks and define coroutines that can be added to a list for later execution.

...
# split the copy operations into chunks
tasks = list()
for i in range(0, len(files), chunksize):
    # select a chunk of filenames
    filepaths = files[i:(i + chunksize)]
    # add the coroutine to the list
    tasks.append(add_files(lock, handle, filepaths))

Finally, we can execute the coroutines and wait for all files to be added.

...
# execute and wait for all coroutines
await asyncio.gather(*tasks)

Tying this together, the complete example of adding files to the zip concurrently with asyncio in batch is listed below.

# SuperFastPython.com
# create a zip file and add files concurrently with asyncio in batch
from os import listdir
from os.path import join
from zipfile import ZipFile
from zipfile import ZIP_DEFLATED
import asyncio
import aiofiles

# load files as strings then add them to the zip in a thread safe manner
async def add_files(lock, handle, filepaths):
    # add all files
    filedata = list()
    for filepath in filepaths:
        # load the data as a string
        async with aiofiles.open(filepath, 'r') as fd:
            data = await fd.read()
            filedata.append(data)
    # lock on the zip file
    async with lock:
        # write all files to zip
        for filepath, data in zip(filepaths, filedata):
            handle.writestr(filepath, data)
            # report progress
            print(f'.added {filepath}')

# create a zip file
async def main(path='tmp'):
    # list all files to add to the zip
    files = [join(path,f) for f in listdir(path)]
    # create lock for adding files to the zip
    lock = asyncio.Lock()
    # determine chunksize
    n_workers = 100
    chunksize = round(len(files) / n_workers)
    # open the zip file
    with ZipFile('testing.zip', 'w', compression=ZIP_DEFLATED) as handle:
        # split the copy operations into chunks
        tasks = list()
        for i in range(0, len(files), chunksize):
            # select a chunk of filenames
            filepaths = files[i:(i + chunksize)]
            # add the coroutine to the list
            tasks.append(add_files(lock, handle, filepaths))
        # execute and wait for all coroutines
        await asyncio.gather(*tasks)

# entry point
if __name__ == '__main__':
    asyncio.run(main())

Running the example adds all of the files to the zip file as with all previous examples.

...
.added tmp/data-0786.csv
.added tmp/data-0242.csv
.added tmp/data-0583.csv
.added tmp/data-0625.csv
.added tmp/data-0032.csv
.added tmp/data-0690.csv
.added tmp/data-0355.csv
.added tmp/data-0494.csv
.added tmp/data-0181.csv
.added tmp/data-0234.csv

In this case, we might see a minor speed difference compared to the sequential version and other asyncio versions, although the difference is likely within the margin of measurement error.

On my system, the example took about 135.4 seconds, compared to about 136.9 seconds for the sequential example.

[Finished in 135.4s]

Extensions

This section lists ideas for extending the tutorial.

Share your extensions in the comments below, it would be great to see what you come up with.

Takeaways

In this tutorial, you explore how to zip hundreds of data files using multithreading.