Parallel NumPy Vector Math with Multiprocessing

May 16, 2023 Concurrent NumPy

You can use multiprocessing to apply a math function to each item in a numpy vector.

Although this is straightforward to implement, it is likely to result in worse performance compared to the sequential version.

As such, it is generally not recommended to use multiprocessing to parallelize math operations on vectors.

In this tutorial, you will discover how to use process-based concurrency with multiprocessing to apply math operations in parallel to numpy vectors.

Let's get started.

Need to Apply Function to NumPy Vector in Parallel

Consider the situation where you have a vector of floating point values represented as a numpy array.

We then need to apply a mathematical operation to each item in the vector to produce a new or transformed vector.

The mathematical operation could be something complex, but it may also be something simple such as a numpy.log(), numpy.sin(), or a numpy.exp().

For example:

...
# apply math function to vector to create transformed vector
result = numpy.exp(data)

If we have a very large vector, such as 10 million, 100 million or billions of items, can we apply the math function on the vector in parallel?

How to Apply Function to a NumPy Vector in Parallel with Multiprocessing

NumPy math functions like numpy.exp() are typically implemented in C code and called from Python.

You can see a long list of them here:

Similarly, we may call a few of these functions in a sequence in order to perform a custom transform.

By default, these operations are not implemented to support parallelism, unlike other numpy functions that call down into the BLAS or LAPACK library, such as numpy.dot() and numpy.linalg.svd().

As such, if we want to perform math operations on a large vector in parallel, we must implement it ourselves.

There are many ways we can implement this.

In this tutorial, let's look at one common approach that many Python developers will reach for when parallelizing code: multiprocessing.

The multiprocessing module is provided in the Python standard library and offers parallelism via process-based concurrency. This is unlike Python threads that are limited to run one at a time due to thread-safety issues with the Python interpreter.

The multiprocessing.Pool class provides a pool of worker processes that can be created once and reused to execute multiple tasks.

If you are new to this class, you can learn more about it here:

We might issue a task to the pool via the Pool.map() method that takes the name of the function, such as numpy.exp, and the iterable on which to apply the function, such as our numpy array vector. It returns a list of results that we can convert back into a numpy array via the numpy.asarray() function.

For example:

...
# apply math function to each item in the vector in parallel
result_list = pool.map(numpy.exp, data)
# convert result list into array
result = numpy.asarray(result_list)

You can learn more about the Pool.map() function in the tutorial:

By default, the map() function will split the iterable into chunks to be sent to each task using a sensible chunksize based on the size of the data. We can also specify our own partitioning of items into chunks to be sent to each worker via the "chunksize" argument.

For example:

...
# apply math function to vector in parallel with a custom chunksize
result_list = pool.map(numpy.exp, data, chunksize=1000)
# convert result list into array
result = numpy.asarray(result_list)

You can learn more about the "chunksize" argument for the map() function in the tutorial:

An alternative strategy is to split our vector up first, then issue each piece of the vector as a task on which a worker in the pool can apply the math function. This can be achieved via the numpy.split() function.

For example:

...
# split vector into parts
parts = numpy.split(data, 10)
# apply math function to each sub vector in parallel
result_list = pool.map(numpy.exp, parts)

We can then stitch the list of result vectors back into one large vector using the numpy.concatenate() function.

For example:

...
# convert list back to one array
result = concatenate(result_list)

Now that we know how to use multiprocessing to apply a math operation to a large numpy vector, let's look at some worked examples.

Example of Vector Operation (sequential)

Before we look at parallelizing math operations on a vector, let's look at the sequential version.

In this example, we will first define a 10,000,000 item vector of random floats via the numpy.random.rand() function, then apply the numpy.exp() math operation to each item in the vector to produce a result vector.

We will time the entire program and report the duration in seconds.

The complete example is listed below.

# example of vector operation
from numpy.random import rand
from numpy import exp
from time import time
# record start time
start = time()
# size of the vector
n = 10000000
# create vector of random floats
data = rand(n)
# apply math operation to each item
result = exp(data)
# calculate and report duration
duration = time() - start
print(f'Took {duration:.3f} seconds')

Running the example on my system takes about one-tenth of a second, that is 143 milliseconds.

It may run faster or slower on your system, depending on the speed of your hardware and the version of Python and NumPy.

Took 0.143 seconds

Next, let's explore applying the math function in parallel.

Example of Parallel Vector Operations (Default Chunksize)

We can apply the exp() function on our array in parallel.

This requires first creating the data as before, then creating a process pool.

By default, the Pool class will create one worker per logical CPU in your system. We can often achieve better performance by configuring it to have one worker per physical CPU core.

I have 4 physical cores, so the pool is configured with 4 workers. Update the example to match the number of cores in your system.

...
# create the process pool
with Pool(4) as pool:
	# ...

You can learn more about configuring the process pool in the tutorial:

We can then issue the tasks, gather the results, and reconstitute the transformed vector.

