You can share NumPy arrays between processes using a multiprocessing Pipe.
A Pipe is a data structure for inter-process communication that connects one process with another.
In this tutorial, you will discover how to share NumPy arrays between processes using a Pipe.
Let’s get started.
What is a Multiprocessing Pipe
In multiprocessing, a Pipe class provides a way to send data between two processes.
Under the covers, a Pipe class is implemented using a pair of connection objects.
Creating a Pipe instance will create two connection objects. Both connection objects can be used for sending and receiving data.
The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).
— multiprocessing — Process-based parallelism.
For example:
1 2 3 |
... # create the connections of a shared pipe conn1, conn2 = Pipe() |
Python objects can be shared between processes using the Pipe, although this is generally discouraged.
However, one should generally avoid sending shared objects to other processes using pipes or queues.
— multiprocessing — Process-based parallelism.
The send() method on the connection can be used to send objects from one process to another.
The objects sent must be picklable.
For example:
1 2 3 |
... # send an object conn2.send('Hello world') |
The recv() method on the connection can be used to receive objects in one process sent by another.
The objects received will be automatically un-pickled.
For example:
1 2 3 |
... # receive an object value = conn1.recv() |
The function call will block until an object is received.
Once sharing data has finished, the connection can be closed via the close() method.
For example:
1 2 3 |
... # close the connection conn2.close() |
The connection will be closed automatically when the object is garbage collected. This may mean that we do not have to explicitly close the Pipe once we are finished with it.
You can learn more about how to use a multiprocessing Pipe in the tutorial:
Now that we know how to use a multiprocessing Pipe, let’s consider how we might use it to share a NumPy array.
Run loops using all CPUs, download your FREE book to learn how.
How to Share an Array Using a Pipe
A multiprocessing Pipe can be used directly to share a NumPy array.
This can be achieved by:
- Selecting one connection object for each process.
- Calling the send() method and passing the array on one connection.
- Calling the recv() method and assigning the array on the other connection.
This will allow a NumPy array to be transmitted from one process to another, such as:
- Sharing an array prepared in the parent process with a child process.
- Returning an array prepared in a child process to a parent process.
A limitation of using a Pipe to share NumPy arrays is that the array must be pickled in order to be sent, and then unpickled when received.
This can be very slow.
For example, you can see a benchmark of sharing an array via a Pipe compared to other methods in the tutorial:
Further, very large Python objects transmitted between processes using a Pipe could raise a runtime exception, such as pickled objects larger than 32 megabytes, although preliminary testing as shown this to not be the case in Python 3.11 on macOS.
The object must be picklable. Very large pickles (approximately 32 MiB+, though it depends on the OS) may raise a ValueError exception.
— multiprocessing — Process-based parallelism.
Another limitation of using a Pipe for inter-process communication is that it is only able to connect one process with another. This is different from a Queue (discussed in the next tutorial), which allows more elaborate connection schemes between processes, such as one-to-many and many-to-many.
Now that we know how to share a NumPy array between processes using a multiprocessing Pipe, let’s look at some worked examples, starting with how to transmit an array from a parent to a child process.
Example of Sharing a NumPy Array Via a Pipe
We can explore the case of sharing a NumPy array with a child process via Pipe.
In this example, we will create an array in the parent process and then create a Pipe. The parent process will create and start the child process and pass it to one end of the Pipe. The child process will receive the array and block until it arrives. The parent process will then send the array to the child and block until the child process is done, then report the contents of the shared array to show that changes made in the child are not reflected in the parent process.
Firstly, we can define a task function to be executed in the child process.
The function will take a connection as an argument. It then calls the recv() method on the connection and blocks until the NumPy array is received. It will then report on some of the content of the array, change all values to zero, and then report that the contents of the array were changed.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a child process def task(connection): # read the data from the pipe data = connection.recv() # check some data in the array print(data[:5,:5]) # change data in the array data.fill(0.0) # confirm the data was changed print(data[:5,:5]) |
Next, in the main process, we can create an array with a modest size, filled with ones.
1 2 3 4 5 |
... # define the size of the numpy array n = 10000 # create the numpy array data = ones((n,n)) |
We will then create the Pipe which returns two connection objects, one for the parent and one for the child process.
1 2 3 |
... # create the shared pipe conn1, conn2 = Pipe() |
We can then create and configure a child process to execute our task() function and pass one connection as an argument.
The child process is then started so that it blocks waiting to read the transmitted array.
1 2 3 4 5 |
... # create a child process child = Process(target=task, args=(conn1,)) # start the child process child.start() |
The parent process will then transmit the NumPy array via the other connection object and block until the child process is terminated.
1 2 3 4 5 |
... # send the data to the child process conn2.send(data) # wait for the child process to complete child.join() |
Finally, we can check the contents of the array.
1 2 3 |
... # check some data in the array print(data[:5,:5]) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # example of sharing a numpy array using a pipe from multiprocessing import Process from multiprocessing import Pipe from numpy import ones # task executed in a child process def task(connection): # read the data from the pipe data = connection.recv() # check some data in the array print(data[:5,:5]) # change data in the array data.fill(0.0) # confirm the data was changed print(data[:5,:5]) # protect the entry point if __name__ == '__main__': # define the size of the numpy array n = 10000 # create the numpy array data = ones((n,n)) # create the shared pipe conn1, conn2 = Pipe() # create a child process child = Process(target=task, args=(conn1,)) # start the child process child.start() # send the data to the child process conn2.send(data) # wait for the child process to complete child.join() # check some data in the array print(data[:5,:5]) |
Running the example first creates the NumPy array populated with one values.
The Pipe is then created, providing one connection object for each process.
The child process is then created, configured with one of the connection objects, and then started. The child process runs and then blocks on the provided connection object until the array is received.
The main process then transmits the array via the Pipe, and then blocks until the child process is done.
The child process receives the array and then confirms the array was populated with one values. It then fills the array with zero values and confirms the array’s contents were changed.
The child process terminates and the main process resumes. It confirms that the contents of the array in the parent process have not changed.
This highlights how we can share a copy of an array with a child process via a multiprocessing Pipe, and how changes to the copy of the array in the child process are not reflected in the parent process.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
[[1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.]] [[0. 0. 0. 0. 0.] [0. 0. 0. 0. 0.] [0. 0. 0. 0. 0.] [0. 0. 0. 0. 0.] [0. 0. 0. 0. 0.]] [[1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.]] |
Next, let’s look at an example of using a Pipe to simulate a return value from a child process to a parent process.
Free Concurrent NumPy Course
Get FREE access to my 7-day email course on concurrent NumPy.
Discover how to configure the number of BLAS threads, how to execute NumPy tasks faster with thread pools, and how to share arrays super fast.
Example of Returning a NumPy Array Via Pipe
We can explore the case of simulating the return of a NumPy array from a child process to a parent process using a Pipe.
In this example, we will create a Pipe and share one connection with the child process and keep the other connection in the parent process. The child process will then create a NumPy array and send it to the parent process via the Pipe.
Firstly, we can define a task function that takes a Pipe connection as an argument and creates and transmits the array.
The function first creates an array initialized with one values. It then confirms the contents of the array contains one values and sends it via the Pipe.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a child process def task(pipe): # define the size of the numpy array n = 10000 # create the numpy array data = ones((n,n)) # check some data in the array print(data[:5,:5]) # send the array via a pipe pipe.send(data) |
Next, the main process will create the Pipe.
1 2 3 |
... # create the shared pipe conn1, conn2 = Pipe() |
It then creates the child process and configures it to execute our task function, passing it one of the connection objects as an argument.
1 2 3 |
... # create a child process child = Process(target=task, args=(conn2,)) |
The child process is then started and the main process blocks waiting to receive the array via the Pipe.
1 2 3 4 5 |
... # start the child process child.start() # read the data from the pipe data = conn1.recv() |
Finally, the main process confirms the contents of the received NumPy array.
1 2 3 |
... # check some data in the array print(data[:5,:5]) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # example of returning an array from a child process from multiprocessing import Process from multiprocessing import Pipe from numpy import ones # task executed in a child process def task(pipe): # define the size of the numpy array n = 10000 # create the numpy array data = ones((n,n)) # check some data in the array print(data[:5,:5]) # send the array via a pipe pipe.send(data) # protect the entry point if __name__ == '__main__': # create the shared pipe conn1, conn2 = Pipe() # create a child process child = Process(target=task, args=(conn2,)) # start the child process child.start() # read the data from the pipe data = conn1.recv() # check some data in the array print(data[:5,:5]) |
Running the example first creates the Pipe with the two connection objects.
Next, the child process is created, configured, and started.
The main process then blocks, waiting to read the NumPy array from the Pipe.
The child process runs, first creating a NumPy array initialized with one values.
It then reports on the contents of the array and transmits it to the parent process via the Pipe. The child process then terminates.
The main process reads the NumPy array from the Pipe and confirms it contains one values.
This highlights how we can simulate a NumPy array return value from a child process to a parent process using a Pipe.
1 2 3 4 5 6 7 8 9 10 |
[[1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.]] [[1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.]] |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Concurrent NumPy in Python, Jason Brownlee (my book!)
Guides
- Concurrent NumPy 7-Day Course
- Which NumPy Functions Are Multithreaded
- Numpy Multithreaded Matrix Multiplication (up to 5x faster)
- NumPy vs the Global Interpreter Lock (GIL)
- ThreadPoolExecutor Fill NumPy Array (3x faster)
- Fastest Way To Share NumPy Array Between Processes
Documentation
- Parallel Programming with numpy and scipy, SciPi Cookbook, 2015
- Parallel Programming with numpy and scipy (older archived version)
- Parallel Random Number Generation, NumPy API
NumPy APIs
Concurrency APIs
- threading — Thread-based parallelism
- multiprocessing — Process-based parallelism
- concurrent.futures — Launching parallel tasks
Takeaways
You now know how to share NumPy arrays between processes using a Pipe.
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Caleb George on Unsplash
Do you have any questions?