Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TaskResult IDs skipped in Task Results #303

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/handlers/health_check_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from pathlib import Path
from time import sleep

from django.conf import settings

Expand All @@ -10,3 +11,4 @@ def trigger_api_restart_callback(sender, **kwargs):
logger.warning("triggering API container restart")
if settings.IN_DOCKER:
Path(settings.SDR_HEALTHCHECK_FILE).touch()
sleep(60) # sleep to prevent running next task until restart completed
28 changes: 26 additions & 2 deletions src/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def run(self, blocking=True):

try:
self.calibrate_if_needed()
self.reset_next_task_id()
while True:
with minimum_duration(blocking):
self._consume_schedule(blocking)
Expand Down Expand Up @@ -155,7 +156,16 @@ def _initialize_task_result(self) -> TaskResult:
"""Initalize an 'in-progress' result so it exists when action runs."""
tid = self.task.task_id
task_result = TaskResult(schedule_entry=self.entry, task_id=tid)
logger.debug(f"Creating task result with task id = {tid}")
task_result.save()
if tid > 1:
last_task_id_exists = TaskResult.objects.filter(task_id=tid-1).exists()
if not last_task_id_exists:
logger.warning(f"TaskResult for previous task id ({tid-1}) does not exist. Current task_id = {tid}")
# commented out code useful for debugging if this warning occurs
# self.entry.is_active = False
# self.entry.save()
# raise Exception(f"Task ID Mismatch! last task id = {tid-1} current task id = {tid}")
return task_result

def _call_task_action(self):
Expand Down Expand Up @@ -241,6 +251,7 @@ def _finalize_task_result(self, task_result, started, finished, status, detail):
task_result.save()

with self.task_status_lock:
self.last_status = status
if status == "failure" and self.last_status == "failure":
self.consecutive_failures = self.consecutive_failures + 1
elif status == "failure":
Expand All @@ -249,8 +260,10 @@ def _finalize_task_result(self, task_result, started, finished, status, detail):
self.consecutive_failures = 0
if self.consecutive_failures >= settings.MAX_FAILURES:
trigger_api_restart.send(sender=self.__class__)

self.last_status = status
# prevent more tasks from being run
# restart can cause missing db task result ids
# if tasks continue to run waiting for restart
thread.stop()

@staticmethod
def _callback_response_handler(resp, task_result):
Expand Down Expand Up @@ -373,6 +386,17 @@ def calibrate_if_needed(self):
"Skipping startup calibration since sensor_calibration exists and has not expired."
)

def reset_next_task_id(self):
# reset next task id
for entry in self.schedule:
count = TaskResult.objects.filter(schedule_entry=entry).count()
if count > 0:
last_task_id = TaskResult.objects.filter(schedule_entry=entry).order_by("task_id")[count-1].task_id
if entry.next_task_id != last_task_id + 1:
logger.info(f"Changing next_task_id from {entry.next_task_id} to {last_task_id + 1}")
entry.next_task_id = last_task_id + 1
entry.save(update_fields=("next_task_id",))


@contextmanager
def minimum_duration(blocking):
Expand Down