Last Updated on September 12, 2022
You can issue tasks to the process pool one-by-one and execute them in parallel via the imap() function.
In this tutorial you will discover how to use the imap() function to issue tasks to the process pool in Python.
Let’s get started.
Need a Lazy and Parallel Version of map()
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
The built-in map() function allows you to apply a function to each item in an iterable. A problem with this function is that it converts the provided iterable of items into a list and submits all items as tasks to the process pool then blocks until all tasks are complete.
The imap() function is a lazier version of them map() function where we submit tasks one-by-one to the process pool and retrieve results for tasks as they complete
How can we use the imap() function?
Run loops using all CPUs, download your FREE book to learn how.
How to Use Pool.imap()
The process pool provides a lazy parallel map function via the Pool.imap() function.
Recall that the built-in map() function will apply a given function to each item in a given iterable.
Return an iterator that applies function to every item of iterable, yielding the results.
— Built-in Functions
It yields one result returned from the given target function called with one item from a given iterable. It is common to call map and iterate the results in a for-loop.
For example:
1 2 3 4 |
... # iterates results from map for result in map(task, items): # ... |
Also, recall that the process pool provides a version of the map() function where the target function is called for each item in the provided iterable in parallel.
The problem with the Pool.map() function is that it converts the provided iterable into a list and issues a task for each item all at once. This can be a problem if the iterable contains many hundreds or thousands of items as it may use a lot of main memory.
As an alternative, the process pool provides the imap() function which is a lazy version of map for applying a target function to each item in an iterable in a lazy manner.
A lazier version of map().
— multiprocessing — Process-based parallelism
Specifically:
- Items are yielded from the provided iterable one at a time instead of all at once.
- Results are yielded in order as they are completed rather than after all tasks are completed.
We can use the imap() function on the process pool just like the map() function.
For example:
1 2 3 4 |
... # apply function to each item in the iterable in parallel for result in pool.imap(task, items): # ... |
Each item in the iterable is taken as a separate task in the process pool.
Like the built-in map() function, the returned iterator of results will be in the order of the provided iterable. This means that tasks are issued in the same order as the results are returned.
Unlike the built-in map() function, the Pool.imap() function only takes one iterable as an argument. This means that the target function executed in the process can only take a single argument.
Unlike the Pool.map() function, the Pool.imap() function will iterate the provided iterable one item at a time and issue tasks to the process pool. It will also yield return values as tasks are completed rather than all at once after all tasks are completed.
Like the Pool.map() function, it is possible to split up the items in the iterable evenly to worker processes.
For example, if we had a process pool with 4 child worker processes and an iterable with 40 items, we can split up the items into 4 chunks of 10 items, with one chunk allocated to each worker process.
The effect is less overhead in transmitting tasks to worker processes and collecting results.
This can be achieved via the “chunksize” argument to imap().
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 |
... # iterates results from map with chunksize for result in pool.imap(task, items, chunksize=10): # ... |
Next, let’s take a closer look at how the imap() function compares to other functions on the process pool.
Difference Between imap() and map()
How does the Pool.imap() function compare to the Pool.map() for issuing tasks to the process pool?
Both the imap() and map() may be used to issue tasks that call a function to all items in an iterable via the process pool.
The following summarizes the key differences between these two functions:
- The imap() function issues one task at a time to the process pool, the map() function issues all tasks at once to the pool.
- The imap() function blocks until each task is complete when iterating the return values, the map() function blocks until all tasks complete when iterating return values.
The imap() function should be used for issuing tasks one-by-one and processing the results for tasks in order as they are available.
The map() function should be used for issuing all tasks at once and processing results in order only once all issued tasks have completed.
Difference Between imap() and imap_unordered()
How does the Pool.imap() function compare to the Pool.imap_unordered() for issuing tasks to the process pool?
The imap() and imap_unordered() functions have a lot in common, such as:
- Both the imap() and imap_unordered() may be used to issue tasks that call a function to all items in an iterable via the process pool.
- Both the imap() and imap_unordered() are lazy versions of the map() function.
- Both the imap() and imap_unordered() functions return an iterable over the return values immediately.
Nevertheless, there is one key difference between the two functions:
- The iterable returned from imap() yields results in order as they are completed, whereas the imap_unordered() function yields results in an arbitrary order in which tasks are completed.
The imap() function should be used when the caller needs to iterate return values in the order that they were submitted from tasks as they are completed.
The imap_unordered() function should be used when the caller needs to iterate return values in any arbitrary order (not the order that they were submitted) from tasks as they are completed.
Now that we know how to use the imap() function to execute tasks in the process pool, let’s look at some worked examples.
Example of Pool.imap()
We can explore how to use the imap() on the process pool.
In this example, we can define a target task function that takes an integer as an argument, generates a random number, reports the value then returns the value that was generated. We can then call this function for each integer between 0 and 9 using the process pool imap().
This will apply the function to each integer in parallel using as many cores as are available in the system.
Firstly, we can define the target task function.
The function takes an argument, generates a random number between 0 and 1, reports the integer and generated number. It then blocks for a fraction of a second to simulate computational effort, then returns the number that was generated.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value |
We can then create and configure a process pool.
We will use the context manager interface to ensure the pool is shutdown automatically once we are finished with it.
If you are new to the context manager interface of the process pool, you can learn more in the tutorial:
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
We can then call the imap() function on the process pool to apply our task() function to each value in a range between 0 and 49.
This returns an iterator over return values that will yield results in the order that the tasks were submitted to the pool, as the tasks are completed.
We can then report the return values directly.
This can all be achieved in a for-loop.
1 2 3 4 |
... # execute tasks in order for result in pool.imap(task, range(50)): print(f'Got result: {result}', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of parallel imap() with the process pool from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # execute tasks in order for result in pool.imap(task, range(50)): print(f'Got result: {result}', flush=True) # process pool is closed automatically |
Running the example first creates the process pool with a default configuration.
It will have one child worker process for each logical CPU in your system.
The imap() function is then called for the range.
This issues 50 calls to the task() function, one for each integer between 0 and 49. An iterator is returned with the result for each function call, in order.
Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.
The main process iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each child process.
Importantly, tasks are issued to the process pool one-by-one, as space in the pool becomes available.
As importantly, results in the main process are reported as tasks are completed.
A truncated listing of results is provided below. We can see that tasks are running and reporting their generated results while the main process is receiving and reporting return values.
This is unlike the map() function that must wait for all tasks to complete before reporting return values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
... Task 45 executing with 0.09632403833711778 Task 46 executing with 0.26264504001408606 Task 47 executing with 0.1250416261538233 Got result: 0.8799040277052829 Got result: 0.42176546024662354 Got result: 0.10420345434591471 Got result: 0.3171846595192461 Got result: 0.6277845874681902 Task 48 executing with 0.9859124175548017 Task 49 executing with 0.12981521666676332 Got result: 0.9630317108419421 Got result: 0.6357453112764213 Got result: 0.6478370016289148 Got result: 0.7360124438542892 Got result: 0.7564494501202844 Got result: 0.6419903204192555 Got result: 0.09632403833711778 Got result: 0.26264504001408606 Got result: 0.1250416261538233 Got result: 0.9859124175548017 Got result: 0.12981521666676332 |
Next, let’s look at an example of issuing tasks that do not have a return value.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Pool.imap() with No Return Value
We can explore using the imap() function to call a function for each item in an iterable that does not have a return value.
This means that we are not interested in the iterable of results returned by the call to imap() and instead are only interested that all issued tasks get executed.
This can be achieved by updating the previous example so that the task() function does not return a value.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) |
Then, in the main process, we can call imap() with our task() function and the range, and not iterate the results.
1 2 3 |
... # issue tasks to the process pool pool.imap(task, range(50)) |
The call to imap() will return immediately.
Therefore, we must explicitly wait for all tasks in the process pool to complete. Otherwise, the context manager for the process pool would exit and forcefully terminate the process pool and all running tasks in the pool.
This can be achieved by first closing the process pool so no further tasks can be submitted to the pool, then joining the pool to wait for all tasks to complete and all worker processes to close.
1 2 3 4 5 |
... # shutdown the process pool pool.close() # wait for all issued task to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of parallel imap() with the process pool and a task that does not return a value from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool pool.imap(task, range(50)) # shutdown the process pool pool.close() # wait for all issued task to complete pool.join() |
Running the example first creates the process pool with a default configuration.
The imap() function is then called for the range. This issues 50 calls to the task() function, one for each integer between 0 and 49. An iterator is returned immediately with the result for each function call, but is ignored in this case.
The main process carries on, first closing the process pool then joining it to wait for all tasks to complete.
Each call to the task function generates a random number between 0 and 1, reports a message, then blocks.
The tasks in the pool finish then the process pool is closed.
This example again highlights that the call to imap() does not block. That it only blocks when we loop over the returned iterator of return values.
Below is a truncated listing of results.
1 2 3 4 5 6 7 8 9 10 11 |
... Task 40 executing with 0.16718668079632748 Task 41 executing with 0.3873095377967739 Task 42 executing with 0.29098208312035945 Task 43 executing with 0.7388509094163341 Task 44 executing with 0.34606331674403357 Task 45 executing with 0.13515157652070398 Task 46 executing with 0.9341872104999233 Task 47 executing with 0.5368892051176957 Task 48 executing with 0.8575315808381572 Task 49 executing with 0.494705203671741 |
Next, let’s explore the chunksize argument to the imap() function.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Pool.imap() with chunksize
The imap() function will apply a function to each item in an iterable one-by-one.
If the iterable has a large number of items, it may be inefficient as each task must retrieve the input from the provided iterable and be serialized to be sent to and executed by a child process.
A more efficient approach for very large iterables might be to divide the items in the iterable into chunks and issue chunks of items to each worker process to which the target function can be applied.
This can be achieved with the “chunksize” argument to the imap() function.
The example below updates the example to first configure the process pool with 4 child worker processes, then to issue 40 tasks with 10 tasks per child worker process.
Firstly, we can configure the process pool.
1 2 3 |
# create and configure the process pool with Pool(4) as pool: # ... |
Next, we can issue 40 tasks, with 10 tasks assigned to each worker via the “chunksize” argument.
1 2 3 |
... # issue tasks to the process pool pool.imap(task, range(40), chunksize=10) |
The main process then closes the pool and waits for all tasks to complete.
1 2 3 4 5 |
... # shutdown the process pool pool.close() # wait for all issued task to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of parallel imap() with the process pool with a larger iterable and chunksize from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(1) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool(4) as pool: # issue tasks to the process pool pool.imap(task, range(40), chunksize=10) # shutdown the process pool pool.close() # wait for all issued task to complete pool.join() |
Running the example first creates the process pool with 4 child process workers.
The imap() function is then called for the range with a chunksize of 10.
This issues 4 units of work to the process pool, one for each child worker process and each composed of 10 calls to the task() function.
Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.
The main process carries on, first closing the process pool then joining it to wait for all tasks to complete.
The tasks in the pool finish then the process pool is closed.
Below is a truncated listing of results.
1 2 3 4 5 6 7 8 9 10 11 |
... Task 15 executing with 0.3846357554634975 Task 16 executing with 0.24445576093668153 Task 17 executing with 0.030455340637973394 Task 18 executing with 0.04062327569186841 Task 19 executing with 0.33773653174616214 Task 20 executing with 0.11109157246192058 Task 21 executing with 0.8826710026699609 Task 22 executing with 0.5482863501264439 Task 23 executing with 0.026732652520682798 Task 24 executing with 0.34362602918337504 |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use the imap() function to issue tasks to the process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by HS Spender on Unsplash
Do you have any questions?