How to Get the First Result from the ThreadPool
You can get the first result from tasks in the ThreadPool either via a shared queue.Queue or by issuing tasks via the imap_unordered() function and getting the first value from the iterable.
In this tutorial, you will discover how to get the first result from the ThreadPool in Python.
Let's get started.
Need to Get the First Result
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
-- multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
When using the ThreadPool, we may issue many tasks and require the result from the first task to be finished.
How can we get the first result from the ThreadPool?
How to Get The First Result
There are two main approaches we can use to get the result from the first task to complete in the ThreadPool.
They are:
- Have tasks put their result on a shared queue.
- Issue tasks using the imap_unordered() function.
Let's take a closer look at each approach in turn.
Put Results on a Shared Queue
A queue.Queue can be created and shared among all tasks issued to the ThreadPool.
As tasks finish, they can put their results on the queue.
The parent thread waiting for the first result can get the first result made available via the shared queue.
A ThreadPool can be created as per normal.
For example:
...
# create a shared queue
queue = queue.Queue()
We can share the queue with the ThreadPool directly as an argument to the task function.
Alternatively, we can share the queue as a global variable accessible by the main thread and the worker threads in the thread pool.
Tasks can then put results on the queue by calling the put() function and passing the result object.
For example:
...
# put result on the queue
queue.put(result)
The main thread can then retrieve the first result made available via the get() function on the queue. This call will block until a result is available.
For example:
...
# get the first result
result = queue.get()
You can learn more about using the thread-safe queue in the tutorial:
Issue Tasks Using imap_unordered()
Another approach to getting the first result from the ThreadPool is to issue tasks using the imap_unordered() method.
This function will issue tasks in a lazy manner and will return an iterable that yields results in the order that tasks are completed, rather than the order that tasks were issued.
Therefore, tasks can be issued as per normal via a call to the imap_unordered() function.
For example:
...
# issue tasks to the thread pool
it = pool.imap_unordered(task, items)
We can then call the built-in next() function on the returned iterable to get the result from the first task to complete in the ThreadPool.
Recall, that the next() function returns the next item in an iterable.
For example,
...
# get the first result
result = next(it)
You can learn more about the imap_unordered() function in the tutorial:
Now that we know how to get the first result from the ThreadPool, let's look at some worked examples.
Example of Getting the First Result with a Queue
We can explore how to get the result from the first task to complete using a queue.
In this example, we will define a queue and share it with workers in the ThreadPool via a global variable. We will then define a target task function that will generate a random number, block, then store the result on the shared queue. The randomness will mean the task that finishes first will differ each time the code is run. Finally, we will issue many tasks and wait for and report the first result received in the main thread.
Firstly, we can define the target task function.
The function takes a unique integer identifier for the task. It then generates a random number between 2 and 12, reports the value, then blocks for that many seconds. Finally, a tuple of the integer identifier for the task and the generated value are put on the shared queue.
The task() function below implements this.
# task executed in a worker thread
def task(identifier):
# generate a value
value = 2 + random() * 10
# report a message
print(f'Task {identifier} executing with {value}')
# block for a moment
sleep(value)
# return the generated value
queue.put((identifier, value))
Next, in the main thread, we first create a shared queue.
In this case, we will use a queue.SimpleQueue type queue as we do not need to limit the capacity of the queue.
...
# create the shared queue
queue = SimpleQueue()
You can learn more about the SimpleQueue in the tutorial:
We can then create the ThreadPool and configure it to use the custom worker initialization function.
...
# create and configure the thread pool
with ThreadPool() as pool:
# ...
We then issue 30 tasks to the ThreadPool asynchronously using the map_async() function.
...
# issue many tasks
_ = pool.map_async(task, range(30))
You can learn more about issuing tasks to the ThreadPool using the map_async() function in the tutorial:
The main thread then gets the first result from the queue.
This blocks until a result is available by the first task to complete.
...
# get the first result, blocking
identifier, value = queue.get()
The result is then reported.
...
# report the first result
print(f'First result: identifier={identifier}, value={value}')
Finally, the ThreadPool is closed by the context manager interface.
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of getting the first result from the thread pool with a queue
from random import random
from time import sleep
from queue import SimpleQueue
from multiprocessing.pool import ThreadPool
# task executed in a worker thread
def task(identifier):
# generate a value
value = 2 + random() * 10
# report a message
print(f'Task {identifier} executing with {value}')
# block for a moment
sleep(value)
# return the generated value
queue.put((identifier, value))
# protect the entry point
if __name__ == '__main__':
# create the shared queue
queue = SimpleQueue()
# create and configure the thread pool
with ThreadPool() as pool:
# issue many tasks
_ = pool.map_async(task, range(30))
# get the first result, blocking
identifier, value = queue.get()
# report the first result
print(f'First result: identifier={identifier}, value={value}')
Running the example first creates the shared queue.
The ThreadPool is then created, and configured with the custom initialization function that shares the queue with each worker thread.
The tasks are then issued to the pool asynchronously. The main thread then blocks, waiting for the first result to be retrieved from the shared queue.
Only a subset of the tasks is able to run at a time. Each task generates a random number, reports a message, blocks, then stores its result in the queue.
The main thread gets the result from the first task to complete. It reports the value, then exits the context manager block for the pool.
This terminates the pool and all running tasks, then the program ends.
This highlights one approach we can use to get the first result from tasks in the ThreadPool.
Note, that results will differ each time the program is run given the use of random numbers.
Task 0 executing with 4.388392181354002
Task 1 executing with 3.0102164339752298
Task 2 executing with 4.9214826220804815
Task 3 executing with 10.350379291238756
Task 4 executing with 7.736914173034551
Task 5 executing with 6.7523327230496974
Task 6 executing with 9.969935912716616
Task 7 executing with 10.794115864831909
Task 8 executing with 5.037455046767715
First result: identifier=1, value=3.0102164339752298
Next, let's explore how we might get the first result using the imap_unordered() function.
Example of Getting the First Result with imap_unordered()
We can explore how to issue tasks with the imap_unordered() function and get the first result.
In this example, we can issue all tasks using the imap_unordered() function. This will return an iterable that will yield results in the order that the tasks have been completed. We can then get the first value from the iterable in order to get the first result.
Firstly, we can define the ThreadPool using the default configuration with the context manager interface.
...
# create and configure the thread pool
with ThreadPool() as pool:
# ...
You can learn more about the ThreadPool context manager interface in the tutorial:
Next, we can issue all tasks using the imap_unordered() function and retrieve the iterable.
...
# issue many tasks
it = pool.imap_unordered(task, range(30))
Finally, we can call the built-in next() function in order to traverse the iterable one step and get the first result.
This call will block until the first result is available.
...
# get the first result, blocking
identifier, value = next(it)
And that's it.
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of getting the first result from the thread pool with imap_unordered
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool
# task executed in a worker thread
def task(identifier):
# generate a value
value = 2 + random() * 10
# report a message
print(f'Task {identifier} executing with {value}')
# block for a moment
sleep(value)
# return the generated value
return (identifier, value)
# protect the entry point
if __name__ == '__main__':
# create and configure the thread pool
with ThreadPool() as pool:
# issue many tasks
it = pool.imap_unordered(task, range(30))
# get the first result, blocking
identifier, value = next(it)
# report the first result
print(f'First result: identifier={identifier}, value={value}')
# terminate remaining tasks
print('Terminating remaining tasks')
Running the example first creates the ThreadPool.
It then issues all tasks to the ThreadPool and returns an iterable for task results.
The main thread then blocks, waiting for the first result to be made available.
A subset of the tasks begins executing. Each task generates a random number, reports the value, blocks, then returns a tuple of the integer value and generated random number.
A result is received by the main thread and is reported.
The ThreadPool and all remaining tasks are then forcefully terminated.
This highlights a simple way to get the first result from multiple tasks in the ThreadPool.
Note, that results will differ each time the program is run given the use of random numbers.
Task 0 executing with 10.195721449283099
Task 1 executing with 4.537848138785382
Task 2 executing with 6.188222239424766
Task 3 executing with 6.625273113993776
Task 4 executing with 6.777439734838023
Task 5 executing with 11.14993589189849
Task 6 executing with 10.008730134107777
Task 7 executing with 9.518768849906268
Task 8 executing with 8.16243572617688
First result: identifier=1, value=4.537848138785382
Terminating remaining tasks
Takeaways
You now know how to get the first available result from the ThreadPool.
If you enjoyed this tutorial, you will love my book: Python ThreadPool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.