Last Updated on September 12, 2022
You can retry failed tasks in the ThreadPoolExecutor by re-submitting them once they fail.
In this tutorial, you will discover how to retry failed tasks in the Python thread pool.
Let’s get started.
Need to Retry Failed Tasks in the ThreadPoolExecutor
The ThreadPoolExecutor provides a flexible way to execute ad hoc tasks using a pool of worker threads.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
Tasks submitted to the thread pool can fail by raising an exception.
In some applications, we may need to resubmit a task that has failed, allowing the task to be tried again or retried.
This may be for many reasons and is likely dependent on the specific details of the task, such as accessing an external resource whose connection can fail or performing a probabilistic task whose outcome may not be desirable.
We may want to perform a single retry or repeated retries until the task is successful.
The ThreadPoolExecutor has no built-in facility for retrying failed tasks.
This raises the question, how do you retry failed tasks with the ThreadPoolExecutor in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Retry Failed Tasks in The ThreadPoolExecutor
We can retry a failed task by manually resubmitting it to the thread pool.
This requires two things:
- That we know that the task has failed.
- That we have enough information to resubmit it.
How to Determine if a Task Failed
We can determine if a task has failed via its associated Future object.
The exception() function on the Future will return None if the task was executed successfully; otherwise, it will return the instance of the Exception that was raised during execution, indicating a failure.
For example:
1 2 3 4 |
... # check of a task failed if future.exception(): # task failed... |
Additionally, if a task fails by raising an Exception during execution, we can call the result() function to get the return value from the target task function and the exception will be re-raised.
1 2 3 4 5 6 |
... # check if a task failed try: result = future.result() except: # task failed... |
This provides the two main ways to determine if a task has failed.
How to Store Data for Submitted Tasks
Once we know that a task has failed, we need to resubmit it, which requires knowledge of what data the task was submitted with.
Specifically, it refers to the arguments passed to the target task function, assuming we are calling the same target task function for each task.
This can be achieved by maintaining a dictionary mapping of Future objects returned from submit() to the data passed to the target task function.
The data stored for each Future could be a single value or a list or tuple of arguments.
For example:
1 2 3 4 5 6 7 |
... # map of futures to task data future_to_data = {} # submit a task future = executor.submit(work, data) # store the mapping of future to data future_to_data[future] = data |
We might also do this in a dictionary comprehension if we are calling the submit() function for each item in a collection.
For example:
1 2 3 |
... # submit tasks and record a mapping of futures to task data futures_to_data = {executor.submit(work, item):item for item in items} |
If we are calling different functions for different tasks, we might also store the function name in the same mapping.
This provides a simple way to record the data submitted for each task to the thread pool.
How to Resubmit a Failed Task
Once we know that a task has failed and we have access to data for each submitted task, we can re-submit failed tasks.
This can be as simple as calling the submit() function again with the same data.
For example:
1 2 3 4 5 6 7 |
... # check of a task failed if future.exception(): # get the data for the task data = future_to_data[future] # resubmit the task retry = executor.submit(work, data) |
We might be sure to also record the Future object for the retry, in case we want to retry a failed task again if it fails for a second time, and beyond.
1 2 3 |
... # record the retry future_to_data[retry] = data |
Now that we know how to retry a failed task, let’s look at some worked examples.
Example of Retrying Failed Tasks
Let’s explore how we might retry tasks that fail in the thread pool.
First, let’s define a task that has a chance to fail.
We can generate a random number between 0 and 1 using the random.random() function and if the number is below 0.3 (e.g. 30% of the time), then the function can raise an exception and fail.
The function will take a unique identifier so we can see the success or failure and retry of each task.
1 2 3 4 5 6 7 8 |
# task that sleeps for a moment and may fail with an exception def work(identifier): # sleep for a moment sleep(random()) # conditionally fail with a chance of 30 percent if random() < 0.3: raise Exception(f'Something bad happened {identifier}') return f'Completed {identifier}' |
Next, we can define a thread pool with 10 workers and submit 10 tasks to the pool for execution.
We will use a dictionary comprehension to create a mapping of Future objects for each task to the data submitted with each task, in this case, an integer value.
1 2 3 4 5 |
... # create a thread pool with ThreadPoolExecutor(10) as executor: # submit ten tasks futures_to_data = {executor.submit(work, i):i for i in range(10)} |
So far so good.
Next, we can monitor tasks as they are completed and check if they have failed.
We can define a new dictionary that will record only those tasks that have been retried.
1 2 3 |
... # record of retried tasks retries = {} |
We can then call the as_completed() function on the dictionary of Future objects, which will return the same objects in the order that they are completed.
If the task fails, we will retrieve the data, resubmit the task, store the retry, and report the failure. Otherwise, if the task was successful, we report the success.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# process work as it completes for future in as_completed(futures_to_data): # check for a failure if future.exception(): # get the associated data for the task data = futures_to_data[future] # submit the task again future = executor.submit(work, data) # store so we can track the retries retries[future] = data # report progress print(f'Failure, retrying {data}') else: # report successful result print(future.result()) |
That’s all there is to retrying failed tasks.
Next, we can report on the success or failure of the retries themselves.
If the task fails for a second time, we report the failure and that we are giving up; otherwise, we report the success on the retry.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
... # wait for retries print('\nRetries:') for future in as_completed(retries): # check for a failure if future.exception(): # get the associated data for the task data = retries[future] # failure print(f'Failure on retry: {data}, not trying again') else: # report successful result print(future.result()) |
Tying this together, the complete example of retrying failed tasks for a second time in the ThreadPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# SuperFastPython.com # example of retrying a failed task a second time from random import random from time import sleep from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # task that sleeps for a moment and may fail with an exception def work(identifier): # sleep for a moment sleep(random()) # conditionally fail with a chance of 30 percent if random() < 0.3: raise Exception(f'Something bad happened {identifier}') return f'Completed {identifier}' # create a thread pool with ThreadPoolExecutor(10) as executor: # submit ten tasks futures_to_data = {executor.submit(work, i):i for i in range(10)} # record of retried tasks retries = {} # process work as it completes for future in as_completed(futures_to_data): # check for a failure if future.exception(): # get the associated data for the task data = futures_to_data[future] # submit the task again future = executor.submit(work, data) # store so we can track the retries retries[future] = data # report progress print(f'Failure, retrying {data}') else: # report successful result print(future.result()) # wait for retries print('\nRetries:') for future in as_completed(retries): # check for a failure if future.exception(): # get the associated data for the task data = retries[future] # failure print(f'Failure on retry: {data}, not trying again') else: # report successful result print(future.result()) |
Running the example creates the thread pool and submits the ten tasks as per normal.
The tasks are then processed in the order they are completed; we can see that five of the ten tasks failed on the first round and the remaining five completed successfully
The five failures are resubmitted for a retry.
The retried tasks are then processed as they are completed and we can see that three of the retried tasks completed successfully and two failed for a second time, after which we gave up.
Note: your specific results will differ given that we are using the random() function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Failure, retrying 6 Failure, retrying 1 Completed 8 Completed 7 Completed 4 Failure, retrying 0 Failure, retrying 5 Completed 3 Failure, retrying 9 Completed 2 Retries: Failure on retry: 1, not trying again Completed 5 Completed 0 Failure on retry: 6, not trying again Completed 9 |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Repeatedly Retrying Failed Tasks
We may want to repeatedly retry tasks until they are ultimately successful.
One approach to achieving this would be to keep track of the total number of submitted tasks, then count tasks as they complete successfully and continue submitting retries until the expected number of tasks are completed.
Firstly, we can define the total number of tasks and the count of successfully completed tasks.
1 2 3 4 5 |
... # total tasks to execute TASKS = 20 # counter for the total tasks that are complete completed = 0 |
We can then start the thread pool and submit the tasks as per normal, ensuring we create a dictionary mapping of Future objects to data submitted with each task.
1 2 3 4 5 |
... # create a thread pool with ThreadPoolExecutor(TASKS) as executor: # submit the tasks into the pool futures_to_data = {executor.submit(work, i):i for i in range(TASKS)} |
Next, we need a consistent way of retrying a failed task.
We can define a function named retry() that will take the Future that failed, the mapping of Future objects to data, and the thread pool. The function will retrieve the data for the Future, resubmit the task, then store the retry in the same mapping of Future objects to data.
This function is listed below and contains much the same retry functionality we developed in the previous section.
1 2 3 4 5 6 7 8 9 |
# retry a task that failed def retry(future, futures_to_data, executor): # get the associated data for the task data = futures_to_data[future] # submit the task again retry = executor.submit(work, data) # store so we can track the retries futures_to_data[retry] = data return data |
We can then loop until the expected number of tasks have completed successfully.
We can call as_completed() on the mapping of Future objects to data that will contain the original tasks and the retries.
1 2 3 4 5 6 |
... # spin, keep track of things until all tasks complete while completed < TASKS: # process work as it completes for future in as_completed(futures_to_data): # ... |
If the task fails, we can resubmit it by calling our retry() function. Whereas, if the task was completed successfully we can update the count of all successful tasks.
Finally, we must remove each done task (successful or otherwise) from the mapping of Future objects to data to ensure we don’t process it again on the next iteration.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
... # check for a failure if future.exception(): # retry the task data = retry(future, futures_to_data, executor) # report the failure print(f'Failure, retrying {data}') else: # report successful result print(future.result()) completed += 1 # remove this future so we don't re-consider it futures_to_data.pop(future) |
And that’s it.
We will now run until all tasks are completed successfully and only process tasks as they are completed, avoiding the reprocessing of tasks that are already done.
The complete example of repeated retrying failed tasks in the thread pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# SuperFastPython.com # example of retrying a failed task until they succeed from random import random from time import sleep from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # task that sleeps for a moment and may fail with an exception def work(identifier): # sleep for a moment sleep(random()) # conditionally fail with a chance of 30 percent if random() < 0.3: raise Exception(f'Something bad happened {identifier}') return f'Completed {identifier}' # retry a task that failed def retry(future, futures_to_data, executor): # get the associated data for the task data = futures_to_data[future] # submit the task again retry = executor.submit(work, data) # store so we can track the retries futures_to_data[retry] = data return data # total tasks to execute TASKS = 20 # counter for the total tasks that are complete completed = 0 # create a thread pool with ThreadPoolExecutor(TASKS) as executor: # submit the tasks into the pool futures_to_data = {executor.submit(work, i):i for i in range(TASKS)} # spin, keep track of things until all tasks complete while completed < TASKS: # process work as it completes for future in as_completed(futures_to_data): # check for a failure if future.exception(): # retry the task data = retry(future, futures_to_data, executor) # report the failure print(f'Failure, retrying {data}') else: # report successful result print(future.result()) completed += 1 # remove this future so we don't re-consider it futures_to_data.pop(future) |
Running the example creates the thread pool and submits the tasks as per normal.
We then process the tasks as they are completed and report progress along the way.
We can see that tasks start failing and are retried. Interestingly, we can see some tasks fail and retry a number of times, such as task 14 that failed 3 times before completing successfully.
Note: your specific results will differ given that we are using the random() function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
Failure, retrying 14 Completed 2 Completed 7 Completed 11 Failure, retrying 5 Completed 4 Failure, retrying 17 Failure, retrying 13 Completed 16 Completed 6 Failure, retrying 19 Failure, retrying 1 Failure, retrying 8 Completed 10 Completed 12 Completed 15 Failure, retrying 18 Completed 0 Completed 9 Completed 3 Failure, retrying 14 Completed 13 Failure, retrying 5 Completed 19 Completed 1 Completed 8 Completed 18 Completed 17 Failure, retrying 14 Completed 5 Completed 14 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Retrying Failed Tasks Using a Callback Function
A natural place to consider retrying a failed task might be the done callback.
Recall that we can register a callback function on a task’s Future object by calling the add_done_callback() function with the function name. This function will be called when the task is done, successfully or otherwise.
The callback function is provided with a reference to the Future object that was done via an argument; for example:
1 2 3 4 5 6 7 |
# callback function def custom_callback(future) # retry... # register a callback future = executor.submit(task, data) future.add_done_callback(custom_callback) |
This approach has two problems.
The first is that it is generally a bad practice for tasks to have knowledge of other tasks or of the thread pool. It breaks encapsulation; tasks should only know about themselves. Beyond a program design issue, it can lead to deadlocks if a task waits on the result of another task.
The second reason is that it is possible for the thread pool to be shutdown while the callback for the final task is being executed. This can happen because technically, a task is complete once the target function returns, and if the shutdown operation is waiting on all tasks to complete before shutting down, then the callback will potentially be called after the shutdown has occurred.
The problem with all of this is that you cannot submit tasks to the thread pool after it has shut down.
This might sound esoteric, but it will happen pretty much every time, resulting in an exception.
We can demonstrate this with an example.
First, let’s define a callback function that is just the retry() function from the previous example updated to access the dictionary and the thread pool via global variables and to take the future object for the task as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# task done callback that will re-submit failed tasks def auto_retry(future): global executor, futures_to_data # check for a failure if future.exception(): # get the associated data for the task data = futures_to_data[future] # re-submit the task retry = executor.submit(work, data) # add to the map futures_to_data[retry] = data # add the callback retry.add_done_callback(auto_retry) # report the failure print(f'Failure, retrying {data}') else: # report success print(future.result()) |
Note, we are adding the callback to the retried task so that if the task fails again it will get retried automatically.
We can then register this callback function with each Future object after we create it.
1 2 3 4 |
... # register callbacks for future in futures_to_data: future.add_done_callback(auto_retry) |
And that’s it! The complete example is listed below.
Recall, we expect it to not work, and to fail with an exception because we will be trying to submit retries after the thread pool is shutdown.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# SuperFastPython.com # example of automatic retry via callback, that causes an exception from random import random from time import sleep from threading import Barrier from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # task that sleeps for a moment and may fail with an exception def work(identifier): # sleep for a moment sleep(random()) # conditionally fail with a chance of 30 percent if random() < 0.3: raise Exception(f'Something bad happened {identifier}') return f'Completed {identifier}' # task done callback that will re-submit failed tasks def auto_retry(future): global executor, futures_to_data # check for a failure if future.exception(): # get the associated data for the task data = futures_to_data[future] # re-submit the task retry = executor.submit(work, data) # add to the map futures_to_data[retry] = data # add the callback retry.add_done_callback(auto_retry) # report the failure print(f'Failure, retrying {data}') else: # report success print(future.result()) # constant defining the total tasks to complete TASKS = 20 # create a thread pool with ThreadPoolExecutor(TASKS) as executor: # submit ten tasks futures_to_data = {executor.submit(work, i):i for i in range(TASKS)} # register callbacks for future in futures_to_data: future.add_done_callback(auto_retry) # shutdown, will raise an exception for the last task |
Running the example creates the thread pool and submits the tasks as per normal.
Things start fine as tasks are completed, then eventually, as we try to submit a retry from the callback, it fails.
This is because the pool has been shut down and will not take any further tasks, like retries submitted from the callback function.
1 2 3 4 5 6 7 |
... During handling of the above exception, another exception occurred: Traceback (most recent call last): ... raise RuntimeError('cannot schedule new futures after shutdown') RuntimeError: cannot schedule new futures after shutdown |
There are a number of ways to solve this problem.
Central to the solution is the need to not call shutdown() on the thread pool until all tasks have completed successfully.
The simplest way to achieve this is with a counter that records tasks as they are completed in auto_retry() and updating the main thread to check the count and not call shutdown until all tasks are successful.
The callback function will be called by each worker thread in the thread pool and so may be executed concurrently. Therefore, it is a good practice to protect the update of the counter using a mutual exclusion lock. This is to ensure that the count is not left in an inconsistent state via a race condition.
We can achieve this using the threading.Lock class and the context manager for the class.
For example:
1 2 3 4 |
... # update the count of completed tasks with lock: tasks_completed += 1 |
The complete example of automatically retrying failed tasks via a callback function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# SuperFastPython.com # example of automatic retry via callback from random import random from time import sleep from threading import Lock from concurrent.futures import ThreadPoolExecutor # task that sleeps for a moment and may fail with an exception def work(identifier): # sleep for a moment sleep(random()) # conditionally fail with a chance of 30 percent if random() < 0.3: raise Exception(f'Something bad happened {identifier}') return f'Completed {identifier}' # task done callback that will re-submit failed tasks def auto_retry(future): global executor, futures_to_data, tasks_completed # check for a failure if future.exception(): # get the associated data for the task data = futures_to_data[future] # re-submit the task retry = executor.submit(work, data) # add to the map futures_to_data[retry] = data # add the callback retry.add_done_callback(auto_retry) # report the failure print(f'Failure, retrying {data}') else: # report success print(future.result()) # update the count of completed tasks with lock: tasks_completed += 1 # constant defining the total tasks to complete TASKS = 20 # count of tasks that are completed tasks_completed = 0 # lock protecting the count of completed tasks lock = Lock() # create a thread pool with ThreadPoolExecutor(TASKS) as executor: # submit ten tasks futures_to_data = {executor.submit(work, i):i for i in range(TASKS)} # register callbacks for future in futures_to_data: future.add_done_callback(auto_retry) # busy wait for all tasks to be completed successfully while tasks_completed < TASKS: sleep(0.5) |
Running the example creates the thread pool and submits the tasks as per normal.
The callback function is registered on each Future object and then the main thread will loop until the expected number of tasks have completed successfully. This specific loop is called spinning or a busy-wait as the thread will run and do nothing until the condition is met. We have added a call to sleep() to make it less wasteful.
This loop will ensure that the shutdown() function for the thread pool is not called automatically when the context manager block is exited, allowing tasks to be retried from the callback function.
The callback function will then retry tasks as we did before; the important difference is that we now keep track of the number of tasks that complete successfully.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
Completed 9 Failure, retrying 11 Completed 18 Completed 13 Completed 16 Failure, retrying 17 Completed 8 Completed 12 Completed 14 Completed 15 Completed 6 Completed 0 Completed 3 Completed 10 Completed 4 Completed 19 Failure, retrying 2 Failure, retrying 1 Completed 7 Completed 11 Failure, retrying 5 Failure, retrying 2 Failure, retrying 17 Failure, retrying 2 Completed 17 Completed 1 Failure, retrying 5 Completed 5 Completed 2 |
Although this works, it is a little ugly.
Specifically, the spinning in the main thread waiting for all tasks to complete is wasteful.
Ideally, we would use a countdown latch. This is a concurrency data structure that has a counter initialized to a specific number and will cause the main thread to block until the counter is zero. Each task that finishes would call the latch to subtract one from the counter.
The benefit of this change is that the main thread would block until the condition is met (all tasks are completed), instead of using a busy wait loop.
We could simulate this latch using a Barrier, another similar concurrency data structure. We could also update our example to use a conditional variable to achieve the same effect.
This can be achieved using a threading.Condition.
First, the main thread can wait on the condition variable by first acquiring a lock via the context manager, then calling the wait() function.
1 2 3 4 |
... # block, wait for all tasks to be completed successfully with condition: condition.wait() |
Then, the retry function can be updated to check if the number of completed tasks matches the total tasks that were submitted, and if so, acquire the lock on the condition and notify the main thread.
1 2 3 4 5 6 7 |
... # update the count of completed tasks with lock: tasks_completed += 1 if tasks_completed >= TASKS: with condition: condition.notify() |
This achieves the same result but is more efficient as it avoids the main thread spinning and wasting resources.
The complete example of automatically retrying tasks from the callback function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# SuperFastPython.com # example of automatic retry via callback from random import random from time import sleep from threading import Lock from threading import Condition from concurrent.futures import ThreadPoolExecutor # task that sleeps for a moment and may fail with an exception def work(identifier): # sleep for a moment sleep(random()) # conditionally fail with a chance of 30 percent if random() < 0.3: raise Exception(f'Something bad happened {identifier}') return f'Completed {identifier}' # task done callback that will re-submit failed tasks def auto_retry(future): global executor, futures_to_data, tasks_completed # check for a failure if future.exception(): # get the associated data for the task data = futures_to_data[future] # re-submit the task retry = executor.submit(work, data) # add to the map futures_to_data[retry] = data # add the callback retry.add_done_callback(auto_retry) # report the failure print(f'Failure, retrying {data}') else: # report success print(future.result()) # update the count of completed tasks with lock: tasks_completed += 1 if tasks_completed >= TASKS: with condition: condition.notify() # constant defining the total tasks to complete TASKS = 20 # count of tasks that are completed tasks_completed = 0 # lock protecting the count of completed tasks lock = Lock() condition = Condition() # create a thread pool with ThreadPoolExecutor(TASKS) as executor: # submit ten tasks futures_to_data = {executor.submit(work, i):i for i in range(TASKS)} # register callbacks for future in futures_to_data: future.add_done_callback(auto_retry) # block, wait for all tasks to be completed successfully with condition: condition.wait() |
Running the example creates the thread pool and submits the tasks as before.
Tasks fail and are automatically submitted until all tasks are completed successfully, after which, the thread pool is shut down.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
Completed 15 Failure, retrying 16 Failure, retrying 3 Failure, retrying 0 Completed 18 Failure, retrying 10 Failure, retrying 0 Completed 8 Completed 19 Completed 14 Failure, retrying 3 Failure, retrying 4 Completed 7 Completed 16 Failure, retrying 2 Completed 6 Completed 11 Completed 2 Completed 4 Failure, retrying 10 Completed 12 Completed 3 Failure, retrying 5 Completed 9 Completed 17 Completed 1 Completed 13 Completed 0 Failure, retrying 5 Completed 5 Completed 10 |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to retry failed tasks in the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Augusto Lopes on Unsplash
Evgenii says
Hello. Thanks for this great article. I just have one small question.
Do we need to include “condition” in the definition of the global variables (“global executor, futures_to_data, tasks_completed”)?
Jason Brownlee says
The condition is needed in the final example to allow the main thread to know when all tasks have been completed.
Without it, the retry functionality in the callback would not work.
That being said, it does not have to be global, just accessible to the callback function and the thread that owns the executor.
kloseivar says
Thanks for this article. Super helpful.
I have one question. How do we exit this loop if there’s a real issue in task ?
I tried to exit it based on retry count of each item. But, it didn’t work as expected.
i introduced retry_count as a dict variable in the above example. It’s basically keeps track of how many times an item has been retried.
Jason Brownlee says
Thanks, I’m happy to hear that!
Great question.
I would pass around a dict that maps data/id for each task to a dict. The dict can then store all kinds of state like number of retries, success, failure, gave up.
Then, when a task fails/succeeds, you update the state for the task. If it’s failed too many times, then do not schedule it for future retries.