Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Ensure the results of the task are provided to the monitor queue, eve…
Browse files Browse the repository at this point in the history
…n in the case of exception
  • Loading branch information
stumpylog committed Aug 11, 2022
1 parent 0612a70 commit b58fc7d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 22 deletions.
51 changes: 29 additions & 22 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,10 @@ def worker(
name = current_process().name
logger.info(_(f"{name} ready for work at {current_process().pid}"))
task_count = 0
if timeout is None:
# If signal.alarm timeout is 0, no alarm will be scheduled
# by signal.alarm
timeout = 0
# Start reading the task queue
for task in iter(task_queue.get, "STOP"):
result = None
timed_out = False
# Got a task package, but have not yet called the work
status.value = WorkerStatus.STARTING.value
task_count += 1
Expand All @@ -447,29 +444,39 @@ def worker(
res = f(*task["args"], **task["kwargs"])
result = (res, True)
except Exception as e:
if isinstance(e, JobTimeoutException):
status.value = WorkerStatus.TIMEOUT.value
# Minor QoL, parse the exception chain as far as
# possible to check if this was a timeout or just an error
exception_chain = [e]
next_exception = e.__cause__
while next_exception is not None:
exception_chain.append(next_exception)
next_exception = next_exception.__cause__
if any(isinstance(x, JobTimeoutException) for x in exception_chain):
timed_out = True
result = (f"{e} : {traceback.format_exc()}", False)
if error_reporter:
error_reporter.report()
if task.get("sync", False):
raise
with status.get_lock():
# Process result
task["result"] = result[0]
task["success"] = result[1]
task["stopped"] = timezone.now()
result_queue.put(task)
# If the worker didn't timeout, go back to idle
# Otherwise, break out of the loop
if status.value != WorkerStatus.TIMEOUT.value:
status.value = WorkerStatus.IDLE.value
else:
break
# Recycle
if task_count == Conf.RECYCLE or rss_check():
status.value = WorkerStatus.RECYCLE.value
break
finally:
with status.get_lock():
# Process result
if result is None:
result = (None, False)
task["result"] = result[0]
task["success"] = result[1]
task["stopped"] = timezone.now()
result_queue.put(task)

if timed_out:
status.value = WorkerStatus.TIMEOUT.value
break
elif task_count == Conf.RECYCLE or rss_check():
status.value = WorkerStatus.RECYCLE.value
break
else:
status.value = WorkerStatus.IDLE.value

logger.info(_(f"{name} stopped doing work"))


Expand Down
4 changes: 4 additions & 0 deletions django_q/timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ class BaseDeathPenalty:
"""Base class to setup job timeouts."""

def __init__(self, timeout, exception=JobTimeoutException, **kwargs):
# If signal.alarm timeout is 0, no alarm will be scheduled
# by signal.alarm
if timeout is None:
timeout = 0
self._timeout = timeout
self._exception = exception

Expand Down

0 comments on commit b58fc7d

Please sign in to comment.