Last Updated on September 29, 2023
You can use multiprocessing to apply a math function to each item in a numpy vector.
Although this is straightforward to implement, it is likely to result in worse performance compared to the sequential version.
As such, it is generally not recommended to use multiprocessing to parallelize math operations on vectors.
In this tutorial, you will discover how to use process-based concurrency with multiprocessing to apply math operations in parallel to numpy vectors.
Let’s get started.
Need to Apply Function to NumPy Vector in Parallel
Consider the situation where you have a vector of floating point values represented as a numpy array.
We then need to apply a mathematical operation to each item in the vector to produce a new or transformed vector.
The mathematical operation could be something complex, but it may also be something simple such as a numpy.log(), numpy.sin(), or a numpy.exp().
For example:
1 2 3 |
... # apply math function to vector to create transformed vector result = numpy.exp(data) |
If we have a very large vector, such as 10 million, 100 million or billions of items, can we apply the math function on the vector in parallel?
Run loops using all CPUs, download your FREE book to learn how.
How to Apply Function to a NumPy Vector in Parallel with Multiprocessing
NumPy math functions like numpy.exp() are typically implemented in C code and called from Python.
You can see a long list of them here:
Similarly, we may call a few of these functions in a sequence in order to perform a custom transform.
By default, these operations are not implemented to support parallelism, unlike other numpy functions that call down into the BLAS or LAPACK library, such as numpy.dot() and numpy.linalg.svd().
As such, if we want to perform math operations on a large vector in parallel, we must implement it ourselves.
There are many ways we can implement this.
In this tutorial, let’s look at one common approach that many Python developers will reach for when parallelizing code: multiprocessing.
The multiprocessing module is provided in the Python standard library and offers parallelism via process-based concurrency. This is unlike Python threads that are limited to run one at a time due to thread-safety issues with the Python interpreter.
The multiprocessing.Pool class provides a pool of worker processes that can be created once and reused to execute multiple tasks.
If you are new to this class, you can learn more about it here:
We might issue a task to the pool via the Pool.map() method that takes the name of the function, such as numpy.exp, and the iterable on which to apply the function, such as our numpy array vector. It returns a list of results that we can convert back into a numpy array via the numpy.asarray() function.
For example:
1 2 3 4 5 |
... # apply math function to each item in the vector in parallel result_list = pool.map(numpy.exp, data) # convert result list into array result = numpy.asarray(result_list) |
You can learn more about the Pool.map() function in the tutorial:
By default, the map() function will split the iterable into chunks to be sent to each task using a sensible chunksize based on the size of the data. We can also specify our own partitioning of items into chunks to be sent to each worker via the “chunksize” argument.
For example:
1 2 3 4 5 |
... # apply math function to vector in parallel with a custom chunksize result_list = pool.map(numpy.exp, data, chunksize=1000) # convert result list into array result = numpy.asarray(result_list) |
You can learn more about the “chunksize” argument for the map() function in the tutorial:
An alternative strategy is to split our vector up first, then issue each piece of the vector as a task on which a worker in the pool can apply the math function. This can be achieved via the numpy.split() function.
For example:
1 2 3 4 5 |
... # split vector into parts parts = numpy.split(data, 10) # apply math function to each sub vector in parallel result_list = pool.map(numpy.exp, parts) |
We can then stitch the list of result vectors back into one large vector using the numpy.concatenate() function.
For example:
1 2 3 |
... # convert list back to one array result = concatenate(result_list) |
Now that we know how to use multiprocessing to apply a math operation to a large numpy vector, let’s look at some worked examples.
Example of Vector Operation (sequential)
Before we look at parallelizing math operations on a vector, let’s look at the sequential version.
In this example, we will first define a 10,000,000 item vector of random floats via the numpy.random.rand() function, then apply the numpy.exp() math operation to each item in the vector to produce a result vector.
We will time the entire program and report the duration in seconds.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# example of vector operation from numpy.random import rand from numpy import exp from time import time # record start time start = time() # size of the vector n = 10000000 # create vector of random floats data = rand(n) # apply math operation to each item result = exp(data) # calculate and report duration duration = time() - start print(f'Took {duration:.3f} seconds') |
Running the example on my system takes about one-tenth of a second, that is 143 milliseconds.
It may run faster or slower on your system, depending on the speed of your hardware and the version of Python and NumPy.
1 |
Took 0.143 seconds |
Next, let’s explore applying the math function in parallel.
Free Concurrent NumPy Course
Get FREE access to my 7-day email course on concurrent NumPy.
Discover how to configure the number of BLAS threads, how to execute NumPy tasks faster with thread pools, and how to share arrays super fast.
Example of Parallel Vector Operations (Default Chunksize)
We can apply the exp() function on our array in parallel.
This requires first creating the data as before, then creating a process pool.
By default, the Pool class will create one worker per logical CPU in your system. We can often achieve better performance by configuring it to have one worker per physical CPU core.
I have 4 physical cores, so the pool is configured with 4 workers. Update the example to match the number of cores in your system.
1 2 3 4 |
... # create the process pool with Pool(4) as pool: # ... |
You can learn more about configuring the process pool in the tutorial:
We can then issue the tasks, gather the results, and reconstitute the transformed vector.
The example uses the default “chunksize” argument for map(). This is calculated automatically as:
1 2 3 4 |
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) chunksize, extra = divmod(10,000,000, 4 * 4) chunksize, extra = divmod(10,000,000, 16) chunksize, extra = 625,000, 0 |
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# example of vector operation using a pool (default chunksize) from multiprocessing import Pool from numpy.random import rand from numpy import exp from numpy import asarray from time import time # protect the entry point if __name__ == '__main__': # record start time start = time() # size of the vector n = 10000000 # create vector of random floats data = rand(n) # create the process pool with Pool(4) as pool: # issue each func call on each item as a task result_list = pool.map(exp, data) # convert list back to array result = asarray(result_list) # calculate and report duration duration = time() - start print(f'Took {duration:.3f} seconds') |
Running the example is very slow.
Much slower than the non-parallel version of the program.
On my system, it took about 23 seconds to complete.
1 |
Took 23.248 seconds |
Next, let’s see if we can improve performance by tuning the chunksize.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Parallel Vector Operation (Tuned Chunksize)
The map() function is copying blocks of items from the vector to child processes, applies the math operation on each sequentially, then returns a list of results.
The copying of data in memory between the processes is adding a lot of overhead. Each value must be pickled and unpickled.
Tuning the chunksize allows us to group more or fewer items which may lessen the overhead, compared to the computation of the math function.
We can try a suite of values and discover what works well.
After a little trial and error, I found that a value of 1,000 gives better performance.
1 2 3 |
... # issue each func call on each item as a task result_list = pool.map(exp, data, chunksize=1000) |
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# example of vector operation using a pool (tuned chunksize) from multiprocessing import Pool from numpy.random import rand from numpy import exp from numpy import asarray from time import time # protect the entry point if __name__ == '__main__': # record start time start = time() # size of the vector n = 10000000 # create vector of random floats data = rand(n) # create the process pool with Pool(4) as pool: # issue each func call on each item as a task result_list = pool.map(exp, data, chunksize=1000) # convert list back to array result = asarray(result_list) # calculate and report duration duration = time() - start print(f'Took {duration:.3f} seconds') |
Running the example is faster than the default chunksize, but still terribly slow.
It took about 16 seconds on my system, compared to about 23 seconds with the default chunksize and 143 milliseconds with the sequential version of the program.
1 |
Took 16.471 seconds |
Example of Parallel Partitioned Vector Operation
Another approach is to partition the vector into sub-vectors first, then issue each as a task.
We can call the numpy.split() function to partition an array into subarrays, then the numpy.concatenate() function to stitch the results back together again.
The challenge becomes how many parts to split the array into.
After a little trial and error, I chose 64 parts, where each sub-vector has 156,250 items.
1 2 3 |
... # split the array into chunks chunks = split(data, 64) |
I also chose a chunksize of 1 when issuing the tasks, as larger values did not help.
1 2 3 |
... # issue tasks with chunking result_list = pool.map(exp, chunks, chunksize=1) |
The results can be combined directly as they are already in the order that the tasks were issued.
1 2 3 |
... # convert list back to one array result = concatenate(result_list) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# example of partitioned vector operation using a pool from multiprocessing import Pool from numpy.random import rand from numpy import exp from numpy import concatenate from numpy import split from time import time # protect the entry point if __name__ == '__main__': # record start time start = time() # size of the vector n = 10000000 # create vector of random floats data = rand(n) # create the process pool with Pool(4) as pool: # split the array into chunks chunks = split(data, 64) # issue tasks with chunking result_list = pool.map(exp, chunks, chunksize=1) # convert list back to one array result = concatenate(result_list) # calculate and report duration duration = time() - start print(f'Took {duration:.3f} seconds') |
Running the example is a lot faster.
It took about 0.385 seconds or 385 milliseconds, compared to 143 milliseconds for the sequential version.
I suspect it is faster than the previous parallel versions because pickling whole numpy arrays is faster than copied lists of numbers.
We are also applying the operations “vectorized”. That is, numpy.exp() is permitted to work on whole vectors at once, rather than one number at a time as in previous examples.
It is still more than twice as slow as the non-parallel version, and not helpful if we have billions of items.
1 |
Took 0.385 seconds |
I actually wrote a program to test a suite of different split sizes and chunk sizes.
For reference, this program is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# test different split sizes and chunk size when partitioning a vector for parallel math from multiprocessing import Pool from numpy.random import rand from numpy import exp from numpy import concatenate from numpy import split from time import time # run the experiment with a given split size and chunksize def experiment(s, c): # record start time start = time() # size of the vector n = 10000000 # create vector of random floats data = rand(n) # create the process pool with Pool(4) as pool: # split the array into chunks chunks = split(data, s) # issue tasks with chunking result_list = pool.map(exp, chunks, chunksize=c) # convert list back to one array result = concatenate(result_list) # calculate and report duration duration = time() - start print(f'splits={s}, chunks={c} took {duration:.3f} seconds') # protect the entry point if __name__ == '__main__': splits = [1, 2, 4, 8, 16, 32, 64, 128] chunks = [1, 2, 4, 8, 16, 32, 64, 128] # test different split sizes and chunksizes for i in splits: for j in chunks: experiment(i,j) |
Running the program may produce slightly different results each run.
The results from a run on my system are listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
splits=1, chunks=1 took 1.326 seconds splits=1, chunks=2 took 1.421 seconds splits=1, chunks=4 took 1.267 seconds splits=1, chunks=8 took 1.296 seconds splits=1, chunks=16 took 1.287 seconds splits=1, chunks=32 took 1.274 seconds splits=1, chunks=64 took 1.538 seconds splits=1, chunks=128 took 1.311 seconds splits=2, chunks=1 took 0.534 seconds splits=2, chunks=2 took 1.326 seconds splits=2, chunks=4 took 0.946 seconds splits=2, chunks=8 took 0.876 seconds splits=2, chunks=16 took 0.966 seconds splits=2, chunks=32 took 1.276 seconds splits=2, chunks=64 took 1.065 seconds splits=2, chunks=128 took 0.813 seconds splits=4, chunks=1 took 0.479 seconds splits=4, chunks=2 took 0.511 seconds splits=4, chunks=4 took 1.347 seconds splits=4, chunks=8 took 1.411 seconds splits=4, chunks=16 took 1.315 seconds splits=4, chunks=32 took 1.384 seconds splits=4, chunks=64 took 0.910 seconds splits=4, chunks=128 took 0.838 seconds splits=8, chunks=1 took 0.456 seconds splits=8, chunks=2 took 0.478 seconds splits=8, chunks=4 took 0.517 seconds splits=8, chunks=8 took 0.918 seconds splits=8, chunks=16 took 1.356 seconds splits=8, chunks=32 took 1.303 seconds splits=8, chunks=64 took 0.975 seconds splits=8, chunks=128 took 0.856 seconds splits=16, chunks=1 took 0.523 seconds splits=16, chunks=2 took 0.618 seconds splits=16, chunks=4 took 0.612 seconds splits=16, chunks=8 took 0.559 seconds splits=16, chunks=16 took 0.864 seconds splits=16, chunks=32 took 0.891 seconds splits=16, chunks=64 took 0.931 seconds splits=16, chunks=128 took 1.042 seconds splits=32, chunks=1 took 0.598 seconds splits=32, chunks=2 took 0.771 seconds splits=32, chunks=4 took 0.862 seconds splits=32, chunks=8 took 0.690 seconds splits=32, chunks=16 took 0.645 seconds splits=32, chunks=32 took 0.943 seconds splits=32, chunks=64 took 0.920 seconds splits=32, chunks=128 took 0.936 seconds splits=64, chunks=1 took 0.440 seconds splits=64, chunks=2 took 0.689 seconds splits=64, chunks=4 took 0.880 seconds splits=64, chunks=8 took 0.925 seconds splits=64, chunks=16 took 0.747 seconds splits=64, chunks=32 took 0.674 seconds splits=64, chunks=64 took 0.926 seconds splits=64, chunks=128 took 0.959 seconds splits=128, chunks=1 took 0.481 seconds splits=128, chunks=2 took 0.504 seconds splits=128, chunks=4 took 0.722 seconds splits=128, chunks=8 took 0.986 seconds splits=128, chunks=16 took 1.147 seconds splits=128, chunks=32 took 0.859 seconds splits=128, chunks=64 took 0.712 seconds splits=128, chunks=128 took 1.088 seconds |
Recommendations
The results suggest that parallelizing math operations on a numpy vector using multiprocessing is probably a bad idea.
At least, for those methods tried here.
The limitation of multiprocessing is that data must be transmitted from the main process to the worker processes using inter-process communication.
This may be overcome by using a shared memory mechanism, such as:
- multiprocessing.Manager
- multiprocessing.Array
- multiprocessing.shared_memory
Perhaps one of these mechanisms will lead to a path for faster-parallelized math operations on large numpy vectors.
A better approach may be to explore threading and the ThreadPool class. This is because numpy functions release the GIL, allowing Python threads to achieve full parallelism.
You can learn more about numpy releasing the GIL in the tutorial:
Further Reading
This section provides additional resources that you may find helpful.
Books
- Concurrent NumPy in Python, Jason Brownlee (my book!)
Guides
- Concurrent NumPy 7-Day Course
- Which NumPy Functions Are Multithreaded
- Numpy Multithreaded Matrix Multiplication (up to 5x faster)
- NumPy vs the Global Interpreter Lock (GIL)
- ThreadPoolExecutor Fill NumPy Array (3x faster)
- Fastest Way To Share NumPy Array Between Processes
Documentation
- Parallel Programming with numpy and scipy, SciPi Cookbook, 2015
- Parallel Programming with numpy and scipy (older archived version)
- Parallel Random Number Generation, NumPy API
NumPy APIs
Concurrency APIs
- threading — Thread-based parallelism
- multiprocessing — Process-based parallelism
- concurrent.futures — Launching parallel tasks
Takeaways
You now know how to use process-based concurrency multiprocessing to apply math operations in parallel to numpy vectors.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?