Last Updated on September 12, 2022
You can use a pipe between processes by multiprocessing.Pipe class.
In this tutorial you will discover how to use a multiprocessing pipe in Python.
Let’s get started.
Need for a Pipe
A process is a running instance of a computer program.
Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create new child processes in our program in order to execute code concurrently.
Python provides the ability to create and manage new processes via the multiprocessing.Process class.
In multiprocessing programming, we often need to share data between processes.
One approach to sharing data is to use a pipe.
What is the pipe and how can we use it in Python?
Run loops using all CPUs, download your FREE book to learn how.
What is a Pipe
In multiprocessing, a pipe is a connection between two processes in Python.
It is used to send data from one process which is received by another process.
Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing.connection.Connection class.
Creating a pipe will create two connection objects, one for sending data and one for receiving data. A pipe can also be configured to be duplex so that each connection object can both send and receive data.
Pipe vs Queue
Both a multiprocessing.Pipe and a multiprocessing.Queue can be used to send and receive objects and data between processes.
A Pipe is simpler than a queue. It is a lower-level mechanism, requiring first the explicit creation of the connections between a pair of processes, then the explicit sending and receiving of data between processes.
A Queue is a high-level constructor that can be treated like a local data structure that just so happens to be shared among processes.
Importantly, a Queue is designed to be used with multiple producers and multiple consumers in mind, whereas a Pipe is intended for a pair of processes only.
The targeted and simpler nature of pipes can make them more efficient and potentially faster in sharing data between two processes.
In summary:
- Both pipe and queue can be used to shared data between processes
- Pipe is simple and low-level, queue is more capable and higher-level
- Pipe is between two processes, and the queue has multiple producers and consumers.
How to Use the Pipe
Python provides a simple queue in the multiprocessing.Pipe class.
Let’s take a closer look at how to use the pipe class.
Create a Pipe
A pipe can be created by calling the constructor of the multiprocessing.Pipe class, which returns two multiprocessing.connection.Connection objects.
For example:
1 2 3 |
... # create a pipe conn1, conn2 = multiprocessing.Pipe() |
By default, the first connection (conn1) can only be used to receive data, whereas the second connection (conn2) can only be used to send data.
The connection objects can be made duplex or bidirectional.
This can be achieved by setting the “duplex” argument to the constructor to True.
For example:
1 2 3 |
... # create a duplex pipe conn1, conn2 = multiprocessing.Pipe(duplex=True) |
In this case, both connections can be used to send and receive data.
Share Objects With Pipe
Objects can be shared between processes using the Pipe.
The Connection.send() function can be used to send objects from one process to another.
The objects sent must be picklable.
For example:
1 2 3 |
... # send an object conn2.send('Hello world') |
The Connection.recv() function can be used to receive objects in one process sent by another.
The objects received will be automatically un-pickled.
For example:
1 2 3 |
... # receive an object object = conn1.recv() |
The function call will block until an object is received.
Share Bytes With Pipe
Data can be shared between processes using the Pipe.
This can be achieved by sending and receiving data in the form of packages of bytes.
Bytes can be sent from one process to another via the Connection.send_bytes() function.
For example:
1 2 3 |
... # send bytes conn2.send(b'Hello world') |
If byte data is held in a buffer data structure, then an “offset” and “size” arguments can be specified when sending bytes.
For example:
1 2 3 |
... # send bytes conn2.send(buffer, offset=10, size=100) |
Bytes can be received via the Connection.recv_bytes() function.
For example:
1 2 3 |
... # receive bytes data = conn1.recv_bytes() |
The function call will block until there are bytes to receive.
A single message of bytes will be read.
A maximum length of bytes can be specified via the “maxlength” argument.
For example:
1 2 3 |
... # receive bytes data = conn1.recv_bytes(maxlength=100) |
Byte data can also be received into an existing byte buffer, with an offset.
This can be achieved via the Connection.recv_bytes_into() function with an optional “offset” argument.
For example:
1 2 3 |
... # receive bytes data = conn1.recv_bytes_into(buffer, offset=100) |
Status of Pipe
The status of the pipe can be checked via the Connection.poll() function.
This will return a boolean as to whether three is data to be received and read from the pipe.
For example:
1 2 3 4 |
... # check if there is data to receive if conn1.poll(): # ... |
A timeout can be set via the “timeout” argument. If specified, the call will block until data is available. If no data is available before the timeout number of seconds has elapsed, then the function will return.
For example:
1 2 3 4 |
... # check if there is data to receive if conn1.poll(timeout=5): # ... |
Now that we know how to use the multiprocessing.Pipe, let’s look at some worked examples.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Example of Using a Pipe
We can explore how to use a multiprocessing.Pipe to share data between processes.
In this example we will create a sender process that will generate random numbers and send them to another process via the pipe. We will also create a receiver process that will receive numbers sent from the other process and report them.
We can first define the sender process.
We can define a new function named sender() that takes a connection as an argument on which it will send objects. It then loops ten times and each iteration it will generate a random number between 0 and 1 via random.random(), block for a fraction of a second to simulate work, then send the value to the other process via the pipe.
Once done, the sender will send a special value, called a sentinel value to indicate that no more values will be sent. In this case we will use the value “None“.
The sender() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# generate work def sender(connection): print('Sender: Running', flush=True) # generate work for i in range(10): # generate a value value = random() # block sleep(value) # send data connection.send(value) # all done connection.send(None) print('Sender: Done', flush=True) |
Next, we can define the receiver process.
We can define a new receiver() function that takes a connection on which to receive objects.
The function will loop forever. Each iteration, it will receive an object on the pipe and block until an object is received. It will then report the value. If the received value was the special sentinel value, it will break the loop and the process will terminate.
The receive() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# consume work def receiver(connection): print('Receiver: Running', flush=True) # consume work while True: # get a unit of work item = connection.recv() # report print(f'>receiver got {item}', flush=True) # check for stop if item is None: break # all done print('Receiver: Done', flush=True) |
Finally, the main process will create the process and wait for them to finish.
First, a new pipe is created that will be used to send and receive objects between the processes.
1 2 3 |
... # create the pipe conn1, conn2 = Pipe() |
Next, we can create a child process that will execute the sender() function and take the conn2 that can only send data along the pipe. Once created and configured the child process is started.
1 2 3 4 |
... # start the sender sender_process = Process(target=sender, args=(conn2,)) sender_process.start() |
We can then create another child process that will execute the receiver() function and take the conn1 that can only receive data via the pipe. This process can also then be started.
1 2 3 4 |
... # start the receiver receiver_process = Process(target=receiver, args=(conn1,)) receiver_process.start() |
The main process can then block until both child processes have finished.
1 2 3 4 |
... # wait for all processes to finish sender_process.join() receiver_process.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# SuperFastPython.com # example of using a pipe between processes from time import sleep from random import random from multiprocessing import Process from multiprocessing import Pipe # generate work def sender(connection): print('Sender: Running', flush=True) # generate work for i in range(10): # generate a value value = random() # block sleep(value) # send data connection.send(value) # all done connection.send(None) print('Sender: Done', flush=True) # consume work def receiver(connection): print('Receiver: Running', flush=True) # consume work while True: # get a unit of work item = connection.recv() # report print(f'>receiver got {item}', flush=True) # check for stop if item is None: break # all done print('Receiver: Done', flush=True) # entry point if __name__ == '__main__': # create the pipe conn1, conn2 = Pipe() # start the sender sender_process = Process(target=sender, args=(conn2,)) sender_process.start() # start the receiver receiver_process = Process(target=receiver, args=(conn1,)) receiver_process.start() # wait for all processes to finish sender_process.join() receiver_process.join() |
Running the example first creates the pipe, then creates and starts both child processes.
The main process then blocks until the child processes finish.
The sender child process then runs in a loop, generating and sending ten random values along the pipe. Once all values are generated and sent, the sender process terminates.
The child process loops, receiving objects from the pipe each iteration. It blocks until an object appears each iteration. Received values are reported, and the loop is broken once the sentinel value is received.
Note, your specific results will differ given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Sender: Running Receiver: Running >receiver got 0.7672216614763814 >receiver got 0.6766375162201561 >receiver got 0.3958018282651511 >receiver got 0.0019046615500002417 >receiver got 0.4850692187219524 >receiver got 0.6249563547834307 >receiver got 0.44768176602669507 >receiver got 0.852944978432572 >receiver got 0.26856894937968356 Sender: Done >receiver got 0.7724406632512203 >receiver got None Receiver: Done |
This highlights how to use a default pipe for sending data from one process to another.
Next, let’s look at a duplex or bidirectional pipe between two processes.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Using a Duplex Pipe
A multiprocessing.Pipe can be used to both send and receive data between two processes.
This is called a duplex or bidirectional pipe and can be achieved by setting the “duplex” argument to True when creating a pipe.
In this example, we will play ping pong between two processes, player1 and player2. Player1 will start the game by generating a random value between 0 and 1 and send it to player2. Player2 will receive the value, add a new random value to the received value and send it back to player1. Player1 will receive the value and perform the same action of adding a random value to the received value and sending it back.
This process is repeated until a value above 10 is received, after which both player processes will terminate.
First, we can define a function that takes a connection and a value as arguments, adds a random value to the value and sends it along the connection. The value passed in as an argument will be the value received along the pipe.
This function can be used by both players in the ping pong game, and can be used by player1 when starting the game.
The generate_send() function listed below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 |
# generate and send a value def generate_send(connection, value): # generate value new_value = random() # block sleep(new_value) # update value value = value + new_value # report print(f'>sending {value}', flush=True) # send value connection.send(value) |
Next, we can define a function that encapsulates the game.
The function takes a connection object that can both send and receive, and a boolean value as to whether it should start the game or not.
1 2 3 |
# ping pong between processes def pingpong(connection, send_first): # ... |
The function will then check if it should start the game, and if so, it will call our generate_send() function defined above with a value of zero.
1 2 3 4 |
... # check if this process should seed the process if send_first: generate_send(connection, 0) |
The function will loop forever. Each iteration it will receive an object along the pipe and block until an object is received. It will then report the value, send it back with a call to generate_send(), then stop the game if the value is above the threshold, 10 in this case.
1 2 3 4 5 6 7 8 9 10 11 12 |
... # run until limit reached while True: # read a value value = connection.recv() # report print(f'>received {value}', flush=True) # send the value back generate_send(connection, value) # check for stop if value > 10: break |
Tying this together, the complete pingpong() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# ping pong between processes def pingpong(connection, send_first): print('Process Running', flush=True) # check if this process should seed the process if send_first: generate_send(connection, 0) # run until limit reached while True: # read a value value = connection.recv() # report print(f'>received {value}', flush=True) # send the value back generate_send(connection, value) # check for stop if value > 10: break print('Process Done', flush=True) |
Finally, the main process will create the pipe, and create and start the two players of the ping pong game.
First, the pipe can be created in duplex mode so that each player can both send and receive along the pipe.
1 2 3 |
... # create the pipe conn1, conn2 = Pipe(duplex=True) |
Next, the two players can be created. Each player must use a separate connection, and only one player can start the game.
1 2 3 4 |
... # create players player1 = Process(target=pingpong, args=(conn1,True)) player2 = Process(target=pingpong, args=(conn2,False)) |
Both player child processes can then be started and the main process will block until the game is finished.
1 2 3 4 5 6 7 |
... # start players player1.start() player2.start() # wait for players to finish player1.join() player2.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# SuperFastPython.com # example of using a duplex pipe between processes from time import sleep from random import random from multiprocessing import Process from multiprocessing import Pipe # generate and send a value def generate_send(connection, value): # generate value new_value = random() # block sleep(new_value) # update value value = value + new_value # report print(f'>sending {value}', flush=True) # send value connection.send(value) # ping pong between processes def pingpong(connection, send_first): print('Process Running', flush=True) # check if this process should seed the process if send_first: generate_send(connection, 0) # run until limit reached while True: # read a value value = connection.recv() # report print(f'>received {value}', flush=True) # send the value back generate_send(connection, value) # check for stop if value > 10: break print('Process Done', flush=True) # entry point if __name__ == '__main__': # create the pipe conn1, conn2 = Pipe(duplex=True) # create players player1 = Process(target=pingpong, args=(conn1,True)) player2 = Process(target=pingpong, args=(conn2,False)) # start players player1.start() player2.start() # wait for players to finish player1.join() player2.join() |
Running the example first creates the duplex pipe, then creates and starts the two child processes.
The main process then blocks and waits for both child processes to terminate.
The first player starts the game, generating an initial random value and sending it to player2.
Player2 receives the value, reports it, adds a random value to it and then sends it back to player1.
This process of receiving and sending an accumulating floating point value between the two processes continues until the threshold of 10 is reached.
Both processes detect the value exceeding the threshold and break their loop, ending the game.
This highlights how to use a duplex pipe between two processes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
Process Running Process Running >sending 0.512990902456799 >received 0.512990902456799 >sending 0.6720034113827182 >received 0.6720034113827182 >sending 1.121709843352519 >received 1.121709843352519 >sending 2.05592427147109 >received 2.05592427147109 >sending 2.8145974836316694 >received 2.8145974836316694 >sending 3.5275895108277027 >received 3.5275895108277027 >sending 3.901572782141232 >received 3.901572782141232 >sending 4.165258844191272 >received 4.165258844191272 >sending 4.986646861424761 >received 4.986646861424761 >sending 5.170546681258859 >received 5.170546681258859 >sending 5.591074632717518 >received 5.591074632717518 >sending 6.540571062561306 >received 6.540571062561306 >sending 7.345922557811228 >received 7.345922557811228 >sending 7.79464404807978 >received 7.79464404807978 >sending 7.880037300505228 >received 7.880037300505228 >sending 8.237138395881377 >received 8.237138395881377 >sending 8.971813669795386 >received 8.971813669795386 >sending 9.1417714866508 >received 9.1417714866508 >sending 9.750216361609873 >received 9.750216361609873 >sending 9.759058549107657 >received 9.759058549107657 >sending 10.51892536850362 >received 10.51892536850362 >sending 10.554211869042057 Process Done >received 10.554211869042057 >sending 11.04291146739027 Process Done |
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use a pipe between processes in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Ivan Ragozin on Unsplash
Do you have any questions?