Skip to content

Commit

Permalink
add error state when no updated jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
n1mus committed May 12, 2022
1 parent 3fdeaf5 commit cd0194c
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 93 deletions.
11 changes: 6 additions & 5 deletions src/biokbase/narrative/jobs/jobcomm.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def cell_id_list(self):

@property
def ts(self):
"""This param is completely optional"""
"""
Optional field sent with STATUS requests indicating to filter out
job states in the STATUS response that have not been updated since
this epoch time (in ns)
"""
return self.rq_data.get(PARAM["TS"])


Expand Down Expand Up @@ -200,10 +204,7 @@ def _get_job_ids(self, req: JobRequest) -> List[str]:
if req.has_batch_id():
return self._jm.update_batch_job(req.batch_id)

try:
return req.job_id_list
except Exception as ex:
raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex
return req.job_id_list

def start_job_status_loop(
self,
Expand Down
18 changes: 15 additions & 3 deletions src/biokbase/narrative/jobs/jobmanager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import time
from datetime import datetime, timedelta, timezone
from typing import List, Tuple

Expand Down Expand Up @@ -29,13 +30,15 @@
__version__ = "0.0.1"

JOB_NOT_REG_ERR = "Job ID is not registered"
JOB_NOT_REG_2_ERR = "Cannot find job with ID %s" # TODO unify these
JOB_NOT_BATCH_ERR = "Job ID is not for a batch job"

JOBS_TYPE_ERR = "List expected for job_id_list"
JOBS_MISSING_ERR = "No valid job IDs provided"

CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided"
DOES_NOT_EXIST = "does_not_exist"

NO_UPDATED_JOBS_ERR = "No updated jobs"


class JobManager:
Expand Down Expand Up @@ -345,8 +348,17 @@ def get_job_states(self, job_ids: List[str], ts: int = None) -> dict:
for job_id in job_ids:
if self.get_job(job_id).last_updated < ts:
del output_states[job_id]
no_updated_jobs = ts is not None and job_ids and not output_states

# add error_ids first in the unlikely case one of the error_ids
# is "error" which is a reserved key which is prioritized
# for indicating an actual error event
self.add_errors_to_results(output_states, error_ids)

return self.add_errors_to_results(output_states, error_ids)
if no_updated_jobs:
output_states["error"] = {"error": NO_UPDATED_JOBS_ERR}

return output_states

def get_all_job_states(self, ignore_refresh_flag=False) -> dict:
"""
Expand Down Expand Up @@ -725,7 +737,7 @@ def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict:
for error_id in error_ids:
results[error_id] = {
"job_id": error_id,
"error": f"Cannot find job with ID {error_id}",
"error": JOB_NOT_REG_2_ERR % error_id,
}
return results

Expand Down
58 changes: 38 additions & 20 deletions src/biokbase/narrative/tests/test_job_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def test_load_job_constants__valid(self):


class MergeTest(unittest.TestCase):
def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict):
def _check(self, d0: dict, d1: dict, exp_merge: dict):
d0_copy = copy.deepcopy(d0)
d1_copy = copy.deepcopy(d1)
merge_inplace(d0, d1)
self.assertEqual(
Expand All @@ -71,11 +72,19 @@ def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict):
d1,
d1_copy
)
d0 = copy.deepcopy(d0_copy)
dmerge = merge(d0, d1)
self.assertEqual(
dmerge,
exp_merge
)
self.assertEqual(d0, d0_copy)
self.assertEqual(d1, d1_copy)

def test_merge_inplace__empty(self):
d0 = {}
d1 = {}
self._check_merge_inplace(
self._check(
d0,
d1,
{}
Expand All @@ -85,7 +94,7 @@ def test_merge_inplace__d0_empty(self):
# flat
d0 = {}
d1 = {"level00": "l00"}
self._check_merge_inplace(
self._check(
d0,
d1,
{"level00": "l00"}
Expand All @@ -99,7 +108,7 @@ def test_merge_inplace__d0_empty(self):
"level10": "l10"
}
}
self._check_merge_inplace(
self._check(
d0,
d1,
{
Expand All @@ -114,7 +123,7 @@ def test_merge_inplace__d1_empty(self):
# flat
d0 = {"level00": "l00"}
d1 = {}
self._check_merge_inplace(
self._check(
d0,
d1,
{"level00": "l00"}
Expand All @@ -128,7 +137,7 @@ def test_merge_inplace__d1_empty(self):
}
}
d1 = {}
self._check_merge_inplace(
self._check(
d0,
d1,
{
Expand All @@ -148,7 +157,7 @@ def test_merge_inplace__flat(self):
"level01": "l01_",
"level02": "l02"
}
self._check_merge_inplace(
self._check(
d0,
d1,
{
Expand All @@ -163,28 +172,41 @@ def test_merge_inplace__nested(self):
"level00": {
"level10": {
"level20": "l20",
"level21": "l21"
"level21": "l21",
"level23": {
"level30": "l30"
}
}
},
"level01": "l01"
}
d1 = {
"level00": {
"level10": {
"level22": "l22"
"level21": "l21_",
"level22": "l22",
"level24": {
"level30": "l30"
}
}
},
"level01": "l01_"
}
self._check_merge_inplace(
self._check(
d0,
d1,
{
"level00": {
"level10": {
"level20": "l20",
"level21": "l21",
"level22": "l22"
"level21": "l21_",
"level22": "l22",
"level23": {
"level30": "l30"
},
"level24": {
"level30": "l30"
}
}
},
"level01": "l01_"
Expand All @@ -205,7 +227,7 @@ def test_merge_inplace__xor_dicts(self):
):
merge_inplace(d0, d1)

def test_merge(self):
def test_random(self):
d0 = {
"level00": "l00",
"level01": {
Expand All @@ -222,13 +244,9 @@ def test_merge(self):
}
}
}
d0_copy = copy.deepcopy(d0)
d1_copy = copy.deepcopy(d1)
d0_merge = merge(d0, d1)
self.assertEqual(d0, d0_copy)
self.assertEqual(d1, d1_copy)
self.assertEqual(
d0_merge,
self._check(
d0,
d1,
{
"level00": "l00",
"level01": {
Expand Down
112 changes: 104 additions & 8 deletions src/biokbase/narrative/tests/test_jobcomm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import itertools
import os
import re
import sys
import time
import unittest
from unittest import mock
Expand All @@ -25,7 +26,9 @@
from biokbase.narrative.jobs.jobmanager import (
JOB_NOT_BATCH_ERR,
JOB_NOT_REG_ERR,
JOB_NOT_REG_2_ERR,
JOBS_MISSING_ERR,
NO_UPDATED_JOBS_ERR,
JobManager,
)
from biokbase.narrative.tests.generate_test_results import (
Expand Down Expand Up @@ -501,7 +504,7 @@ def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS):
"""
For STATUS responses, each output_state will have an extra field `last_checked`
that is variable and is not in the test data. Check that here and delete before
other checkd
other checks
"""
for output_state in output_states.values():
self.assertIn("last_checked", output_state)
Expand Down Expand Up @@ -683,12 +686,36 @@ def mock_check_jobs(params):
msg,
)

def _reset_last_updated(self):
"""Set last_updated back a minute"""
for job_id in self.jm._running_jobs:
job = self.jm.get_job(job_id)
job.last_updated -= 60 * 1e9
self.assertTrue(job.last_updated > 0) # sanity check

def _check_last_updated(self, exp_updated):
"""Make sure the right jobs had `last_updated` bumped"""
exp_not_updated = list(set(ALL_JOBS) - set(exp_updated)) # exclusion
now = time.time_ns()

exp_updated = [
self.jm.get_job(job_id).last_updated for job_id in exp_updated
]
for ts in exp_updated:
self.assertTrue(ts_are_close(ts, now))

exp_not_updated = [
self.jm.get_job(job_id).last_updated for job_id in exp_not_updated
]
for ts in exp_not_updated:
# was long time ago
self.assertTrue(ts < now)
self.assertFalse(ts_are_close(ts, now))

@mock.patch(CLIENTS, get_mock_client)
def test_get_job_states__last_updated(self):
"""
Copied from test_jobmanager.py
But also tests the last_checked field
"""
def test_get_job_states__by_last_updated(self):
self._reset_last_updated()

# what FE will say was the last time the jobs were checked
ts = time.time_ns()

Expand Down Expand Up @@ -733,14 +760,83 @@ def mock_check_jobs(self_, params):
job_state["jobState"]["updated"] += 1
expected[JOB_NOT_FOUND] = {
"job_id": JOB_NOT_FOUND,
"error": f"Cannot find job with ID {JOB_NOT_FOUND}"
"error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND
}

self._check_pop_last_checked(output_states, ts)
self._check_pop_last_checked(output_states, time.time_ns())
self.assertEqual(
expected,
output_states
)
self._check_last_updated(updated_active_ids)

@mock.patch(CLIENTS, get_mock_client)
def test_get_job_states__all_updated_jobs(self):
"""
If theoretically all the jobs were last checked at the beginning of time,
all job states would be returned
"""
self._reset_last_updated()

def mock_check_jobs(self_, params):
"""Mutate all given job states"""
lookup_ids = params["job_ids"]
self.assertCountEqual(ACTIVE_JOBS, lookup_ids) # sanity check

job_states_ret = get_test_jobs(lookup_ids)
for _, job_state in job_states_ret.items():
job_state["updated"] += 1
return job_states_ret

rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": 0})
with mock.patch.object(MockClients, "check_jobs", mock_check_jobs):
output_states = self.jc._handle_comm_message(rq)

expected = {
job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id])
for job_id in ALL_JOBS
}
for job_id, job_state in expected.items():
if job_id in ACTIVE_JOBS:
job_state["jobState"]["updated"] += 1
expected[JOB_NOT_FOUND] = {
"job_id": JOB_NOT_FOUND,
"error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND
}

self._check_pop_last_checked(output_states, time.time_ns())
self.assertEqual(
expected,
output_states
)
self._check_last_updated(ACTIVE_JOBS)

@mock.patch(CLIENTS, get_mock_client)
def test_get_job_states__no_updated_jobs(self):
"""
If theoretically all the jobs were last checked at the end of time,
no job states would be returned, and there would be an error state
to indicate that
"""
self._reset_last_updated()

rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": sys.maxsize})
output_states = self.jc._handle_comm_message(rq)

self._check_pop_last_checked(output_states, time.time_ns())
self.assertEqual(
{
JOB_NOT_FOUND: {
"job_id": JOB_NOT_FOUND,
"error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND
},
"error": {
"error": NO_UPDATED_JOBS_ERR
}
},
output_states
)
self._check_last_updated([])

# -----------------------
# get cell job states
Expand Down
Loading

0 comments on commit cd0194c

Please sign in to comment.