Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAUP-729 job ts implementation #2960

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions docs/design/job_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,16 @@ Bundled job status information for one or more jobs, keyed by job ID, with the f
* `job_id`
* `jobState` - see [Job output state](#job-output-state) below for the detailed structure
* `outputWidgetInfo` - the parameters to send to output widgets, generated from the app specifications and job output. This is only available for completed jobs and is set to null otherwise.
* `last_checked` - timestamp in ns added at comm sending time

In case of error, the response has instead the keys:
In case of per-input-job-ID error, the response has instead the keys:
* `job_id`
* `error` - brief message explaining the issue
* `last_checked` - timestamp in ns added at comm sending time

In case that all valid jobs were filtered out by the `ts` input from the frontend, there will be a job-ID-to-job-state pattern breaking key-value pair keyed by "error" with a state keyed by:
* `error` - "No updated jobs"
* `last_checked` - timestamp in ns added at comm sending time

Sample response JSON:
```js
Expand All @@ -330,11 +336,13 @@ Sample response JSON:
"status": "running",
"created": 123456789,
},
"outputWidgetInfo": null, // only available for completed jobs
"outputWidgetInfo": null, // only available for completed jobs,
"last_checked": 1652992287210343298,
},
"job_id_2": {
"job_id": "job_id_2",
"error": "Cannot find job with ID job_id_2"
"error": "Cannot find job with ID job_id_2",
"last_checked": 1652992287210343298,
},
}
```
Expand All @@ -358,7 +366,6 @@ As sent to browser, includes cell info and run info. The structure below indicat
"created": epoch ms,
"queued": optional - epoch ms,
"finished": optional - epoch ms,
"updated": epoch ms,
"terminated_code": optional - int,
"error": { // optional
"code": int,
Expand All @@ -371,7 +378,8 @@ As sent to browser, includes cell info and run info. The structure below indicat
"tag": string (release, beta, dev),
"error_code": optional - int,
"errormsg": optional - string,
}
},
"last_checked": int - ns
}
```

Expand Down
4 changes: 2 additions & 2 deletions docs/testing/HeadlessTesting.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ job = AppManager().run_app(
import time
import pprint
import json
while job.state()['job_state'] not in ['completed', 'suspend']:
while job.refresh_state()['job_state'] not in ['completed', 'suspend']:
time.sleep(5)
job_result = job.state()
job_result = job.refresh_state()
if job_result['job_state'] != 'completed':
print "Failed - job did not complete: " + ",".join(job_result['status'][0:3])
else:
Expand Down
75 changes: 31 additions & 44 deletions src/biokbase/narrative/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import biokbase.narrative.clients as clients
from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state
from biokbase.narrative.exception_util import transform_job_exception
from biokbase.narrative.jobs.util import time_ns

from .specmanager import SpecManager

Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__(self, ee2_state, extra_data=None, children=None):
if ee2_state.get("job_id") is None:
raise ValueError("Cannot create a job without a job ID!")

self._update_state(ee2_state)
self.update_state(ee2_state)
n1mus marked this conversation as resolved.
Show resolved Hide resolved
self.extra_data = extra_data

# verify parent-children relationship
Expand All @@ -129,9 +130,9 @@ def from_job_ids(cls, job_ids, return_list=True):
return jobs

@staticmethod
def _trim_ee2_state(state: dict, exclude: list) -> None:
if exclude:
for field in exclude:
def _trim_ee2_state(state: dict, exclude_fields: list) -> None:
n1mus marked this conversation as resolved.
Show resolved Hide resolved
if exclude_fields:
for field in exclude_fields:
if field in state:
del state[field]

Expand Down Expand Up @@ -199,7 +200,7 @@ def __getattr__(self, name):
# and need the state refresh.
n1mus marked this conversation as resolved.
Show resolved Hide resolved
# But KBParallel/KB Batch App jobs may not have the
# batch_job field
self.state(force_refresh=True).get(
self.refresh_state(force_refresh=True).get(
ialarmedalien marked this conversation as resolved.
Show resolved Hide resolved
"child_jobs", JOB_ATTR_DEFAULTS["child_jobs"]
)
if self.batch_job
Expand All @@ -216,7 +217,7 @@ def __getattr__(self, name):
# retry_ids field so skip the state refresh
self._acc_state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"])
if self.batch_job or self.retry_parent
else self.state(force_refresh=True).get(
else self.refresh_state(force_refresh=True).get(
"retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]
)
),
Expand All @@ -239,17 +240,8 @@ def __getattr__(self, name):
return attr[name]()

def __setattr__(self, name, value):
if name in STATE_ATTRS:
self._acc_state[name] = value
elif name in JOB_INPUT_ATTRS:
self._acc_state["job_input"] = self._acc_state.get("job_input", {})
self._acc_state["job_input"][name] = value
elif name in NARR_CELL_INFO_ATTRS:
self._acc_state["job_input"] = self._acc_state.get("job_input", {})
self._acc_state["job_input"]["narrative_cell_info"] = self._acc_state[
"job_input"
].get("narrative_cell_info", {})
self._acc_state["job_input"]["narrative_cell_info"][name] = value
if name in STATE_ATTRS: # TODO are/should these assignments be used?
self.update_state({name: value})
else:
object.__setattr__(self, name, value)

Expand All @@ -273,14 +265,6 @@ def was_terminal(self):
else:
return self._acc_state.get("status") in TERMINAL_STATUSES

def is_terminal(self):
self.state()
if self._acc_state.get("batch_job"):
for child_job in self.children:
if child_job._acc_state.get("status") != COMPLETED_STATUS:
child_job.state(force_refresh=True)
return self.was_terminal()

def in_cells(self, cell_ids: List[str]) -> bool:
"""
For job initialization.
Expand Down Expand Up @@ -324,9 +308,10 @@ def parameters(self):
f"Unable to fetch parameters for job {self.job_id} - {e}"
)

def _update_state(self, state: dict) -> None:
def update_state(self, state: dict, ts: int = None) -> None:
"""
given a state data structure (as emitted by ee2), update the stored state in the job object
Given a state data structure (as emitted by ee2), update the stored state in the job object
All updates to the job state should go through here to keep the last_updated field accurate
"""
if not isinstance(state, dict):
raise TypeError("state must be a dict")
Expand All @@ -335,7 +320,7 @@ def _update_state(self, state: dict) -> None:
if self._acc_state:
if "job_id" in state and state["job_id"] != self.job_id:
raise ValueError(
f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}"
f"Job ID mismatch in update_state: job ID: {self.job_id}; state ID: {state['job_id']}"
)

# Check if there would be no change in updating
Expand All @@ -348,30 +333,32 @@ def _update_state(self, state: dict) -> None:
if self._acc_state is None:
self._acc_state = state
else:
self._acc_state.update(state)
self._acc_state = {**self._acc_state, **state}

self.last_updated = time_ns() if ts is None else ts

def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS):
def refresh_state(self, force_refresh=False, exclude_fields=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS):
"""
Queries the job service to see the state of the current job.
"""

if force_refresh or not self.was_terminal():
state = self.query_ee2_state(self.job_id, init=False)
self._update_state(state)
self.update_state(state)

return self._internal_state(exclude)
return self.cached_state(exclude_fields)

def _internal_state(self, exclude=None):
def cached_state(self, exclude_fields=None):
"""Wrapper for self._acc_state"""
state = copy.deepcopy(self._acc_state)
self._trim_ee2_state(state, exclude)
self._trim_ee2_state(state, exclude_fields)
return state

def output_state(self, state=None, no_refresh=False) -> dict:
"""
:param state: Supplied when the state is queried beforehand from EE2 in bulk,
or when it is retrieved from a cache. If not supplied, must be
queried with self.state() or self._internal_state()
queried with self.refresh_state() or self.cached_state()
:return: dict, with structure

{
Expand Down Expand Up @@ -424,10 +411,10 @@ def output_state(self, state=None, no_refresh=False) -> dict:
:rtype: dict
"""
if not state:
state = self._internal_state() if no_refresh else self.state()
state = self.cached_state() if no_refresh else self.refresh_state()
else:
self._update_state(state)
state = self._internal_state()
self.update_state(state)
state = self.cached_state()

if state is None:
return self._create_error_state(
Expand Down Expand Up @@ -475,10 +462,10 @@ def show_output_widget(self, state=None):
from biokbase.narrative.widgetmanager import WidgetManager

if not state:
state = self.state()
state = self.refresh_state()
else:
self._update_state(state)
state = self._internal_state()
self.update_state(state)
state = self.cached_state()

if state["status"] == COMPLETED_STATUS and "job_output" in state:
(output_widget, widget_params) = self._get_output_info(state)
Expand Down Expand Up @@ -541,7 +528,7 @@ def log(self, first_line=0, num_lines=None):
return (num_available_lines, [])
return (
num_available_lines,
self._job_logs[first_line : first_line + num_lines],
self._job_logs[first_line: first_line + num_lines],
)

def _update_log(self):
Expand Down Expand Up @@ -611,7 +598,7 @@ def info(self):
print(f"Version: {spec['info']['ver']}")

try:
state = self.state()
state = self.refresh_state()
print(f"Status: {state['status']}")
print("Inputs:\n------")
pprint(self.params)
Expand All @@ -631,7 +618,7 @@ def _repr_javascript_(self):
"""
output_widget_info = None
try:
state = self.state()
state = self.refresh_state()
spec = self.app_spec()
if state.get("status", "") == COMPLETED_STATUS:
(output_widget, widget_params) = self._get_output_info(state)
Expand Down
21 changes: 15 additions & 6 deletions src/biokbase/narrative/jobs/jobcomm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from biokbase.narrative.common import kblogging
from biokbase.narrative.exception_util import JobRequestException, NarrativeException
from biokbase.narrative.jobs.jobmanager import JobManager
from biokbase.narrative.jobs.util import load_job_constants
from biokbase.narrative.jobs.util import load_job_constants, time_ns

(PARAM, MESSAGE_TYPE) = load_job_constants()

Expand Down Expand Up @@ -116,7 +116,11 @@ def cell_id_list(self):

@property
def ts(self):
"""This param is completely optional"""
"""
Optional field sent with STATUS requests indicating to filter out
job states in the STATUS response that have not been updated since
this epoch time (in ns)
n1mus marked this conversation as resolved.
Show resolved Hide resolved
"""
return self.rq_data.get(PARAM["TS"])


Expand Down Expand Up @@ -199,10 +203,7 @@ def _get_job_ids(self, req: JobRequest) -> List[str]:
if req.has_batch_id():
return self._jm.update_batch_job(req.batch_id)

try:
return req.job_id_list
except Exception as ex:
raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex
return req.job_id_list
n1mus marked this conversation as resolved.
Show resolved Hide resolved

def start_job_status_loop(
self,
Expand Down Expand Up @@ -514,6 +515,14 @@ def send_comm_message(self, msg_type: str, content: dict) -> None:
Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type
and content. These just get encoded into the message itself.
"""
# For STATUS responses, add a last_checked field
# to each output_state. Note: error states will have
# the last_checked field too
if msg_type == MESSAGE_TYPE["STATUS"]:
now = time_ns()
for output_state in content.values():
output_state["last_checked"] = now
Comment on lines +521 to +524
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not add this timestamp when the job manager is putting together the list of jobs, instead of adding an extra iteration through the job state data here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure since the CANCEL_JOBS request also responds with a STATUS message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided not to filter the STATUS response for CANCEL_JOBS though because I figured in theory they should all get updated, whether successfully or just coming back with an error

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because everything is asynchronous, the FE doesn't have any way of knowing what triggered a job status message -- whether it was a cancel request, a status request, or the BE job loop. That's why I say it's better to put the timestamp on in the job manager, so that all job state objects that the FE receives have a timestamp on them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one allure of putting everything into JobComm is less tests surgery ... But putting it deep into the stack, at the origin of the STATUS response ds, seems less googly-eyed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I tried putting all the filtering/last_checked logic at the source _construct_job_state_set but the tests were complaining so I'm abandoning that effort for the sake of time. Is the current placement of the filtering/last_checked good enough?


msg = {"msg_type": msg_type, "content": content}
self._comm.send(msg)

Expand Down
34 changes: 29 additions & 5 deletions src/biokbase/narrative/jobs/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)

from .job import JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, Job
from .util import time_ns

"""
KBase Job Manager
Expand All @@ -29,13 +30,15 @@
__version__ = "0.0.1"

JOB_NOT_REG_ERR = "Job ID is not registered"
JOB_NOT_REG_2_ERR = "Cannot find job with ID %s" # TODO unify these
JOB_NOT_BATCH_ERR = "Job ID is not for a batch job"

JOBS_TYPE_ERR = "List expected for job_id_list"
JOBS_MISSING_ERR = "No valid job IDs provided"

CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided"
DOES_NOT_EXIST = "does_not_exist"

NO_UPDATED_JOBS_ERR = "No updated jobs"


class JobManager:
Expand Down Expand Up @@ -307,10 +310,16 @@ def _construct_job_output_state_set(
# fill in the output states for the missing jobs
# if the job fetch failed, add an error message to the output
# and return the cached job state
now = time_ns()
for job_id in jobs_to_lookup:
job = self.get_job(job_id)
if job_id in fetched_states:
output_states[job_id] = job.output_state(fetched_states[job_id])
fetched_state = fetched_states[job_id]
# pre-emptively try a job state update
# so can mark the set of fetched (but also changed) states
# with a simultaneous timestamp
job.update_state(fetched_state, now)
output_states[job_id] = job.output_state(fetched_state)
else:
# fetch the current state without updating it
output_states[job_id] = job.output_state({})
Expand All @@ -322,7 +331,7 @@ def _construct_job_output_state_set(
def get_job_states(self, job_ids: List[str], ts: int = None) -> dict:
"""
Retrieves the job states for the supplied job_ids, with the option to
replace any jobs that have not been updated since ts with a short stub
remove any jobs that have not been updated since ts

Jobs that cannot be found in the `_running_jobs` index will return
{
Expand All @@ -340,7 +349,22 @@ def get_job_states(self, job_ids: List[str], ts: int = None) -> dict:
"""
job_ids, error_ids = self._check_job_list(job_ids)
output_states = self._construct_job_output_state_set(job_ids)
return self.add_errors_to_results(output_states, error_ids)

n1mus marked this conversation as resolved.
Show resolved Hide resolved
if ts is not None:
for job_id in job_ids:
if self.get_job(job_id).last_updated < ts:
del output_states[job_id]
no_updated_jobs = ts is not None and job_ids and not output_states

# add error_ids first in the unlikely case one of the error_ids
# is "error" which is a reserved key which is prioritized
# for indicating an actual error event
self.add_errors_to_results(output_states, error_ids)

if no_updated_jobs:
output_states["error"] = {"error": NO_UPDATED_JOBS_ERR}

return output_states

def get_all_job_states(self, ignore_refresh_flag=False) -> dict:
"""
Expand Down Expand Up @@ -719,7 +743,7 @@ def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict:
for error_id in error_ids:
results[error_id] = {
"job_id": error_id,
"error": f"Cannot find job with ID {error_id}",
"error": JOB_NOT_REG_2_ERR % error_id,
}
return results

Expand Down
Loading