Skip to content

Commit

Permalink
Merge pull request #2950 from kbase/DATAUP-729_job_ts_easy_pickings
Browse files Browse the repository at this point in the history
DATAUP-729: job timestamp easy pickings
  • Loading branch information
ialarmedalien authored May 6, 2022
2 parents 29bd471 + 52e379b commit df43735
Show file tree
Hide file tree
Showing 13 changed files with 459 additions and 361 deletions.
3 changes: 2 additions & 1 deletion kbase-extension/static/kbase/config/job_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"JOB_ID_LIST": "job_id_list",
"FIRST_LINE": "first_line",
"LATEST": "latest",
"NUM_LINES": "num_lines"
"NUM_LINES": "num_lines",
"TS": "ts"
},
"message_types": {
"CANCEL": "cancel_job",
Expand Down
36 changes: 19 additions & 17 deletions src/biokbase/narrative/jobs/appmanager.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
"""
A module for managing apps, specs, requirements, and for starting jobs.
"""
import datetime
import functools
import random
import re
import traceback
from typing import Callable, Dict, Union

import biokbase.auth as auth
from .job import Job
from .jobmanager import JobManager
from .jobcomm import JobComm, MESSAGE_TYPE
from . import specmanager
import biokbase.narrative.clients as clients
from biokbase.narrative.widgetmanager import WidgetManager
from biokbase.narrative.app_util import (
system_variable,
strict_system_variable,
extract_ws_refs,
map_outputs_from_state,
validate_parameters,
resolve_ref_if_typed,
strict_system_variable,
system_variable,
transform_param_value,
extract_ws_refs,
validate_parameters,
)
from biokbase.narrative.exception_util import transform_job_exception
from biokbase.narrative.common import kblogging
import re
import datetime
import traceback
import random
import functools
from typing import Callable, Union, Dict
from biokbase.narrative.exception_util import transform_job_exception
from biokbase.narrative.widgetmanager import WidgetManager

from . import specmanager
from .job import Job
from .jobcomm import MESSAGE_TYPE, JobComm
from .jobmanager import JobManager

"""
A module for managing apps, specs, requirements, and for starting jobs.
Expand Down Expand Up @@ -1002,7 +1004,7 @@ def _generate_input(self, generator):
if "prefix" in generator:
ret = str(generator["prefix"]) + ret
if "suffix" in generator:
ret = ret + str(generator["suffix"])
return ret + str(generator["suffix"])
return ret

def _send_comm_message(self, msg_type, content):
Expand Down
154 changes: 96 additions & 58 deletions src/biokbase/narrative/jobs/job.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import biokbase.narrative.clients as clients
from .specmanager import SpecManager
from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state
from biokbase.narrative.exception_util import transform_job_exception
import copy
import json
import time
import uuid
from jinja2 import Template
from pprint import pprint
from typing import List

from jinja2 import Template

import biokbase.narrative.clients as clients
from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state
from biokbase.narrative.exception_util import transform_job_exception

from .specmanager import SpecManager

"""
KBase job class
"""
Expand Down Expand Up @@ -79,8 +82,8 @@
STATE_ATTRS = list(set(JOB_ATTRS) - set(JOB_INPUT_ATTRS) - set(NARR_CELL_INFO_ATTRS))


class Job(object):
_job_logs = list()
class Job:
_job_logs = []
_acc_state = None # accumulates state

def __init__(self, ee2_state, extra_data=None, children=None):
Expand All @@ -100,7 +103,7 @@ def __init__(self, ee2_state, extra_data=None, children=None):
if ee2_state.get("job_id") is None:
raise ValueError("Cannot create a job without a job ID!")

self._acc_state = ee2_state
self._update_state(ee2_state)
self.extra_data = extra_data

# verify parent-children relationship
Expand All @@ -116,7 +119,7 @@ def from_job_id(cls, job_id, extra_data=None, children=None):
@classmethod
def from_job_ids(cls, job_ids, return_list=True):
states = cls.query_ee2_states(job_ids, init=True)
jobs = dict()
jobs = {}
for job_id, state in states.items():
jobs[job_id] = cls(state)

Expand Down Expand Up @@ -172,25 +175,25 @@ def __getattr__(self, name):
"""
Map expected job attributes to paths in stored ee2 state
"""
attr = dict(
app_id=lambda: self._acc_state.get("job_input", {}).get(
attr = {
"app_id": lambda: self._acc_state.get("job_input", {}).get(
"app_id", JOB_ATTR_DEFAULTS["app_id"]
),
app_version=lambda: self._acc_state.get("job_input", {}).get(
"app_version": lambda: self._acc_state.get("job_input", {}).get(
"service_ver", JOB_ATTR_DEFAULTS["app_version"]
),
batch_id=lambda: (
"batch_id": lambda: (
self.job_id
if self.batch_job
else self._acc_state.get("batch_id", JOB_ATTR_DEFAULTS["batch_id"])
),
batch_job=lambda: self._acc_state.get(
"batch_job": lambda: self._acc_state.get(
"batch_job", JOB_ATTR_DEFAULTS["batch_job"]
),
cell_id=lambda: self._acc_state.get("job_input", {})
"cell_id": lambda: self._acc_state.get("job_input", {})
.get("narrative_cell_info", {})
.get("cell_id", JOB_ATTR_DEFAULTS["cell_id"]),
child_jobs=lambda: copy.deepcopy(
"child_jobs": lambda: copy.deepcopy(
# TODO
# Only batch container jobs have a child_jobs field
# and need the state refresh.
Expand All @@ -202,13 +205,13 @@ def __getattr__(self, name):
if self.batch_job
else self._acc_state.get("child_jobs", JOB_ATTR_DEFAULTS["child_jobs"])
),
job_id=lambda: self._acc_state.get("job_id"),
params=lambda: copy.deepcopy(
"job_id": lambda: self._acc_state.get("job_id"),
"params": lambda: copy.deepcopy(
self._acc_state.get("job_input", {}).get(
"params", JOB_ATTR_DEFAULTS["params"]
)
),
retry_ids=lambda: copy.deepcopy(
"retry_ids": lambda: copy.deepcopy(
# Batch container and retry jobs don't have a
# retry_ids field so skip the state refresh
self._acc_state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"])
Expand All @@ -217,18 +220,18 @@ def __getattr__(self, name):
"retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]
)
),
retry_parent=lambda: self._acc_state.get(
"retry_parent": lambda: self._acc_state.get(
"retry_parent", JOB_ATTR_DEFAULTS["retry_parent"]
),
run_id=lambda: self._acc_state.get("job_input", {})
"run_id": lambda: self._acc_state.get("job_input", {})
.get("narrative_cell_info", {})
.get("run_id", JOB_ATTR_DEFAULTS["run_id"]),
# TODO: add the status attribute!
tag=lambda: self._acc_state.get("job_input", {})
"tag": lambda: self._acc_state.get("job_input", {})
.get("narrative_cell_info", {})
.get("tag", JOB_ATTR_DEFAULTS["tag"]),
user=lambda: self._acc_state.get("user", JOB_ATTR_DEFAULTS["user"]),
)
"user": lambda: self._acc_state.get("user", JOB_ATTR_DEFAULTS["user"]),
}

if name not in attr:
raise AttributeError(f"'Job' object has no attribute '{name}'")
Expand Down Expand Up @@ -325,20 +328,29 @@ def _update_state(self, state: dict) -> None:
"""
given a state data structure (as emitted by ee2), update the stored state in the job object
"""
if state:
if not isinstance(state, dict):
raise TypeError("state must be a dict")

# Check job_id match
if self._acc_state:
if "job_id" in state and state["job_id"] != self.job_id:
raise ValueError(
f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}"
)

state = copy.deepcopy(state)
if self._acc_state is None:
self._acc_state = state
else:
self._acc_state.update(state)
# Check if there would be no change in updating
# i.e., if state <= self._acc_state
if self._acc_state is not None:
if {**self._acc_state, **state} == self._acc_state:
return

state = copy.deepcopy(state)
if self._acc_state is None:
self._acc_state = state
else:
self._acc_state.update(state)

def state(self, force_refresh=False):
def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS):
"""
Queries the job service to see the state of the current job.
"""
Expand All @@ -347,47 +359,73 @@ def state(self, force_refresh=False):
state = self.query_ee2_state(self.job_id, init=False)
self._update_state(state)

return self._internal_state(JOB_INIT_EXCLUDED_JOB_STATE_FIELDS)
return self._internal_state(exclude)

def _internal_state(self, exclude=None):
"""Wrapper for self._acc_state"""
state = copy.deepcopy(self._acc_state)
self._trim_ee2_state(state, exclude)
return state

def output_state(self, state=None) -> dict:
def output_state(self, state=None, no_refresh=False) -> dict:
"""
:param state: can be queried individually from ee2/cache with self.state(),
but sometimes want it to be queried in bulk from ee2 upstream
:return: dict, with structure
:param state: Supplied when the state is queried beforehand from EE2 in bulk,
or when it is retrieved from a cache. If not supplied, must be
queried with self.state() or self._internal_state()
:return: dict, with structure
{
outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result
jobState: {
job_id: string,
status: string,
created: epoch ms,
updated: epoch ms,
queued: optional - epoch ms,
finished: optional - epoc ms,
terminated_code: optional - int,
tag: string (release, beta, dev),
parent_job_id: optional - string or null,
run_id: string,
cell_id: string,
errormsg: optional - string,
error (optional): {
code: int,
name: string,
message: string (should be for the user to read),
error: string, (likely a stacktrace)
"job_id": string,
"jobState": {
"job_id": string,
"status": string - enum,
"batch_id": string or None,
"batch_job": bool,
"child_jobs": list,
"created": epoch ms,
"updated": epoch ms,
"queued": epoch ms,
"running": epoch ms,
"finished": epoch ms,
"tag": string (release, beta, dev),
"run_id": string,
"cell_id": string,
"job_output": { # completed jobs only
"version": string,
"result": [
{
# result params, e.g.
"report_name": string,
"report_ref": string,
}
],
"id": string
},
error_code: optional - int
"terminated_code": terminated jobs only; optional - int,
"error": { # jobs that did not complete successfully
"code": int,
"name": string,
"message": string (should be for the user to read),
"error": string, (likely a stacktrace)
},
"errormsg": optional - string,
"error_code": optional - int
},
"outputWidgetInfo": { # None if job does not have status "completed"
"name": string,
"tag": string - (release, beta, dev),
"params": {
# output widget params, e.g.
"report_name": string,
"report_ref": string
}
}
}
:rtype: dict
"""
if not state:
state = self.state()
state = self._internal_state() if no_refresh else self.state()
else:
self._update_state(state)
state = self._internal_state()
Expand Down Expand Up @@ -503,7 +541,7 @@ def log(self, first_line=0, num_lines=None):
num_lines = 0

if first_line >= num_available_lines or num_lines <= 0:
return (num_available_lines, list())
return (num_available_lines, [])
return (
num_available_lines,
self._job_logs[first_line : first_line + num_lines],
Expand Down
Loading

0 comments on commit df43735

Please sign in to comment.