Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAUP-729: job timestamp easy pickings #2950

Merged
merged 2 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kbase-extension/static/kbase/config/job_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"JOB_ID_LIST": "job_id_list",
"FIRST_LINE": "first_line",
"LATEST": "latest",
"NUM_LINES": "num_lines"
"NUM_LINES": "num_lines",
"TS": "ts"
},
"message_types": {
"CANCEL": "cancel_job",
Expand Down
36 changes: 19 additions & 17 deletions src/biokbase/narrative/jobs/appmanager.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
"""
A module for managing apps, specs, requirements, and for starting jobs.
"""
import datetime
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all isort

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

import functools
import random
import re
import traceback
from typing import Callable, Dict, Union

import biokbase.auth as auth
from .job import Job
from .jobmanager import JobManager
from .jobcomm import JobComm, MESSAGE_TYPE
from . import specmanager
import biokbase.narrative.clients as clients
from biokbase.narrative.widgetmanager import WidgetManager
from biokbase.narrative.app_util import (
system_variable,
strict_system_variable,
extract_ws_refs,
map_outputs_from_state,
validate_parameters,
resolve_ref_if_typed,
strict_system_variable,
system_variable,
transform_param_value,
extract_ws_refs,
validate_parameters,
)
from biokbase.narrative.exception_util import transform_job_exception
from biokbase.narrative.common import kblogging
import re
import datetime
import traceback
import random
import functools
from typing import Callable, Union, Dict
from biokbase.narrative.exception_util import transform_job_exception
from biokbase.narrative.widgetmanager import WidgetManager

from . import specmanager
from .job import Job
from .jobcomm import MESSAGE_TYPE, JobComm
from .jobmanager import JobManager

"""
A module for managing apps, specs, requirements, and for starting jobs.
Expand Down Expand Up @@ -1002,7 +1004,7 @@ def _generate_input(self, generator):
if "prefix" in generator:
ret = str(generator["prefix"]) + ret
if "suffix" in generator:
ret = ret + str(generator["suffix"])
return ret + str(generator["suffix"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? No biggie but I thought this was strange

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got a flake8 plugin installed locally that warns if there's somewhere where a variable can be returned immediately. This is one such case...

return ret

def _send_comm_message(self, msg_type, content):
Expand Down
154 changes: 96 additions & 58 deletions src/biokbase/narrative/jobs/job.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import biokbase.narrative.clients as clients
from .specmanager import SpecManager
from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state
from biokbase.narrative.exception_util import transform_job_exception
import copy
import json
import time
import uuid
from jinja2 import Template
from pprint import pprint
from typing import List

from jinja2 import Template

import biokbase.narrative.clients as clients
from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state
from biokbase.narrative.exception_util import transform_job_exception

from .specmanager import SpecManager

"""
KBase job class
"""
Expand Down Expand Up @@ -79,8 +82,8 @@
STATE_ATTRS = list(set(JOB_ATTRS) - set(JOB_INPUT_ATTRS) - set(NARR_CELL_INFO_ATTRS))


class Job(object):
_job_logs = list()
class Job:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for (object) in python 3

_job_logs = []
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the flake8-comprehensions plugin (installed locally)

_acc_state = None # accumulates state

def __init__(self, ee2_state, extra_data=None, children=None):
Expand All @@ -100,7 +103,7 @@ def __init__(self, ee2_state, extra_data=None, children=None):
if ee2_state.get("job_id") is None:
raise ValueError("Cannot create a job without a job ID!")

self._acc_state = ee2_state
self._update_state(ee2_state)
self.extra_data = extra_data

# verify parent-children relationship
Expand All @@ -116,7 +119,7 @@ def from_job_id(cls, job_id, extra_data=None, children=None):
@classmethod
def from_job_ids(cls, job_ids, return_list=True):
states = cls.query_ee2_states(job_ids, init=True)
jobs = dict()
jobs = {}
for job_id, state in states.items():
jobs[job_id] = cls(state)

Expand Down Expand Up @@ -172,25 +175,25 @@ def __getattr__(self, name):
"""
Map expected job attributes to paths in stored ee2 state
"""
attr = dict(
app_id=lambda: self._acc_state.get("job_input", {}).get(
attr = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flake8-comprehensions again

"app_id": lambda: self._acc_state.get("job_input", {}).get(
"app_id", JOB_ATTR_DEFAULTS["app_id"]
),
app_version=lambda: self._acc_state.get("job_input", {}).get(
"app_version": lambda: self._acc_state.get("job_input", {}).get(
"service_ver", JOB_ATTR_DEFAULTS["app_version"]
),
batch_id=lambda: (
"batch_id": lambda: (
self.job_id
if self.batch_job
else self._acc_state.get("batch_id", JOB_ATTR_DEFAULTS["batch_id"])
),
batch_job=lambda: self._acc_state.get(
"batch_job": lambda: self._acc_state.get(
"batch_job", JOB_ATTR_DEFAULTS["batch_job"]
),
cell_id=lambda: self._acc_state.get("job_input", {})
"cell_id": lambda: self._acc_state.get("job_input", {})
.get("narrative_cell_info", {})
.get("cell_id", JOB_ATTR_DEFAULTS["cell_id"]),
child_jobs=lambda: copy.deepcopy(
"child_jobs": lambda: copy.deepcopy(
# TODO
# Only batch container jobs have a child_jobs field
# and need the state refresh.
Expand All @@ -202,13 +205,13 @@ def __getattr__(self, name):
if self.batch_job
else self._acc_state.get("child_jobs", JOB_ATTR_DEFAULTS["child_jobs"])
),
job_id=lambda: self._acc_state.get("job_id"),
params=lambda: copy.deepcopy(
"job_id": lambda: self._acc_state.get("job_id"),
"params": lambda: copy.deepcopy(
self._acc_state.get("job_input", {}).get(
"params", JOB_ATTR_DEFAULTS["params"]
)
),
retry_ids=lambda: copy.deepcopy(
"retry_ids": lambda: copy.deepcopy(
# Batch container and retry jobs don't have a
# retry_ids field so skip the state refresh
self._acc_state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"])
Expand All @@ -217,18 +220,18 @@ def __getattr__(self, name):
"retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]
)
),
retry_parent=lambda: self._acc_state.get(
"retry_parent": lambda: self._acc_state.get(
"retry_parent", JOB_ATTR_DEFAULTS["retry_parent"]
),
run_id=lambda: self._acc_state.get("job_input", {})
"run_id": lambda: self._acc_state.get("job_input", {})
.get("narrative_cell_info", {})
.get("run_id", JOB_ATTR_DEFAULTS["run_id"]),
# TODO: add the status attribute!
tag=lambda: self._acc_state.get("job_input", {})
"tag": lambda: self._acc_state.get("job_input", {})
.get("narrative_cell_info", {})
.get("tag", JOB_ATTR_DEFAULTS["tag"]),
user=lambda: self._acc_state.get("user", JOB_ATTR_DEFAULTS["user"]),
)
"user": lambda: self._acc_state.get("user", JOB_ATTR_DEFAULTS["user"]),
}

if name not in attr:
raise AttributeError(f"'Job' object has no attribute '{name}'")
Expand Down Expand Up @@ -325,20 +328,29 @@ def _update_state(self, state: dict) -> None:
"""
given a state data structure (as emitted by ee2), update the stored state in the job object
"""
if state:
if not isinstance(state, dict):
raise TypeError("state must be a dict")

# Check job_id match
if self._acc_state:
if "job_id" in state and state["job_id"] != self.job_id:
raise ValueError(
f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}"
)

state = copy.deepcopy(state)
if self._acc_state is None:
self._acc_state = state
else:
self._acc_state.update(state)
# Check if there would be no change in updating
# i.e., if state <= self._acc_state
if self._acc_state is not None:
if {**self._acc_state, **state} == self._acc_state:
return
Comment on lines +341 to +345
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prep for upcoming timestamps work


state = copy.deepcopy(state)
if self._acc_state is None:
self._acc_state = state
else:
self._acc_state.update(state)

def state(self, force_refresh=False):
def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS):
"""
Queries the job service to see the state of the current job.
"""
Expand All @@ -347,47 +359,73 @@ def state(self, force_refresh=False):
state = self.query_ee2_state(self.job_id, init=False)
self._update_state(state)

return self._internal_state(JOB_INIT_EXCLUDED_JOB_STATE_FIELDS)
return self._internal_state(exclude)

def _internal_state(self, exclude=None):
"""Wrapper for self._acc_state"""
state = copy.deepcopy(self._acc_state)
self._trim_ee2_state(state, exclude)
return state

def output_state(self, state=None) -> dict:
def output_state(self, state=None, no_refresh=False) -> dict:
"""
:param state: can be queried individually from ee2/cache with self.state(),
but sometimes want it to be queried in bulk from ee2 upstream
:return: dict, with structure
:param state: Supplied when the state is queried beforehand from EE2 in bulk,
or when it is retrieved from a cache. If not supplied, must be
queried with self.state() or self._internal_state()
:return: dict, with structure

{
outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result
jobState: {
job_id: string,
status: string,
created: epoch ms,
updated: epoch ms,
queued: optional - epoch ms,
finished: optional - epoc ms,
terminated_code: optional - int,
tag: string (release, beta, dev),
parent_job_id: optional - string or null,
run_id: string,
cell_id: string,
errormsg: optional - string,
error (optional): {
code: int,
name: string,
message: string (should be for the user to read),
error: string, (likely a stacktrace)
"job_id": string,
"jobState": {
"job_id": string,
"status": string - enum,
"batch_id": string or None,
"batch_job": bool,
"child_jobs": list,
"created": epoch ms,
"updated": epoch ms,
"queued": epoch ms,
"running": epoch ms,
"finished": epoch ms,
"tag": string (release, beta, dev),
"run_id": string,
"cell_id": string,
"job_output": { # completed jobs only
"version": string,
"result": [
{
# result params, e.g.
"report_name": string,
"report_ref": string,
}
],
"id": string
},
error_code: optional - int
"terminated_code": terminated jobs only; optional - int,
"error": { # jobs that did not complete successfully
"code": int,
"name": string,
"message": string (should be for the user to read),
"error": string, (likely a stacktrace)
},
"errormsg": optional - string,
"error_code": optional - int
},
"outputWidgetInfo": { # None if job does not have status "completed"
"name": string,
"tag": string - (release, beta, dev),
"params": {
# output widget params, e.g.
"report_name": string,
"report_ref": string
}
}

}
:rtype: dict
"""
if not state:
state = self.state()
state = self._internal_state() if no_refresh else self.state()
else:
self._update_state(state)
state = self._internal_state()
Expand Down Expand Up @@ -503,7 +541,7 @@ def log(self, first_line=0, num_lines=None):
num_lines = 0

if first_line >= num_available_lines or num_lines <= 0:
return (num_available_lines, list())
return (num_available_lines, [])
return (
num_available_lines,
self._job_logs[first_line : first_line + num_lines],
Expand Down
Loading