diff --git a/kbase-extension/static/kbase/config/job_config.json b/kbase-extension/static/kbase/config/job_config.json index c9ac1dacac..2274aea9e4 100644 --- a/kbase-extension/static/kbase/config/job_config.json +++ b/kbase-extension/static/kbase/config/job_config.json @@ -7,7 +7,8 @@ "JOB_ID_LIST": "job_id_list", "FIRST_LINE": "first_line", "LATEST": "latest", - "NUM_LINES": "num_lines" + "NUM_LINES": "num_lines", + "TS": "ts" }, "message_types": { "CANCEL": "cancel_job", diff --git a/src/biokbase/narrative/jobs/appmanager.py b/src/biokbase/narrative/jobs/appmanager.py index a4b706085b..31363b11c7 100644 --- a/src/biokbase/narrative/jobs/appmanager.py +++ b/src/biokbase/narrative/jobs/appmanager.py @@ -1,30 +1,32 @@ """ A module for managing apps, specs, requirements, and for starting jobs. """ +import datetime +import functools +import random +import re +import traceback +from typing import Callable, Dict, Union + import biokbase.auth as auth -from .job import Job -from .jobmanager import JobManager -from .jobcomm import JobComm, MESSAGE_TYPE -from . import specmanager import biokbase.narrative.clients as clients -from biokbase.narrative.widgetmanager import WidgetManager from biokbase.narrative.app_util import ( - system_variable, - strict_system_variable, + extract_ws_refs, map_outputs_from_state, - validate_parameters, resolve_ref_if_typed, + strict_system_variable, + system_variable, transform_param_value, - extract_ws_refs, + validate_parameters, ) -from biokbase.narrative.exception_util import transform_job_exception from biokbase.narrative.common import kblogging -import re -import datetime -import traceback -import random -import functools -from typing import Callable, Union, Dict +from biokbase.narrative.exception_util import transform_job_exception +from biokbase.narrative.widgetmanager import WidgetManager + +from . import specmanager +from .job import Job +from .jobcomm import MESSAGE_TYPE, JobComm +from .jobmanager import JobManager """ A module for managing apps, specs, requirements, and for starting jobs. @@ -1002,7 +1004,7 @@ def _generate_input(self, generator): if "prefix" in generator: ret = str(generator["prefix"]) + ret if "suffix" in generator: - ret = ret + str(generator["suffix"]) + return ret + str(generator["suffix"]) return ret def _send_comm_message(self, msg_type, content): diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index fb16b9014b..6bf6db314a 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -1,15 +1,18 @@ -import biokbase.narrative.clients as clients -from .specmanager import SpecManager -from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state -from biokbase.narrative.exception_util import transform_job_exception import copy import json import time import uuid -from jinja2 import Template from pprint import pprint from typing import List +from jinja2 import Template + +import biokbase.narrative.clients as clients +from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state +from biokbase.narrative.exception_util import transform_job_exception + +from .specmanager import SpecManager + """ KBase job class """ @@ -79,8 +82,8 @@ STATE_ATTRS = list(set(JOB_ATTRS) - set(JOB_INPUT_ATTRS) - set(NARR_CELL_INFO_ATTRS)) -class Job(object): - _job_logs = list() +class Job: + _job_logs = [] _acc_state = None # accumulates state def __init__(self, ee2_state, extra_data=None, children=None): @@ -100,7 +103,7 @@ def __init__(self, ee2_state, extra_data=None, children=None): if ee2_state.get("job_id") is None: raise ValueError("Cannot create a job without a job ID!") - self._acc_state = ee2_state + self._update_state(ee2_state) self.extra_data = extra_data # verify parent-children relationship @@ -116,7 +119,7 @@ def from_job_id(cls, job_id, extra_data=None, children=None): @classmethod def from_job_ids(cls, job_ids, return_list=True): states = cls.query_ee2_states(job_ids, init=True) - jobs = dict() + jobs = {} for job_id, state in states.items(): jobs[job_id] = cls(state) @@ -172,25 +175,25 @@ def __getattr__(self, name): """ Map expected job attributes to paths in stored ee2 state """ - attr = dict( - app_id=lambda: self._acc_state.get("job_input", {}).get( + attr = { + "app_id": lambda: self._acc_state.get("job_input", {}).get( "app_id", JOB_ATTR_DEFAULTS["app_id"] ), - app_version=lambda: self._acc_state.get("job_input", {}).get( + "app_version": lambda: self._acc_state.get("job_input", {}).get( "service_ver", JOB_ATTR_DEFAULTS["app_version"] ), - batch_id=lambda: ( + "batch_id": lambda: ( self.job_id if self.batch_job else self._acc_state.get("batch_id", JOB_ATTR_DEFAULTS["batch_id"]) ), - batch_job=lambda: self._acc_state.get( + "batch_job": lambda: self._acc_state.get( "batch_job", JOB_ATTR_DEFAULTS["batch_job"] ), - cell_id=lambda: self._acc_state.get("job_input", {}) + "cell_id": lambda: self._acc_state.get("job_input", {}) .get("narrative_cell_info", {}) .get("cell_id", JOB_ATTR_DEFAULTS["cell_id"]), - child_jobs=lambda: copy.deepcopy( + "child_jobs": lambda: copy.deepcopy( # TODO # Only batch container jobs have a child_jobs field # and need the state refresh. @@ -202,13 +205,13 @@ def __getattr__(self, name): if self.batch_job else self._acc_state.get("child_jobs", JOB_ATTR_DEFAULTS["child_jobs"]) ), - job_id=lambda: self._acc_state.get("job_id"), - params=lambda: copy.deepcopy( + "job_id": lambda: self._acc_state.get("job_id"), + "params": lambda: copy.deepcopy( self._acc_state.get("job_input", {}).get( "params", JOB_ATTR_DEFAULTS["params"] ) ), - retry_ids=lambda: copy.deepcopy( + "retry_ids": lambda: copy.deepcopy( # Batch container and retry jobs don't have a # retry_ids field so skip the state refresh self._acc_state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]) @@ -217,18 +220,18 @@ def __getattr__(self, name): "retry_ids", JOB_ATTR_DEFAULTS["retry_ids"] ) ), - retry_parent=lambda: self._acc_state.get( + "retry_parent": lambda: self._acc_state.get( "retry_parent", JOB_ATTR_DEFAULTS["retry_parent"] ), - run_id=lambda: self._acc_state.get("job_input", {}) + "run_id": lambda: self._acc_state.get("job_input", {}) .get("narrative_cell_info", {}) .get("run_id", JOB_ATTR_DEFAULTS["run_id"]), # TODO: add the status attribute! - tag=lambda: self._acc_state.get("job_input", {}) + "tag": lambda: self._acc_state.get("job_input", {}) .get("narrative_cell_info", {}) .get("tag", JOB_ATTR_DEFAULTS["tag"]), - user=lambda: self._acc_state.get("user", JOB_ATTR_DEFAULTS["user"]), - ) + "user": lambda: self._acc_state.get("user", JOB_ATTR_DEFAULTS["user"]), + } if name not in attr: raise AttributeError(f"'Job' object has no attribute '{name}'") @@ -325,20 +328,29 @@ def _update_state(self, state: dict) -> None: """ given a state data structure (as emitted by ee2), update the stored state in the job object """ - if state: + if not isinstance(state, dict): + raise TypeError("state must be a dict") + # Check job_id match + if self._acc_state: if "job_id" in state and state["job_id"] != self.job_id: raise ValueError( f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}" ) - state = copy.deepcopy(state) - if self._acc_state is None: - self._acc_state = state - else: - self._acc_state.update(state) + # Check if there would be no change in updating + # i.e., if state <= self._acc_state + if self._acc_state is not None: + if {**self._acc_state, **state} == self._acc_state: + return + + state = copy.deepcopy(state) + if self._acc_state is None: + self._acc_state = state + else: + self._acc_state.update(state) - def state(self, force_refresh=False): + def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ Queries the job service to see the state of the current job. """ @@ -347,7 +359,7 @@ def state(self, force_refresh=False): state = self.query_ee2_state(self.job_id, init=False) self._update_state(state) - return self._internal_state(JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + return self._internal_state(exclude) def _internal_state(self, exclude=None): """Wrapper for self._acc_state""" @@ -355,39 +367,65 @@ def _internal_state(self, exclude=None): self._trim_ee2_state(state, exclude) return state - def output_state(self, state=None) -> dict: + def output_state(self, state=None, no_refresh=False) -> dict: """ - :param state: can be queried individually from ee2/cache with self.state(), - but sometimes want it to be queried in bulk from ee2 upstream - :return: dict, with structure + :param state: Supplied when the state is queried beforehand from EE2 in bulk, + or when it is retrieved from a cache. If not supplied, must be + queried with self.state() or self._internal_state() + :return: dict, with structure { - outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result - jobState: { - job_id: string, - status: string, - created: epoch ms, - updated: epoch ms, - queued: optional - epoch ms, - finished: optional - epoc ms, - terminated_code: optional - int, - tag: string (release, beta, dev), - parent_job_id: optional - string or null, - run_id: string, - cell_id: string, - errormsg: optional - string, - error (optional): { - code: int, - name: string, - message: string (should be for the user to read), - error: string, (likely a stacktrace) + "job_id": string, + "jobState": { + "job_id": string, + "status": string - enum, + "batch_id": string or None, + "batch_job": bool, + "child_jobs": list, + "created": epoch ms, + "updated": epoch ms, + "queued": epoch ms, + "running": epoch ms, + "finished": epoch ms, + "tag": string (release, beta, dev), + "run_id": string, + "cell_id": string, + "job_output": { # completed jobs only + "version": string, + "result": [ + { + # result params, e.g. + "report_name": string, + "report_ref": string, + } + ], + "id": string }, - error_code: optional - int + "terminated_code": terminated jobs only; optional - int, + "error": { # jobs that did not complete successfully + "code": int, + "name": string, + "message": string (should be for the user to read), + "error": string, (likely a stacktrace) + }, + "errormsg": optional - string, + "error_code": optional - int + }, + "outputWidgetInfo": { # None if job does not have status "completed" + "name": string, + "tag": string - (release, beta, dev), + "params": { + # output widget params, e.g. + "report_name": string, + "report_ref": string + } } + } + :rtype: dict """ if not state: - state = self.state() + state = self._internal_state() if no_refresh else self.state() else: self._update_state(state) state = self._internal_state() @@ -503,7 +541,7 @@ def log(self, first_line=0, num_lines=None): num_lines = 0 if first_line >= num_available_lines or num_lines <= 0: - return (num_available_lines, list()) + return (num_available_lines, []) return ( num_available_lines, self._job_logs[first_line : first_line + num_lines], diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index 282617a371..9fdfc76627 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -1,12 +1,13 @@ import copy import threading from typing import List, Union + from ipykernel.comm import Comm -from biokbase.narrative.jobs.util import load_job_constants -from biokbase.narrative.jobs.jobmanager import JobManager -from biokbase.narrative.exception_util import NarrativeException, JobRequestException -from biokbase.narrative.common import kblogging +from biokbase.narrative.common import kblogging +from biokbase.narrative.exception_util import JobRequestException, NarrativeException +from biokbase.narrative.jobs.jobmanager import JobManager +from biokbase.narrative.jobs.util import load_job_constants (PARAM, MESSAGE_TYPE) = load_job_constants() @@ -23,7 +24,6 @@ class JobRequest: """ - A small wrapper for job comm channel request data. This generally comes in the form of a packet from the kernel Comm object. It is expected to be a dict of the format: @@ -34,11 +34,12 @@ class JobRequest: 'request_type': job function requested 'job_id': optional 'job_id_list': optional + 'batch_id': optional } } } - This little wrapper fills 2 roles: + This little wrapper fills two roles: 1. It validates that the packet is correct and has some expected values. 2. If there's a job_id field, it makes sure that it's real (by asking the JobManager - avoids a bunch of duplicate validation code) @@ -52,32 +53,33 @@ class JobRequest: Provides the following attributes: raw_request dict the unedited request received by the job comm msg_id str unique message id - request_type str the function to perform. This isn't - strictly controlled here, but by JobComm._handle_comm_message. + request_type str the function to perform. This isn't strictly controlled + here, but by JobComm._handle_comm_message. rq_data dict the actual data of the request. Contains the request - type and other parameters specific to the function to be performed + type and other parameters specific to the function to be + performed - Optional: + The IDs of the job(s) to perform the function on (optional): job_id str job_id_list list(str) batch_id str - - The IDs of the job(s) to perform the function on. - """ + INPUT_TYPES = [PARAM["JOB_ID"], PARAM["JOB_ID_LIST"], PARAM["BATCH_ID"]] + def __init__(self, rq: dict): - self.raw_request = copy.deepcopy(rq) + rq = copy.deepcopy(rq) + self.raw_request = rq self.msg_id = rq.get("msg_id") # might be useful later? self.rq_data = rq.get("content", {}).get("data") - if self.rq_data is None: + if not self.rq_data: raise JobRequestException(INVALID_REQUEST_ERR) self.request_type = self.rq_data.get("request_type") - if self.request_type is None: + if not self.request_type: raise JobRequestException(MISSING_REQUEST_TYPE_ERR) input_type_count = 0 - for input_type in [PARAM["JOB_ID"], PARAM["JOB_ID_LIST"], PARAM["BATCH_ID"]]: + for input_type in self.INPUT_TYPES: if input_type in self.rq_data: input_type_count += 1 if input_type_count > 1: @@ -112,6 +114,11 @@ def cell_id_list(self): return self.rq_data[PARAM["CELL_ID_LIST"]] raise JobRequestException(CELLS_NOT_PROVIDED_ERR) + @property + def ts(self): + """This param is completely optional""" + return self.rq_data.get(PARAM["TS"]) + class JobComm: """ @@ -168,14 +175,14 @@ def __init__(self): self._jm = JobManager() if self._msg_map is None: self._msg_map = { - MESSAGE_TYPE["CANCEL"]: self._cancel_jobs, - MESSAGE_TYPE["CELL_JOB_STATUS"]: self._get_job_states_by_cell_id, - MESSAGE_TYPE["INFO"]: self._get_job_info, - MESSAGE_TYPE["LOGS"]: self._get_job_logs, - MESSAGE_TYPE["RETRY"]: self._retry_jobs, + MESSAGE_TYPE["CANCEL"]: self.cancel_jobs, + MESSAGE_TYPE["CELL_JOB_STATUS"]: self.get_job_states_by_cell_id, + MESSAGE_TYPE["INFO"]: self.get_job_info, + MESSAGE_TYPE["LOGS"]: self.get_job_logs, + MESSAGE_TYPE["RETRY"]: self.retry_jobs, MESSAGE_TYPE["START_UPDATE"]: self._modify_job_updates, - MESSAGE_TYPE["STATUS"]: self._get_job_states, - MESSAGE_TYPE["STATUS_ALL"]: self._get_all_job_states, + MESSAGE_TYPE["STATUS"]: self.get_job_states, + MESSAGE_TYPE["STATUS_ALL"]: self.get_all_job_states, MESSAGE_TYPE["STOP_UPDATE"]: self._modify_job_updates, } @@ -215,11 +222,12 @@ def start_job_status_loop( self._jm.initialize_jobs(cell_list) except Exception as e: error = { + "code": getattr(e, "code", -1), "error": "Unable to get initial jobs list", "message": getattr(e, "message", UNKNOWN_REASON), - "code": getattr(e, "code", -1), - "source": getattr(e, "source", "jobmanager"), "name": getattr(e, "name", type(e).__name__), + "request": getattr(e, "request", "jc.start_job_status_loop"), + "source": getattr(e, "source", "jc.start_job_status_loop"), } self.send_comm_message(MESSAGE_TYPE["ERROR"], error) # if job init failed, set the lookup loop var back to False and return @@ -243,7 +251,7 @@ def _lookup_job_status_loop(self) -> None: Run a loop that will look up job info. After running, this spawns a Timer thread on a loop to run itself again. LOOKUP_TIMER_INTERVAL sets the frequency at which the loop runs. """ - all_job_states = self._get_all_job_states() + all_job_states = self.get_all_job_states() if len(all_job_states) == 0 or not self._running_lookup_loop: self.stop_job_status_loop() else: @@ -252,7 +260,7 @@ def _lookup_job_status_loop(self) -> None: ) self._lookup_timer.start() - def _get_all_job_states( + def get_all_job_states( self, req: JobRequest = None, ignore_refresh_flag: bool = False ) -> dict: """ @@ -265,7 +273,7 @@ def _get_all_job_states( self.send_comm_message(MESSAGE_TYPE["STATUS_ALL"], all_job_states) return all_job_states - def _get_job_states_by_cell_id(self, req: JobRequest) -> dict: + def get_job_states_by_cell_id(self, req: JobRequest) -> dict: """ Fetches status of all jobs associated with the given cell ID(s). @@ -294,7 +302,7 @@ def _get_job_states_by_cell_id(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["CELL_JOB_STATUS"], cell_job_states) return cell_job_states - def _get_job_info(self, req: JobRequest) -> dict: + def get_job_info(self, req: JobRequest) -> dict: """ Gets job information for a list of job IDs. @@ -323,13 +331,12 @@ def _get_job_info(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["INFO"], job_info) return job_info - def __get_job_states(self, job_id_list) -> dict: + def _get_job_states(self, job_id_list: list, ts: int = None) -> dict: """ Retrieves the job states for the supplied job_ids. See Job.output_state() for details of job state structure. - TODO: update as required If the ts parameter is present, only jobs that have been updated since that time are returned. Jobs that cannot be found in the `_running_jobs` index will return @@ -346,7 +353,7 @@ def __get_job_states(self, job_id_list) -> dict: :return: dictionary of job states, indexed by job ID :rtype: dict """ - output_states = self._jm.get_job_states(job_id_list) + output_states = self._jm.get_job_states(job_id_list, ts) self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states @@ -354,7 +361,7 @@ def get_job_state(self, job_id: str) -> dict: """ Retrieve the job state for a single job. - This differs from the _get_job_state (underscored version) in that + This differs from the _get_job_states (underscored version) in that it just takes a job_id string, not a JobRequest. :param job_id: the job ID to get the state for @@ -363,9 +370,9 @@ def get_job_state(self, job_id: str) -> dict: :return: dictionary of job states, indexed by job ID :rtype: dict """ - return self.__get_job_states([job_id]) + return self._get_job_states([job_id]) - def _get_job_states(self, req: JobRequest) -> dict: + def get_job_states(self, req: JobRequest) -> dict: """ Retrieves the job states for the supplied job_ids. @@ -378,7 +385,7 @@ def _get_job_states(self, req: JobRequest) -> dict: :rtype: dict """ job_id_list = self._get_job_ids(req) - return self.__get_job_states(job_id_list) + return self._get_job_states(job_id_list, req.ts) def _modify_job_updates(self, req: JobRequest) -> dict: """ @@ -411,7 +418,7 @@ def _modify_job_updates(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states - def _cancel_jobs(self, req: JobRequest) -> dict: + def cancel_jobs(self, req: JobRequest) -> dict: """ Cancel a job or list of jobs. After sending the cancellation request, the job states are refreshed and their new output states returned. @@ -429,7 +436,7 @@ def _cancel_jobs(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["STATUS"], cancel_results) return cancel_results - def _retry_jobs(self, req: JobRequest) -> dict: + def retry_jobs(self, req: JobRequest) -> dict: """ Retry a job or list of jobs. @@ -446,7 +453,7 @@ def _retry_jobs(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["RETRY"], retry_results) return retry_results - def _get_job_logs(self, req: JobRequest) -> dict: + def get_job_logs(self, req: JobRequest) -> dict: """ Fetch the logs for a job or list of jobs. @@ -523,14 +530,18 @@ def send_error_message( It can also be a string or None if this context manager is invoked outside of a JC request Job error messages have the format: + { - request: the original JobRequest data object, function params, or function name - source: the function request that spawned the error - other fields about the error, dependent on the content. + "msg_type": "job_error", + "content": { + "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, + **{any extra error information} + } } :param req: job request context, either a job request received over the channel or a string - :type req: Union[JobRequest, dict, str] + :type req: JobRequest, dict, str, or NoneType :param content: dictionary of extra data to include in the error message, defaults to None :type content: dict, optional """ @@ -546,7 +557,7 @@ def send_error_message( error_content["request"] = req error_content["source"] = req - if content is not None: + if content: error_content.update(content) self.send_comm_message(MESSAGE_TYPE["ERROR"], error_content) @@ -554,7 +565,8 @@ def send_error_message( class exc_to_msg: """ - This is a context manager to wrap around JC code + This is a context manager to wrap around JobComm code in order to catch any exception, + send it back as a comm error messages, and then re-raise that exception """ jc = JobComm() @@ -574,17 +586,18 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, exc_tb): """ - If exception is caught, will send job comm message in this format + If an exception is caught during execution in the JobComm code, + this will send back a comm error message like: { - "msg_type": ERROR, + "msg_type": "job_error", "content": { - "source": "request_type", - "job_id": "0123456789abcdef", # or job_id_list. optional and mutually exclusive - "name": "ValueError", - "message": "Something happened", - #---------- Below, NarrativeException only ----------- - "code": -1, - "error": "Unable to complete this request" + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, + "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + "name": exception name, # e.g., ValueError + "message": exception message, # e.g. "Something specifically went wrong!" + #---------- Below, for NarrativeException only ----------- + "error": exception error attribute, # e.g. "Unable to complete this request" + "code": exception code attribute, # e.g., -1 } } Will then re-raise exception diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index 8ee61aebc5..0c80738f11 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -1,20 +1,20 @@ +import copy +from datetime import datetime, timedelta, timezone +from typing import List, Tuple + from IPython.display import HTML from jinja2 import Template -from datetime import datetime, timezone, timedelta -from typing import List, Tuple + import biokbase.narrative.clients as clients -from .job import ( - Job, - EXCLUDED_JOB_STATE_FIELDS, - JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, -) -from biokbase.narrative.common import kblogging from biokbase.narrative.app_util import system_variable +from biokbase.narrative.common import kblogging from biokbase.narrative.exception_util import ( - transform_job_exception, JobRequestException, + transform_job_exception, ) +from .job import JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, Job + """ KBase Job Manager @@ -197,13 +197,7 @@ def _create_jobs(self, job_ids: List[str]) -> dict: if not job_ids: return {} - job_states = clients.get("execution_engine2").check_jobs( - { - "job_ids": job_ids, - "exclude_fields": JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, - "return_list": 0, - } - ) + job_states = Job.query_ee2_states(job_ids, init=True) for job_state in job_states.values(): # do not set new jobs to be automatically refreshed - if the front end wants them # refreshed, it'll make a request. @@ -233,21 +227,37 @@ def _construct_job_output_state_set( """ Builds a set of job states for the list of job ids. + Precondition: job_ids already validated + The job output state for a given job ID is generated by job.output_state() and has the basic form: { - "job_id": string, - "jobState": dictionary containing job data from EE2, - "outputWidgetInfo": dictionary | None - "batch_id": string if present; otherwise None, - } - - If the call to refresh job data from ee2 fails, the job output state will have an additional - 'error' key with the error message from the failed data refresh - { - "job_id": string, + "job_id_0": { # dict generated by job.output_state() + "job_id": "job_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": { + ... + } + }, + "job_id_1": { # dict generated by job.output_state() + "job_id": "job_id_1": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": None + }, ..., - "error": + "job_id_2": { # dict generated by job.output_state() with EE2 error message added + "job_id": "job_id_2", + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": ..., + "error": + }, + ... } :param job_ids: list of job IDs @@ -257,9 +267,7 @@ def _construct_job_output_state_set( :raises JobRequestException: if job_ids is not a list - :return: dict containing the output_state for each job, indexed by job ID. If an error - occurs during job look up on ee2, a message will be added to the output_state for the - job under the key 'error' + :return: dict containing the output_state for each job, indexed by job ID. :rtype: dict """ if not isinstance(job_ids, list): @@ -287,13 +295,7 @@ def _construct_job_output_state_set( # Get the rest of states direct from EE2. if jobs_to_lookup: try: - fetched_states = clients.get("execution_engine2").check_jobs( - { - "job_ids": jobs_to_lookup, - "exclude_fields": EXCLUDED_JOB_STATE_FIELDS, - "return_list": 0, - } - ) + fetched_states = Job.query_ee2_states(jobs_to_lookup, init=False) except Exception as e: error_message = str(e) kblogging.log_event( @@ -319,10 +321,8 @@ def _construct_job_output_state_set( def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: """ - Retrieves the job states for the supplied job_ids. - - TODO: UPDATE ME! - If the ts parameter is present, only jobs that have been updated since that time are returned. + Retrieves the job states for the supplied job_ids, with the option to + replace any jobs that have not been updated since ts with a short stub Jobs that cannot be found in the `_running_jobs` index will return { @@ -358,7 +358,7 @@ def get_all_job_states(self, ignore_refresh_flag=False) -> dict: jobs_to_lookup = [] # grab the list of running job ids, so we don't run into update-while-iterating problems. - for job_id in self._running_jobs.keys(): + for job_id in self._running_jobs: if self._running_jobs[job_id]["refresh"] or ignore_refresh_flag: jobs_to_lookup.append(job_id) if len(jobs_to_lookup) > 0: @@ -599,6 +599,7 @@ def cancel_jobs(self, job_id_list: List[str]) -> dict: error_states[job_id] = error.message job_states = self._construct_job_output_state_set(job_ids) + for job_id in error_states: job_states[job_id]["error"] = error_states[job_id] @@ -687,17 +688,17 @@ def retry_jobs(self, job_id_list: List[str]) -> dict: retry_states = self._construct_job_output_state_set( retry_ids, self._create_jobs(retry_ids) # add to self._running_jobs index ) - job_states = {**orig_states, **retry_states} results_by_job_id = {} # fill in the job state details for result in retry_results: job_id = result["job_id"] - results_by_job_id[job_id] = {"job_id": job_id, "job": job_states[job_id]} + results_by_job_id[job_id] = {"job_id": job_id, "job": orig_states[job_id]} if "retry_id" in result: retry_id = result["retry_id"] - results_by_job_id[job_id]["retry_id"] = retry_id - results_by_job_id[job_id]["retry"] = job_states[retry_id] + results_by_job_id[job_id].update( + {"retry_id": retry_id, "retry": retry_states[retry_id]} + ) if "error" in result: results_by_job_id[job_id]["error"] = result["error"] return self.add_errors_to_results(results_by_job_id, error_ids) @@ -770,7 +771,7 @@ def list_jobs(self): """ try: all_states = self.get_all_job_states(ignore_refresh_flag=True) - state_list = [s["jobState"] for s in all_states.values()] + state_list = [copy.deepcopy(s["jobState"]) for s in all_states.values()] if not state_list: return "No running jobs!" diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 0174485a9c..8b9c07e77a 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -1,5 +1,5 @@ -import os import json +import os JOB_CONFIG_FILE_PATH_PARTS = [ "kbase-extension", @@ -15,8 +15,8 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): Load the job-related terms that are shared by front- and back ends. """ full_path = [os.environ["NARRATIVE_DIR"]] + relative_path_to_file - config_json = open(os.path.join(*full_path)).read() - config = json.loads(config_json) + with open(os.path.join(*full_path)) as fh: + config = json.load(fh) REQUIRED = { "message_types": [ "CANCEL", @@ -31,7 +31,16 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): "ERROR", "RUN_STATUS", ], - "params": ["BATCH_ID", "CELL_ID_LIST", "JOB_ID", "JOB_ID_LIST"], + "params": [ + "BATCH_ID", + "CELL_ID_LIST", + "FIRST_LINE", + "JOB_ID_LIST", + "JOB_ID", + "LATEST", + "NUM_LINES", + "TS" + ], } # ensure we have all the required message type and param names @@ -42,7 +51,7 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): f"job_config.json is missing the '{datatype}' config section" ) missing = [item for item in example_list if item not in config[datatype]] - if len(missing): + if missing: raise ValueError( f"job_config.json is missing the following values for {datatype}: " + ", ".join(missing) diff --git a/src/biokbase/narrative/tests/generate_test_results.py b/src/biokbase/narrative/tests/generate_test_results.py index 91fa444d2e..e07221d784 100644 --- a/src/biokbase/narrative/tests/generate_test_results.py +++ b/src/biokbase/narrative/tests/generate_test_results.py @@ -1,19 +1,20 @@ import copy -import sys import os.path -from biokbase.narrative.tests.util import ConfigTests +import sys + from biokbase.narrative.app_util import app_version_tags -from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE from biokbase.narrative.jobs.job import ( + COMPLETED_STATUS, JOB_ATTR_DEFAULTS, OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS, - COMPLETED_STATUS, ) +from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE from biokbase.narrative.tests.job_test_constants import ( BAD_JOBS, - get_test_job, generate_error, + get_test_job, ) +from biokbase.narrative.tests.util import ConfigTests """ generate_test_results.py is used to generate the job message data that the narrative backend produces and that the frontend consumes. It uses data from `ee2_job_test_data_file` and `app_specs_file` and provides expected narrative backend message data, as well as a number of mappings that are used in python tests. @@ -30,7 +31,7 @@ TEST_SPECS = {} for tag in app_version_tags: - spec_dict = dict() + spec_dict = {} for spec in spec_list: spec_dict[spec["info"]["id"]] = spec TEST_SPECS[tag] = spec_dict @@ -95,11 +96,10 @@ def _generate_job_output(job_id): def generate_bad_jobs(): - bad_jobs = { + return { job_id: {"job_id": job_id, "error": generate_error(job_id, "not_found")} for job_id in BAD_JOBS } - return bad_jobs def generate_job_output_state(all_jobs): @@ -215,7 +215,7 @@ def generate_job_logs(all_jobs): INVALID_CELL_ID = "invalid_cell_id" TEST_CELL_ID_LIST = list(JOBS_BY_CELL_ID.keys()) + [INVALID_CELL_ID] # mapping expected as output from get_job_states_by_cell_id -TEST_CELL_IDs = {id: list(JOBS_BY_CELL_ID[id]) for id in JOBS_BY_CELL_ID.keys()} +TEST_CELL_IDs = {cell_id: list(JOBS_BY_CELL_ID[cell_id]) for cell_id in JOBS_BY_CELL_ID.keys()} TEST_CELL_IDs[INVALID_CELL_ID] = [] @@ -230,8 +230,8 @@ def generate_job_logs(all_jobs): config.write_json_file(RESPONSE_DATA_FILE, ALL_RESPONSE_DATA) -def main(args=[]): - if len(args) and args[0] == "--force" or not os.path.exists(RESPONSE_DATA_FILE): +def main(args=None): + if args and args[0] == "--force" or not os.path.exists(RESPONSE_DATA_FILE): config.write_json_file(RESPONSE_DATA_FILE, ALL_RESPONSE_DATA) diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index 68bd9b2a51..597c43eec7 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -1,7 +1,8 @@ -from .util import ConfigTests import copy + from biokbase.narrative.jobs.job import TERMINAL_STATUSES +from .util import ConfigTests config = ConfigTests() TEST_JOBS = config.load_json_file(config.get("jobs", "ee2_job_test_data_file")) @@ -35,8 +36,14 @@ def get_test_job(job_id): return copy.deepcopy(TEST_JOBS[job_id]) +def get_test_jobs(job_ids): + return {job_id: get_test_job(job_id) for job_id in job_ids} + + CLIENTS = "biokbase.narrative.clients.get" +TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" +TEST_EPOCH_NS = 42 # arbitrary epoch ns MAX_LOG_LINES = 10 # test_jobs contains jobs in the following states @@ -89,7 +96,7 @@ def get_test_job(job_id): BATCH_PARENT_CHILDREN = [BATCH_PARENT] + BATCH_CHILDREN JOBS_TERMINALITY = { - id: TEST_JOBS[id]["status"] in TERMINAL_STATUSES for id in TEST_JOBS.keys() + job_id: TEST_JOBS[job_id]["status"] in TERMINAL_STATUSES for job_id in TEST_JOBS.keys() } TERMINAL_JOBS = [] diff --git a/src/biokbase/narrative/tests/test_appmanager.py b/src/biokbase/narrative/tests/test_appmanager.py index d3104a47b7..2b2aad3ef5 100644 --- a/src/biokbase/narrative/tests/test_appmanager.py +++ b/src/biokbase/narrative/tests/test_appmanager.py @@ -1,31 +1,31 @@ """ Tests for the app manager. """ +import copy +import io +import os +import sys import unittest from unittest import mock from unittest.mock import MagicMock -from biokbase.narrative.jobs.specmanager import SpecManager -from biokbase.narrative.jobs.appmanager import AppManager, BATCH_APP -from biokbase.narrative.jobs.jobmanager import JobManager -from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE + +from IPython.display import HTML, Javascript + import biokbase.narrative.app_util as app_util +from biokbase.narrative.jobs.appmanager import BATCH_APP, AppManager from biokbase.narrative.jobs.job import Job -from IPython.display import HTML, Javascript -from .narrative_mock.mockclients import ( - get_mock_client, - WSID_STANDARD, -) -import os -import sys -import io -import copy -from .util import ConfigTests +from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE +from biokbase.narrative.jobs.jobmanager import JobManager +from biokbase.narrative.jobs.specmanager import SpecManager from biokbase.narrative.tests.job_test_constants import ( CLIENTS, READS_OBJ_1, READS_OBJ_2, ) +from .narrative_mock.mockclients import WSID_STANDARD, get_mock_client +from .util import ConfigTests + CONFIG = ConfigTests() WS_NAME = CONFIG.get("app_tests", "public_ws_name") SEMANTIC_VER_ERROR = "Semantic versions only apply to released app modules." diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 1b60ea892c..0b1e4d245d 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -1,47 +1,46 @@ -import unittest -from unittest import mock import copy import itertools +import sys +import unittest +from contextlib import contextmanager +from io import StringIO +from unittest import mock + from biokbase.execution_engine2.baseclient import ServerError from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state from biokbase.narrative.jobs.job import ( - Job, COMPLETED_STATUS, EXCLUDED_JOB_STATE_FIELDS, - JOB_ATTRS, JOB_ATTR_DEFAULTS, + JOB_ATTRS, + Job, ) from biokbase.narrative.jobs.jobmanager import JOB_INIT_EXCLUDED_JOB_STATE_FIELDS from biokbase.narrative.jobs.specmanager import SpecManager - -from .narrative_mock.mockclients import ( - get_mock_client, - get_failing_mock_client, - MockClients, - assert_obj_method_called, -) -from contextlib import contextmanager -from io import StringIO -import sys - +from biokbase.narrative.tests.generate_test_results import JOBS_BY_CELL_ID from biokbase.narrative.tests.job_test_constants import ( + ACTIVE_JOBS, + ALL_JOBS, + BATCH_CHILDREN, + BATCH_PARENT, + BATCH_RETRY_RUNNING, CLIENTS, - MAX_LOG_LINES, JOB_COMPLETED, JOB_CREATED, JOB_RUNNING, JOB_TERMINATED, - BATCH_PARENT, - BATCH_RETRY_RUNNING, JOBS_TERMINALITY, - ALL_JOBS, + MAX_LOG_LINES, TERMINAL_JOBS, - ACTIVE_JOBS, - BATCH_CHILDREN, get_test_job, ) -from biokbase.narrative.tests.generate_test_results import JOBS_BY_CELL_ID +from .narrative_mock.mockclients import ( + MockClients, + assert_obj_method_called, + get_failing_mock_client, + get_mock_client, +) @contextmanager @@ -70,8 +69,7 @@ def get_test_spec(tag, app_id): def create_job_from_ee2(job_id, extra_data=None, children=None): state = get_test_job(job_id) - job = Job(state, extra_data=extra_data, children=children) - return job + return Job(state, extra_data=extra_data, children=children) def create_state_from_ee2(job_id, exclude_fields=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): @@ -92,26 +90,25 @@ def create_attrs_from_ee2(job_id): job_input = state.get("job_input", {}) narr_cell_info = job_input.get("narrative_cell_info", {}) - attrs = dict( - app_id=job_input.get("app_id", JOB_ATTR_DEFAULTS["app_id"]), - app_version=job_input.get("service_ver", JOB_ATTR_DEFAULTS["app_version"]), - batch_id=( + return { + "app_id": job_input.get("app_id", JOB_ATTR_DEFAULTS["app_id"]), + "app_version": job_input.get("service_ver", JOB_ATTR_DEFAULTS["app_version"]), + "batch_id": ( state.get("job_id") if state.get("batch_job", JOB_ATTR_DEFAULTS["batch_job"]) else state.get("batch_id", JOB_ATTR_DEFAULTS["batch_id"]) ), - batch_job=state.get("batch_job", JOB_ATTR_DEFAULTS["batch_job"]), - cell_id=narr_cell_info.get("cell_id", JOB_ATTR_DEFAULTS["cell_id"]), - child_jobs=state.get("child_jobs", JOB_ATTR_DEFAULTS["child_jobs"]), - job_id=state.get("job_id"), - params=job_input.get("params", JOB_ATTR_DEFAULTS["params"]), - retry_ids=state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]), - retry_parent=state.get("retry_parent", JOB_ATTR_DEFAULTS["retry_parent"]), - run_id=narr_cell_info.get("run_id", JOB_ATTR_DEFAULTS["run_id"]), - tag=narr_cell_info.get("tag", JOB_ATTR_DEFAULTS["tag"]), - user=state.get("user", JOB_ATTR_DEFAULTS["user"]), - ) - return attrs + "batch_job": state.get("batch_job", JOB_ATTR_DEFAULTS["batch_job"]), + "cell_id": narr_cell_info.get("cell_id", JOB_ATTR_DEFAULTS["cell_id"]), + "child_jobs": state.get("child_jobs", JOB_ATTR_DEFAULTS["child_jobs"]), + "job_id": state.get("job_id"), + "params": job_input.get("params", JOB_ATTR_DEFAULTS["params"]), + "retry_ids": state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]), + "retry_parent": state.get("retry_parent", JOB_ATTR_DEFAULTS["retry_parent"]), + "run_id": narr_cell_info.get("run_id", JOB_ATTR_DEFAULTS["run_id"]), + "tag": narr_cell_info.get("tag", JOB_ATTR_DEFAULTS["tag"]), + "user": state.get("user", JOB_ATTR_DEFAULTS["user"]), + } def get_widget_info(job_id): @@ -166,7 +163,7 @@ def get_all_jobs(return_list=False): if job_id not in jobs: jobs[job_id] = create_job_from_ee2(job_id) if return_list: - jobs = list(jobs.values()) + return [jobs.values()] return jobs @@ -186,15 +183,19 @@ def check_jobs_equal(self, jobl, jobr): for attr in JOB_ATTRS: self.assertEqual(getattr(jobl, attr), getattr(jobr, attr)) - def check_job_attrs_custom(self, job, exp_attr={}): + def check_job_attrs_custom(self, job, exp_attr=None): + if not exp_attr: + exp_attr = {} attr = dict(JOB_ATTR_DEFAULTS) attr.update(exp_attr) with mock.patch(CLIENTS, get_mock_client): for name, value in attr.items(): self.assertEqual(value, getattr(job, name)) - def check_job_attrs(self, job, job_id, exp_attrs={}, skip_state=False): + def check_job_attrs(self, job, job_id, exp_attrs=None, skip_state=False): # TODO check _acc_state full vs pruned, extra_data + if not exp_attrs: + exp_attrs = {} # Check state() if no special values expected if not exp_attrs and not skip_state: @@ -377,13 +378,20 @@ def mock_state(self, state=None): state = job.output_state() self.assertEqual(expected, state) + # TODO: improve this test def test_job_update__no_state(self): """ test that without a state object supplied, the job state is unchanged """ job = create_job_from_ee2(JOB_CREATED) self.assertFalse(job.was_terminal()) - job._update_state(None) + + # should fail with error 'state must be a dict' + with self.assertRaisesRegex(TypeError, "state must be a dict"): + job._update_state(None) + self.assertFalse(job.was_terminal()) + + job._update_state({}) self.assertFalse(job.was_terminal()) @mock.patch(CLIENTS, get_mock_client) @@ -714,7 +722,7 @@ def test_was_terminal__batch(self): self.assertFalse(batch_job.was_terminal()) def mock_check_job(self_, params): - assert params["job_id"] in BATCH_CHILDREN + self.assertTrue(params["job_id"] in BATCH_CHILDREN) return {"status": COMPLETED_STATUS} with mock.patch.object(MockClients, "check_job", mock_check_job): diff --git a/src/biokbase/narrative/tests/test_job_util.py b/src/biokbase/narrative/tests/test_job_util.py index b6a2fc5105..656d86f160 100644 --- a/src/biokbase/narrative/tests/test_job_util.py +++ b/src/biokbase/narrative/tests/test_job_util.py @@ -1,6 +1,7 @@ -from biokbase.narrative.jobs.util import load_job_constants import unittest +from biokbase.narrative.jobs.util import load_job_constants + class JobUtilTestCase(unittest.TestCase): def test_load_job_constants__no_file(self): @@ -43,7 +44,7 @@ def test_load_job_constants__missing_value(self): ] with self.assertRaisesRegex( ValueError, - "job_config.json is missing the following values for params: BATCH_ID, JOB_ID", + "job_config.json is missing the following values for params: BATCH_ID, FIRST_LINE, JOB_ID, LATEST, NUM_LINES, TS", ): load_job_constants(file_path) diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index 871df66675..c526e1bebf 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -1,73 +1,72 @@ -import unittest -from unittest import mock -import os +import copy import itertools +import os import re -import copy +import unittest +from unittest import mock -from biokbase.narrative.jobs.jobmanager import ( - JobManager, - JOB_NOT_REG_ERR, - JOB_NOT_BATCH_ERR, - JOBS_MISSING_ERR, +from biokbase.narrative.exception_util import ( + JobRequestException, + NarrativeException, + transform_job_exception, ) from biokbase.narrative.jobs.jobcomm import ( - JobRequest, - JobComm, - exc_to_msg, CELLS_NOT_PROVIDED_ERR, - ONE_INPUT_TYPE_ONLY_ERR, INVALID_REQUEST_ERR, - MISSING_REQUEST_TYPE_ERR, MESSAGE_TYPE, + MISSING_REQUEST_TYPE_ERR, + ONE_INPUT_TYPE_ONLY_ERR, PARAM, + JobComm, + JobRequest, + exc_to_msg, ) -from biokbase.narrative.exception_util import ( - NarrativeException, - JobRequestException, - transform_job_exception, +from biokbase.narrative.jobs.jobmanager import ( + JOB_NOT_BATCH_ERR, + JOB_NOT_REG_ERR, + JOBS_MISSING_ERR, + JobManager, +) +from biokbase.narrative.tests.generate_test_results import ( + ALL_RESPONSE_DATA, + JOBS_BY_CELL_ID, + TEST_CELL_ID_LIST, + TEST_CELL_IDs, ) - -from .util import ConfigTests, validate_job_state from biokbase.narrative.tests.job_test_constants import ( + ACTIVE_JOBS, + ALL_JOBS, + BAD_JOB_ID, + BAD_JOB_ID_2, + BAD_JOBS, + BATCH_CHILDREN, + BATCH_COMPLETED, + BATCH_ERROR_RETRIED, + BATCH_PARENT, + BATCH_PARENT_CHILDREN, + BATCH_RETRY_RUNNING, + BATCH_TERMINATED, + BATCH_TERMINATED_RETRIED, CLIENTS, - MAX_LOG_LINES, JOB_COMPLETED, JOB_CREATED, - JOB_RUNNING, - JOB_TERMINATED, JOB_ERROR, - BATCH_PARENT, - BATCH_COMPLETED, - BATCH_TERMINATED, - BATCH_TERMINATED_RETRIED, - BATCH_ERROR_RETRIED, - BATCH_RETRY_RUNNING, JOB_NOT_FOUND, - BAD_JOB_ID, - BAD_JOB_ID_2, - ALL_JOBS, - BAD_JOBS, - ACTIVE_JOBS, + JOB_RUNNING, + JOB_TERMINATED, + MAX_LOG_LINES, REFRESH_STATE, - BATCH_PARENT_CHILDREN, - BATCH_CHILDREN, generate_error, ) -from biokbase.narrative.tests.generate_test_results import ( - ALL_RESPONSE_DATA, - TEST_CELL_ID_LIST, - TEST_CELL_IDs, - JOBS_BY_CELL_ID, -) -from .narrative_mock.mockcomm import MockComm from .narrative_mock.mockclients import ( - get_mock_client, - get_failing_mock_client, - generate_ee2_error, MockClients, + generate_ee2_error, + get_failing_mock_client, + get_mock_client, ) +from .narrative_mock.mockcomm import MockComm +from .util import ConfigTests, validate_job_state APP_NAME = "The Best App in the World" @@ -102,6 +101,8 @@ def make_comm_msg( msg_type: str, job_id_like, as_job_request: bool, content: dict = None ): + if content is None: + content = {} job_arguments = {} if isinstance(job_id_like, dict): job_arguments = job_id_like @@ -112,10 +113,9 @@ def make_comm_msg( msg = { "msg_id": "some_id", - "content": {"data": {"request_type": msg_type, **job_arguments}}, + "content": {"data": {"request_type": msg_type, **job_arguments, **content}}, } - if content is not None: - msg["content"]["data"].update(content) + if as_job_request: return JobRequest(msg) else: @@ -362,6 +362,22 @@ def test_req_no_inputs__fail(self): self.jc._handle_comm_message(req_dict) self.check_error_message(req_dict, err) + def test_req_multiple_inputs__fail(self): + functions = [ + CANCEL, + INFO, + LOGS, + RETRY, + STATUS, + ] + + for msg_type in functions: + req_dict = make_comm_msg(msg_type, {"job_id": "something", "batch_id": "another_thing"}, False) + err = JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) + with self.assertRaisesRegex(type(err), str(err)): + self.jc._handle_comm_message(req_dict) + self.check_error_message(req_dict, err) + # --------------------- # Start job status loop # --------------------- @@ -437,11 +453,12 @@ def test_start_job_status_loop__initialise_jobs_error(self): { "msg_type": ERROR, "content": { + "code": -32000, "error": "Unable to get initial jobs list", "message": "check_workspace_jobs failed", - "code": -32000, - "source": "ee2", "name": "JSONRPCError", + "request": "jc.start_job_status_loop", + "source": "ee2", }, }, ) @@ -454,7 +471,7 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): self.jm._jobs_by_cell_id = {} self.jm = JobManager() self.assertEqual(self.jm._running_jobs, {}) - # this will trigger a call to _get_all_job_states + # this will trigger a call to get_all_job_states # a message containing all jobs (i.e. {}) will be sent out # when it returns 0 jobs, the JobComm will run stop_job_status_loop self.jc.start_job_status_loop() @@ -542,7 +559,6 @@ def test_get_job_state__no_job(self): # ----------------------- # Lookup select job states # ----------------------- - def test_get_job_states__job_id__ok(self): self.check_job_output_states( params={JOB_ID: JOB_COMPLETED}, ok_states=[JOB_COMPLETED] @@ -829,6 +845,7 @@ def test_cancel_jobs__job_id_list__failure(self): job_id_list = [JOB_RUNNING, BATCH_RETRY_RUNNING] req_dict = make_comm_msg(CANCEL, job_id_list, False) output = self.jc._handle_comm_message(req_dict) + expected = { JOB_RUNNING: ALL_RESPONSE_DATA[STATUS][JOB_RUNNING], BATCH_RETRY_RUNNING: { @@ -1284,14 +1301,19 @@ def test_request_ok(self): rq.job_id_list def test_request_no_data(self): - rq_msg = {"msg_id": "some_id", "content": {}} - with self.assertRaisesRegex(JobRequestException, INVALID_REQUEST_ERR): - JobRequest(rq_msg) + rq_msg1 = {"msg_id": "some_id", "content": {}} + rq_msg2 = {"msg_id": "some_id", "content": {"data": {}}} + rq_msg3 = {"msg_id": "some_id", "content": {"data": None}} + rq_msg4 = {"msg_id": "some_id", "content": {"what": "?"}} + for msg in [rq_msg1, rq_msg2, rq_msg3, rq_msg4]: + with self.assertRaisesRegex(JobRequestException, INVALID_REQUEST_ERR): + JobRequest(msg) def test_request_no_req(self): - rq_msg = {"msg_id": "some_id", "content": {"data": {"request_type": None}}} - rq_msg2 = {"msg_id": "some_other_id", "content": {"data": {}}} - for msg in [rq_msg, rq_msg2]: + rq_msg1 = {"msg_id": "some_id", "content": {"data": {"request_type": None}}} + rq_msg2 = {"msg_id": "some_id", "content": {"data": {"request_type": ""}}} + rq_msg3 = {"msg_id": "some_id", "content": {"data": {"what": {}}}} + for msg in [rq_msg1, rq_msg2, rq_msg3]: with self.assertRaisesRegex(JobRequestException, MISSING_REQUEST_TYPE_ERR): JobRequest(msg) diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index ae7420f091..dc2c36bb05 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -1,78 +1,74 @@ -""" -Tests for job management -""" -import unittest import copy import itertools -from unittest import mock -import re import os +import re +import unittest from datetime import datetime +from unittest import mock + from IPython.display import HTML -from biokbase.narrative.jobs.jobmanager import ( - JobManager, - JOB_NOT_REG_ERR, - JOB_NOT_BATCH_ERR, - JOBS_MISSING_ERR, - CELLS_NOT_PROVIDED_ERR, -) +from biokbase.narrative.exception_util import JobRequestException, NarrativeException from biokbase.narrative.jobs.job import ( - Job, EXCLUDED_JOB_STATE_FIELDS, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, + Job, ) from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE -from biokbase.narrative.exception_util import ( - NarrativeException, - JobRequestException, +from biokbase.narrative.jobs.jobmanager import ( + CELLS_NOT_PROVIDED_ERR, + JOB_NOT_BATCH_ERR, + JOB_NOT_REG_ERR, + JOBS_MISSING_ERR, + JobManager, +) +from biokbase.narrative.tests.generate_test_results import ( + ALL_RESPONSE_DATA, + JOBS_BY_CELL_ID, + TEST_CELL_ID_LIST, + TEST_CELL_IDs, ) - -from .util import ConfigTests - from biokbase.narrative.tests.job_test_constants import ( + ACTIVE_JOBS, + ALL_JOBS, + BAD_JOB_ID, + BAD_JOBS, + BATCH_CHILDREN, + BATCH_ERROR_RETRIED, + BATCH_PARENT, + BATCH_TERMINATED, + BATCH_TERMINATED_RETRIED, CLIENTS, JOB_COMPLETED, JOB_CREATED, - JOB_RUNNING, - JOB_TERMINATED, JOB_ERROR, - BATCH_PARENT, - BATCH_TERMINATED, - BATCH_TERMINATED_RETRIED, - BATCH_ERROR_RETRIED, JOB_NOT_FOUND, - BAD_JOB_ID, - ALL_JOBS, - BAD_JOBS, - TERMINAL_JOBS, - ACTIVE_JOBS, + JOB_RUNNING, + JOB_TERMINATED, REFRESH_STATE, - BATCH_CHILDREN, + TERMINAL_JOBS, TEST_JOBS, - get_test_job, generate_error, -) - -from biokbase.narrative.tests.generate_test_results import ( - ALL_RESPONSE_DATA, - JOBS_BY_CELL_ID, - TEST_CELL_ID_LIST, - TEST_CELL_IDs, + get_test_job, ) from .narrative_mock.mockclients import ( - get_mock_client, - get_failing_mock_client, - assert_obj_method_called, MockClients, + assert_obj_method_called, + get_failing_mock_client, + get_mock_client, ) +from .util import ConfigTests TERMINAL_IDS = [JOB_COMPLETED, JOB_TERMINATED, JOB_ERROR] NON_TERMINAL_IDS = [JOB_CREATED, JOB_RUNNING] class JobManagerTest(unittest.TestCase): + """ + Tests for job management + """ + @classmethod @mock.patch(CLIENTS, get_mock_client) def setUpClass(cls): @@ -235,7 +231,7 @@ def test__construct_job_output_state_set__empty_list(self): @mock.patch(CLIENTS, get_mock_client) def test__construct_job_output_state_set__ee2_error(self): exc = Exception("Test exception") - exc_message = str(exc) + exc_msg = str(exc) def mock_check_jobs(params): raise exc @@ -250,7 +246,7 @@ def mock_check_jobs(params): for job_id in ACTIVE_JOBS: # expect there to be an error message added - expected[job_id]["error"] = exc_message + expected[job_id]["error"] = exc_msg self.assertEqual( expected,