The example uses the default "chunksize" argument for map(). This is calculated automatically as:

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
chunksize, extra = divmod(10,000,000, 4 * 4)
chunksize, extra = divmod(10,000,000, 16)
chunksize, extra = 625,000, 0

The complete example is listed below.

# example of vector operation using a pool (default chunksize)
from multiprocessing import Pool
from numpy.random import rand
from numpy import exp
from numpy import asarray
from time import time

# protect the entry point
if __name__ == '__main__':
    # record start time
    start = time()
    # size of the vector
    n = 10000000
    # create vector of random floats
    data = rand(n)
    # create the process pool
    with Pool(4) as pool:
        # issue each func call on each item as a task
        result_list = pool.map(exp, data)
        # convert list back to array
        result = asarray(result_list)
    # calculate and report duration
    duration = time() - start
    print(f'Took {duration:.3f} seconds')

Running the example is very slow.

Much slower than the non-parallel version of the program.

On my system, it took about 23 seconds to complete.

Took 23.248 seconds

Next, let's see if we can improve performance by tuning the chunksize.

Example of Parallel Vector Operation (Tuned Chunksize)

The map() function is copying blocks of items from the vector to child processes, applies the math operation on each sequentially, then returns a list of results.

The copying of data in memory between the processes is adding a lot of overhead. Each value must be pickled and unpickled.

Tuning the chunksize allows us to group more or fewer items which may lessen the overhead, compared to the computation of the math function.

We can try a suite of values and discover what works well.

After a little trial and error, I found that a value of 1,000 gives better performance.

...
# issue each func call on each item as a task
result_list = pool.map(exp, data, chunksize=1000)

The complete example is listed below.

# example of vector operation using a pool (tuned chunksize)
from multiprocessing import Pool
from numpy.random import rand
from numpy import exp
from numpy import asarray
from time import time

# protect the entry point
if __name__ == '__main__':
    # record start time
    start = time()
    # size of the vector
    n = 10000000
    # create vector of random floats
    data = rand(n)
    # create the process pool
    with Pool(4) as pool:
        # issue each func call on each item as a task
        result_list = pool.map(exp, data, chunksize=1000)
        # convert list back to array
        result = asarray(result_list)
    # calculate and report duration
    duration = time() - start
    print(f'Took {duration:.3f} seconds')

Running the example is faster than the default chunksize, but still terribly slow.

It took about 16 seconds on my system, compared to about 23 seconds with the default chunksize and 143 milliseconds with the sequential version of the program.

Took 16.471 seconds

Example of Parallel Partitioned Vector Operation

Another approach is to partition the vector into sub-vectors first, then issue each as a task.

We can call the numpy.split() function to partition an array into subarrays, then the numpy.concatenate() function to stitch the results back together again.

The challenge becomes how many parts to split the array into.

After a little trial and error, I chose 64 parts, where each sub-vector has 156,250 items.

...
# split the array into chunks
chunks = split(data, 64)

I also chose a chunksize of 1 when issuing the tasks, as larger values did not help.

...
# issue tasks with chunking
result_list = pool.map(exp, chunks, chunksize=1)

The results can be combined directly as they are already in the order that the tasks were issued.

...
# convert list back to one array
result = concatenate(result_list)

Tying this together, the complete example is listed below.

# example of partitioned vector operation using a pool
from multiprocessing import Pool
from numpy.random import rand
from numpy import exp
from numpy import concatenate
from numpy import split
from time import time

# protect the entry point
if __name__ == '__main__':
    # record start time
    start = time()
    # size of the vector
    n = 10000000
    # create vector of random floats
    data = rand(n)
    # create the process pool
    with Pool(4) as pool:
        # split the array into chunks
        chunks = split(data, 64)
        # issue tasks with chunking
        result_list = pool.map(exp, chunks, chunksize=1)
        # convert list back to one array
        result = concatenate(result_list)
    # calculate and report duration
    duration = time() - start
    print(f'Took {duration:.3f} seconds')

Running the example is a lot faster.

It took about 0.385 seconds or 385 milliseconds, compared to 143 milliseconds for the sequential version.

I suspect it is faster than the previous parallel versions because pickling whole numpy arrays is faster than copied lists of numbers.

We are also applying the operations "vectorized". That is, numpy.exp() is permitted to work on whole vectors at once, rather than one number at a time as in previous examples.

It is still more than twice as slow as the non-parallel version, and not helpful if we have billions of items.

Took 0.385 seconds

I actually wrote a program to test a suite of different split sizes and chunk sizes.

For reference, this program is listed below.

# test different split sizes and chunk size when partitioning a vector for parallel math
from multiprocessing import Pool
from numpy.random import rand
from numpy import exp
from numpy import concatenate
from numpy import split
from time import time

