Last Updated on September 29, 2023
You can share numpy arrays between processes using a queue.
In this tutorial, you will discover how to share numpy arrays between processes using a queue.
Let’s get started.
Need to Share Numpy Arrays Between Processes
Python offers process-based concurrency via the multiprocessing module.
Process-based concurrency is appropriate for those tasks that are CPU-bound, as opposed to thread-based concurrency in Python which is generally suited to IO-bound tasks given the presence of the Global Interpreter Lock (GIL).
You can learn more about process-based concurrency and the multiprocessing module in the tutorial:
Consider the situation where we need to share numpy arrays between processes.
We may need to share numpy between processes for many reasons, such as:
- Data is loaded as an array in one process and analyzed differently in different subprocesses.
- Many child processes load small data as arrays that are sent to a parent process for handling.
- Data arrays are loaded in the parent process and processed in a suite of child processes.
Sharing Python objects and data between processes is slow.
This is because any data, like numpy arrays, shared between processes must be transmitted using inter-process communication (ICP) requiring the data first be pickled by the sender and then unpickled by the receiver.
You can learn more about this in the tutorial:
This means that if we share numpy arrays between processes, it assumes that we receive some benefit, such as a speedup, that overcomes the slow speed of data transmission.
For example, it may be the case that the arrays are relatively small and fast to transmit, whereas the computation performed on each array is slow and can benefit from being performed in separate processes.
Alternatively, preparing the array may be computationally expensive and benefit from being performed in a separate process, and once prepared, the arrays are small and fast to transmit to another process that requires them.
Given these situations, how can we share data between Processes in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Share a Numpy Array Between Processes Using a Queue
One way to share numpy arrays between python processes is to use a queue.
Python provides a process-safe queue in the multiprocessing.Queue class.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
The multiprocessing.Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out, and priority queues.
You can learn more about how to use the multiprocessing.Queue class in the tutorial:
The multiprocessing module also provides the JoinableQueue.
The multiprocessing.JoinableQueue class offers two additional methods for joining a queue and marking items on the queue as done.
Consumers of items on the queue can mark items like numpy arrays as “processed” or “done” via the task_done() method. Other processes can wait for all items on the queue to be processed by calling the join() method.
You can learn more about the JoinableQueue in the tutorial:
We can create a new Queue object and share it between child processes and use it to share small to modestly sized numpy arrays efficiently.
For example, we might create or load many numpy arrays in a parent process and place them on a shared queue. Child processes may then read numpy arrays from the queue as fast as they are able and perform some computation on them.
Another approach would be to have a suite of child processes load or create numpy arrays as fast as they are able and place them on a shared queue. Another process may then read arrays from the queue as fast as it is able and perform an operation on them.
Now that we know how to use a queue to share numpy arrays between processes, let’s look at some worked examples.
Example of Sending Numpy Arrays to Child Processes via a Queue
We can explore an example of sending numpy arrays to child processes for processing using a shared queue.
In this example, the parent process will create many arrays with different dimensions and filled with random numbers. The arrays will be placed on a shared queue. Multiple child processes will then consume the arrays from the queue and perform a mathematical operation on them.
This may be a good template for those cases where loading or preparing the numpy arrays is relatively easy and the arrays a modest in size, yet each array requires non-trivial computation to be performed (which we will simulate in this case).
Firstly, we can define the task executed in child processes.
This task will run in a loop. Each iteration the loop will retrieve one array from the queue and then perform a computational operation on it. In this case, it will simply compute the max value in the array and report it. The loop is exited if a None message is received, signaling no further arrays.
The task() function below implements this, taking the shared array as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# read numpy arrays from the queue and compute something def task(queue): # read arrays while True: # read one array data = queue.get() # check for final item if data is None: # report message print('Task done.', flush=True) # push signal back into queue queue.put(data) # exit the task break # compute max of array result = data.max() # report a message print(f'Task read an array {data.shape}, max={result}', flush=True) |
The main process will first create the shared queue, then starts 4 child processes, each executing the task() function.
1 2 3 4 5 6 7 |
... # create the shared queue queue = Queue() # issue task processes tasks = [Process(target=task, args=(queue,)) for _ in range(4)] for t in tasks: t.start() |
The main process then loops 20 times, each iteration creating a numpy array with random dimensions and filled with random values which is then put on the shared queue.
1 2 3 4 5 6 7 8 9 |
... # generate many arrays of random numbers for _ in range(20): # generate random dimensions dim = (randint(500,2000),randint(500,2000)) # generate array of random floats with random dimensions data = random(dim) # push into queue queue.put(data) |
The main process then signals that no further arrays are expected via a None value, then waits for all child processes to terminate.
1 2 3 4 5 6 7 8 |
... # signal no further arrays queue.put(None) # wait for task processes to be done for t in tasks: t.join() # report a final message print('Done.', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# create arrays in a parent process and read in children via queue from multiprocessing import Process from multiprocessing import Queue from random import randint from numpy.random import random # read numpy arrays from the queue and compute something def task(queue): # read arrays while True: # read one array data = queue.get() # check for final item if data is None: # report message print('Task done.', flush=True) # push signal back into queue queue.put(data) # exit the task break # compute max of array result = data.max() # report a message print(f'Task read an array {data.shape}, max={result}', flush=True) # protect the entry point if __name__ == '__main__': # create the shared queue queue = Queue() # issue task processes tasks = [Process(target=task, args=(queue,)) for _ in range(4)] for t in tasks: t.start() # generate many arrays of random numbers for _ in range(20): # generate random dimensions dim = (randint(500,2000),randint(500,2000)) # generate array of random floats with random dimensions data = random(dim) # push into queue queue.put(data) # signal no further arrays queue.put(None) # wait for task processes to be done for t in tasks: t.join() # report a final message print('Done.', flush=True) |
Running the program first creates the shared queue.
The 4 child processes are then created and configured to execute our task() function before being started. Each process waits on items to appear on the queue.
The main process then generates and puts 20 numpy arrays on the queue.
It then adds the signal to not expect any further arrays and waits for the child process to exit.
Each child process loops, retrieving one array from the queue. It computes the max value and then reports it along with the dimensions of the array.
This is repeated by all child processes until no further arrays are available.
Each child process encounters the None message, adds it back to the queue for other child processes, exits its loop, and reports a message, terminating the child process.
Once all child processes are terminated the main process resumes and reports a final message before terminating itself.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
Task read an array (827, 1388), max=0.999999824661523 Task read an array (1745, 1231), max=0.9999998440051578 Task read an array (823, 1081), max=0.999999653298093 Task read an array (1364, 1886), max=0.9999994826187178 Task read an array (1960, 1190), max=0.9999999890428217 Task read an array (1272, 1634), max=0.9999992473453926 Task read an array (1596, 1742), max=0.9999996946372763 Task read an array (1035, 1511), max=0.9999988748078102 Task read an array (1358, 951), max=0.9999996488860812 Task read an array (586, 564), max=0.999998583415731 Task read an array (1852, 673), max=0.9999997326673665 Task read an array (1273, 1114), max=0.9999999614124049 Task read an array (1920, 775), max=0.9999997603830041 Task read an array (1175, 1762), max=0.9999998728441418 Task read an array (1567, 843), max=0.9999978316858225 Task read an array (1579, 1028), max=0.9999996224604828 Task read an array (784, 1873), max=0.9999998776214708 Task read an array (1770, 1099), max=0.9999998975119301 Task read an array (1511, 788), max=0.9999996664224969 Task done. Task done. Task done. Task read an array (1851, 1027), max=0.9999997882123821 Task done. Done. |
An extension of this program may require that the result or a modified version of the array be sent back to the parent process.
This can be achieved with a second queue on which results are put by child processes and retrieved by the parent process until no further results are available.
Next, let’s explore processes that both generate and use numpy arrays, shared by a queue.
Free Concurrent NumPy Course
Get FREE access to my 7-day email course on concurrent NumPy.
Discover how to configure the number of BLAS threads, how to execute NumPy tasks faster with thread pools, and how to share arrays super fast.
Example of Producing and Consuming Numpy Arrays via a Queue
We can explore an example of child processes that load or generate numpy arrays that are then shared with child processes that consume and make use of the loaded numpy arrays via a queue.
In this example, we will first define a numpy array producer task that creates a fixed number of numpy arrays and places them on a shared queue. We will then define a numpy array consumer task that retrieves numpy arrays from the shared queue and performs some operation on them. Both producer and consumer processes are coordinated by the parent process.
This pattern may be helpful in those cases where Numpy arrays are computationally expensive to load, prepare, or generate. We will simulate this expense in this example using a sleep.
Firstly, we can define a task to be executed in producer child processes.
The task takes a shared queue as an argument and then loops 5 times. Each iteration it generates a fixed-sized numpy array, places it on the shared queue, reports a message, and then sleeps a moment to simulate computational effort.
The producer_task() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# producer task that adds arrays to the queue def producer_task(queue): # create a fixed number of arrays for _ in range(5): # create a numpy array n = 2000 data = ones((n,n)) # push into the queue queue.put(data) # report message print('Producer added array.', flush=True) # sleep some amount of time sleep(random()) |
Next, we can define the function executed by the consumer child process.
This function also takes the shared queue as an argument.
It runs in a loop. Each iteration it retrieves one array from the queue reports the details of the array and marks it as processed.
If a None message is received signaling no further arrays, the loop is broken and the task ends.
The consumer_task() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# consumer task that reads arrays from the queue def consumer_task(queue): # read arrays while True: # read one array data = queue.get() # check for final item if data is None: # report message print('Consumer done.', flush=True) # exit the task break # report a message print(f'Consumer read an array {data.shape}', flush=True) # mark the item as done queue.task_done() |
Next, we need to coordinate the producer and consumer tasks.
We will use a JoinableQueue in this case so that the main process knows when the consumer task has processed all numpy arrays.
This queue will be shared with all child processes.
1 2 3 |
... # create the shared queue queue = JoinableQueue() |
Next, we will create four child processes configured to execute our producer function.
1 2 3 4 5 |
... # start producer tasks producers = [Process(target=producer_task, args=(queue,)) for _ in range(4)] for process in producers: process.start() |
A single consumer process is then created and started.
1 2 3 4 |
... # start consumer task consumer = Process(target=consumer_task, args=(queue,)) consumer.start() |
The main process then waits for all child processes to terminate.
1 2 3 4 |
... # wait for all producers to be done for process in producers: process.join() |
It then waits for all items on the shared queue to be marked as processed.
1 2 3 |
... # wait until all items have been consumed and marked done queue.join() |
Finally, a None object is added to the queue to signal no further arrays and the main process waits for the consumer process to terminate.
1 2 3 4 5 6 7 |
... # signal that we are done queue.put(None) # wait for the consumer to be done consumer.join() # report a final message print('Done.', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# create numpy arrays in producer and read arrays in a consumer from time import sleep from random import random from multiprocessing import Process from multiprocessing import JoinableQueue from numpy import ones # producer task that adds arrays to the queue def producer_task(queue): # create a fixed number of arrays for _ in range(5): # create a numpy array n = 2000 data = ones((n,n)) # push into the queue queue.put(data) # report message print('Producer added array.', flush=True) # sleep some amount of time sleep(random()) # consumer task that reads arrays from the queue def consumer_task(queue): # read arrays while True: # read one array data = queue.get() # check for final item if data is None: # report message print('Consumer done.', flush=True) # exit the task break # report a message print(f'Consumer read an array {data.shape}', flush=True) # mark the item as done queue.task_done() # protect the entry point if __name__ == '__main__': # create the shared queue queue = JoinableQueue() # start producer tasks producers = [Process(target=producer_task, args=(queue,)) for _ in range(4)] for process in producers: process.start() # start consumer task consumer = Process(target=consumer_task, args=(queue,)) consumer.start() # wait for all producers to be done for process in producers: process.join() # wait until all items have been consumed and marked done queue.join() # signal that we are done queue.put(None) # wait for the consumer to be done consumer.join() # report a final message print('Done.', flush=True) |
Running the example first creates the shared queue.
The main process then creates, configures, and starts 4 producer child processes.
Each process then runs its loop, generating numpy arrays, adding them to the shared queue, and sleeping for a fraction of a second.
The main process then starts the consumer child process that loops reading arrays from the shared queue and reports their details.
The main process then waits for all producer child processes to terminate.
Once each producer child process has generated and shared 5 numpy arrays, it terminates.
The main process then waits for all arrays still on the queue to be marked as processed.
The consumer child process iterates as fast as it is able, retrieving arrays, reporting their details, and marking them as processed.
Once all items on the queue are marked as processed, the main process puts the None value on the queue and waits for the consumer child process to terminate.
The consumer child process retrieves the None value, reports a message, and exits its loop that terminates the process.
Finally, the main process reports a final message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
Producer added array. Producer added array. Producer added array. Producer added array. Consumer read an array (2000, 2000) Consumer read an array (2000, 2000) Consumer read an array (2000, 2000) Consumer read an array (2000, 2000) Producer added array. Producer added array. Consumer read an array (2000, 2000) Consumer read an array (2000, 2000) Producer added array. Consumer read an array (2000, 2000) Producer added array. Consumer read an array (2000, 2000) Producer added array. Producer added array. Consumer read an array (2000, 2000) Consumer read an array (2000, 2000) Producer added array. Consumer read an array (2000, 2000) Producer added array. Consumer read an array (2000, 2000) Producer added array. Producer added array. Consumer read an array (2000, 2000) Producer added array. Consumer read an array (2000, 2000) Producer added array. Consumer read an array (2000, 2000) Consumer read an array (2000, 2000) Producer added array. Producer added array. Consumer read an array (2000, 2000) Consumer read an array (2000, 2000) Producer added array. Consumer read an array (2000, 2000) Producer added array. Consumer read an array (2000, 2000) Consumer done. Done. |
The example could be updated to have many consumer child processes.
If a single consumer process is required, this activity could take place in the main process.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Concurrent NumPy in Python, Jason Brownlee (my book!)
Guides
- Concurrent NumPy 7-Day Course
- Which NumPy Functions Are Multithreaded
- Numpy Multithreaded Matrix Multiplication (up to 5x faster)
- NumPy vs the Global Interpreter Lock (GIL)
- ThreadPoolExecutor Fill NumPy Array (3x faster)
- Fastest Way To Share NumPy Array Between Processes
Documentation
- Parallel Programming with numpy and scipy, SciPi Cookbook, 2015
- Parallel Programming with numpy and scipy (older archived version)
- Parallel Random Number Generation, NumPy API
NumPy APIs
Concurrency APIs
- threading — Thread-based parallelism
- multiprocessing — Process-based parallelism
- concurrent.futures — Launching parallel tasks
Takeaways
You now know how to share numpy arrays between processes using a queue.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?