Last Updated on September 12, 2022
You can get the first result from tasks in the multiprocessing.pool.Pool either via a shared multiprocessing.Queue or by issuing tasks via the imap_unordered() function and getting the first value from the iterable.
In this tutorial you will discover how to get the first result from the multiprocessing pool in Python.
Let’s get started.
Need to Get the First Result
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may issue many tasks and require the result from the first task to finish.
How can we get the first result from the process pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Get The First Result
There are two main approaches we can use to get the result from the first task to complete in the process pool.
They are:
- Have tasks put their result on a shared queue.
- Issue tasks using the imap_unordered() function.
Let’s take a closer look at each approach in turn.
Put Results on a Shared Queue
A multiprocessing.Queue can be created and shared among all tasks issued to the process pool.
As tasks finish, they can put their results on the queue.
The parent process waiting for the first result can get the first result made available via the shared queue.
A multiprocessing queue can be created as per normal.
For example:
1 2 3 |
... # create a shared queue queue = multiprocessing.Queue() |
We cannot share the queue directly with each task to the process pool as it will result in an error.
Instead, we can share the queue with the initialization function for each child worker process. The queue can be stored in a global variable and made available to all tasks executed by all workers.
This requires that we define a child worker initialization function that takes the queue as an argument, declares a global variable available to all tasks executed by the worker, then stores the queue in the global variable.
For example:
1 2 3 4 5 6 |
# worker process initialization def init_worker(arg_queue): # define global variable for each worker global queue # store queue in global argument queue = arg_queue |
We can then configure a new process pool to use our worker initialization function and pass the queue as an argument.
1 2 3 |
... # create a process pool pool = Pool(initializer=init_worker, initargs=(queue,)) |
You can learn more about sharing global variables with all tasks executed by a worker in this tutorial:
Tasks can then put results on the queue by calling the put() function and passing the result object.
For example:
1 2 3 |
... # put result on the queue queue.put(result) |
The main process can then retrieve the first result made available via the get() function on the queue. This call will block until a result is available.
For example:
1 2 3 |
... # get the first result result = queue.get() |
You can learn more about using the multiprocessing queue in the tutorial:
Issue Tasks Using imap_unordered()
Another approach to getting the first result from the process pool is to issue tasks using the imap_unordered().
This function will issue tasks in a lazy manner and will return an iterable that yields results in the order that tasks are completed, rather than the order that tasks were issued.
Therefore, tasks can be issued as per normal via a call to the imap_unordered() function.
For example:
1 2 3 |
... # issue tasks to the process pool it = pool.imap_unordered(task, items) |
We can then call the built-in next() function on the returned iterable to get the result from the first task to complete in the process pool.
Recall, the next() function returns the next item in an iterable.
For example,
1 2 3 |
... # get the first result result = next(it) |
You can learn more about the imap_unordered() function in the tutorial:
Now that we know how to get the first result from the process pool, let’s look at some worked examples.
Example of Getting the First Result with a Queue
We can explore how to get the result from the first task to complete using a queue.
In this example we will define a worker initialization function that will take the shared queue as an argument and store it in a global variable to make it available to all tasks in all workers in the pool. We will then define a target task function that will generate a random number, block, then store the result on the shared queue. The randomness will mean the task that finishes first will differ each time the code is run. Finally, we will issue many tasks and wait for and report the first result received in the main process.
Firstly, we will define the function to initialize each child worker process.
The function will take a shared queue, then defines a global variable and stores the argument in the global variable. This function is executed in each child worker process before it executes tasks. This means that the queue stored in the global variable will be available to all tasks executed by all workers in the process pool.
The init_worker() function below implements this.
1 2 3 4 5 6 |
# worker process initialization def init_worker(arg_queue): # define global variable for each worker global queue # store queue in global argument queue = arg_queue |
Next, we can define the target task function.
The function takes a unique integer identifier for the task. It then generates a random number between 2 and 12, reports the value, then blocks for that many seconds. Finally, a tuple of the integer identifier for the task and the generated value are put on the shared queue.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(identifier): # generate a value value = 2 + random() * 10 # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value queue.put((identifier, value)) |
Next, in the main process we first create a shared queue.
In this case, we will use a multiprocessing.SimpleQueue type queue as we do not need to limit the capacity of the queue.
1 2 3 |
... # create the shared queue queue = SimpleQueue() |
You can learn more about the SimpleQueue in the tutorial:
We can then create the process pool and configure it to use the custom worker initialization function.
1 2 3 4 |
... # create and configure the process pool with Pool(initializer=init_worker, initargs=(queue,)) as pool: # ... |
You can learn more about initializing process pools in the tutorial:
We then issue 30 tasks to the process pool asynchronously using the map_async() function.
1 2 3 |
... # issue many tasks _ = pool.map_async(task, range(30)) |
You can learn more about issuing tasks to the process pool using the map_async() function in the tutorial:
The main process then gets the first result from the queue.
This blocks until a result is available by the first task to complete.
1 2 3 |
... # get the first result, blocking identifier, value = queue.get() |
The result is then reported.
1 2 3 |
... # report the first result print(f'First result: identifier={identifier}, value={value}') |
Finally, the process pool is terminated by the context manager interface. This forcefully stops all tasks in the pool automatically.
We report a message to this effect.
1 2 3 |
... # terminate remaining tasks print('Terminating remaining tasks') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of getting the first result from the process pool with a queue from random import random from time import sleep from multiprocessing.pool import Pool from multiprocessing import SimpleQueue # worker process initialization def init_worker(arg_queue): # define global variable for each worker global queue # store queue in global argument queue = arg_queue # task executed in a worker process def task(identifier): # generate a value value = 2 + random() * 10 # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value queue.put((identifier, value)) # protect the entry point if __name__ == '__main__': # create the shared queue queue = SimpleQueue() # create and configure the process pool with Pool(initializer=init_worker, initargs=(queue,)) as pool: # issue many tasks _ = pool.map_async(task, range(30)) # get the first result, blocking identifier, value = queue.get() # report the first result print(f'First result: identifier={identifier}, value={value}') # terminate remaining tasks print('Terminating remaining tasks') |
Running the example first creates the shared queue.
The process pool is then created, configured with the custom initialization function that shares the queue with each child worker process.
The tasks are then issued to the pool asynchronously. The main process then blocks, waiting for the first result to be retrieved from the shared queue.
Only a subset of the tasks is able to run at a time. Each task generates a random number, reports a message, blocks, then stores its result in the queue.
The main process gets the result from the first task to complete. It reports the value, then exits the context manager block for the pool.
This terminates the pool and all running tasks, then the program ends.
This highlights one approach we can use to get the first result from tasks in the multiprocessing pool.
Note, results will differ each time the program is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 |
Task 0 executing with 5.636381601220135 Task 1 executing with 6.939226960253943 Task 2 executing with 4.11833591204181 Task 3 executing with 10.302360850894589 Task 4 executing with 6.247353496296375 Task 5 executing with 4.551726400980119 Task 6 executing with 5.076364406901675 Task 7 executing with 10.959613172165605 Task 8 executing with 5.762465637162721 First result: identifier=2, value=4.11833591204181 Terminating remaining tasks |
Next, let’s explore how we might get the first result using the imap_unordered() function.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Getting the First Result with imap_unordered()
We can explore how to issue tasks with the imap_unordered() function and get the first result.
In this example, we can update the previous example to no longer initialize the child worker processes and use a queue. Instead, we can issue all tasks using the imap_unordered() function. This will return an iterable that will yield results in the order that the tasks have completed. We can then get the first value from the iterable in order to get the first result.
Firstly, we can define the process pool using the default configuration with the context manager interface.
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
You can learn more about the process pool context manager interface in the tutorial:
Next, we can issue all tasks using the imap_unordered() function and retrieve the iterable.
1 2 3 |
... # issue many tasks it = pool.imap_unordered(task, range(30)) |
Finally, we can call the built-in next() function in order to traverse the iterable one step and get the first result.
This call will block until the first result is available.
1 2 3 |
... # get the first result, blocking identifier, value = next(it) |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # example of getting the first result from the process pool with imap_unordered from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = 2 + random() * 10 # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue many tasks it = pool.imap_unordered(task, range(30)) # get the first result, blocking identifier, value = next(it) # report the first result print(f'First result: identifier={identifier}, value={value}') # terminate remaining tasks print('Terminating remaining tasks') |
Running the example first creates the process pool.
It then issues all tasks to the process pool and returns an iterable for task results.
The main process then blocks, waiting for the first result to be made available.
A subset of the tasks begin executing. Each task generates a random number, reports the value, blocks, then returns a tuple of the integer value and generated random number.
A result is received by the main process and is reported.
THe process pool and all remaining tasks are then forcefully terminated.
This highlights a simple way to get the first result from multiple tasks in the process pool.
Note, results will differ each time the program is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 |
Task 0 executing with 2.4591115237324237 Task 1 executing with 6.5464709230061455 Task 2 executing with 2.847775172423446 Task 3 executing with 5.482376922246911 Task 4 executing with 7.11178723756704 Task 5 executing with 3.6949780247040525 Task 6 executing with 5.3101695644764915 Task 7 executing with 2.6110942609971746 Task 8 executing with 4.104337058058016 First result: identifier=0, value=2.4591115237324237 Terminating remaining tasks |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to get the first available result from the process pool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Victor Larracuente on Unsplash
Do you have any questions?