Failure to provide a mechanism for reporting that tasks in a thread pool failed as a result of an exceptional condition can make it difficult or impossible to diagnose the problem.
Note
Prerequisite to understand this page: Intro to multiprocessing and multithreading
ThreadPoolExecutor from the concurrent.futures
module is used in Python to asynchronously execute callable tasks [Python Docs - concurrent.futures]. The ThreadPoolExecutor
suppresses exceptions raised by tasks to ensure that the failure of a task does not result in the failure of a worker thread in the thread pool. Otherwise, worker threads would close whenever an exception is raised, negatively impacting the performance of the ThreadPoolExecutor
.
One of the ways of running a task with ThreadPoolExecutor
is using a submit()
function. The function returns a Future object, which represents the eventual result of an asynchronous operation. Without calling the result()
function on the Future object, any exceptions raised during the task will not appear. In the below example, the function get_sqrt()
will raise a ValueError
when trying to calculate a square root of a negative number [Python Docs - Errors and Exceptions]. However, no Exception will be raised during the code execution.
""" Non-compliant Code Example """
import math
from concurrent.futures import ThreadPoolExecutor
def get_sqrt(a):
return math.sqrt(a)
def run_thread(var):
with ThreadPoolExecutor() as executor:
return executor.submit(get_sqrt, var)
#####################
# exploiting above code example
#####################
# get_sqrt will raise ValueError that will be suppressed by the ThreadPoolExecutor
arg = -1
result = run_thread(arg) # The outcome of the task is unknown
In order to determine whether an exception occurred, we can call the exception()
method of the Future object. It returns the exception that was raised during the task execution or None if no exception was raised. It is important to note that exception()
does not raise the exception, thus it cannot be caught in a try … except
statement.
""" Compliant Code Example """
import math
from concurrent.futures import ThreadPoolExecutor
def get_sqrt(a):
return math.sqrt(a)
def run_thread(var):
with ThreadPoolExecutor() as executor:
future = executor.submit(get_sqrt, var)
if future.exception() is not None:
# handle exception...
raise ValueError(f"Invalid argument: {var}")
return future
#####################
# exploiting above code example
#####################
arg = -1
result = run_thread(arg) # Now code execution will be interrupted by ValueError
The exception that was suppressed by the ThreadPoolExecutor will also be raised upon calling the result()
method on the Future object. Since the exception is raised, it can be handled using the try … except
statement.
""" Compliant Code Example """
import math
from concurrent.futures import ThreadPoolExecutor
def get_sqrt(a):
return math.sqrt(a)
def run_thread(var):
with ThreadPoolExecutor() as executor:
future = executor.submit(get_sqrt, var)
try:
res = future.result()
return res
except ValueError as e:
# handle exception...
raise ValueError(f"Invalid argument: {var}") from None
#####################
# exploiting above code example
#####################
arg = -1
result = run_thread(arg) # Now code execution will be interrupted by ValueError
A similar example of using the result()
and exception()
methods to handle exceptions in the Future objects can be found at [Ross 2020].
Alternatively, the asynchronous tasks can be run by the ThreadPoolExecutor using the map()
function, which takes the function that should be run and an iterable collection of its parameters. map()
creates a separate thread for each set of parameters in the given collection. Instead of a Future object, it returns all the task results in an iterator. The iterator maintains the order of submitted tasks - the first task that raises an exception will cause the iterator to be aborted and further results will not be generated.
""" Non-compliant Code Example """
import math
from concurrent.futures import ThreadPoolExecutor
def get_sqrt(a):
return math.sqrt(a)
def map_threads(x):
with ThreadPoolExecutor() as executor:
return executor.map(get_sqrt, x)
#####################
# exploiting above code example
#####################
# get_sqrt will raise ValueError that will be suppressed by the ThreadPoolExecutor
args = [2, -1, 4]
results = map_threads(args)
for result in results:
print(result) # Unhandled ValueError will be raised before all results are read
Since an exception is raised during iterating over the results, we can handle the exception using the try … except
statement.
""" Compliant Code Example """
import math
from concurrent.futures import ThreadPoolExecutor
def get_sqrt(a):
return math.sqrt(a)
def map_threads(x):
with ThreadPoolExecutor() as executor:
result_gen = executor.map(get_sqrt, x)
ret = list()
invalid_arg = 0
try:
for res in result_gen:
ret.append(res)
invalid_arg += 1
return res
except ValueError as e:
# handle exception...
raise ValueError(
f"Invalid argument: {x[invalid_arg]} at list index {invalid_arg}"
) from None
#####################
# exploiting above code example
#####################
args = [2, -1, 4]
results = map_threads(args)
for result in results:
print(
result
) # The exception is handled, but the rest of the results are still unavailable
Even after handling the exception, the results of tasks submitted after the erroneous task are still not available. If we were to iterate over the results manually using the next()
method, we could see that after the task exception, a StopIterator is also raised. This terminates the result generator's useful life, which means further results will not be generated. This mechanism was described in [PEP 0255].
If we want to make sure all task results will be available, we can either avoid using map()
by using submit()
instead, or we can ensure no unhandled exception will be raised within the executed task.
""" Compliant Code Example """
import math
from concurrent.futures import ThreadPoolExecutor
def get_sqrt(a):
try:
return math.sqrt(a)
except ValueError as e:
print(f"Invalid argument: {a}")
return None
def map_threads(x):
with ThreadPoolExecutor() as executor:
return executor.map(get_sqrt, x)
#####################
# exploiting above code example
#####################
args = [2, -1, 4]
results = map_threads(args)
for result in results:
print(result) # Now no exception is raised and we can read all of the results
More examples of handling exceptions within and outside the concurrent tasks can be found at [Brownlee 2022].
unknown
[Python Docs - concurrent.futures] | Python 3.10.4 documentation - concurrent.futures — Launching parallel tasks [online]. Available from: https://docs.python.org/3/library/concurrent.futures.html [accessed 9 May 2024] |
[Python Docs - Errors and Exceptions] | Python 3.10.4 documentation - 8. Errors and Exceptions [online]. Available from: https://docs.python.org/3/tutorial/errors.html [accessed 9 May 2024] |
[Ross 2020] | Ross, E. (2020). Raising Exceptions in Python Futures [online]. Available from: https://skeptric.com/python-futures-exception/index.html [accessed 21 December 2023]. |
[Brownlee 2022] | Brownlee, J. (2022). How to Handle Exceptions With the ThreadPoolExecutor in Python [online]. Available from: https://superfastpython.com/threadpoolexecutor-exception-handling/ [accessed 21 December 2023]. |
[PEP 0255] | PEP 255 - Simple Generators Specification: Generators and Exception Propagation [online]. Available from: https://peps.python.org/pep-0255/#specification-generators-and-exception-propagation [accessed 9 May 2024] |