Last Updated on September 12, 2022
You can execute tasks asynchronously with the ProcessPoolExecutor by calling the map() function.
In this tutorial you will discover how to use the map() function to execute tasks with the process pool in Python.
Let’s get started.
Need to Call Functions in Separate Processes
You may have a for-loop that calls a function for each item in an iterable like a list.
1 2 3 4 |
... # apply a function to each item in a list for item in items: result = task(item) |
How can you make each function call in a separate process?
Or put another way:
How can you make the for-loop concurrent?
Alternatively, you may be using the built-in map() function to apply the function to each item in an iterable for you.
1 2 3 |
... # apply the function to each element in the collection results = map(task, items) |
This does not perform the task() function to each item until we iterate the results, so-called lazy evaluation:
1 2 3 4 |
... # iterate the results from map for result in results: print(result) |
Therefore, it is common to see this operation consolidated to the following:
1 2 3 4 |
... # iterate the results from map for result in map(task, items): print(result) |
There are a number of other common patterns when using the map() function.
For example, you can use the built-in enumerate() function on the results of map() to enumerate the returned values from the function with an index, helpful if you need to access a parallel list of data.
1 2 3 4 5 6 |
... # map function to items results = map(task, items) # enumerate the results from map for i,result in enumerate(results): # do something... |
Finally, it is also common to want to iterate the results of map() as well as the list of items passed to map.
This can be achieved using the zip() built-in function that takes a number of iterables and yields results from each per iteration.
For example:
1 2 3 4 5 6 |
... # map function to items results = map(task, items) # iterate items and results together for item,result in zip(items, results): # do something... |
Now that we are familiar with map(), how can we execute each call by map() concurrently using processes?
Run loops using all CPUs, download your FREE book to learn how.
How to Call map() With Separate Processes
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can specify the number of processes to create in the process pool as an argument, which defaults to the number of logical CPU cores in your system.
1 2 3 |
... # create a process pool executor = ProcessPoolExecutor() |
You can also submit tasks by calling the map() function and specify the name of the function to execute and the iterable of items to which your function will be applied.
1 2 3 |
... # execute each function call in a separate process results = executor.map(task, items) |
Each call to the target function with one item from the iterable will be executed in a separate process.
You can then iterate over the results as we would with map(), except we may have to wait for the results as the tasks complete in separate processes. This waiting is called “blocking” and happens automatically.
1 2 3 4 |
... # iterate the results from map for result in results: print(result) |
Although the tasks are executed concurrently, the results are iterated in the order of the iterable provided to the map() function, the same as the built-in map() function.
In this way, we can think of the process pool version of map() as a concurrent version of the map() function and is ideal if you are looking to update your for loop to use processes.
We can call the map() function to execute the functions in separate processes and process the results using the common for-loop idiom, as follows:
1 2 3 4 |
... # iterate the results from map performed in separate processes for result in executor.map(task, items): print(result) |
Map Submitted Tasks to Internal Tasks with Chunksize
The map() function on the ProcessPoolExecutor takes a parameter called “chunksize” which defaults to 1.
1 2 3 4 |
... # apply a function to each item in an iterable with a chunksize for result in executor.map(task, items, chunksize=1) # ... |
The “chunksize” argument controls the mapping of items in the iterable passed to map to tasks used in the ProcessPoolExecutor executor.
A value of one means that one item is mapped to one task.
Recall that the data for each task in terms of arguments sent to the target task function and values that are returned must be serialized by pickle. This happens automatically, but incurs some computational and memory cost, adding overhead to each task processed by the process pool.
When there are a vast number of tasks or tasks are relatively quick to compute, then the chunksize should be tuned to maximize the grouping of items to process pool tasks in order to minimize the overhead per task and in turn reduce the overall compute time.
This will likely require some tuning of the chunksize that you may be able to perform with real task data, or perhaps a test harness with mock data and task processes.
Some values to try might include:
- 1: The default mapping of one item per task.
- items / max_workers: Splits all items into max_workers groups, e.g. one batch of items per process.
Note, the (items / max_workers) division may need to be rounded as the “chunksize” argument must be a positive integer.
For example:
1 2 3 4 5 6 |
... # estimate the chunksize size = round(len(items) / executor._max_workers) # apply a function to each item in an iterable with a chunksize for result in executor.map(task, items, chunksize=size) # ... |
Compare the performance to see if one configuration is better than another, and if so, use it as a starting point for similar values to evaluate.
Wait For Results With Timeout
We may want to limit how long we are willing to wait for a task to complete and return a result when iterating the return value from the map() function.
This can be achieved by setting the “timeout” argument when calling map() and specifying how long we are willing to wait in seconds.
1 2 3 4 |
... # iterate the results from map performed in separate processes, wait a limited time for result in executor.map(task, items, timeout=5): print(result) |
If more than the timeout number of seconds elapses before a task completes and returns a result, then a TimeoutError is raised that may need to be handled.
1 2 3 4 5 6 7 8 |
... # handle a timeout error when getting results try: # iterate the results from map performed in separate processes, wait a limited time for result in executor.map(task, items, timeout=5): print(result) except TimeoutError: print('Waited too long') |
Wait For All Results to Complete
We don’t have to wait for the results from the call to the map() function in order to continue.
For example, we can call map() with a target task function and an iterable and then carry on with other operations.
The call to map() does not block, meaning that we do not have to wait for the results unless we iterate the results returned from the call to the map() function explicitly.
This might be helpful if our target task function does not return a result, although you may want to use the submit() function on the ProcessPoolExecutor instead.
1 2 3 4 |
... # perform the tasks in separate processes map(task, items) # does not block # do other things.. |
Alternatively, we may want the map() function to call the target function in separate processes and wait for all tasks to complete.
This can be achieved by calling the shutdown() function of the ProcessPoolExecutor which by default will wait for all scheduled and running tasks to complete before returning.
1 2 3 |
... # wait for all tasks to complete executor.shutdown() |
Alternately, we use the context manager of the process pool that will automatically shutdown the process pool for us when we are finished and will block until all tasks have completed.
1 2 3 4 5 6 |
... # start the process pool with ProcessPoolExecutor() as executor: # perform the tasks in separate processes map(task, items) # does not block # shutdown automatically, wait for all tasks to complete |
Call map() With Multiple Arguments
The ProcessPoolExecutor map() function supports target functions that take more than one argument by providing more than one iterable as arguments to the call to map().
For example, we can define a target function for map that takes two arguments, then provide two iterables to the call to map(). The map() function will then apply the function with both sets of arguments and stop when the shortest iterable is exhausted.
1 2 3 |
... # call map with a target function that takes more than one argument executor.map(task, items1, items2) |
If you want to provide ad hoc arguments to your target task function, e.g. variable numbers of arguments or the same argument to multiple function calls, then you may be better off using the submit() function on the ProcessPoolExecutor.
Now that we are familiar with how to use map() function to call a function using processes in Python, let’s look at a worked example.
Example of Using map() With the ProcessPoolExecutor
Let’s explore how to use the map() function on the ProcessPoolExecutor with a worked example.
First, we can define a simple task that will block for a variable amount of time less than one second.
The task takes a unique name and will return a message that it has completed.
1 2 3 4 5 |
# custom task that will block for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return f'Task: {name} done.' |
This task is a good candidate for calling the built-in map() function and the ProcessPoolExecutor map() function because it is a pure function that does not have any side effects and does not access or change state outside of the function itself.
Next, we can define a process pool with ten worker processes using the context manager.
1 2 3 4 |
... # start the process pool with ProcessPoolExecutor(10) as executor: # ... |
Next, we can call the map() function to apply the task() function to a range of integers from 0 to 9.
Finally, we can report the results as they are made available in the order that the tasks were submitted to the process pool for execution.
1 2 3 4 5 |
... # execute tasks concurrently and process results in order for result in executor.map(task, range(10)): # report the result print(result) |
Tying this together, the complete example of calling map() to apply a function to a range of data using worker processes in the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of calling map and processing results from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return f'Task: {name} done.' # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # execute tasks concurrently and process results in order for result in executor.map(task, range(10)): # report the result print(result) |
Running the example first creates the process pool, then submits the tasks to the process pool for execution.
The tasks are completed and results are retrieved and reported as they become available in the order that tasks were submitted.
1 2 3 4 5 6 7 8 9 10 |
Task: 0 done. Task: 1 done. Task: 2 done. Task: 3 done. Task: 4 done. Task: 5 done. Task: 6 done. Task: 7 done. Task: 8 done. Task: 9 done. |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Setting Chunksize For Many Small Tasks
When we have thousands or millions of short duration tasks to complete, it is a good idea to set the “chunksize” argument when calling the map() function.
This will group calls to our target function into chunks that are then sent to work processes.
Each call to a function and the argument must be serialized and transmitted to a worker process using inter-process communication.
Although fast, it does incur some computational cost for each task transmitted. This adds up when we have a large number of short duration tasks.
Let’s demonstrate this with an example of a target function that squares a provided number.
1 2 3 |
# square a provided number def task(value): return value**2 |
We can then call this 10,000 times with integers from 0 to 9,999.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# SuperFastPython.com # example of calling map with a chunksize of 1 from concurrent.futures import ProcessPoolExecutor # square a provided number def task(value): return value**2 # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(4) as executor: # execute tasks concurrently and process results in order for result in executor.map(task, range(10000), chunksize=1): # report the result print(result) |
Running the example will square 10,000 numbers and will complete reasonably quickly.
It takes about 2.3 seconds on my system.
1 2 3 4 5 6 7 8 |
... 99860049 99880036 99900025 99920016 99940009 99960004 99980001 |
It does not take this long to square some numbers in Python.
It’s slow because the process pool must manage 10,000 very short duration tasks.
We can dramatically speed up the execution of these tasks by grouping them into large chunks for each process to work on.
Setting the chunksize is a bit of an art and will depend on the number of CPU cores, the number of worker processes, the duration of each task and the amount of data transmitted for each task.
As such, it can be a good idea to run some tests to see what works well for your system.
A good starting point is to divide the number of tasks by the number of processes.
For example, if we have 10,000 tasks and 4 worker processes (and at least 4 CPU cores), then we might set the chunksize to 2,500 as a first test.
- chunksize = number of tasks / number of worker processes
- chunksize = 10,000 / 4
- chunksize = 2,500
The example below demonstrates the same batch of tasks with the chunksize of 2,500.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# SuperFastPython.com # example of calling map with a chunksize of 2500 from concurrent.futures import ProcessPoolExecutor # square a provided number def task(value): return value**2 # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(4) as executor: # execute tasks concurrently and process results in order for result in executor.map(task, range(10000), chunksize=2500): # report the result print(result) |
Running the example calculates the same results, but is much faster.
It takes about 150 milliseconds on my system, compared to the 2.3 seconds with a chunksize of 1.
1 2 3 4 5 6 7 8 |
... 99860049 99880036 99900025 99920016 99940009 99960004 99980001 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Calling map() With a Timeout
We may want the results from the function calls, but are unwilling to wait an extended period.
This can be achieved by setting the “timeout” argument when calling the map() function.
As we process the results by iterating the value returned from the calling map(), if a task takes more than the timeout to return a value, then a TimeoutError is raised, which we can choose to handle.
The list below updates the example to wait a timeout of a tiny fraction of a second and if the timeout elapses, a message is reported.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of calling map and processing results with a timeout from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import TimeoutError # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return f'Task: {name} done.' # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # handle a timeout error when getting results try: # iterate the results from map performed in separate processes, wait a limited time for result in executor.map(task, range(10), timeout=0.05): print(result) except TimeoutError: print('Waited too long') |
Running the example starts the process pool then submits ten function call tasks to the pool.
We wait a fraction of a second for the first result, before a TimeoutError is raised and caught, reporting a message that we gave up waiting.
Note that if a TimeoutError is raised, it does not impact the tasks exciting in the process pool. For example, although we were impatient for a result, the tasks will continue to run and the process pool will not close until all tasks have completed.
1 |
Waited too long |
Example of Waiting for All Tasks To Complete
We may have a target task function that does not return a value and yet we may want to call the function for an iterable of data and wait for all tasks to complete.
We can do this by calling the map() function, not iterating the results and using the context manager to close the process pool which will wait for all tasks to complete.
First, let’s update our target task function to not return a value, but instead report a value directly via a print statement.
1 2 3 4 5 |
# custom task that will sleep for a variable amount of time def task(value): # sleep for less than a second sleep(random()) print(f'Done: {value}') |
We can then submit the tasks to the process pool via a call to map() without iterating the results.
1 2 3 |
... # submit all tasks executor.map(task, range(5)) |
The call to map() does not block, so we are able to carry on with other tasks in our program.
We can then wait for all tasks in the process pool to complete by allowing the context manager to call shutdown() for us and return once all tasks have finished.
To make this clear, we will add a print statement that we are waiting for tasks to complete.
1 2 3 |
... # shutdown, wait for all tasks to complete print('Waiting...') |
Once all tasks have completed, we can carry on with our program.
1 2 |
... print('All done.') |
Tying this together, the complete example of waiting for all map() tasks to complete in the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of calling map and waiting for all tasks to complete from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(value): # sleep for less than a second sleep(random()) print(f'Done: {value}') # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor() as executor: # submit all tasks executor.map(task, range(5)) # shutdown, wait for all tasks to complete print('Waiting...') print('All done.') |
Running the example starts the process pool and submits five tasks by calling the map() function.
We do not iterate the results of the tasks, instead we start waiting for the tasks to complete immediately with an implicit call to shutdown() on the process pool by the context manager.
The tasks complete one by one and we then carry on with our program once all tasks have completed.
1 2 3 4 5 6 7 |
Waiting... Done: 1 Done: 3 Done: 4 Done: 0 Done: 2 All done! |
Example of Calling map() With Multiple Arguments
We may want to call a target task function that takes more than one argument.
This can be done with the built-in map() function and with the ProcessPoolExecutor map() function.
Both map() functions take one or more iterables and the map function will stop calling the target task function with values from each iterable when the shortest iterable is exhausted.
We can update our target task function to take two values and to return a tuple that combines the values after a short sleep.
1 2 3 4 5 |
# custom task that will sleep for a variable amount of time def task(value1, value2): # sleep for less than a second sleep(random()) return (value1, value2) |
We can define two lists of data that we will use as iterables for our call to map.
1 2 3 4 |
... # define our data data1 = ['1', '2', '3'] data2 = ['a', 'b', 'c'] |
Finally, we can update our call to map() to take both iterables of data.
1 2 3 4 |
... # submit all tasks for result in executor.map(task, data1, data2): print(result) |
Tying this together, the complete example of calling map to execute function calls with two arguments asynchronously with the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of calling map with multiple arguments from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(value1, value2): # sleep for less than a second sleep(random()) return (value1, value2) # entry point if __name__ == '__main__': # define our data data1 = ['1', '2', '3'] data2 = ['a', 'b', 'c'] # start the process pool with ProcessPoolExecutor() as executor: # submit all tasks for result in executor.map(task, data1, data2): print(result) |
Running the example first starts the process pool, then submits three function calls to the worker processes, one for each pair of values in the two iterables passed to map.
Each task returns a tuple that is printed.
1 2 3 |
('1', 'a') ('2', 'b') ('3', 'c') |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Takeaways
You now know how to use the map() function to execute tasks asynchronously with the ProcessPoolExecutor
Do you have any questions?
Ask your question in the comments below and I will do my best to answer.
Selin says
Hello,
I noticed when I used the executor with the with statement, the first time it runs and finished right. However, if I call the executor again for the same process or a different process, it is hanging: not triggering the concurrent computation.
If it makes a difference, my environment is Linux.
Jason Brownlee says
Sorry to hear that.
You should have no problem calling the same ProcessPoolExecutor twice.
Below is an example of using the ProcessPoolExecutor within a “with” expression that works and that that you can adapt for your use case:
Paul says
this is a great post – I learned more about ProcessPoolExecutire in 10 minutes than I had reading many examples. Great work
Jason Brownlee says
Thanks you, I’m happy it helped!