Skip to content

Commit

Permalink
Catching create job failures, updates to job definition
Browse files Browse the repository at this point in the history
  • Loading branch information
3coins committed Nov 2, 2022
1 parent 97466b2 commit dbe2808
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions jupyter_scheduler/task_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from dataclasses import dataclass
from datetime import datetime
from heapq import heappop, heappush
from typing import List, Optional

Expand Down Expand Up @@ -56,6 +57,10 @@ class JobDefinitionTask:
def __lt__(self, other):
return self.next_run_time < other.next_run_time

def __str__(self):
next_run_time = datetime.fromtimestamp(self.next_run_time / 1e3)
return f"Id: {self.job_definition_id}, Run-time: {next_run_time}"


class PriorityQueue:
"""A priority queue using heapq"""
Expand All @@ -82,6 +87,13 @@ def __len__(self):
def isempty(self):
return len(self._heap) < 1

def __str__(self):
tasks = []
for task in self._heap:
tasks.append(str(task))

return "\n".join(tasks)


class Cache:
def __init__(self) -> None:
Expand Down Expand Up @@ -251,10 +263,16 @@ def update_job_definition(self, job_definition_id: str, model: UpdateJobDefiniti
),
)

if cached_next_run_time != next_run_time and active:
self.queue.push(
JobDefinitionTask(job_definition_id=job_definition_id, next_run_time=next_run_time)
next_run_time_changed = cached_next_run_time != next_run_time and active
resumed_job = model.active and not cache.active

if next_run_time_changed or resumed_job:
self.log.debug("Updating queue...")
task = JobDefinitionTask(
job_definition_id=job_definition_id, next_run_time=next_run_time
)
self.queue.push(task)
self.log.debug(f"Updated queue, {task}")

def delete_job_definition(self, job_definition_id: str):
self.cache.delete(job_definition_id)
Expand All @@ -266,7 +284,7 @@ def create_job(self, job_definition_id: str):
self.scheduler.create_job(
CreateJob(
**definition.dict(exclude={"schedule", "timezone"}, exclude_none=True),
input_uri=input_uri
input_uri=input_uri,
)
)

Expand All @@ -275,6 +293,7 @@ def compute_time_diff(self, queue_run_time: int, timezone: str):
return local_time - queue_run_time

def process_queue(self):
self.log.debug(self.queue)
while not self.queue.isempty():
task = self.queue.peek()
cache = self.cache.get(task.job_definition_id)
Expand All @@ -295,11 +314,11 @@ def process_queue(self):
# if run time is in future
if time_diff < 0:
break
# if run time is in the past might be in previous run
elif time_diff >= (self.poll_interval * 1000):
break
else:
self.create_job(task.job_definition_id)
try:
self.create_job(task.job_definition_id)
except Exception as e:
self.log.exception(e)
self.queue.pop()
run_time = self.compute_next_run_time(cache.schedule, cache.timezone)
self.cache.update(
Expand Down

0 comments on commit dbe2808

Please sign in to comment.