From b58fc7dfbd3ec96cacb634b8c3bd93dcff236d1a Mon Sep 17 00:00:00 2001 From: Trenton Holmes Date: Tue, 2 Aug 2022 15:32:00 -0700 Subject: [PATCH] Ensure the results of the task are provided to the monitor queue, even in the case of exception --- django_q/cluster.py | 51 +++++++++++++++++++++++++------------------- django_q/timeouts.py | 4 ++++ 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index 247c7d55..f3ee2adf 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -420,13 +420,10 @@ def worker( name = current_process().name logger.info(_(f"{name} ready for work at {current_process().pid}")) task_count = 0 - if timeout is None: - # If signal.alarm timeout is 0, no alarm will be scheduled - # by signal.alarm - timeout = 0 # Start reading the task queue for task in iter(task_queue.get, "STOP"): result = None + timed_out = False # Got a task package, but have not yet called the work status.value = WorkerStatus.STARTING.value task_count += 1 @@ -447,29 +444,39 @@ def worker( res = f(*task["args"], **task["kwargs"]) result = (res, True) except Exception as e: - if isinstance(e, JobTimeoutException): - status.value = WorkerStatus.TIMEOUT.value + # Minor QoL, parse the exception chain as far as + # possible to check if this was a timeout or just an error + exception_chain = [e] + next_exception = e.__cause__ + while next_exception is not None: + exception_chain.append(next_exception) + next_exception = next_exception.__cause__ + if any(isinstance(x, JobTimeoutException) for x in exception_chain): + timed_out = True result = (f"{e} : {traceback.format_exc()}", False) if error_reporter: error_reporter.report() if task.get("sync", False): raise - with status.get_lock(): - # Process result - task["result"] = result[0] - task["success"] = result[1] - task["stopped"] = timezone.now() - result_queue.put(task) - # If the worker didn't timeout, go back to idle - # Otherwise, break out of the loop - if status.value != WorkerStatus.TIMEOUT.value: - status.value = WorkerStatus.IDLE.value - else: - break - # Recycle - if task_count == Conf.RECYCLE or rss_check(): - status.value = WorkerStatus.RECYCLE.value - break + finally: + with status.get_lock(): + # Process result + if result is None: + result = (None, False) + task["result"] = result[0] + task["success"] = result[1] + task["stopped"] = timezone.now() + result_queue.put(task) + + if timed_out: + status.value = WorkerStatus.TIMEOUT.value + break + elif task_count == Conf.RECYCLE or rss_check(): + status.value = WorkerStatus.RECYCLE.value + break + else: + status.value = WorkerStatus.IDLE.value + logger.info(_(f"{name} stopped doing work")) diff --git a/django_q/timeouts.py b/django_q/timeouts.py index 48f0c560..7d4ce378 100644 --- a/django_q/timeouts.py +++ b/django_q/timeouts.py @@ -16,6 +16,10 @@ class BaseDeathPenalty: """Base class to setup job timeouts.""" def __init__(self, timeout, exception=JobTimeoutException, **kwargs): + # If signal.alarm timeout is 0, no alarm will be scheduled + # by signal.alarm + if timeout is None: + timeout = 0 self._timeout = timeout self._exception = exception