Numpy Parallel Vector Distance Calculation
You can calculate vector distances in parallel by using SciPy distance functions and threads.
In this tutorial, you will discover how to calculate vector distances between numpy arrays in parallel using threads.
Let's get started.
Need Parallel Vector Distance Calculation
A vector is an array of numbers.
A common operation with vectors is to calculate the distance between two vectors.
Common distance calculations include Euclidean distance, Minkowski distance, and Cosine distance.
Distance calculations can be calculated using SciPy functions in the scipy.spatial.distance module, such as:
- spatial.distance.euclidean(): Euclidean distance.
- scipy.spatial.distance.minkowski(): Minkowski distance.
- scipy.spatial.distance.cosine(): Cosine distance.
We may need to calculate the distance between vectors as part of many linear algebra operations, such as in machine learning applications in algorithms such as k-nearest neighbors (knn) and in collaborative filtering systems for making recommendations.
In these applications, we may need to calculate the distance between many thousands or millions of vectors.
As such, vector distance calculations need to be performed fast.
How can we calculate vector distances in parallel and make best use of available hardware?
How to Calculate Vector Distances in Parallel with Threads
Vector distance calculations between numpy arrays using SciPy functions can be performed in parallel.
This can be achieved using threads.
The reference Python interpreter CPython uses a lock to prevent more than one thread from running at a time. This is to ensure that the internal state of the Python interpreter is thread-safe, avoiding race conditions.
This lock is called the global interpreter lock or GIL and it prevents true parallelism with Python threads. The exception is when the lock is released when it is known that the thread will not be interacting with the internals of the Python interpreter, such as when blocking during I/O, and when calling a third-party library.
Many numpy and SciPy functions on numpy arrays are performed using an underlying C-API. Because these operations are CPU intensive and executed in an external library, most of these functions will release the global interpreter lock.
You can learn more about numpy releasing the GIL in the tutorial:
The SciPy distance functions release the GIL. As such, we can use threads to execute vector distance calculations between numpy arrays in parallel.
The Python standard library provides the multiprocessing.pool.ThreadPool class that can be used to create and manage a pool of worker threads.
A ThreadPool instance can be created and configured with one worker per physical CPU in the system.
For example:
...
# create the thread pool
with ThreadPool(4) as pool:
# ...
You can learn more about configuring the number of workers in the ThreadPool in the tutorial:
We can define a function to execute with thread workers and issue it to the thread pool.
If the function takes more than one argument, such as two vectors or two collections of vectors, we can call the starmap() function with the name of the function and a list of arguments for each call to the target function.
For example:
...
# issue calls to the thread pool
results = pool.starmap(task, arguments)
This will issue each call to the target function to the thread pool and will return a collection of tasks.
You can learn more about the starmap() function in the tutorial:
If there are many more function calls (tasks) than worker threads, the starmap() function will group the calls into chunks and issue them to threads for execution.
We can control the size of the groups of tasks issued to worker threads via the "chunksize" argument to the starmap() function.
For example:
...
# issue calls to the thread pool
results = pool.starmap(task, arguments, chunksize=10)
A chunksize tuned for the specific duration of tasks can offer better performance in some circumstances. It often requires some trial and error to configure.
You can learn more about configuring the "chunksize" argument when using the starmap() and other map() functions on the ThreadPool in the tutorial:
Now that we know how to execute vector distance calculations in parallel using threads, let's look at some worked examples.
Example of Single-Threaded Pairwise Vector Distances (slow)
Before we explore parallel vector distance calculations, let's look at a single-threaded example.
In this example, we will create two large sets of vectors and calculate the Euclidean distance between each pair of vectors and report how long it takes to complete.
Firstly, we will define two collections of vectors. This can be done by creating matrices, with a given number of rows, one for each vector, where each vector has the same number of columns.
Each vector will have 10,000 elements and we will create 100,000 pairs of vectors (rows in the matrix). The first matrix will have all one values and the second all zero values.
...
# size of data
n_rows = 100000
n_cols = 10000
# prepare data
data1 = ones((n_rows,n_cols))
data2 = zeros((n_rows,n_cols))
We can then calculate the Euclidean distance between each pair of vectors. This can be achieved in a list comprehension to give a list of 100,000 distance calculations.
...
# calculate distances for each row
distances = [euclidean(data1[i,:], data2[i,:]) for i in range(len(data1))]
Tying this together, the complete example is listed below.
# example of calculating pairwise vector distances
from time import time
from numpy import ones
from numpy import zeros
from scipy.spatial.distance import euclidean
# size of data
n_rows = 100000
n_cols = 10000
# prepare data
data1 = ones((n_rows,n_cols))
data2 = zeros((n_rows,n_cols))
# record start time
start = time()
# calculate distances for each row
distances = [euclidean(data1[i,:], data2[i,:]) for i in range(len(data1))]
# calculate and report duration
duration = time() - start
print(f'Took {duration:.3f} seconds')
Running the example calculates the Euclidean distance between all pairs of vectors and collects the distances in a list.
The time taken to complete this example on my system was about 4.306 seconds. This provides a point of reference for comparing parallel versions of the same calculations.
Note, the example may take a longer or shorter duration, depending on the speed of your hardware.
Took 4.306 seconds
Next, let's look at how we can perform the same vector distance calculations in parallel.
Example of Multithreaded Pairwise Vector Distances (faster)
We can update the previous example to calculate vector distances in parallel.
There are many ways we could do this. In this case, we will use a ThreadPool of worker threads and issue each distance calculation between a pair of vectors as one task. All pairs of vectors will be issued to the pool as tasks to complete in parallel as fast as the workers are able.
This means there will be 100,000 tasks issued to the pool. These will be grouped together using a default chunksize and the distance results will be returned in the same order in which the tasks were issued to the pool.
...
# create thread pool
with ThreadPool(4) as pool:
# prepare args
args = [(a,b) for a,b in zip(data1,data2)]
# calculate distances on chunks of vectors
distances = pool.starmap(euclidean, args)
The default chunksize is a factor of the number of tasks and the number of workers, which in this case is calculated as follows:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
chunksize, extra = divmod(100,000, 4 * 4)
chunksize, extra = divmod(100,000, 8)
chunksize, extra = 6250, 0
You can learn more about the calculation of the default chunksize in the tutorial:
Tying this together, the complete example is listed below.
# example of multithreaded pairwise vector distances
from time import time
from numpy import ones
from numpy import zeros
from scipy.spatial.distance import euclidean
from multiprocessing.pool import ThreadPool
# size of data
n_rows = 100000
n_cols = 10000
# prepare data
data1 = ones((n_rows,n_cols))
data2 = zeros((n_rows,n_cols))
# record start time
start = time()
# create thread pool
with ThreadPool(4) as pool:
# prepare args
args = [(a,b) for a,b in zip(data1,data2)]
# calculate distances on chunks of vectors
distances = pool.starmap(euclidean, args)
# calculate and report duration
duration = time() - start
print(f'Took {duration:.3f} seconds')
Running the example first splits the pairs of vectors into groups, then issues all tasks to the thread pool.
The thread pool executes all distance calculations as fast as possible, in groups of about 6,000 vectors.
In this case, the example took about 2.593 seconds.
This is about 1.713 seconds faster than the single-threaded version or a speed-up of about 1.66x.
Took 2.593 seconds
Example of Multithreaded Pairwise Vector Distances With Chunksize
Maybe we can get a speed-up with a customized chunksize.
For example, in my system, I have 4 physical CPU cores and we are using 4 worker threads. Perhaps we can divide the list of vectors into 4 chunks and issue each chunk in parallel, we will get a speed up.
This can be achieved by parameterizing the number of workers.
Update the number of workers based on the number of physical CPU cores in your system.
...
n_workers = 4
We can then use this variable to configure the number of worker threads.
...
# create thread pool
with ThreadPool(n_workers) as pool:
# ...
We can also use the variable in calculating the chunkisze, calculated as the number of vectors divided by the number of workers.
...
# calculate distances on chunks of vectors
distances = pool.starmap(euclidean, args, chunksize=round(n_rows/n_workers))
This is calculated as follows:
chunksize = round(n_rows/n_workers)
chunksize = round(100,000/4)
chunksize = round(25,000)
chunksize = 25,000
Tying this together, the complete example is listed below.
# example of multithreaded pairwise vector distances
from time import time
from numpy import ones
from numpy import zeros
from scipy.spatial.distance import euclidean
from multiprocessing.pool import ThreadPool
# size of data
n_rows = 100000
n_cols = 10000
n_workers = 4
# prepare data
data1 = ones((n_rows,n_cols))
data2 = zeros((n_rows,n_cols))
# record start time
start = time()
# create thread pool
with ThreadPool(n_workers) as pool:
# prepare args
args = [(a,b) for a,b in zip(data1,data2)]
# calculate distances on chunks of vectors
distances = pool.starmap(euclidean, args, chunksize=round(n_rows/n_workers))
# calculate and report duration
duration = time() - start
print(f'Took {duration:.3f} seconds')
Running the example creates the data, starts the ThreadPool then issues 4 chunks of pair-wise vectors for distance calculations.
In this case, the example took about 2.653 seconds.
This is roughly equivalent to the default chunksize, offering no meaningful benefit. Nevertheless, it is still about 1.653 seconds faster than the single-threaded version offering a speedup of about 1.62x.
Maybe you can do better with a different manually specified configuration of the chunksize.
Took 2.653 seconds
Takeaways
You now know how to calculate vector distances between numpy arrays in parallel using threads.
If you enjoyed this tutorial, you will love my book: Concurrent NumPy in Python. It covers everything you need to master the topic with hands-on examples and clear explanations.