Last Updated on October 29, 2022
You can call apply() to issue tasks to the thread pool and block the caller until the task is complete.
In this tutorial you will discover how to issue one-off tasks to the ThreadPool in Python.
Let’s get started.
Need to Issue Tasks To The ThreadPool
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
The ThreadPool allows you to issue tasks in the form of target functions to be executed by the worker threads.
How can we issue one-off tasks to the ThreadPool?
Run loops using all CPUs, download your FREE book to learn how.
How to Issue One-Off Tasks to the ThreadPool
We can issue one-off tasks to the ThreadPool using the apply() method.
The apply() method takes the name of the function to be executed by a worker thread.
For example:
1 2 3 |
... # issue a task to the thread pool pool.apply(task) |
The call will block until the function is executed by a worker thread, after which time it will return.
Call func with arguments args and keyword arguments kwds. It blocks until the result is ready.
— multiprocessing — Process-based parallelism
If the function to be executed takes arguments they can be specified as a tuple to the “args” argument or a dictionary via the “kwds” argument.
For example:
1 2 3 |
... # issue a task to the thread pool with arguments pool.apply(task, args=(arg1, arg2, arg3)) |
Difference Between apply() vs apply_async()
How does the apply() methods compare to the apply_async() for issuing tasks?
Both the apply() and apply_async() may be used to issue one-off tasks to the thread pool.
The following summarizes the key differences between these two methods:
- The apply() method blocks, whereas the apply_async() method does not block.
- The apply() method returns the result of the target function, whereas the apply_async() method returns an AsyncResult.
- The apply() method does not take callbacks, whereas the apply_async() method does take callbacks.
The apply() method should be used for issuing target task functions to the thread pool where the caller can or must block until the task is complete.
The apply_async() method should be used for issuing target task functions to the ThreadPool where the caller cannot or must not block while the task is executing.
Now that we know how to issue one-off tasks to the ThreadPool, let’s look at some worked examples.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Example of Calling apply()
The apply() method can be called directly to execute a target function in the ThreadPool.
The call will block until the function is executed by a worker thread.
The example below demonstrates this by defining a task that reports a message and blocks for one second.
The task() function implements this.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # report a message print(f'Task done') |
We can then create and configure a ThreadPool with the default configuration.
1 2 3 |
... # create and configure the thread pool pool = Pool() |
Next, we can issue the task() function to the ThreadPool and block until it is executed.
1 2 3 |
... # issue tasks to the thread pool pool.apply(task) |
Finally, we can close the ThreadPool and release the resources.
1 2 3 |
... # close the thread pool pool.close() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of issuing a task with apply() to the thread pool from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # report a message print(f'Task done') # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool pool.apply(task) # close the thread pool pool.close() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool. The main thread blocks until the task is executed.
A worker thread executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.
The main thread continues on and closes the ThreadPool.
1 2 |
Task executing Task done |
Next, let’s look at an example of issuing a task function with arguments
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Calling apply() With Arguments
We can call apply() to issue a task to the ThreadPool that takes arguments.
This can be achieved by passing a tuple of arguments to the “args” argument or a dictionary of arguments to the “kwds” argument.
In this example, we can update the previous examples so that our task() function takes one argument that is then reported in printed messages.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(data): # report a message print(f'Task executing: {data}') # block for a moment sleep(1) # report a message print(f'Task done: {data}') |
We can then update the call to apply() to issue the task() function and specify a tuple containing one argument.
1 2 3 |
... # issue tasks to the thread pool pool.apply(task, args=('Hello World',)) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of issuing a task with apply() with arguments to the thread pool from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(data): # report a message print(f'Task executing: {data}') # block for a moment sleep(1) # report a message print(f'Task done: {data}') # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool pool.apply(task, args=('Hello World',)) # close the thread pool pool.close() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool with an argument. The main thread blocks until the task is executed.
A worker thread executes the task() function, reporting messages with the provided argument and sleeping for a second. The task is finished and returns.
The main thread continues on and closes the ThreadPool.
1 2 |
Task executing: Hello World Task done: Hello World |
Next, let’s look at an example of issuing a task function that returns a value.
Example of Calling apply() With a Return Value
We can call apply() to issue a target task function that returns a value.
This can be achieved by specifying the function that returns a value as an argument to apply(), and the function will be executed by the ThreadPool, returning the value directly once completed.
In this example, we can update the task() function from the previous example to generate a random value between 0 and 1, report this value, then return it to the calling thread.
The updated task() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# task executed in a worker thread def task(): # generate a value value = random() # report a message print(f'Task executing: {value}') # block for a moment sleep(1) # report a message print(f'Task done {value}') return value |
We can then call the apply() method from the main thread and assign the return value to a variable, then report the value that was returned.
1 2 3 4 5 |
... # issue tasks to the thread pool result = pool.apply(task) # report value print(f'Main got: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of issuing a task with apply() to the thread pool with a return value from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # generate a value value = random() # report a message print(f'Task executing: {value}') # block for a moment sleep(1) # report a message print(f'Task done {value}') return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool result = pool.apply(task) # report value print(f'Main got: {result}') # close the thread pool pool.close() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool. The main thread blocks until the task is executed.
A worker thread executes the task() function, generating a random value, reporting messages and sleeping for a second. The task is finished and returns the value that was generated.
The main thread continues on, reports the same return value and then closes the ThreadPool.
1 2 3 |
Task executing: 0.9420120071068216 Task done 0.9420120071068216 Main got: 0.9420120071068216 |
Next, let’s look at an example of issuing a target task function that may raise an exception.
Example of Calling apply() With an Exception
We can call apply() to issue a task to the ThreadPool that may raise an exception that is not handled.
The ThreadPool will trap the exception for us, then re-raise the exception in the calling thread that issued the task.
In this example, we can update the example above so that the task() function will raise an exception that is left unhandled.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done') |
We can then issue the task() function to the ThreadPool, but wrap the call in a try-except block, to handle the possible exception.
1 2 3 4 5 6 |
... # issue tasks to the thread pool try: pool.apply(task) except Exception as e: print(f'Failed with: {e}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of issuing a task with apply() to the thread pool that raises an exception from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done') # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool try: pool.apply(task) except Exception as e: print(f'Failed with: {e}') # close the thread pool pool.close() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool. The main thread blocks until the task is executed.
A worker thread executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.
The ThreadPool traps the exception and re-raises it in the calling thread.
The main thread continues on and handles the re-raised exception. It then closes the ThreadPool.
1 2 |
Task executing Failed with: Something bad happened |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to issue one-off tasks to the ThreadPool using the apply() method.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by James Coleman on Unsplash
Do you have any questions?