# run the experiment with a given split size and chunksize
def experiment(s, c):
    # record start time
    start = time()
    # size of the vector
    n = 10000000
    # create vector of random floats
    data = rand(n)
    # create the process pool
    with Pool(4) as pool:
        # split the array into chunks
        chunks = split(data, s)
        # issue tasks with chunking
        result_list = pool.map(exp, chunks, chunksize=c)
        # convert list back to one array
        result = concatenate(result_list)
    # calculate and report duration
    duration = time() - start
    print(f'splits={s}, chunks={c} took {duration:.3f} seconds')

# protect the entry point
if __name__ == '__main__':
    splits = [1, 2, 4, 8, 16, 32, 64, 128]
    chunks = [1, 2, 4, 8, 16, 32, 64, 128]
    # test different split sizes and chunksizes
    for i in splits:
        for j in chunks:
            experiment(i,j)

Running the program may produce slightly different results each run.

The results from a run on my system are listed below.

splits=1, chunks=1 took 1.326 seconds
splits=1, chunks=2 took 1.421 seconds
splits=1, chunks=4 took 1.267 seconds
splits=1, chunks=8 took 1.296 seconds
splits=1, chunks=16 took 1.287 seconds
splits=1, chunks=32 took 1.274 seconds
splits=1, chunks=64 took 1.538 seconds
splits=1, chunks=128 took 1.311 seconds
splits=2, chunks=1 took 0.534 seconds
splits=2, chunks=2 took 1.326 seconds
splits=2, chunks=4 took 0.946 seconds
splits=2, chunks=8 took 0.876 seconds
splits=2, chunks=16 took 0.966 seconds
splits=2, chunks=32 took 1.276 seconds
splits=2, chunks=64 took 1.065 seconds
splits=2, chunks=128 took 0.813 seconds
splits=4, chunks=1 took 0.479 seconds
splits=4, chunks=2 took 0.511 seconds
splits=4, chunks=4 took 1.347 seconds
splits=4, chunks=8 took 1.411 seconds
splits=4, chunks=16 took 1.315 seconds
splits=4, chunks=32 took 1.384 seconds
splits=4, chunks=64 took 0.910 seconds
splits=4, chunks=128 took 0.838 seconds
splits=8, chunks=1 took 0.456 seconds
splits=8, chunks=2 took 0.478 seconds
splits=8, chunks=4 took 0.517 seconds
splits=8, chunks=8 took 0.918 seconds
splits=8, chunks=16 took 1.356 seconds
splits=8, chunks=32 took 1.303 seconds
splits=8, chunks=64 took 0.975 seconds
splits=8, chunks=128 took 0.856 seconds
splits=16, chunks=1 took 0.523 seconds
splits=16, chunks=2 took 0.618 seconds
splits=16, chunks=4 took 0.612 seconds
splits=16, chunks=8 took 0.559 seconds
splits=16, chunks=16 took 0.864 seconds
splits=16, chunks=32 took 0.891 seconds
splits=16, chunks=64 took 0.931 seconds
splits=16, chunks=128 took 1.042 seconds
splits=32, chunks=1 took 0.598 seconds
splits=32, chunks=2 took 0.771 seconds
splits=32, chunks=4 took 0.862 seconds
splits=32, chunks=8 took 0.690 seconds
splits=32, chunks=16 took 0.645 seconds
splits=32, chunks=32 took 0.943 seconds
splits=32, chunks=64 took 0.920 seconds
splits=32, chunks=128 took 0.936 seconds
splits=64, chunks=1 took 0.440 seconds
splits=64, chunks=2 took 0.689 seconds
splits=64, chunks=4 took 0.880 seconds
splits=64, chunks=8 took 0.925 seconds
splits=64, chunks=16 took 0.747 seconds
splits=64, chunks=32 took 0.674 seconds
splits=64, chunks=64 took 0.926 seconds
splits=64, chunks=128 took 0.959 seconds
splits=128, chunks=1 took 0.481 seconds
splits=128, chunks=2 took 0.504 seconds
splits=128, chunks=4 took 0.722 seconds
splits=128, chunks=8 took 0.986 seconds
splits=128, chunks=16 took 1.147 seconds
splits=128, chunks=32 took 0.859 seconds
splits=128, chunks=64 took 0.712 seconds
splits=128, chunks=128 took 1.088 seconds

Recommendations

The results suggest that parallelizing math operations on a numpy vector using multiprocessing is probably a bad idea.

At least, for those methods tried here.

The limitation of multiprocessing is that data must be transmitted from the main process to the worker processes using inter-process communication.

This may be overcome by using a shared memory mechanism, such as:

Perhaps one of these mechanisms will lead to a path for faster-parallelized math operations on large numpy vectors.

A better approach may be to explore threading and the ThreadPool class. This is because numpy functions release the GIL, allowing Python threads to achieve full parallelism.

You can learn more about numpy releasing the GIL in the tutorial:

Takeaways

You now know how to use process-based concurrency multiprocessing to apply math operations in parallel to numpy vectors.



If you enjoyed this tutorial, you will love my book: Concurrent NumPy in Python. It covers everything you need to master the topic with hands-on examples and clear explanations.