Multiprocessing Pool Get First Result
You can get the first result from tasks in the multiprocessing.pool.Pool either via a shared multiprocessing.Queue or by issuing tasks via the imap_unordered() function and getting the first value from the iterable.
In this tutorial you will discover how to get the first result from the multiprocessing pool in Python.
Let's get started.
Need to Get the First Result
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
-- multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may issue many tasks and require the result from the first task to finish.
How can we get the first result from the process pool?
How to Get The First Result
There are two main approaches we can use to get the result from the first task to complete in the process pool.
They are:
- Have tasks put their result on a shared queue.
- Issue tasks using the imap_unordered() function.
Let's take a closer look at each approach in turn.
Put Results on a Shared Queue
A multiprocessing.Queue can be created and shared among all tasks issued to the process pool.
As tasks finish, they can put their results on the queue.
The parent process waiting for the first result can get the first result made available via the shared queue.
A multiprocessing queue can be created as per normal.
For example:
...
# create a shared queue
queue = multiprocessing.Queue()
We cannot share the queue directly with each task to the process pool as it will result in an error.
Instead, we can share the queue with the initialization function for each child worker process. The queue can be stored in a global variable and made available to all tasks executed by all workers.
This requires that we define a child worker initialization function that takes the queue as an argument, declares a global variable available to all tasks executed by the worker, then stores the queue in the global variable.
For example:
# worker process initialization
def init_worker(arg_queue):
# define global variable for each worker
global queue
# store queue in global argument
queue = arg_queue
We can then configure a new process pool to use our worker initialization function and pass the queue as an argument.
...
# create a process pool
pool = Pool(initializer=init_worker, initargs=(queue,))
You can learn more about sharing global variables with all tasks executed by a worker in this tutorial:
Tasks can then put results on the queue by calling the put() function and passing the result object.
For example:
...
# put result on the queue
queue.put(result)
The main process can then retrieve the first result made available via the get() function on the queue. This call will block until a result is available.
For example:
...
# get the first result
result = queue.get()
You can learn more about using the multiprocessing queue in the tutorial:
Issue Tasks Using imap_unordered()
Another approach to getting the first result from the process pool is to issue tasks using the imap_unordered().
This function will issue tasks in a lazy manner and will return an iterable that yields results in the order that tasks are completed, rather than the order that tasks were issued.
Therefore, tasks can be issued as per normal via a call to the imap_unordered() function.
For example:
...
# issue tasks to the process pool
it = pool.imap_unordered(task, items)
We can then call the built-in next() function on the returned iterable to get the result from the first task to complete in the process pool.
Recall, the next() function returns the next item in an iterable.
For example,
...
# get the first result
result = next(it)
You can learn more about the imap_unordered() function in the tutorial:
Now that we know how to get the first result from the process pool, let's look at some worked examples.
Example of Getting the First Result with a Queue
We can explore how to get the result from the first task to complete using a queue.
In this example we will define a worker initialization function that will take the shared queue as an argument and store it in a global variable to make it available to all tasks in all workers in the pool. We will then define a target task function that will generate a random number, block, then store the result on the shared queue. The randomness will mean the task that finishes first will differ each time the code is run. Finally, we will issue many tasks and wait for and report the first result received in the main process.
Firstly, we will define the function to initialize each child worker process.
The function will take a shared queue, then defines a global variable and stores the argument in the global variable. This function is executed in each child worker process before it executes tasks. This means that the queue stored in the global variable will be available to all tasks executed by all workers in the process pool.
The init_worker() function below implements this.
# worker process initialization
def init_worker(arg_queue):
# define global variable for each worker
global queue
# store queue in global argument
queue = arg_queue
Next, we can define the target task function.
The function takes a unique integer identifier for the task. It then generates a random number between 2 and 12, reports the value, then blocks for that many seconds. Finally, a tuple of the integer identifier for the task and the generated value are put on the shared queue.
The task() function below implements this.
# task executed in a worker process
def task(identifier):
# generate a value
value = 2 + random() * 10
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# return the generated value
queue.put((identifier, value))
Next, in the main process we first create a shared queue.
In this case, we will use a multiprocessing.SimpleQueue type queue as we do not need to limit the capacity of the queue.
...
# create the shared queue
queue = SimpleQueue()
You can learn more about the SimpleQueue in the tutorial:
We can then create the process pool and configure it to use the custom worker initialization function.
...
# create and configure the process pool
with Pool(initializer=init_worker, initargs=(queue,)) as pool:
# ...
You can learn more about initializing process pools in the tutorial:
We then issue 30 tasks to the process pool asynchronously using the map_async() function.
...
# issue many tasks
_ = pool.map_async(task, range(30))
You can learn more about issuing tasks to the process pool using the map_async() function in the tutorial:
The main process then gets the first result from the queue.
This blocks until a result is available by the first task to complete.
...
# get the first result, blocking
identifier, value = queue.get()
The result is then reported.
...
# report the first result
print(f'First result: identifier={identifier}, value={value}')
Finally, the process pool is terminated by the context manager interface. This forcefully stops all tasks in the pool automatically.
We report a message to this effect.
...
# terminate remaining tasks
print('Terminating remaining tasks')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of getting the first result from the process pool with a queue
from random import random
from time import sleep
from multiprocessing.pool import Pool
from multiprocessing import SimpleQueue
# worker process initialization
def init_worker(arg_queue):
# define global variable for each worker
global queue
# store queue in global argument
queue = arg_queue
# task executed in a worker process
def task(identifier):
# generate a value
value = 2 + random() * 10
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# return the generated value
queue.put((identifier, value))
# protect the entry point
if __name__ == '__main__':
# create the shared queue
queue = SimpleQueue()
# create and configure the process pool
with Pool(initializer=init_worker, initargs=(queue,)) as pool:
# issue many tasks
_ = pool.map_async(task, range(30))
# get the first result, blocking
identifier, value = queue.get()
# report the first result
print(f'First result: identifier={identifier}, value={value}')
# terminate remaining tasks
print('Terminating remaining tasks')
Running the example first creates the shared queue.
The process pool is then created, configured with the custom initialization function that shares the queue with each child worker process.
The tasks are then issued to the pool asynchronously. The main process then blocks, waiting for the first result to be retrieved from the shared queue.
Only a subset of the tasks is able to run at a time. Each task generates a random number, reports a message, blocks, then stores its result in the queue.
The main process gets the result from the first task to complete. It reports the value, then exits the context manager block for the pool.
This terminates the pool and all running tasks, then the program ends.
This highlights one approach we can use to get the first result from tasks in the multiprocessing pool.
Note, results will differ each time the program is run given the use of random numbers.
Task 0 executing with 5.636381601220135
Task 1 executing with 6.939226960253943
Task 2 executing with 4.11833591204181
Task 3 executing with 10.302360850894589
Task 4 executing with 6.247353496296375
Task 5 executing with 4.551726400980119
Task 6 executing with 5.076364406901675
Task 7 executing with 10.959613172165605
Task 8 executing with 5.762465637162721
First result: identifier=2, value=4.11833591204181
Terminating remaining tasks
Next, let's explore how we might get the first result using the imap_unordered() function.
Example of Getting the First Result with imap_unordered()
We can explore how to issue tasks with the imap_unordered() function and get the first result.
In this example, we can update the previous example to no longer initialize the child worker processes and use a queue. Instead, we can issue all tasks using the imap_unordered() function. This will return an iterable that will yield results in the order that the tasks have completed. We can then get the first value from the iterable in order to get the first result.
Firstly, we can define the process pool using the default configuration with the context manager interface.
...
# create and configure the process pool
with Pool() as pool:
# ...
You can learn more about the process pool context manager interface in the tutorial:
Next, we can issue all tasks using the imap_unordered() function and retrieve the iterable.
...
# issue many tasks
it = pool.imap_unordered(task, range(30))
Finally, we can call the built-in next() function in order to traverse the iterable one step and get the first result.
This call will block until the first result is available.
...
# get the first result, blocking
identifier, value = next(it)
And that's it.
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of getting the first result from the process pool with imap_unordered
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = 2 + random() * 10
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# return the generated value
return (identifier, value)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# issue many tasks
it = pool.imap_unordered(task, range(30))
# get the first result, blocking
identifier, value = next(it)
# report the first result
print(f'First result: identifier={identifier}, value={value}')
# terminate remaining tasks
print('Terminating remaining tasks')
Running the example first creates the process pool.
It then issues all tasks to the process pool and returns an iterable for task results.
The main process then blocks, waiting for the first result to be made available.
A subset of the tasks begin executing. Each task generates a random number, reports the value, blocks, then returns a tuple of the integer value and generated random number.
A result is received by the main process and is reported.
THe process pool and all remaining tasks are then forcefully terminated.
This highlights a simple way to get the first result from multiple tasks in the process pool.
Note, results will differ each time the program is run given the use of random numbers.
Task 0 executing with 2.4591115237324237
Task 1 executing with 6.5464709230061455
Task 2 executing with 2.847775172423446
Task 3 executing with 5.482376922246911
Task 4 executing with 7.11178723756704
Task 5 executing with 3.6949780247040525
Task 6 executing with 5.3101695644764915
Task 7 executing with 2.6110942609971746
Task 8 executing with 4.104337058058016
First result: identifier=0, value=2.4591115237324237
Terminating remaining tasks
Takeaways
You now know how to get the first available result from the process pool.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.