diff --git a/src/handlers/health_check_handler.py b/src/handlers/health_check_handler.py index c4423038..bba25f41 100644 --- a/src/handlers/health_check_handler.py +++ b/src/handlers/health_check_handler.py @@ -1,5 +1,6 @@ import logging from pathlib import Path +from time import sleep from django.conf import settings @@ -10,3 +11,4 @@ def trigger_api_restart_callback(sender, **kwargs): logger.warning("triggering API container restart") if settings.IN_DOCKER: Path(settings.SDR_HEALTHCHECK_FILE).touch() + sleep(60) # sleep to prevent running next task until restart completed diff --git a/src/scheduler/scheduler.py b/src/scheduler/scheduler.py index 4f410779..415f965a 100644 --- a/src/scheduler/scheduler.py +++ b/src/scheduler/scheduler.py @@ -94,6 +94,7 @@ def run(self, blocking=True): try: self.calibrate_if_needed() + self.reset_next_task_id() while True: with minimum_duration(blocking): self._consume_schedule(blocking) @@ -155,7 +156,16 @@ def _initialize_task_result(self) -> TaskResult: """Initalize an 'in-progress' result so it exists when action runs.""" tid = self.task.task_id task_result = TaskResult(schedule_entry=self.entry, task_id=tid) + logger.debug(f"Creating task result with task id = {tid}") task_result.save() + if tid > 1: + last_task_id_exists = TaskResult.objects.filter(task_id=tid-1).exists() + if not last_task_id_exists: + logger.warning(f"TaskResult for previous task id ({tid-1}) does not exist. Current task_id = {tid}") + # commented out code useful for debugging if this warning occurs + # self.entry.is_active = False + # self.entry.save() + # raise Exception(f"Task ID Mismatch! last task id = {tid-1} current task id = {tid}") return task_result def _call_task_action(self): @@ -241,6 +251,7 @@ def _finalize_task_result(self, task_result, started, finished, status, detail): task_result.save() with self.task_status_lock: + self.last_status = status if status == "failure" and self.last_status == "failure": self.consecutive_failures = self.consecutive_failures + 1 elif status == "failure": @@ -249,8 +260,10 @@ def _finalize_task_result(self, task_result, started, finished, status, detail): self.consecutive_failures = 0 if self.consecutive_failures >= settings.MAX_FAILURES: trigger_api_restart.send(sender=self.__class__) - - self.last_status = status + # prevent more tasks from being run + # restart can cause missing db task result ids + # if tasks continue to run waiting for restart + thread.stop() @staticmethod def _callback_response_handler(resp, task_result): @@ -373,6 +386,17 @@ def calibrate_if_needed(self): "Skipping startup calibration since sensor_calibration exists and has not expired." ) + def reset_next_task_id(self): + # reset next task id + for entry in self.schedule: + count = TaskResult.objects.filter(schedule_entry=entry).count() + if count > 0: + last_task_id = TaskResult.objects.filter(schedule_entry=entry).order_by("task_id")[count-1].task_id + if entry.next_task_id != last_task_id + 1: + logger.info(f"Changing next_task_id from {entry.next_task_id} to {last_task_id + 1}") + entry.next_task_id = last_task_id + 1 + entry.save(update_fields=("next_task_id",)) + @contextmanager def minimum_duration(blocking):