Last Updated on September 12, 2022
You can call Pool.apply() to issue tasks to the process pool and block the caller until the task is complete.
In this tutorial you will discover how to issue one-off tasks to the process pool in Python.
Let’s get started.
Need to Issue Tasks To The Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
The process pool allows you to issue tasks in the form of target functions to be executed by the worker processes.
How can we issue one-off tasks to the process pool?
Run loops using all CPUs, download your FREE book to learn how.
How Use Pool.apply()
We can issue one-off tasks to the process pool using the apply() function.
The apply() function takes the name of the function to execute by a worker process.
For example:
1 2 3 |
... # issue a task to the process pool pool.apply(task) |
The call will block until the function is executed by a worker process, after which time it will return.
Call func with arguments args and keyword arguments kwds. It blocks until the result is ready.
— multiprocessing — Process-based parallelism
If the function to be executed takes arguments they can be specified as a tuple to the “args” argument or a dictionary via the “kwds” argument.
For example:
1 2 3 |
... # issue a task to the process pool with arguments pool.apply(task, args=(arg1, arg2, arg3)) |
Difference Between apply() vs apply_async()
How does the apply() function compare to the apply_async() function for issuing tasks?
Both the apply() and apply_async() may be used to issue one-off tasks to the process pool.
The following summarizes the key differences between these two functions:
- The apply() function blocks, whereas the apply_async() function does not block.
- The apply() function returns the result of the target function, whereas the apply_async() function returns an AsyncResult.
- The apply() function does not take callbacks, whereas the apply_async() function does take callbacks.
The apply() function should be used for issuing target task functions to the process pool where the caller can or must block until the task is complete.
The apply_async() function should be used for issuing target task functions to the process pool where the caller cannot or must not block while the task is executing.
Now that we know how to issue one-off tasks to the process pool, let’s look at some worked examples.
Example Pool.apply()
The apply() function can be called directly to execute a target function in the process pool.
The call will block until the function is executed by a worker process.
The example below demonstrates this by defining a task that reports a message and blocks for one second.
The task() function implements this.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # report a message print(f'Task done', flush=True) |
We can then create and configure a process pool with the default configuration.
1 2 3 |
... # create and configure the process pool pool = Pool() |
Next, we can issue the task() function to the process pool and block until it is executed.
1 2 3 |
... # issue tasks to the process pool pool.apply(task) |
Finally, we can close the process pool and release the resources.
1 2 3 |
... # close the process pool pool.close() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of issuing a task with apply() to the process pool from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool pool.apply(task) # close the process pool pool.close() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool. The main process blocks until the task is executed.
A worker process executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.
The main process continues on and closes the process pool.
1 2 |
Task executing Task done |
Next, let’s look at an example of issuing a task function with arguments
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Pool.apply() With Arguments
We can call apply() to issue a task to the process pool that takes arguments.
This can be achieved by passing a tuple of arguments to the “args” argument or a dictionary of arguments to the “kwds” argument.
In this example, we can update the previous examples so that our task() function takes one argument that is then reported in printed messages.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(data): # report a message print(f'Task executing: {data}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done: {data}', flush=True) |
We can then update the call to apply() to issue the task() function and specify a tuple containing one argument.
1 2 3 |
... # issue tasks to the process pool pool.apply(task, args=('Hello World',)) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of issuing a task with apply() with arguments to the process pool from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(data): # report a message print(f'Task executing: {data}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done: {data}', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool pool.apply(task, args=('Hello World',)) # close the process pool pool.close() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool with an argument. The main process blocks until the task is executed.
A worker process executes the task() function, reporting messages with the provided argument and sleeping for a second. The task is finished and returns.
The main process continues on and closes the process pool.
1 2 |
Task executing: Hello World Task done: Hello World |
Next, let’s look at an example of issuing a task function that returns a value.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Pool.apply() With a Return Value
We can call apply() to issue a target task function that returns a value.
This can be achieved by specifying the function that returns a value as an argument to apply(), and the function will be executed by the process pool, returning the value directly once completed.
In this example, we can update the task() function from the previous example to generate a random value between 0 and 1, report this value, then return it to the calling process.
The updated task() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a worker process def task(): # generate a value value = random() # report a message print(f'Task executing: {value}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done {value}', flush=True) return value |
We can then call the apply() function from the main process and assign the return value to a variable, then report the value that was returned.
1 2 3 4 5 |
... # issue tasks to the process pool result = pool.apply(task) # report value print(f'Main got: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of issuing a task with apply() to the process pool with a return value from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # generate a value value = random() # report a message print(f'Task executing: {value}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done {value}', flush=True) return value # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool result = pool.apply(task) # report value print(f'Main got: {result}') # close the process pool pool.close() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool. The main process blocks until the task is executed.
A worker process executes the task() function, generating a random value, reporting messages and sleeping for a second. The task is finished and returns the value that was generated.
The main process continues on, reports the same return value and then closes the process pool.
1 2 3 |
Task executing: 0.4150468503576358 Task done 0.4150468503576358 Main got: 0.4150468503576358 |
Next, let’s look at an example of issuing a target task function that may raise an exception.
Example of Pool.apply() With an Exception
We can call apply() to issue a task to the process pool that may raise an exception that is not handled.
The process pool will trap the exception for us, then re-raise the exception in the calling process that issued the task.
In this example, we can update the example above so that the task() function will raise an exception that is left unhandled.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) |
We can then issue the task() function to the process pool, but wrap the call in a try-except block, to handle the possible exception.
1 2 3 4 5 6 |
... # issue tasks to the process pool try: pool.apply(task) except Exception as e: print(f'Failed with: {e}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of issuing a task with apply() to the process pool that raises an exception from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool try: pool.apply(task) except Exception as e: print(f'Failed with: {e}') # close the process pool pool.close() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool. The main process blocks until the task is executed.
A worker process executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.
The process pool traps the exception and re-raises it in the calling process.
The main process continues on and handles the re-raised exception. It then closes the process pool.
1 2 |
Task executing Failed with: Something bad happened |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to issue one-off tasks to the process pool using Pool.apply()
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Tim Martin on Unsplash
Do you have any questions?