From 0612a7079940698bf688ac2641c543b507db46ec Mon Sep 17 00:00:00 2001 From: Trenton Holmes Date: Thu, 21 Jul 2022 12:53:42 -0700 Subject: [PATCH 1/4] Updates the worker process to handle its own timeout, removing the need for some work in the guard loop. Also removes the double duty of timer as a timer and a status, status is now a dedicated enum --- django_q/cluster.py | 58 ++++++++++++++++++---------- django_q/tasks.py | 3 +- django_q/tests/test_cluster.py | 16 ++++---- django_q/tests/test_scheduler.py | 4 +- django_q/timeouts.py | 66 ++++++++++++++++++++++++++++++++ 5 files changed, 116 insertions(+), 31 deletions(-) create mode 100644 django_q/timeouts.py diff --git a/django_q/cluster.py b/django_q/cluster.py index 0fd6b429..247c7d55 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -1,5 +1,6 @@ # Standard import ast +import enum import inspect import pydoc import signal @@ -46,6 +47,15 @@ from django_q.signals import post_execute, pre_execute from django_q.signing import BadSignature, SignedPackage from django_q.status import Stat, Status +from django_q.timeouts import JobTimeoutException, UnixSignalDeathPenalty + + +class WorkerStatus(enum.IntEnum): + IDLE = 1 + BUSY = 2 + RECYCLE = 3 + TIMEOUT = 4 + STARTING = 5 class Cluster: @@ -192,7 +202,7 @@ def spawn_process(self, target, *args) -> Process: p.daemon = True if target == worker: p.daemon = Conf.DAEMONIZE_WORKERS - p.timer = args[2] + p.status = args[2] self.pool.append(p) p.start() return p @@ -202,7 +212,7 @@ def spawn_pusher(self) -> Process: def spawn_worker(self): self.spawn_process( - worker, self.task_queue, self.result_queue, Value("f", -1), self.timeout + worker, self.task_queue, self.result_queue, Value('I', WorkerStatus.IDLE.value), self.timeout ) def spawn_monitor(self) -> Process: @@ -225,11 +235,11 @@ def reincarnate(self, process): else: self.pool.remove(process) self.spawn_worker() - if process.timer.value == 0: + if process.status.value == WorkerStatus.TIMEOUT.value: # only need to terminate on timeout, otherwise we risk destabilizing the queues process.terminate() logger.warning(_(f"reincarnated worker {process.name} after timeout")) - elif int(process.timer.value) == -2: + elif process.status.value == WorkerStatus.RECYCLE.value: logger.info(_(f"recycled worker {process.name}")) else: logger.error(_(f"reincarnated worker {process.name} after death")) @@ -267,14 +277,11 @@ def guard(self): while not self.stop_event.is_set() or not counter: # Check Workers for p in self.pool: - with p.timer.get_lock(): + with p.status.get_lock(): # Are you alive? - if not p.is_alive() or p.timer.value == 0: + if not p.is_alive() or p.status.value in {WorkerStatus.TIMEOUT.value, WorkerStatus.RECYCLE.value}: self.reincarnate(p) continue - # Decrement timer if work is being done - if p.timer.value > 0: - p.timer.value -= cycle # Check Monitor if not self.monitor.is_alive(): self.reincarnate(self.monitor) @@ -401,24 +408,27 @@ def monitor(result_queue: Queue, broker: Broker = None): def worker( - task_queue: Queue, result_queue: Queue, timer: Value, timeout: int = Conf.TIMEOUT + task_queue: Queue, result_queue: Queue, status: Value, timeout: int = Conf.TIMEOUT ): """ Takes a task from the task queue, tries to execute it and puts the result back in the result queue :param timeout: number of seconds wait for a worker to finish. :type task_queue: multiprocessing.Queue :type result_queue: multiprocessing.Queue - :type timer: multiprocessing.Value + :type timer: multiprocessing.Value wrapping an unsigned int """ name = current_process().name logger.info(_(f"{name} ready for work at {current_process().pid}")) task_count = 0 if timeout is None: - timeout = -1 + # If signal.alarm timeout is 0, no alarm will be scheduled + # by signal.alarm + timeout = 0 # Start reading the task queue for task in iter(task_queue.get, "STOP"): result = None - timer.value = -1 # Idle + # Got a task package, but have not yet called the work + status.value = WorkerStatus.STARTING.value task_count += 1 # Get the function from the task logger.info(_(f'{name} processing [{task["name"]}]')) @@ -427,30 +437,38 @@ def worker( if not callable(task["func"]): f = pydoc.locate(f) close_old_django_connections() - timer_value = task.pop("timeout", timeout) + timeout = task.pop("timeout", timeout) # signal execution pre_execute.send(sender="django_q", func=f, task=task) # execute the payload - timer.value = timer_value # Busy + status.value = WorkerStatus.BUSY.value try: - res = f(*task["args"], **task["kwargs"]) - result = (res, True) + with UnixSignalDeathPenalty(timeout=timeout): + res = f(*task["args"], **task["kwargs"]) + result = (res, True) except Exception as e: + if isinstance(e, JobTimeoutException): + status.value = WorkerStatus.TIMEOUT.value result = (f"{e} : {traceback.format_exc()}", False) if error_reporter: error_reporter.report() if task.get("sync", False): raise - with timer.get_lock(): + with status.get_lock(): # Process result task["result"] = result[0] task["success"] = result[1] task["stopped"] = timezone.now() result_queue.put(task) - timer.value = -1 # Idle + # If the worker didn't timeout, go back to idle + # Otherwise, break out of the loop + if status.value != WorkerStatus.TIMEOUT.value: + status.value = WorkerStatus.IDLE.value + else: + break # Recycle if task_count == Conf.RECYCLE or rss_check(): - timer.value = -2 # Recycled + status.value = WorkerStatus.RECYCLE.value break logger.info(_(f"{name} stopped doing work")) diff --git a/django_q/tasks.py b/django_q/tasks.py index a6694e16..77d38eaf 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -15,6 +15,7 @@ from django_q.queues import Queue from django_q.signals import pre_enqueue from django_q.signing import SignedPackage +from django_q.cluster import WorkerStatus def async_task(func, *args, **kwargs): @@ -762,7 +763,7 @@ def _sync(pack): task = SignedPackage.loads(pack) task_queue.put(task) task_queue.put("STOP") - worker(task_queue, result_queue, Value("f", -1)) + worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value)) result_queue.put("STOP") monitor(result_queue) task_queue.close() diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 1656cdfb..99819a56 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -14,7 +14,7 @@ sys.path.insert(0, myPath + "/../") from django_q.brokers import Broker, get_broker -from django_q.cluster import Cluster, Sentinel, monitor, pusher, save_task, worker +from django_q.cluster import Cluster, Sentinel, monitor, pusher, save_task, worker, WorkerStatus from django_q.conf import Conf from django_q.humanhash import DEFAULT_WORDLIST, uuid from django_q.models import Success, Task @@ -124,7 +124,7 @@ def test_cluster(broker): assert queue_size(broker=broker) == 0 # Test work task_queue.put("STOP") - worker(task_queue, result_queue, Value("f", -1)) + worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value)) assert task_queue.qsize() == 0 assert result_queue.qsize() == 1 # Test monitor @@ -227,7 +227,7 @@ def test_enqueue(broker, admin_user): assert fetch_group("test_j", count=2, wait=10) is None # let a worker handle them result_queue = Queue() - worker(task_queue, result_queue, Value("f", -1)) + worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value)) assert result_queue.qsize() == task_count result_queue.put("STOP") # store the results @@ -437,7 +437,7 @@ def test_recycle(broker, monkeypatch): pusher(task_queue, stop_event, broker=broker) pusher(task_queue, stop_event, broker=broker) # worker should exit on recycle - worker(task_queue, result_queue, Value("f", -1)) + worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value)) # check if the work has been done assert result_queue.qsize() == 2 # save_limit test @@ -472,7 +472,7 @@ def test_max_rss(broker, monkeypatch): # push the task pusher(task_queue, stop_event, broker=broker) # worker should exit on recycle - worker(task_queue, result_queue, Value("f", -1)) + worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value)) # check if the work has been done assert result_queue.qsize() == 1 # save_limit test @@ -508,7 +508,7 @@ def test_bad_secret(broker, monkeypatch): worker( task_queue, result_queue, - Value("f", -1), + Value("I", WorkerStatus.IDLE.value), ) assert result_queue.qsize() == 0 broker.delete_queue() @@ -693,7 +693,7 @@ def handler(sender, task, func, **kwargs): event.set() pusher(task_queue, event, broker=broker) task_queue.put("STOP") - worker(task_queue, result_queue, Value("f", -1)) + worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value)) result_queue.put("STOP") monitor(result_queue, broker) broker.delete_queue() @@ -722,7 +722,7 @@ def handler(sender, task, **kwargs): event.set() pusher(task_queue, event, broker=broker) task_queue.put("STOP") - worker(task_queue, result_queue, Value("f", -1)) + worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value)) result_queue.put("STOP") monitor(result_queue, broker) broker.delete_queue() diff --git a/django_q/tests/test_scheduler.py b/django_q/tests/test_scheduler.py index 8be35ead..bcdea534 100644 --- a/django_q/tests/test_scheduler.py +++ b/django_q/tests/test_scheduler.py @@ -12,7 +12,7 @@ from django.utils.timezone import is_naive from django_q.brokers import Broker, get_broker -from django_q.cluster import monitor, pusher, scheduler, worker, localtime +from django_q.cluster import WorkerStatus, localtime, monitor, pusher, scheduler, worker from django_q.conf import Conf from django_q.queues import Queue from django_q.tasks import Schedule, fetch @@ -118,7 +118,7 @@ def test_scheduler(broker, monkeypatch): task_queue.put("STOP") # let a worker handle them result_queue = Queue() - worker(task_queue, result_queue, Value("b", -1)) + worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value)) assert result_queue.qsize() == 1 result_queue.put("STOP") # store the results diff --git a/django_q/timeouts.py b/django_q/timeouts.py new file mode 100644 index 00000000..48f0c560 --- /dev/null +++ b/django_q/timeouts.py @@ -0,0 +1,66 @@ +""" +Using signal, implements a alarm based callback after a certain amount of time. +Borrowed from rq: https://github.com/rq/rq/blob/master/rq/timeouts.py +""" +import signal + + +class JobTimeoutException(Exception): + """Raised when a job takes longer to complete than the allowed maximum + timeout value. + """ + pass + + +class BaseDeathPenalty: + """Base class to setup job timeouts.""" + + def __init__(self, timeout, exception=JobTimeoutException, **kwargs): + self._timeout = timeout + self._exception = exception + + def __enter__(self): + self.setup_death_penalty() + + def __exit__(self, type, value, traceback): + # Always cancel immediately, since we're done + try: + self.cancel_death_penalty() + except JobTimeoutException: + # Weird case: we're done with the with body, but now the alarm is + # fired. We may safely ignore this situation and consider the + # body done. + pass + + # __exit__ may return True to supress further exception handling. We + # don't want to suppress any exceptions here, since all errors should + # just pass through, BaseTimeoutException being handled normally to the + # invoking context. + return False + + def setup_death_penalty(self): + raise NotImplementedError() + + def cancel_death_penalty(self): + raise NotImplementedError() + + +class UnixSignalDeathPenalty(BaseDeathPenalty): + + def handle_death_penalty(self, signum, frame): + raise self._exception('Task exceeded maximum timeout value ' + '({0} seconds)'.format(self._timeout)) + + def setup_death_penalty(self): + """Sets up an alarm signal and a signal handler that raises + an exception after the timeout amount (expressed in seconds). + """ + signal.signal(signal.SIGALRM, self.handle_death_penalty) + signal.alarm(self._timeout) + + def cancel_death_penalty(self): + """Removes the death penalty alarm and puts back the system into + default signal handling. + """ + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) From b58fc7dfbd3ec96cacb634b8c3bd93dcff236d1a Mon Sep 17 00:00:00 2001 From: Trenton Holmes Date: Tue, 2 Aug 2022 15:32:00 -0700 Subject: [PATCH 2/4] Ensure the results of the task are provided to the monitor queue, even in the case of exception --- django_q/cluster.py | 51 +++++++++++++++++++++++++------------------- django_q/timeouts.py | 4 ++++ 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index 247c7d55..f3ee2adf 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -420,13 +420,10 @@ def worker( name = current_process().name logger.info(_(f"{name} ready for work at {current_process().pid}")) task_count = 0 - if timeout is None: - # If signal.alarm timeout is 0, no alarm will be scheduled - # by signal.alarm - timeout = 0 # Start reading the task queue for task in iter(task_queue.get, "STOP"): result = None + timed_out = False # Got a task package, but have not yet called the work status.value = WorkerStatus.STARTING.value task_count += 1 @@ -447,29 +444,39 @@ def worker( res = f(*task["args"], **task["kwargs"]) result = (res, True) except Exception as e: - if isinstance(e, JobTimeoutException): - status.value = WorkerStatus.TIMEOUT.value + # Minor QoL, parse the exception chain as far as + # possible to check if this was a timeout or just an error + exception_chain = [e] + next_exception = e.__cause__ + while next_exception is not None: + exception_chain.append(next_exception) + next_exception = next_exception.__cause__ + if any(isinstance(x, JobTimeoutException) for x in exception_chain): + timed_out = True result = (f"{e} : {traceback.format_exc()}", False) if error_reporter: error_reporter.report() if task.get("sync", False): raise - with status.get_lock(): - # Process result - task["result"] = result[0] - task["success"] = result[1] - task["stopped"] = timezone.now() - result_queue.put(task) - # If the worker didn't timeout, go back to idle - # Otherwise, break out of the loop - if status.value != WorkerStatus.TIMEOUT.value: - status.value = WorkerStatus.IDLE.value - else: - break - # Recycle - if task_count == Conf.RECYCLE or rss_check(): - status.value = WorkerStatus.RECYCLE.value - break + finally: + with status.get_lock(): + # Process result + if result is None: + result = (None, False) + task["result"] = result[0] + task["success"] = result[1] + task["stopped"] = timezone.now() + result_queue.put(task) + + if timed_out: + status.value = WorkerStatus.TIMEOUT.value + break + elif task_count == Conf.RECYCLE or rss_check(): + status.value = WorkerStatus.RECYCLE.value + break + else: + status.value = WorkerStatus.IDLE.value + logger.info(_(f"{name} stopped doing work")) diff --git a/django_q/timeouts.py b/django_q/timeouts.py index 48f0c560..7d4ce378 100644 --- a/django_q/timeouts.py +++ b/django_q/timeouts.py @@ -16,6 +16,10 @@ class BaseDeathPenalty: """Base class to setup job timeouts.""" def __init__(self, timeout, exception=JobTimeoutException, **kwargs): + # If signal.alarm timeout is 0, no alarm will be scheduled + # by signal.alarm + if timeout is None: + timeout = 0 self._timeout = timeout self._exception = exception From 73c3ccb2ebc3e615e29032f4915b82a2dfa211a4 Mon Sep 17 00:00:00 2001 From: Trenton Holmes Date: Wed, 31 Aug 2022 14:28:24 -0700 Subject: [PATCH 3/4] Fixes circular import --- django_q/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_q/tasks.py b/django_q/tasks.py index 77d38eaf..a2dae7d3 100644 --- a/django_q/tasks.py +++ b/django_q/tasks.py @@ -15,7 +15,6 @@ from django_q.queues import Queue from django_q.signals import pre_enqueue from django_q.signing import SignedPackage -from django_q.cluster import WorkerStatus def async_task(func, *args, **kwargs): @@ -757,6 +756,7 @@ def fetch_group(self, failures=True, wait=0, count=None): def _sync(pack): """Simulate a package travelling through the cluster.""" from django_q.cluster import monitor, worker + from django_q.cluster import WorkerStatus task_queue = Queue() result_queue = Queue() From cf4f24752af71788d0393452b2f5bb1ddc5bc7d0 Mon Sep 17 00:00:00 2001 From: Trenton Holmes Date: Wed, 31 Aug 2022 16:03:33 -0700 Subject: [PATCH 4/4] Inherit the timeout from a lower leverl exception, so user code is less likely to catch it --- django_q/cluster.py | 13 +++---------- django_q/timeouts.py | 5 +++-- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index f3ee2adf..b85726ec 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -443,17 +443,10 @@ def worker( with UnixSignalDeathPenalty(timeout=timeout): res = f(*task["args"], **task["kwargs"]) result = (res, True) - except Exception as e: - # Minor QoL, parse the exception chain as far as - # possible to check if this was a timeout or just an error - exception_chain = [e] - next_exception = e.__cause__ - while next_exception is not None: - exception_chain.append(next_exception) - next_exception = next_exception.__cause__ - if any(isinstance(x, JobTimeoutException) for x in exception_chain): - timed_out = True + except (JobTimeoutException, Exception) as e: result = (f"{e} : {traceback.format_exc()}", False) + if isinstance(e, JobTimeoutException): + timed_out = True if error_reporter: error_reporter.report() if task.get("sync", False): diff --git a/django_q/timeouts.py b/django_q/timeouts.py index 7d4ce378..77251183 100644 --- a/django_q/timeouts.py +++ b/django_q/timeouts.py @@ -5,9 +5,10 @@ import signal -class JobTimeoutException(Exception): +class JobTimeoutException(SystemExit): """Raised when a job takes longer to complete than the allowed maximum - timeout value. + timeout value. Inherits from SystemExit to prevent user code which catches + Exception from not timing out correctly. """ pass