From 07fd057011ee7808bb3f0a73f71db7cb7d68a270 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Wed, 4 May 2022 11:01:59 -0700 Subject: [PATCH 1/2] Merge DATAUP-729-job-ts into develop --- .../static/kbase/config/job_config.json | 3 +- src/biokbase/narrative/jobs/job.py | 105 +++++-- src/biokbase/narrative/jobs/jobcomm.py | 99 +++--- src/biokbase/narrative/jobs/jobmanager.py | 209 +++++++++---- src/biokbase/narrative/jobs/util.py | 15 +- .../narrative/tests/job_test_constants.py | 6 + src/biokbase/narrative/tests/test_job.py | 9 +- src/biokbase/narrative/tests/test_job_util.py | 2 +- src/biokbase/narrative/tests/test_jobcomm.py | 169 +++++++++- .../narrative/tests/test_jobmanager.py | 294 +++++++++++++++++- 10 files changed, 750 insertions(+), 161 deletions(-) diff --git a/kbase-extension/static/kbase/config/job_config.json b/kbase-extension/static/kbase/config/job_config.json index c9ac1dacac..2274aea9e4 100644 --- a/kbase-extension/static/kbase/config/job_config.json +++ b/kbase-extension/static/kbase/config/job_config.json @@ -7,7 +7,8 @@ "JOB_ID_LIST": "job_id_list", "FIRST_LINE": "first_line", "LATEST": "latest", - "NUM_LINES": "num_lines" + "NUM_LINES": "num_lines", + "TS": "ts" }, "message_types": { "CANCEL": "cancel_job", diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index fb16b9014b..ff3ce15fa6 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -100,7 +100,7 @@ def __init__(self, ee2_state, extra_data=None, children=None): if ee2_state.get("job_id") is None: raise ValueError("Cannot create a job without a job ID!") - self._acc_state = ee2_state + self._update_state(ee2_state) self.extra_data = extra_data # verify parent-children relationship @@ -325,20 +325,31 @@ def _update_state(self, state: dict) -> None: """ given a state data structure (as emitted by ee2), update the stored state in the job object """ - if state: + if not isinstance(state, dict): + raise TypeError("state must be a dict") + # Check job_id match + if self._acc_state: if "job_id" in state and state["job_id"] != self.job_id: raise ValueError( f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}" ) - state = copy.deepcopy(state) - if self._acc_state is None: - self._acc_state = state - else: - self._acc_state.update(state) + # Check if there would be no change in updating + # i.e., if state <= self._acc_state + if self._acc_state is not None: + if {**self._acc_state, **state} == self._acc_state: + return - def state(self, force_refresh=False): + state = copy.deepcopy(state) + if self._acc_state is None: + self._acc_state = state + else: + self._acc_state.update(state) + + self.last_updated = time.time_ns() + + def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ Queries the job service to see the state of the current job. """ @@ -347,7 +358,7 @@ def state(self, force_refresh=False): state = self.query_ee2_state(self.job_id, init=False) self._update_state(state) - return self._internal_state(JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + return self._internal_state(exclude) def _internal_state(self, exclude=None): """Wrapper for self._acc_state""" @@ -355,39 +366,65 @@ def _internal_state(self, exclude=None): self._trim_ee2_state(state, exclude) return state - def output_state(self, state=None) -> dict: + def output_state(self, state=None, no_refresh=False) -> dict: """ - :param state: can be queried individually from ee2/cache with self.state(), - but sometimes want it to be queried in bulk from ee2 upstream - :return: dict, with structure + :param state: Supplied when the state is queried beforehand from EE2 in bulk, + or when it is retrieved from a cache. If not supplied, must be + queried with self.state() or self._internal_state() + :return: dict, with structure { - outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result - jobState: { - job_id: string, - status: string, - created: epoch ms, - updated: epoch ms, - queued: optional - epoch ms, - finished: optional - epoc ms, - terminated_code: optional - int, - tag: string (release, beta, dev), - parent_job_id: optional - string or null, - run_id: string, - cell_id: string, - errormsg: optional - string, - error (optional): { - code: int, - name: string, - message: string (should be for the user to read), - error: string, (likely a stacktrace) + "job_id": string, + "jobState": { + "job_id": string, + "status": string - enum, + "batch_id": string or None, + "batch_job": bool, + "child_jobs": list, + "created": epoch ms, + "updated": epoch ms, + "queued": epoch ms, + "running": epoch ms, + "finished": epoch ms, + "tag": string (release, beta, dev), + "run_id": string, + "cell_id": string, + "job_output": { # completed jobs only + "version": string, + "result": [ + { + # result params, e.g. + "report_name": string, + "report_ref": string, + } + ], + "id": string + }, + "terminated_code": terminated jobs only; optional - int, + "error": { # jobs that did not complete successfully + "code": int, + "name": string, + "message": string (should be for the user to read), + "error": string, (likely a stacktrace) }, - error_code: optional - int + "errormsg": optional - string, + "error_code": optional - int + }, + "outputWidgetInfo": { # None if job does not have status "completed" + "name": string, + "tag": string - (release, beta, dev), + "params": { + # output widget params, e.g. + "report_name": string, + "report_ref": string + } } + } + :rtype: dict """ if not state: - state = self.state() + state = self._internal_state() if no_refresh else self.state() else: self._update_state(state) state = self._internal_state() diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index 282617a371..a59747a35b 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -1,6 +1,7 @@ import copy import threading from typing import List, Union +import time from ipykernel.comm import Comm from biokbase.narrative.jobs.util import load_job_constants from biokbase.narrative.jobs.jobmanager import JobManager @@ -23,7 +24,6 @@ class JobRequest: """ - A small wrapper for job comm channel request data. This generally comes in the form of a packet from the kernel Comm object. It is expected to be a dict of the format: @@ -34,11 +34,12 @@ class JobRequest: 'request_type': job function requested 'job_id': optional 'job_id_list': optional + 'batch_id': optional } } } - This little wrapper fills 2 roles: + This little wrapper fills two roles: 1. It validates that the packet is correct and has some expected values. 2. If there's a job_id field, it makes sure that it's real (by asking the JobManager - avoids a bunch of duplicate validation code) @@ -52,32 +53,33 @@ class JobRequest: Provides the following attributes: raw_request dict the unedited request received by the job comm msg_id str unique message id - request_type str the function to perform. This isn't - strictly controlled here, but by JobComm._handle_comm_message. + request_type str the function to perform. This isn't strictly controlled + here, but by JobComm._handle_comm_message. rq_data dict the actual data of the request. Contains the request - type and other parameters specific to the function to be performed + type and other parameters specific to the function to be + performed - Optional: + The IDs of the job(s) to perform the function on (optional): job_id str job_id_list list(str) batch_id str - - The IDs of the job(s) to perform the function on. - """ + INPUT_TYPES = [PARAM["JOB_ID"], PARAM["JOB_ID_LIST"], PARAM["BATCH_ID"]] + def __init__(self, rq: dict): - self.raw_request = copy.deepcopy(rq) + rq = copy.deepcopy(rq) + self.raw_request = rq self.msg_id = rq.get("msg_id") # might be useful later? self.rq_data = rq.get("content", {}).get("data") - if self.rq_data is None: + if not self.rq_data: raise JobRequestException(INVALID_REQUEST_ERR) self.request_type = self.rq_data.get("request_type") - if self.request_type is None: + if not self.request_type: raise JobRequestException(MISSING_REQUEST_TYPE_ERR) input_type_count = 0 - for input_type in [PARAM["JOB_ID"], PARAM["JOB_ID_LIST"], PARAM["BATCH_ID"]]: + for input_type in self.INPUT_TYPES: if input_type in self.rq_data: input_type_count += 1 if input_type_count > 1: @@ -103,6 +105,9 @@ def batch_id(self): return self.rq_data[PARAM["BATCH_ID"]] raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) + def has_job_ids(self): + return any([input_type in self.rq_data for input_type in self.INPUT_TYPES]) + def has_batch_id(self): return PARAM["BATCH_ID"] in self.rq_data @@ -112,6 +117,11 @@ def cell_id_list(self): return self.rq_data[PARAM["CELL_ID_LIST"]] raise JobRequestException(CELLS_NOT_PROVIDED_ERR) + @property + def ts(self): + """This param is completely optional""" + return self.rq_data.get(PARAM["TS"]) + class JobComm: """ @@ -189,13 +199,11 @@ def _get_job_ids(self, req: JobRequest) -> List[str]: :return: list of job IDs :rtype: List[str] """ + if not req.has_job_ids(): + raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) if req.has_batch_id(): return self._jm.update_batch_job(req.batch_id) - - try: - return req.job_id_list - except Exception as ex: - raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex + return req.job_id_list def start_job_status_loop( self, @@ -215,11 +223,12 @@ def start_job_status_loop( self._jm.initialize_jobs(cell_list) except Exception as e: error = { - "error": "Unable to get initial jobs list", + "source": getattr(e, "source", "jc.start_job_status_loop"), + "request": getattr(e, "request", "jc.start_job_status_loop"), + "name": getattr(e, "name", type(e).__name__), "message": getattr(e, "message", UNKNOWN_REASON), + "error": "Unable to get initial jobs list", "code": getattr(e, "code", -1), - "source": getattr(e, "source", "jobmanager"), - "name": getattr(e, "name", type(e).__name__), } self.send_comm_message(MESSAGE_TYPE["ERROR"], error) # if job init failed, set the lookup loop var back to False and return @@ -323,13 +332,12 @@ def _get_job_info(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["INFO"], job_info) return job_info - def __get_job_states(self, job_id_list) -> dict: + def _get_send_job_states(self, job_id_list: list, ts: int = None) -> dict: """ Retrieves the job states for the supplied job_ids. See Job.output_state() for details of job state structure. - TODO: update as required If the ts parameter is present, only jobs that have been updated since that time are returned. Jobs that cannot be found in the `_running_jobs` index will return @@ -346,7 +354,7 @@ def __get_job_states(self, job_id_list) -> dict: :return: dictionary of job states, indexed by job ID :rtype: dict """ - output_states = self._jm.get_job_states(job_id_list) + output_states = self._jm.get_job_states(job_id_list, ts) self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states @@ -363,7 +371,7 @@ def get_job_state(self, job_id: str) -> dict: :return: dictionary of job states, indexed by job ID :rtype: dict """ - return self.__get_job_states([job_id]) + return self._get_send_job_states([job_id]) def _get_job_states(self, req: JobRequest) -> dict: """ @@ -378,7 +386,7 @@ def _get_job_states(self, req: JobRequest) -> dict: :rtype: dict """ job_id_list = self._get_job_ids(req) - return self.__get_job_states(job_id_list) + return self._get_send_job_states(job_id_list, req.ts) def _modify_job_updates(self, req: JobRequest) -> dict: """ @@ -507,6 +515,9 @@ def send_comm_message(self, msg_type: str, content: dict) -> None: Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type and content. These just get encoded into the message itself. """ + if msg_type == MESSAGE_TYPE["STATUS"]: + content["last_checked"] = time.time_ns() + msg = {"msg_type": msg_type, "content": content} self._comm.send(msg) @@ -523,14 +534,18 @@ def send_error_message( It can also be a string or None if this context manager is invoked outside of a JC request Job error messages have the format: + { - request: the original JobRequest data object, function params, or function name - source: the function request that spawned the error - other fields about the error, dependent on the content. + "msg_type": "job_error", + "content": { + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, + "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + **{any extra error information} + } } :param req: job request context, either a job request received over the channel or a string - :type req: Union[JobRequest, dict, str] + :type req: JobRequest, dict, str, or NoneType :param content: dictionary of extra data to include in the error message, defaults to None :type content: dict, optional """ @@ -546,7 +561,7 @@ def send_error_message( error_content["request"] = req error_content["source"] = req - if content is not None: + if content: error_content.update(content) self.send_comm_message(MESSAGE_TYPE["ERROR"], error_content) @@ -554,7 +569,8 @@ def send_error_message( class exc_to_msg: """ - This is a context manager to wrap around JC code + This is a context manager to wrap around JobComm code in order to catch any exception, + send it back as a comm error messages, and then re-raise that exception """ jc = JobComm() @@ -574,17 +590,18 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, exc_tb): """ - If exception is caught, will send job comm message in this format + If an exception is caught during execution in the JobComm code, + this will send back a comm error message like: { - "msg_type": ERROR, + "msg_type": "job_error", "content": { - "source": "request_type", - "job_id": "0123456789abcdef", # or job_id_list. optional and mutually exclusive - "name": "ValueError", - "message": "Something happened", - #---------- Below, NarrativeException only ----------- - "code": -1, - "error": "Unable to complete this request" + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, + "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + "name": exception name, # e.g., ValueError + "message": exception message, # e.g. "Something specifically went wrong!" + #---------- Below, for NarrativeException only ----------- + "error": exception error attribute, # e.g. "Unable to complete this request" + "code": exception code attribute, # e.g., -1 } } Will then re-raise exception diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index 8ee61aebc5..b535130b9b 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -1,11 +1,14 @@ from IPython.display import HTML from jinja2 import Template from datetime import datetime, timezone, timedelta -from typing import List, Tuple +import copy +from enum import Enum +from itertools import cycle +from typing import List, Tuple, Union +from collections.abc import Iterable import biokbase.narrative.clients as clients from .job import ( Job, - EXCLUDED_JOB_STATE_FIELDS, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, ) from biokbase.narrative.common import kblogging @@ -35,7 +38,57 @@ JOBS_MISSING_ERR = "No valid job IDs provided" CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided" -DOES_NOT_EXIST = "does_not_exist" + + +class OutputStateErrMsg(Enum): + """ + For errors that go into the STATUS response. + Enum mapping from error names to formattable error messages. + The first formatting argument of each error message must be the job ID + """ + + NOT_FOUND = "Cannot find job with ID %s" + NOT_UPDATED = "Job with ID %s has not been updated since ts %d" + QUERY_EE2_STATES = "A Job.query_ee2_states error occurred for job with ID %s: %s" + CANCEL = "An EE2.cancel_job error occurred for job with ID %s: %s" + + def gen_err_msg(self, its: List[Union[str, int, Iterable]]): + """ + Create a generator for filled in enum values + + :param its: A list of arguments, where each argument + should be either a literal or an iterable of literals + The iterables will then be zipped and used to format the enum values. + The first argument must be the list of job_ids + """ + if not isinstance(its, list): + raise TypeError( + "Argument its must be of type list" + ) + # check number of arguments matches number of %s or %d to be formatted + num_format = self.value.count("%") + if num_format != len(its): + raise ValueError( + f"{self.__class__.__name__}.{self.name} must be formatted with {num_format} argument(s). " + f"Received {len(its)} argument(s)" + ) + + for i, e in enumerate(its): + # if element is a literal, convert to iterable of literal + if isinstance(e, str) or isinstance(e, int): + its[i] = cycle([e]) + + def gen(): + for tup in zip(*its): + yield self.value % tup + + return gen() + + def replace_result(self) -> bool: + names = [e.name for e in list(OutputStateErrMsg)] + replace = [True, True, False, False] + ans = dict(zip(names, replace)) + return ans[self.name] class JobManager: @@ -197,13 +250,7 @@ def _create_jobs(self, job_ids: List[str]) -> dict: if not job_ids: return {} - job_states = clients.get("execution_engine2").check_jobs( - { - "job_ids": job_ids, - "exclude_fields": JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, - "return_list": 0, - } - ) + job_states = Job.query_ee2_states(job_ids, init=True) for job_state in job_states.values(): # do not set new jobs to be automatically refreshed - if the front end wants them # refreshed, it'll make a request. @@ -233,21 +280,37 @@ def _construct_job_output_state_set( """ Builds a set of job states for the list of job ids. + Precondition: job_ids already validated + The job output state for a given job ID is generated by job.output_state() and has the basic form: { - "job_id": string, - "jobState": dictionary containing job data from EE2, - "outputWidgetInfo": dictionary | None - "batch_id": string if present; otherwise None, - } - - If the call to refresh job data from ee2 fails, the job output state will have an additional - 'error' key with the error message from the failed data refresh - { - "job_id": string, + "job_id_0": { # dict generated by job.output_state() + "job_id": "job_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": { + ... + } + }, + "job_id_1": { # dict generated by job.output_state() + "job_id": "job_id_1": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": None + }, ..., - "error": + "job_id_2": { # dict generated by job.output_state() with EE2 error message added + "job_id": "job_id_2", + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": ..., + "error": + }, + ... } :param job_ids: list of job IDs @@ -257,9 +320,7 @@ def _construct_job_output_state_set( :raises JobRequestException: if job_ids is not a list - :return: dict containing the output_state for each job, indexed by job ID. If an error - occurs during job look up on ee2, a message will be added to the output_state for the - job under the key 'error' + :return: dict containing the output_state for each job, indexed by job ID. :rtype: dict """ if not isinstance(job_ids, list): @@ -287,13 +348,7 @@ def _construct_job_output_state_set( # Get the rest of states direct from EE2. if jobs_to_lookup: try: - fetched_states = clients.get("execution_engine2").check_jobs( - { - "job_ids": jobs_to_lookup, - "exclude_fields": EXCLUDED_JOB_STATE_FIELDS, - "return_list": 0, - } - ) + fetched_states = Job.query_ee2_states(jobs_to_lookup, init=False) except Exception as e: error_message = str(e) kblogging.log_event( @@ -311,18 +366,20 @@ def _construct_job_output_state_set( output_states[job_id] = job.output_state(fetched_states[job_id]) else: # fetch the current state without updating it - output_states[job_id] = job.output_state({}) - # add an error field with the error message from the failed look up - output_states[job_id]["error"] = error_message + output_states[job_id] = job.output_state(no_refresh=True) + + failed_ids = [job_id for job_id in jobs_to_lookup if job_id not in fetched_states] + if failed_ids: + self.add_errors_to_results( + output_states, failed_ids, OutputStateErrMsg.QUERY_EE2_STATES, error_message + ) return output_states def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: """ - Retrieves the job states for the supplied job_ids. - - TODO: UPDATE ME! - If the ts parameter is present, only jobs that have been updated since that time are returned. + Retrieves the job states for the supplied job_ids, with the option to + replace any jobs that have not been updated since ts with a short stub Jobs that cannot be found in the `_running_jobs` index will return { @@ -338,9 +395,16 @@ def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: :return: dictionary of job states, indexed by job ID :rtype: dict """ - job_ids, error_ids = self._check_job_list(job_ids) + job_ids, not_found_ids = self._check_job_list(job_ids) output_states = self._construct_job_output_state_set(job_ids) - return self.add_errors_to_results(output_states, error_ids) + not_updated_ids = [] + if ts is not None: + for job_id in output_states: + if self.get_job(job_id).last_updated <= ts: + not_updated_ids.append(job_id) + self.add_errors_to_results(output_states, not_found_ids, OutputStateErrMsg.NOT_FOUND) + self.add_errors_to_results(output_states, not_updated_ids, OutputStateErrMsg.NOT_UPDATED, ts) + return output_states def get_all_job_states(self, ignore_refresh_flag=False) -> dict: """ @@ -358,7 +422,7 @@ def get_all_job_states(self, ignore_refresh_flag=False) -> dict: jobs_to_lookup = [] # grab the list of running job ids, so we don't run into update-while-iterating problems. - for job_id in self._running_jobs.keys(): + for job_id in self._running_jobs: if self._running_jobs[job_id]["refresh"] or ignore_refresh_flag: jobs_to_lookup.append(job_id) if len(jobs_to_lookup) > 0: @@ -446,7 +510,7 @@ def get_job_info(self, job_ids: List[str]) -> dict: "job_id": job_id, "job_params": job.params, } - return self.add_errors_to_results(infos, error_ids) + return self.add_errors_to_results(infos, error_ids, OutputStateErrMsg.NOT_FOUND) def get_job_logs( self, @@ -565,7 +629,7 @@ def get_job_logs_for_list( for job_id in job_ids: output[job_id] = self.get_job_logs(job_id, first_line, num_lines, latest) - return self.add_errors_to_results(output, error_ids) + return self.add_errors_to_results(output, error_ids, OutputStateErrMsg.NOT_FOUND) def cancel_jobs(self, job_id_list: List[str]) -> dict: """ @@ -599,10 +663,10 @@ def cancel_jobs(self, job_id_list: List[str]) -> dict: error_states[job_id] = error.message job_states = self._construct_job_output_state_set(job_ids) - for job_id in error_states: - job_states[job_id]["error"] = error_states[job_id] - return self.add_errors_to_results(job_states, error_ids) + self.add_errors_to_results(job_states, list(error_states.keys()), OutputStateErrMsg.CANCEL, list(error_states.values())) + self.add_errors_to_results(job_states, error_ids, OutputStateErrMsg.NOT_FOUND) + return job_states def _cancel_job(self, job_id: str) -> None: """ @@ -687,39 +751,62 @@ def retry_jobs(self, job_id_list: List[str]) -> dict: retry_states = self._construct_job_output_state_set( retry_ids, self._create_jobs(retry_ids) # add to self._running_jobs index ) - job_states = {**orig_states, **retry_states} results_by_job_id = {} # fill in the job state details for result in retry_results: job_id = result["job_id"] - results_by_job_id[job_id] = {"job_id": job_id, "job": job_states[job_id]} + results_by_job_id[job_id] = {"job_id": job_id, "job": orig_states[job_id]} if "retry_id" in result: retry_id = result["retry_id"] - results_by_job_id[job_id]["retry_id"] = retry_id - results_by_job_id[job_id]["retry"] = job_states[retry_id] + results_by_job_id[job_id].update( + {"retry_id": retry_id, "retry": retry_states[retry_id]} + ) if "error" in result: results_by_job_id[job_id]["error"] = result["error"] - return self.add_errors_to_results(results_by_job_id, error_ids) + return self.add_errors_to_results(results_by_job_id, error_ids, OutputStateErrMsg.NOT_FOUND) - def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict: + @staticmethod + def add_errors_to_results( + results: dict, error_ids: List[str], error_enum: OutputStateErrMsg, *extra_its: Tuple + ) -> dict: """ - Add the generic "not found" error for each job_id in error_ids. + Add error states to results :param results: dictionary of job data (output state, info, retry, etc.) indexed by job ID :type results: dict :param error_ids: list of IDs that could not be found :type error_ids: List[str] + :param error_enum: an enum instance from JobStateErrMsg + :type error_enum: JobStateErrMsg + :param its: any extra arguments to feed into error_enum.gen_str_fill to format the error message + :type its: list - :return: input results dictionary augmented by a dictionary containing job ID and a short - not found error message for every ID in the error_ids list + :return: input results augmented by either extra error dictionaries or errors in existing dictionaries :rtype: dict """ - for error_id in error_ids: - results[error_id] = { - "job_id": error_id, - "error": f"Cannot find job with ID {error_id}", - } + # create generator yielding error messages + gen_err_msg = error_enum.gen_err_msg([error_ids] + list(extra_its)) + + for error_id, err_msg in zip(error_ids, gen_err_msg): + # if there's already an error there + if error_id in results and "error" in results[error_id]: + existing_error = results[error_id]["error"] + # concatenate the errors (works recursively) + err_msg = f"{existing_error}\n{err_msg}" + + if error_enum.replace_result(): + results[error_id] = { + "job_id": error_id, + "error": err_msg, + } + else: + if error_id not in results: + raise ValueError(f"Cannot add error because response dict is missing key {error_id}") + results[error_id].update( + {"error": err_msg} + ) + return results def modify_job_refresh(self, job_ids: List[str], update_refresh: bool) -> None: @@ -770,7 +857,7 @@ def list_jobs(self): """ try: all_states = self.get_all_job_states(ignore_refresh_flag=True) - state_list = [s["jobState"] for s in all_states.values()] + state_list = [copy.deepcopy(s["jobState"]) for s in all_states.values()] if not state_list: return "No running jobs!" diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 0174485a9c..25b721e5e5 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -15,8 +15,8 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): Load the job-related terms that are shared by front- and back ends. """ full_path = [os.environ["NARRATIVE_DIR"]] + relative_path_to_file - config_json = open(os.path.join(*full_path)).read() - config = json.loads(config_json) + with open(os.path.join(*full_path)) as fh: + config = json.load(fh) REQUIRED = { "message_types": [ "CANCEL", @@ -31,7 +31,16 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): "ERROR", "RUN_STATUS", ], - "params": ["BATCH_ID", "CELL_ID_LIST", "JOB_ID", "JOB_ID_LIST"], + "params": [ + "BATCH_ID", + "CELL_ID_LIST", + "FIRST_LINE", + "JOB_ID_LIST", + "JOB_ID", + "LATEST", + "NUM_LINES", + "TS" + ], } # ensure we have all the required message type and param names diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index 68bd9b2a51..d78adf632b 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -35,8 +35,14 @@ def get_test_job(job_id): return copy.deepcopy(TEST_JOBS[job_id]) +def get_test_jobs(job_ids): + return {job_id: get_test_job(job_id) for job_id in job_ids} + + CLIENTS = "biokbase.narrative.clients.get" +TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" +TEST_EPOCH_NS = 42 # arbitrary epoch ns MAX_LOG_LINES = 10 # test_jobs contains jobs in the following states diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 1b60ea892c..7594fcaab1 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -377,13 +377,20 @@ def mock_state(self, state=None): state = job.output_state() self.assertEqual(expected, state) + # TODO: improve this test def test_job_update__no_state(self): """ test that without a state object supplied, the job state is unchanged """ job = create_job_from_ee2(JOB_CREATED) self.assertFalse(job.was_terminal()) - job._update_state(None) + + # should fail with error 'state must be a dict' + with self.assertRaisesRegex(TypeError, "state must be a dict"): + job._update_state(None) + self.assertFalse(job.was_terminal()) + + job._update_state({}) self.assertFalse(job.was_terminal()) @mock.patch(CLIENTS, get_mock_client) diff --git a/src/biokbase/narrative/tests/test_job_util.py b/src/biokbase/narrative/tests/test_job_util.py index b6a2fc5105..d0952fb474 100644 --- a/src/biokbase/narrative/tests/test_job_util.py +++ b/src/biokbase/narrative/tests/test_job_util.py @@ -43,7 +43,7 @@ def test_load_job_constants__missing_value(self): ] with self.assertRaisesRegex( ValueError, - "job_config.json is missing the following values for params: BATCH_ID, JOB_ID", + "job_config.json is missing the following values for params: BATCH_ID, FIRST_LINE, JOB_ID, LATEST, NUM_LINES, TS", ): load_job_constants(file_path) diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index 871df66675..f61c1aa9e0 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -4,9 +4,11 @@ import itertools import re import copy +import time from biokbase.narrative.jobs.jobmanager import ( JobManager, + OutputStateErrMsg, JOB_NOT_REG_ERR, JOB_NOT_BATCH_ERR, JOBS_MISSING_ERR, @@ -31,6 +33,8 @@ from .util import ConfigTests, validate_job_state from biokbase.narrative.tests.job_test_constants import ( CLIENTS, + TIME_NS, + TEST_EPOCH_NS, MAX_LOG_LINES, JOB_COMPLETED, JOB_CREATED, @@ -53,6 +57,7 @@ BATCH_PARENT_CHILDREN, BATCH_CHILDREN, generate_error, + get_test_jobs, ) from biokbase.narrative.tests.generate_test_results import ( ALL_RESPONSE_DATA, @@ -102,6 +107,8 @@ def make_comm_msg( msg_type: str, job_id_like, as_job_request: bool, content: dict = None ): + if content is None: + content = {} job_arguments = {} if isinstance(job_id_like, dict): job_arguments = job_id_like @@ -112,16 +119,29 @@ def make_comm_msg( msg = { "msg_id": "some_id", - "content": {"data": {"request_type": msg_type, **job_arguments}}, + "content": {"data": {"request_type": msg_type, **job_arguments, **content}}, } - if content is not None: - msg["content"]["data"].update(content) + if as_job_request: return JobRequest(msg) else: return msg +def ts_are_close(t0: int, t1: int, tol: float = 1) -> bool: + """Check if two times, in ns, are "close" + + Args: + t0 (int): time in ns + t1 (int): time in ns + tol (float, optional): tolerated discrepancy between the times, in s. Defaults to 1. + + Returns: + bool: Whether the two times differ by less than the tolerance + """ + return abs(t1 - t0) * 1e-9 <= 1 + + class JobCommTestCase(unittest.TestCase): maxDiff = None @@ -362,6 +382,22 @@ def test_req_no_inputs__fail(self): self.jc._handle_comm_message(req_dict) self.check_error_message(req_dict, err) + def test_req_multiple_inputs__fail(self): + functions = [ + CANCEL, + INFO, + LOGS, + RETRY, + STATUS, + ] + + for msg_type in functions: + req_dict = make_comm_msg(msg_type, {"job_id": "something", "batch_id": "another_thing"}, False) + err = JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) + with self.assertRaisesRegex(type(err), str(err)): + self.jc._handle_comm_message(req_dict) + self.check_error_message(req_dict, err) + # --------------------- # Start job status loop # --------------------- @@ -437,11 +473,12 @@ def test_start_job_status_loop__initialise_jobs_error(self): { "msg_type": ERROR, "content": { + "code": -32000, "error": "Unable to get initial jobs list", "message": "check_workspace_jobs failed", - "code": -32000, - "source": "ee2", "name": "JSONRPCError", + "request": "jc.start_job_status_loop", + "source": "ee2", }, }, ) @@ -469,6 +506,7 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): # --------------------- @mock.patch(CLIENTS, get_mock_client) + @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def check_job_output_states( self, output_states=None, @@ -477,6 +515,7 @@ def check_job_output_states( response_type=STATUS, ok_states=None, error_states=None, + last_checked=TEST_EPOCH_NS, ): """ Handle any request that returns a dictionary of job state objects; this @@ -489,6 +528,7 @@ def check_job_output_states( :param params: params for the comm message (opt) :param ok_states: list of job IDs expected to be in the output :param error_states: list of job IDs expected to return a not found error + :param last_checked: ts in ns """ if not params: params = {} @@ -510,6 +550,11 @@ def check_job_output_states( msg, ) + if response_type == STATUS: + self._check_pop_last_checked(output_states, last_checked) + else: + self.assertNotIn("last_checked", output_states) + for job_id, state in output_states.items(): self.assertEqual(ALL_RESPONSE_DATA[STATUS][job_id], state) if job_id in ok_states: @@ -527,6 +572,7 @@ def test_get_all_job_states__ok(self): # ----------------------- # Lookup single job state # ----------------------- + @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_state__1_ok(self): output_states = self.jc.get_job_state(JOB_COMPLETED) self.check_job_output_states( @@ -539,9 +585,22 @@ def test_get_job_state__no_job(self): ): self.jc.get_job_state(None) + def test_lookup_job_state__live_ts(self): + output_states = self.jc.get_job_state(JOB_COMPLETED) + self.assertTrue( + ts_are_close(output_states["last_checked"], time.time_ns()) + ) + # ----------------------- # Lookup select job states # ----------------------- + def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS): + self.assertIn("last_checked", output_states) + self.assertTrue( + last_checked == output_states["last_checked"] + or ts_are_close(last_checked, output_states["last_checked"]) + ) + del output_states["last_checked"] def test_get_job_states__job_id__ok(self): self.check_job_output_states( @@ -596,6 +655,7 @@ def test_get_job_states__batch_id__not_batch(self): self.check_batch_id__not_batch_test(STATUS) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_states__job_id_list__ee2_error(self): exc = Exception("Test exception") exc_message = str(exc) @@ -610,10 +670,12 @@ def mock_check_jobs(params): self.jc._handle_comm_message(req_dict) msg = self.jc._comm.last_message + self._check_pop_last_checked(msg["content"], TEST_EPOCH_NS) + expected = {job_id: copy.deepcopy(ALL_RESPONSE_DATA[STATUS][job_id]) for job_id in ALL_JOBS} for job_id in ACTIVE_JOBS: # add in the ee2_error message - expected[job_id]["error"] = exc_message + expected[job_id]["error"] = OutputStateErrMsg.QUERY_EE2_STATES.value % (job_id, exc_message) self.assertEqual( { @@ -623,6 +685,79 @@ def mock_check_jobs(params): msg, ) + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__err(self): + """ + """ + # what FE would say was the last time the jobs were checked + NOW = time.time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + # error ids + not_found_ids = [JOB_NOT_FOUND] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # output_states will be partitioned as not_found_ids + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) + updated_active_ids = list(set(updated_ids) & set(active_ids)) # (yes, redundant) + + def mock_check_jobs(params): + """Update appropriate job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for job_id, job_state in job_states_ret.items(): + # if job was updated, return an updated version + if job_id in updated_active_ids: + job_state["updated"] += 1 + return job_states_ret + + rq = make_comm_msg(STATUS, job_ids + not_found_ids, False, {"ts": NOW}) + with mock.patch.object(MockClients, "check_jobs", side_effect=mock_check_jobs): + output_states = self.jc._handle_comm_message(rq) + + # checks + exp_updated_output_states = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in updated_active_ids + } + for job_state in exp_updated_output_states.values(): + job_state["jobState"]["updated"] += 1 + + expected = { + # corresponding to not_found_ids + **{ + job_id: { + "job_id": job_id, + "error": OutputStateErrMsg.NOT_FOUND.value % job_id + } + for job_id in not_found_ids + }, + # corresponding to updated_active_ids + **exp_updated_output_states, + # corresponding to not_updated_active_ids and terminal_ids + **{ + job_id: { + "job_id": job_id, + "error": OutputStateErrMsg.NOT_UPDATED.value % (job_id, NOW) + } + for job_id in not_updated_active_ids + terminal_ids + }, + } + + self._check_pop_last_checked(output_states, NOW) + self.assertEqual( + expected, + output_states + ) + # ----------------------- # get cell job states # ----------------------- @@ -824,16 +959,19 @@ def test_cancel_jobs__job_id_list__all_bad_jobs(self): ) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_cancel_jobs__job_id_list__failure(self): # the mock client will throw an error with BATCH_RETRY_RUNNING job_id_list = [JOB_RUNNING, BATCH_RETRY_RUNNING] req_dict = make_comm_msg(CANCEL, job_id_list, False) output = self.jc._handle_comm_message(req_dict) + self._check_pop_last_checked(output) + expected = { JOB_RUNNING: ALL_RESPONSE_DATA[STATUS][JOB_RUNNING], BATCH_RETRY_RUNNING: { **ALL_RESPONSE_DATA[STATUS][BATCH_RETRY_RUNNING], - "error": CANCEL + " failed", + "error": OutputStateErrMsg.CANCEL.value % (BATCH_RETRY_RUNNING, CANCEL + " failed"), }, } @@ -1284,14 +1422,19 @@ def test_request_ok(self): rq.job_id_list def test_request_no_data(self): - rq_msg = {"msg_id": "some_id", "content": {}} - with self.assertRaisesRegex(JobRequestException, INVALID_REQUEST_ERR): - JobRequest(rq_msg) + rq_msg1 = {"msg_id": "some_id", "content": {}} + rq_msg2 = {"msg_id": "some_id", "content": {"data": {}}} + rq_msg3 = {"msg_id": "some_id", "content": {"data": None}} + rq_msg4 = {"msg_id": "some_id", "content": {"what": "?"}} + for msg in [rq_msg1, rq_msg2, rq_msg3, rq_msg4]: + with self.assertRaisesRegex(JobRequestException, INVALID_REQUEST_ERR): + JobRequest(msg) def test_request_no_req(self): - rq_msg = {"msg_id": "some_id", "content": {"data": {"request_type": None}}} - rq_msg2 = {"msg_id": "some_other_id", "content": {"data": {}}} - for msg in [rq_msg, rq_msg2]: + rq_msg1 = {"msg_id": "some_id", "content": {"data": {"request_type": None}}} + rq_msg2 = {"msg_id": "some_id", "content": {"data": {"request_type": ""}}} + rq_msg3 = {"msg_id": "some_id", "content": {"data": {"what": {}}}} + for msg in [rq_msg1, rq_msg2, rq_msg3]: with self.assertRaisesRegex(JobRequestException, MISSING_REQUEST_TYPE_ERR): JobRequest(msg) diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index ae7420f091..5ac28b69de 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -7,11 +7,14 @@ from unittest import mock import re import os +from typing import List, Tuple +import time from datetime import datetime from IPython.display import HTML from biokbase.narrative.jobs.jobmanager import ( JobManager, + OutputStateErrMsg, JOB_NOT_REG_ERR, JOB_NOT_BATCH_ERR, JOBS_MISSING_ERR, @@ -27,9 +30,6 @@ NarrativeException, JobRequestException, ) - -from .util import ConfigTests - from biokbase.narrative.tests.job_test_constants import ( CLIENTS, JOB_COMPLETED, @@ -41,6 +41,7 @@ BATCH_TERMINATED, BATCH_TERMINATED_RETRIED, BATCH_ERROR_RETRIED, + BATCH_RETRY_RUNNING, JOB_NOT_FOUND, BAD_JOB_ID, ALL_JOBS, @@ -50,7 +51,9 @@ REFRESH_STATE, BATCH_CHILDREN, TEST_JOBS, + TEST_EPOCH_NS, get_test_job, + get_test_jobs, generate_error, ) @@ -68,6 +71,8 @@ MockClients, ) +from .util import ConfigTests + TERMINAL_IDS = [JOB_COMPLETED, JOB_TERMINATED, JOB_ERROR] NON_TERMINAL_IDS = [JOB_CREATED, JOB_RUNNING] @@ -235,7 +240,7 @@ def test__construct_job_output_state_set__empty_list(self): @mock.patch(CLIENTS, get_mock_client) def test__construct_job_output_state_set__ee2_error(self): exc = Exception("Test exception") - exc_message = str(exc) + exc_msg = str(exc) def mock_check_jobs(params): raise exc @@ -248,9 +253,12 @@ def mock_check_jobs(params): for job_id in ALL_JOBS } - for job_id in ACTIVE_JOBS: + for job_id, meta_exc_msg in zip( + ACTIVE_JOBS, + OutputStateErrMsg.QUERY_EE2_STATES.gen_err_msg([ACTIVE_JOBS, exc_msg]) + ): # expect there to be an error message added - expected[job_id]["error"] = exc_message + expected[job_id]["error"] = meta_exc_msg self.assertEqual( expected, @@ -711,6 +719,67 @@ def test_get_job_states__empty(self): ): self.jm.get_job_states([]) + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__last_updated(self): + """ + Test that only updated jobs return an actual state + and that the rest of the jobs return an error stub state + """ + # what FE would say was the last time the jobs were checked + NOW = time.time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # output_states will be partitioned as + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) + updated_active_ids = list(set(updated_ids) & set(active_ids)) # (yes, redundant) + + def mock_check_jobs(self_, params): + """Update appropriate job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for job_id, job_state in job_states_ret.items(): + # if job was updated, return an updated version + if job_id in updated_active_ids: + job_state["updated"] += 1 + return job_states_ret + + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + output_states = self.jm.get_job_states(job_ids, ts=NOW) + + updated_output_states = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in updated_active_ids + } + for job_state in updated_output_states.values(): + job_state["jobState"]["updated"] += 1 + + expected = { + # corresponding to updated_active_ids + **updated_output_states, + # corresponding to not_updated_active_ids and terminal_ids + **{ + job_id: { + "job_id": job_id, + "error": OutputStateErrMsg.NOT_UPDATED.value % (job_id, NOW) + } + for job_id in not_updated_active_ids + terminal_ids + } + } + + self.assertEqual( + expected, + output_states + ) + def test_update_batch_job__dne(self): with self.assertRaisesRegex( JobRequestException, f"{JOB_NOT_REG_ERR}: {JOB_NOT_FOUND}" @@ -815,6 +884,219 @@ def test_get_job_info(self): }, ) + @mock.patch(CLIENTS, get_mock_client) + def test_add_errors_to_results__concat_errs__integrated(self): + active_ids = [JOB_CREATED, JOB_RUNNING] + terminal_ids = [JOB_COMPLETED] + job_ids = active_ids + terminal_ids + + check_jobs_err = "Something went wrong in EE2.check_jobs" + check_jobs_exc = RuntimeError(check_jobs_err) + cancel_job_errs = { + job_id: err + for job_id, err in zip( + job_ids, [f"EE2.check_job err {num}" for num in ["UNO", "DOS"]] + ) + } + + def mock_check_jobs(self, params): + raise check_jobs_exc + + def mock_cancel_job(self, job_id): + return NarrativeException( + None, cancel_job_errs[job_id], None, None, None + ) + + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + with mock.patch.object(JobManager, "_cancel_job", mock_cancel_job): + output_states = self.jm.cancel_jobs(job_ids) + + exp = {job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in job_ids} + for job_id in active_ids: + exp[job_id]["error"] = ( + f"A Job.query_ee2_states error occurred for job with ID {job_id}: {check_jobs_err}" + "\n" + f"An EE2.cancel_job error occurred for job with ID {job_id}: {cancel_job_errs[job_id]}" + ) + + self.assertEqual( + exp, + output_states + ) + + def test_add_errors_to_results__concat_errs__unit(self): + job_ids = ALL_JOBS + error_ids = [JOB_RUNNING, JOB_COMPLETED] + output_states = get_test_jobs(job_ids) + + check_jobs_err = "Test check_jobs exception" + cancel_job_errs = ["Test cancel_job exception UNO", "Test cancel_job exception DOS"] + + self.jm.add_errors_to_results( + output_states, error_ids, OutputStateErrMsg.QUERY_EE2_STATES, check_jobs_err + ) + self.jm.add_errors_to_results( + output_states, error_ids, OutputStateErrMsg.CANCEL, cancel_job_errs + ) + + for error_id, cancel_job_err in zip(error_ids, cancel_job_errs): + output_state = output_states[error_id] + self.assertIn("error", output_state) + self.assertEqual( + ( + f"A Job.query_ee2_states error occurred for job with ID {error_id}: {check_jobs_err}" + "\n" + f"An EE2.cancel_job error occurred for job with ID {error_id}: {cancel_job_err}" + ), + output_state["error"] + ) + + def test_add_errors_to_results__cannot_add_err(self): + job_ids = [JOB_RUNNING, JOB_COMPLETED] + error_ids = [JOB_CREATED] + output_states = get_test_jobs(job_ids) + + with self.assertRaisesRegex( + ValueError, f"Cannot add error because response dict is missing key {error_ids[0]}" + ): + self.jm.add_errors_to_results( + output_states, error_ids, OutputStateErrMsg.CANCEL, ["Test cancel_job exception`"] + ) + + +class OutputStateErrMsgTest(unittest.TestCase): + """ + Unit tests + """ + + JOB_IDS = [c + str(i) for c, i in zip(list("abc"), range(3))] + ERROR_IDS = JOB_IDS[1:] + CHECK_JOBS_ERR = "ee2.check_jobs rejection" + CANCEL_JOBS_ERR = [ + "ee2.cancel_job rejection UNO", "ee2.cancel_job rejection DOS" + ] + + maxDiff = None + + def get_orig_results(self): + return { + job_id: {"some": "random", "content": "with", "job_id": job_id} + for job_id in self.JOB_IDS + } + + def get_first_orig_result(self): + job_id = self.JOB_IDS[0] + return { + job_id: {"some": "random", "content": "with", "job_id": job_id} + } + + def add_errors_to_results( + self, results: dict, error_ids: List[str], error_enum: OutputStateErrMsg, *extra_its: Tuple + ): + """ + Strongly resembles jm.add_errors_to_results + But a pared down happy path method + """ + gen_err_msg = error_enum.gen_err_msg([error_ids] + list(extra_its)) + + for error_id, err_msg in zip(error_ids, gen_err_msg): + if error_enum.replace_result(): + results[error_id] = { + "job_id": error_id, + "error": err_msg, + } + else: + results[error_id].update( + {"error": err_msg} + ) + return results + + def test__NOT_FOUND(self): + results = self.add_errors_to_results( + self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.NOT_FOUND + ) + self.assertEqual( + { + **self.get_first_orig_result(), + **{ + job_id: { + "job_id": job_id, + "error": f"Cannot find job with ID {job_id}" + } + for job_id in self.ERROR_IDS + } + }, + results + ) + + def test__NOT_UPDATED(self): + results = self.add_errors_to_results( + self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.NOT_UPDATED, TEST_EPOCH_NS + ) + self.assertEqual( + { + **self.get_first_orig_result(), + **{ + job_id: { + "job_id": job_id, + "error": f"Job with ID {job_id} has not been updated since ts {TEST_EPOCH_NS}" + } + for job_id in self.ERROR_IDS + } + }, + results + ) + + def test__QUERY_EE2_STATES(self): + results = self.add_errors_to_results( + self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.QUERY_EE2_STATES, self.CHECK_JOBS_ERR + ) + self.assertEqual( + { + **self.get_first_orig_result(), + **{ + job_id: { + "some": "random", "content": "with", "job_id": job_id, + "error": f"A Job.query_ee2_states error occurred for job with ID {job_id}: {self.CHECK_JOBS_ERR}" + } + for job_id in self.ERROR_IDS + } + }, + results + ) + + def test__CANCEL(self): + results = self.add_errors_to_results( + self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.CANCEL, self.CANCEL_JOBS_ERR + ) + self.assertEqual( + { + **self.get_first_orig_result(), + **{ + job_id: { + "some": "random", "content": "with", "job_id": job_id, + "error": f"An EE2.cancel_job error occurred for job with ID {job_id}: {cancel_job_err}" + } + for job_id, cancel_job_err in zip(self.ERROR_IDS, self.CANCEL_JOBS_ERR) + } + }, + results + ) + + def test_gen_err_msg__wrong_type_arg(self): + with self.assertRaisesRegex( + TypeError, + "Argument its must be of type list" + ): + OutputStateErrMsg.NOT_FOUND.gen_err_msg(42) + + def test_gen_err_msg__wrong_num_format(self): + with self.assertRaisesRegex( + ValueError, + re.escape("OutputStateErrMsg.NOT_FOUND must be formatted with 1 argument(s). Received 2 argument(s)") + ): + OutputStateErrMsg.NOT_FOUND.gen_err_msg([self.ERROR_IDS, "extra_unused_format_arg"]) + if __name__ == "__main__": unittest.main() From 52e379b542bea4bff95c17384fe0708c426e286c Mon Sep 17 00:00:00 2001 From: ialarmedalien Date: Wed, 4 May 2022 11:36:27 -0700 Subject: [PATCH 2/2] Cherry-picking out a set of simple changes from the DATAUP-729-job-ts branch --- src/biokbase/narrative/jobs/appmanager.py | 36 +- src/biokbase/narrative/jobs/job.py | 53 +-- src/biokbase/narrative/jobs/jobcomm.py | 74 ++-- src/biokbase/narrative/jobs/jobmanager.py | 148 ++----- src/biokbase/narrative/jobs/util.py | 4 +- .../narrative/tests/generate_test_results.py | 22 +- .../narrative/tests/job_test_constants.py | 5 +- .../narrative/tests/test_appmanager.py | 28 +- src/biokbase/narrative/tests/test_job.py | 89 ++--- src/biokbase/narrative/tests/test_job_util.py | 3 +- src/biokbase/narrative/tests/test_jobcomm.py | 217 +++-------- .../narrative/tests/test_jobmanager.py | 366 ++---------------- 12 files changed, 277 insertions(+), 768 deletions(-) diff --git a/src/biokbase/narrative/jobs/appmanager.py b/src/biokbase/narrative/jobs/appmanager.py index a4b706085b..31363b11c7 100644 --- a/src/biokbase/narrative/jobs/appmanager.py +++ b/src/biokbase/narrative/jobs/appmanager.py @@ -1,30 +1,32 @@ """ A module for managing apps, specs, requirements, and for starting jobs. """ +import datetime +import functools +import random +import re +import traceback +from typing import Callable, Dict, Union + import biokbase.auth as auth -from .job import Job -from .jobmanager import JobManager -from .jobcomm import JobComm, MESSAGE_TYPE -from . import specmanager import biokbase.narrative.clients as clients -from biokbase.narrative.widgetmanager import WidgetManager from biokbase.narrative.app_util import ( - system_variable, - strict_system_variable, + extract_ws_refs, map_outputs_from_state, - validate_parameters, resolve_ref_if_typed, + strict_system_variable, + system_variable, transform_param_value, - extract_ws_refs, + validate_parameters, ) -from biokbase.narrative.exception_util import transform_job_exception from biokbase.narrative.common import kblogging -import re -import datetime -import traceback -import random -import functools -from typing import Callable, Union, Dict +from biokbase.narrative.exception_util import transform_job_exception +from biokbase.narrative.widgetmanager import WidgetManager + +from . import specmanager +from .job import Job +from .jobcomm import MESSAGE_TYPE, JobComm +from .jobmanager import JobManager """ A module for managing apps, specs, requirements, and for starting jobs. @@ -1002,7 +1004,7 @@ def _generate_input(self, generator): if "prefix" in generator: ret = str(generator["prefix"]) + ret if "suffix" in generator: - ret = ret + str(generator["suffix"]) + return ret + str(generator["suffix"]) return ret def _send_comm_message(self, msg_type, content): diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index ff3ce15fa6..6bf6db314a 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -1,15 +1,18 @@ -import biokbase.narrative.clients as clients -from .specmanager import SpecManager -from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state -from biokbase.narrative.exception_util import transform_job_exception import copy import json import time import uuid -from jinja2 import Template from pprint import pprint from typing import List +from jinja2 import Template + +import biokbase.narrative.clients as clients +from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state +from biokbase.narrative.exception_util import transform_job_exception + +from .specmanager import SpecManager + """ KBase job class """ @@ -79,8 +82,8 @@ STATE_ATTRS = list(set(JOB_ATTRS) - set(JOB_INPUT_ATTRS) - set(NARR_CELL_INFO_ATTRS)) -class Job(object): - _job_logs = list() +class Job: + _job_logs = [] _acc_state = None # accumulates state def __init__(self, ee2_state, extra_data=None, children=None): @@ -116,7 +119,7 @@ def from_job_id(cls, job_id, extra_data=None, children=None): @classmethod def from_job_ids(cls, job_ids, return_list=True): states = cls.query_ee2_states(job_ids, init=True) - jobs = dict() + jobs = {} for job_id, state in states.items(): jobs[job_id] = cls(state) @@ -172,25 +175,25 @@ def __getattr__(self, name): """ Map expected job attributes to paths in stored ee2 state """ - attr = dict( - app_id=lambda: self._acc_state.get("job_input", {}).get( + attr = { + "app_id": lambda: self._acc_state.get("job_input", {}).get( "app_id", JOB_ATTR_DEFAULTS["app_id"] ), - app_version=lambda: self._acc_state.get("job_input", {}).get( + "app_version": lambda: self._acc_state.get("job_input", {}).get( "service_ver", JOB_ATTR_DEFAULTS["app_version"] ), - batch_id=lambda: ( + "batch_id": lambda: ( self.job_id if self.batch_job else self._acc_state.get("batch_id", JOB_ATTR_DEFAULTS["batch_id"]) ), - batch_job=lambda: self._acc_state.get( + "batch_job": lambda: self._acc_state.get( "batch_job", JOB_ATTR_DEFAULTS["batch_job"] ), - cell_id=lambda: self._acc_state.get("job_input", {}) + "cell_id": lambda: self._acc_state.get("job_input", {}) .get("narrative_cell_info", {}) .get("cell_id", JOB_ATTR_DEFAULTS["cell_id"]), - child_jobs=lambda: copy.deepcopy( + "child_jobs": lambda: copy.deepcopy( # TODO # Only batch container jobs have a child_jobs field # and need the state refresh. @@ -202,13 +205,13 @@ def __getattr__(self, name): if self.batch_job else self._acc_state.get("child_jobs", JOB_ATTR_DEFAULTS["child_jobs"]) ), - job_id=lambda: self._acc_state.get("job_id"), - params=lambda: copy.deepcopy( + "job_id": lambda: self._acc_state.get("job_id"), + "params": lambda: copy.deepcopy( self._acc_state.get("job_input", {}).get( "params", JOB_ATTR_DEFAULTS["params"] ) ), - retry_ids=lambda: copy.deepcopy( + "retry_ids": lambda: copy.deepcopy( # Batch container and retry jobs don't have a # retry_ids field so skip the state refresh self._acc_state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]) @@ -217,18 +220,18 @@ def __getattr__(self, name): "retry_ids", JOB_ATTR_DEFAULTS["retry_ids"] ) ), - retry_parent=lambda: self._acc_state.get( + "retry_parent": lambda: self._acc_state.get( "retry_parent", JOB_ATTR_DEFAULTS["retry_parent"] ), - run_id=lambda: self._acc_state.get("job_input", {}) + "run_id": lambda: self._acc_state.get("job_input", {}) .get("narrative_cell_info", {}) .get("run_id", JOB_ATTR_DEFAULTS["run_id"]), # TODO: add the status attribute! - tag=lambda: self._acc_state.get("job_input", {}) + "tag": lambda: self._acc_state.get("job_input", {}) .get("narrative_cell_info", {}) .get("tag", JOB_ATTR_DEFAULTS["tag"]), - user=lambda: self._acc_state.get("user", JOB_ATTR_DEFAULTS["user"]), - ) + "user": lambda: self._acc_state.get("user", JOB_ATTR_DEFAULTS["user"]), + } if name not in attr: raise AttributeError(f"'Job' object has no attribute '{name}'") @@ -347,8 +350,6 @@ def _update_state(self, state: dict) -> None: else: self._acc_state.update(state) - self.last_updated = time.time_ns() - def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ Queries the job service to see the state of the current job. @@ -540,7 +541,7 @@ def log(self, first_line=0, num_lines=None): num_lines = 0 if first_line >= num_available_lines or num_lines <= 0: - return (num_available_lines, list()) + return (num_available_lines, []) return ( num_available_lines, self._job_logs[first_line : first_line + num_lines], diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index a59747a35b..9fdfc76627 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -1,13 +1,13 @@ import copy import threading from typing import List, Union -import time + from ipykernel.comm import Comm -from biokbase.narrative.jobs.util import load_job_constants -from biokbase.narrative.jobs.jobmanager import JobManager -from biokbase.narrative.exception_util import NarrativeException, JobRequestException -from biokbase.narrative.common import kblogging +from biokbase.narrative.common import kblogging +from biokbase.narrative.exception_util import JobRequestException, NarrativeException +from biokbase.narrative.jobs.jobmanager import JobManager +from biokbase.narrative.jobs.util import load_job_constants (PARAM, MESSAGE_TYPE) = load_job_constants() @@ -105,9 +105,6 @@ def batch_id(self): return self.rq_data[PARAM["BATCH_ID"]] raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) - def has_job_ids(self): - return any([input_type in self.rq_data for input_type in self.INPUT_TYPES]) - def has_batch_id(self): return PARAM["BATCH_ID"] in self.rq_data @@ -178,14 +175,14 @@ def __init__(self): self._jm = JobManager() if self._msg_map is None: self._msg_map = { - MESSAGE_TYPE["CANCEL"]: self._cancel_jobs, - MESSAGE_TYPE["CELL_JOB_STATUS"]: self._get_job_states_by_cell_id, - MESSAGE_TYPE["INFO"]: self._get_job_info, - MESSAGE_TYPE["LOGS"]: self._get_job_logs, - MESSAGE_TYPE["RETRY"]: self._retry_jobs, + MESSAGE_TYPE["CANCEL"]: self.cancel_jobs, + MESSAGE_TYPE["CELL_JOB_STATUS"]: self.get_job_states_by_cell_id, + MESSAGE_TYPE["INFO"]: self.get_job_info, + MESSAGE_TYPE["LOGS"]: self.get_job_logs, + MESSAGE_TYPE["RETRY"]: self.retry_jobs, MESSAGE_TYPE["START_UPDATE"]: self._modify_job_updates, - MESSAGE_TYPE["STATUS"]: self._get_job_states, - MESSAGE_TYPE["STATUS_ALL"]: self._get_all_job_states, + MESSAGE_TYPE["STATUS"]: self.get_job_states, + MESSAGE_TYPE["STATUS_ALL"]: self.get_all_job_states, MESSAGE_TYPE["STOP_UPDATE"]: self._modify_job_updates, } @@ -199,11 +196,13 @@ def _get_job_ids(self, req: JobRequest) -> List[str]: :return: list of job IDs :rtype: List[str] """ - if not req.has_job_ids(): - raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) if req.has_batch_id(): return self._jm.update_batch_job(req.batch_id) - return req.job_id_list + + try: + return req.job_id_list + except Exception as ex: + raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex def start_job_status_loop( self, @@ -223,12 +222,12 @@ def start_job_status_loop( self._jm.initialize_jobs(cell_list) except Exception as e: error = { - "source": getattr(e, "source", "jc.start_job_status_loop"), - "request": getattr(e, "request", "jc.start_job_status_loop"), - "name": getattr(e, "name", type(e).__name__), - "message": getattr(e, "message", UNKNOWN_REASON), - "error": "Unable to get initial jobs list", "code": getattr(e, "code", -1), + "error": "Unable to get initial jobs list", + "message": getattr(e, "message", UNKNOWN_REASON), + "name": getattr(e, "name", type(e).__name__), + "request": getattr(e, "request", "jc.start_job_status_loop"), + "source": getattr(e, "source", "jc.start_job_status_loop"), } self.send_comm_message(MESSAGE_TYPE["ERROR"], error) # if job init failed, set the lookup loop var back to False and return @@ -252,7 +251,7 @@ def _lookup_job_status_loop(self) -> None: Run a loop that will look up job info. After running, this spawns a Timer thread on a loop to run itself again. LOOKUP_TIMER_INTERVAL sets the frequency at which the loop runs. """ - all_job_states = self._get_all_job_states() + all_job_states = self.get_all_job_states() if len(all_job_states) == 0 or not self._running_lookup_loop: self.stop_job_status_loop() else: @@ -261,7 +260,7 @@ def _lookup_job_status_loop(self) -> None: ) self._lookup_timer.start() - def _get_all_job_states( + def get_all_job_states( self, req: JobRequest = None, ignore_refresh_flag: bool = False ) -> dict: """ @@ -274,7 +273,7 @@ def _get_all_job_states( self.send_comm_message(MESSAGE_TYPE["STATUS_ALL"], all_job_states) return all_job_states - def _get_job_states_by_cell_id(self, req: JobRequest) -> dict: + def get_job_states_by_cell_id(self, req: JobRequest) -> dict: """ Fetches status of all jobs associated with the given cell ID(s). @@ -303,7 +302,7 @@ def _get_job_states_by_cell_id(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["CELL_JOB_STATUS"], cell_job_states) return cell_job_states - def _get_job_info(self, req: JobRequest) -> dict: + def get_job_info(self, req: JobRequest) -> dict: """ Gets job information for a list of job IDs. @@ -332,7 +331,7 @@ def _get_job_info(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["INFO"], job_info) return job_info - def _get_send_job_states(self, job_id_list: list, ts: int = None) -> dict: + def _get_job_states(self, job_id_list: list, ts: int = None) -> dict: """ Retrieves the job states for the supplied job_ids. @@ -362,7 +361,7 @@ def get_job_state(self, job_id: str) -> dict: """ Retrieve the job state for a single job. - This differs from the _get_job_state (underscored version) in that + This differs from the _get_job_states (underscored version) in that it just takes a job_id string, not a JobRequest. :param job_id: the job ID to get the state for @@ -371,9 +370,9 @@ def get_job_state(self, job_id: str) -> dict: :return: dictionary of job states, indexed by job ID :rtype: dict """ - return self._get_send_job_states([job_id]) + return self._get_job_states([job_id]) - def _get_job_states(self, req: JobRequest) -> dict: + def get_job_states(self, req: JobRequest) -> dict: """ Retrieves the job states for the supplied job_ids. @@ -386,7 +385,7 @@ def _get_job_states(self, req: JobRequest) -> dict: :rtype: dict """ job_id_list = self._get_job_ids(req) - return self._get_send_job_states(job_id_list, req.ts) + return self._get_job_states(job_id_list, req.ts) def _modify_job_updates(self, req: JobRequest) -> dict: """ @@ -419,7 +418,7 @@ def _modify_job_updates(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states - def _cancel_jobs(self, req: JobRequest) -> dict: + def cancel_jobs(self, req: JobRequest) -> dict: """ Cancel a job or list of jobs. After sending the cancellation request, the job states are refreshed and their new output states returned. @@ -437,7 +436,7 @@ def _cancel_jobs(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["STATUS"], cancel_results) return cancel_results - def _retry_jobs(self, req: JobRequest) -> dict: + def retry_jobs(self, req: JobRequest) -> dict: """ Retry a job or list of jobs. @@ -454,7 +453,7 @@ def _retry_jobs(self, req: JobRequest) -> dict: self.send_comm_message(MESSAGE_TYPE["RETRY"], retry_results) return retry_results - def _get_job_logs(self, req: JobRequest) -> dict: + def get_job_logs(self, req: JobRequest) -> dict: """ Fetch the logs for a job or list of jobs. @@ -515,9 +514,6 @@ def send_comm_message(self, msg_type: str, content: dict) -> None: Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type and content. These just get encoded into the message itself. """ - if msg_type == MESSAGE_TYPE["STATUS"]: - content["last_checked"] = time.time_ns() - msg = {"msg_type": msg_type, "content": content} self._comm.send(msg) @@ -538,8 +534,8 @@ def send_error_message( { "msg_type": "job_error", "content": { - "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, **{any extra error information} } } diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index b535130b9b..0c80738f11 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -1,23 +1,20 @@ +import copy +from datetime import datetime, timedelta, timezone +from typing import List, Tuple + from IPython.display import HTML from jinja2 import Template -from datetime import datetime, timezone, timedelta -import copy -from enum import Enum -from itertools import cycle -from typing import List, Tuple, Union -from collections.abc import Iterable + import biokbase.narrative.clients as clients -from .job import ( - Job, - JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, -) -from biokbase.narrative.common import kblogging from biokbase.narrative.app_util import system_variable +from biokbase.narrative.common import kblogging from biokbase.narrative.exception_util import ( - transform_job_exception, JobRequestException, + transform_job_exception, ) +from .job import JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, Job + """ KBase Job Manager @@ -38,57 +35,7 @@ JOBS_MISSING_ERR = "No valid job IDs provided" CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided" - - -class OutputStateErrMsg(Enum): - """ - For errors that go into the STATUS response. - Enum mapping from error names to formattable error messages. - The first formatting argument of each error message must be the job ID - """ - - NOT_FOUND = "Cannot find job with ID %s" - NOT_UPDATED = "Job with ID %s has not been updated since ts %d" - QUERY_EE2_STATES = "A Job.query_ee2_states error occurred for job with ID %s: %s" - CANCEL = "An EE2.cancel_job error occurred for job with ID %s: %s" - - def gen_err_msg(self, its: List[Union[str, int, Iterable]]): - """ - Create a generator for filled in enum values - - :param its: A list of arguments, where each argument - should be either a literal or an iterable of literals - The iterables will then be zipped and used to format the enum values. - The first argument must be the list of job_ids - """ - if not isinstance(its, list): - raise TypeError( - "Argument its must be of type list" - ) - # check number of arguments matches number of %s or %d to be formatted - num_format = self.value.count("%") - if num_format != len(its): - raise ValueError( - f"{self.__class__.__name__}.{self.name} must be formatted with {num_format} argument(s). " - f"Received {len(its)} argument(s)" - ) - - for i, e in enumerate(its): - # if element is a literal, convert to iterable of literal - if isinstance(e, str) or isinstance(e, int): - its[i] = cycle([e]) - - def gen(): - for tup in zip(*its): - yield self.value % tup - - return gen() - - def replace_result(self) -> bool: - names = [e.name for e in list(OutputStateErrMsg)] - replace = [True, True, False, False] - ans = dict(zip(names, replace)) - return ans[self.name] +DOES_NOT_EXIST = "does_not_exist" class JobManager: @@ -366,13 +313,9 @@ def _construct_job_output_state_set( output_states[job_id] = job.output_state(fetched_states[job_id]) else: # fetch the current state without updating it - output_states[job_id] = job.output_state(no_refresh=True) - - failed_ids = [job_id for job_id in jobs_to_lookup if job_id not in fetched_states] - if failed_ids: - self.add_errors_to_results( - output_states, failed_ids, OutputStateErrMsg.QUERY_EE2_STATES, error_message - ) + output_states[job_id] = job.output_state({}) + # add an error field with the error message from the failed look up + output_states[job_id]["error"] = error_message return output_states @@ -395,16 +338,9 @@ def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: :return: dictionary of job states, indexed by job ID :rtype: dict """ - job_ids, not_found_ids = self._check_job_list(job_ids) + job_ids, error_ids = self._check_job_list(job_ids) output_states = self._construct_job_output_state_set(job_ids) - not_updated_ids = [] - if ts is not None: - for job_id in output_states: - if self.get_job(job_id).last_updated <= ts: - not_updated_ids.append(job_id) - self.add_errors_to_results(output_states, not_found_ids, OutputStateErrMsg.NOT_FOUND) - self.add_errors_to_results(output_states, not_updated_ids, OutputStateErrMsg.NOT_UPDATED, ts) - return output_states + return self.add_errors_to_results(output_states, error_ids) def get_all_job_states(self, ignore_refresh_flag=False) -> dict: """ @@ -510,7 +446,7 @@ def get_job_info(self, job_ids: List[str]) -> dict: "job_id": job_id, "job_params": job.params, } - return self.add_errors_to_results(infos, error_ids, OutputStateErrMsg.NOT_FOUND) + return self.add_errors_to_results(infos, error_ids) def get_job_logs( self, @@ -629,7 +565,7 @@ def get_job_logs_for_list( for job_id in job_ids: output[job_id] = self.get_job_logs(job_id, first_line, num_lines, latest) - return self.add_errors_to_results(output, error_ids, OutputStateErrMsg.NOT_FOUND) + return self.add_errors_to_results(output, error_ids) def cancel_jobs(self, job_id_list: List[str]) -> dict: """ @@ -664,9 +600,10 @@ def cancel_jobs(self, job_id_list: List[str]) -> dict: job_states = self._construct_job_output_state_set(job_ids) - self.add_errors_to_results(job_states, list(error_states.keys()), OutputStateErrMsg.CANCEL, list(error_states.values())) - self.add_errors_to_results(job_states, error_ids, OutputStateErrMsg.NOT_FOUND) - return job_states + for job_id in error_states: + job_states[job_id]["error"] = error_states[job_id] + + return self.add_errors_to_results(job_states, error_ids) def _cancel_job(self, job_id: str) -> None: """ @@ -764,49 +701,26 @@ def retry_jobs(self, job_id_list: List[str]) -> dict: ) if "error" in result: results_by_job_id[job_id]["error"] = result["error"] - return self.add_errors_to_results(results_by_job_id, error_ids, OutputStateErrMsg.NOT_FOUND) + return self.add_errors_to_results(results_by_job_id, error_ids) - @staticmethod - def add_errors_to_results( - results: dict, error_ids: List[str], error_enum: OutputStateErrMsg, *extra_its: Tuple - ) -> dict: + def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict: """ - Add error states to results + Add the generic "not found" error for each job_id in error_ids. :param results: dictionary of job data (output state, info, retry, etc.) indexed by job ID :type results: dict :param error_ids: list of IDs that could not be found :type error_ids: List[str] - :param error_enum: an enum instance from JobStateErrMsg - :type error_enum: JobStateErrMsg - :param its: any extra arguments to feed into error_enum.gen_str_fill to format the error message - :type its: list - :return: input results augmented by either extra error dictionaries or errors in existing dictionaries + :return: input results dictionary augmented by a dictionary containing job ID and a short + not found error message for every ID in the error_ids list :rtype: dict """ - # create generator yielding error messages - gen_err_msg = error_enum.gen_err_msg([error_ids] + list(extra_its)) - - for error_id, err_msg in zip(error_ids, gen_err_msg): - # if there's already an error there - if error_id in results and "error" in results[error_id]: - existing_error = results[error_id]["error"] - # concatenate the errors (works recursively) - err_msg = f"{existing_error}\n{err_msg}" - - if error_enum.replace_result(): - results[error_id] = { - "job_id": error_id, - "error": err_msg, - } - else: - if error_id not in results: - raise ValueError(f"Cannot add error because response dict is missing key {error_id}") - results[error_id].update( - {"error": err_msg} - ) - + for error_id in error_ids: + results[error_id] = { + "job_id": error_id, + "error": f"Cannot find job with ID {error_id}", + } return results def modify_job_refresh(self, job_ids: List[str], update_refresh: bool) -> None: diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 25b721e5e5..8b9c07e77a 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -1,5 +1,5 @@ -import os import json +import os JOB_CONFIG_FILE_PATH_PARTS = [ "kbase-extension", @@ -51,7 +51,7 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): f"job_config.json is missing the '{datatype}' config section" ) missing = [item for item in example_list if item not in config[datatype]] - if len(missing): + if missing: raise ValueError( f"job_config.json is missing the following values for {datatype}: " + ", ".join(missing) diff --git a/src/biokbase/narrative/tests/generate_test_results.py b/src/biokbase/narrative/tests/generate_test_results.py index 91fa444d2e..e07221d784 100644 --- a/src/biokbase/narrative/tests/generate_test_results.py +++ b/src/biokbase/narrative/tests/generate_test_results.py @@ -1,19 +1,20 @@ import copy -import sys import os.path -from biokbase.narrative.tests.util import ConfigTests +import sys + from biokbase.narrative.app_util import app_version_tags -from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE from biokbase.narrative.jobs.job import ( + COMPLETED_STATUS, JOB_ATTR_DEFAULTS, OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS, - COMPLETED_STATUS, ) +from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE from biokbase.narrative.tests.job_test_constants import ( BAD_JOBS, - get_test_job, generate_error, + get_test_job, ) +from biokbase.narrative.tests.util import ConfigTests """ generate_test_results.py is used to generate the job message data that the narrative backend produces and that the frontend consumes. It uses data from `ee2_job_test_data_file` and `app_specs_file` and provides expected narrative backend message data, as well as a number of mappings that are used in python tests. @@ -30,7 +31,7 @@ TEST_SPECS = {} for tag in app_version_tags: - spec_dict = dict() + spec_dict = {} for spec in spec_list: spec_dict[spec["info"]["id"]] = spec TEST_SPECS[tag] = spec_dict @@ -95,11 +96,10 @@ def _generate_job_output(job_id): def generate_bad_jobs(): - bad_jobs = { + return { job_id: {"job_id": job_id, "error": generate_error(job_id, "not_found")} for job_id in BAD_JOBS } - return bad_jobs def generate_job_output_state(all_jobs): @@ -215,7 +215,7 @@ def generate_job_logs(all_jobs): INVALID_CELL_ID = "invalid_cell_id" TEST_CELL_ID_LIST = list(JOBS_BY_CELL_ID.keys()) + [INVALID_CELL_ID] # mapping expected as output from get_job_states_by_cell_id -TEST_CELL_IDs = {id: list(JOBS_BY_CELL_ID[id]) for id in JOBS_BY_CELL_ID.keys()} +TEST_CELL_IDs = {cell_id: list(JOBS_BY_CELL_ID[cell_id]) for cell_id in JOBS_BY_CELL_ID.keys()} TEST_CELL_IDs[INVALID_CELL_ID] = [] @@ -230,8 +230,8 @@ def generate_job_logs(all_jobs): config.write_json_file(RESPONSE_DATA_FILE, ALL_RESPONSE_DATA) -def main(args=[]): - if len(args) and args[0] == "--force" or not os.path.exists(RESPONSE_DATA_FILE): +def main(args=None): + if args and args[0] == "--force" or not os.path.exists(RESPONSE_DATA_FILE): config.write_json_file(RESPONSE_DATA_FILE, ALL_RESPONSE_DATA) diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index d78adf632b..597c43eec7 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -1,7 +1,8 @@ -from .util import ConfigTests import copy + from biokbase.narrative.jobs.job import TERMINAL_STATUSES +from .util import ConfigTests config = ConfigTests() TEST_JOBS = config.load_json_file(config.get("jobs", "ee2_job_test_data_file")) @@ -95,7 +96,7 @@ def get_test_jobs(job_ids): BATCH_PARENT_CHILDREN = [BATCH_PARENT] + BATCH_CHILDREN JOBS_TERMINALITY = { - id: TEST_JOBS[id]["status"] in TERMINAL_STATUSES for id in TEST_JOBS.keys() + job_id: TEST_JOBS[job_id]["status"] in TERMINAL_STATUSES for job_id in TEST_JOBS.keys() } TERMINAL_JOBS = [] diff --git a/src/biokbase/narrative/tests/test_appmanager.py b/src/biokbase/narrative/tests/test_appmanager.py index d3104a47b7..2b2aad3ef5 100644 --- a/src/biokbase/narrative/tests/test_appmanager.py +++ b/src/biokbase/narrative/tests/test_appmanager.py @@ -1,31 +1,31 @@ """ Tests for the app manager. """ +import copy +import io +import os +import sys import unittest from unittest import mock from unittest.mock import MagicMock -from biokbase.narrative.jobs.specmanager import SpecManager -from biokbase.narrative.jobs.appmanager import AppManager, BATCH_APP -from biokbase.narrative.jobs.jobmanager import JobManager -from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE + +from IPython.display import HTML, Javascript + import biokbase.narrative.app_util as app_util +from biokbase.narrative.jobs.appmanager import BATCH_APP, AppManager from biokbase.narrative.jobs.job import Job -from IPython.display import HTML, Javascript -from .narrative_mock.mockclients import ( - get_mock_client, - WSID_STANDARD, -) -import os -import sys -import io -import copy -from .util import ConfigTests +from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE +from biokbase.narrative.jobs.jobmanager import JobManager +from biokbase.narrative.jobs.specmanager import SpecManager from biokbase.narrative.tests.job_test_constants import ( CLIENTS, READS_OBJ_1, READS_OBJ_2, ) +from .narrative_mock.mockclients import WSID_STANDARD, get_mock_client +from .util import ConfigTests + CONFIG = ConfigTests() WS_NAME = CONFIG.get("app_tests", "public_ws_name") SEMANTIC_VER_ERROR = "Semantic versions only apply to released app modules." diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 7594fcaab1..0b1e4d245d 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -1,47 +1,46 @@ -import unittest -from unittest import mock import copy import itertools +import sys +import unittest +from contextlib import contextmanager +from io import StringIO +from unittest import mock + from biokbase.execution_engine2.baseclient import ServerError from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state from biokbase.narrative.jobs.job import ( - Job, COMPLETED_STATUS, EXCLUDED_JOB_STATE_FIELDS, - JOB_ATTRS, JOB_ATTR_DEFAULTS, + JOB_ATTRS, + Job, ) from biokbase.narrative.jobs.jobmanager import JOB_INIT_EXCLUDED_JOB_STATE_FIELDS from biokbase.narrative.jobs.specmanager import SpecManager - -from .narrative_mock.mockclients import ( - get_mock_client, - get_failing_mock_client, - MockClients, - assert_obj_method_called, -) -from contextlib import contextmanager -from io import StringIO -import sys - +from biokbase.narrative.tests.generate_test_results import JOBS_BY_CELL_ID from biokbase.narrative.tests.job_test_constants import ( + ACTIVE_JOBS, + ALL_JOBS, + BATCH_CHILDREN, + BATCH_PARENT, + BATCH_RETRY_RUNNING, CLIENTS, - MAX_LOG_LINES, JOB_COMPLETED, JOB_CREATED, JOB_RUNNING, JOB_TERMINATED, - BATCH_PARENT, - BATCH_RETRY_RUNNING, JOBS_TERMINALITY, - ALL_JOBS, + MAX_LOG_LINES, TERMINAL_JOBS, - ACTIVE_JOBS, - BATCH_CHILDREN, get_test_job, ) -from biokbase.narrative.tests.generate_test_results import JOBS_BY_CELL_ID +from .narrative_mock.mockclients import ( + MockClients, + assert_obj_method_called, + get_failing_mock_client, + get_mock_client, +) @contextmanager @@ -70,8 +69,7 @@ def get_test_spec(tag, app_id): def create_job_from_ee2(job_id, extra_data=None, children=None): state = get_test_job(job_id) - job = Job(state, extra_data=extra_data, children=children) - return job + return Job(state, extra_data=extra_data, children=children) def create_state_from_ee2(job_id, exclude_fields=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): @@ -92,26 +90,25 @@ def create_attrs_from_ee2(job_id): job_input = state.get("job_input", {}) narr_cell_info = job_input.get("narrative_cell_info", {}) - attrs = dict( - app_id=job_input.get("app_id", JOB_ATTR_DEFAULTS["app_id"]), - app_version=job_input.get("service_ver", JOB_ATTR_DEFAULTS["app_version"]), - batch_id=( + return { + "app_id": job_input.get("app_id", JOB_ATTR_DEFAULTS["app_id"]), + "app_version": job_input.get("service_ver", JOB_ATTR_DEFAULTS["app_version"]), + "batch_id": ( state.get("job_id") if state.get("batch_job", JOB_ATTR_DEFAULTS["batch_job"]) else state.get("batch_id", JOB_ATTR_DEFAULTS["batch_id"]) ), - batch_job=state.get("batch_job", JOB_ATTR_DEFAULTS["batch_job"]), - cell_id=narr_cell_info.get("cell_id", JOB_ATTR_DEFAULTS["cell_id"]), - child_jobs=state.get("child_jobs", JOB_ATTR_DEFAULTS["child_jobs"]), - job_id=state.get("job_id"), - params=job_input.get("params", JOB_ATTR_DEFAULTS["params"]), - retry_ids=state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]), - retry_parent=state.get("retry_parent", JOB_ATTR_DEFAULTS["retry_parent"]), - run_id=narr_cell_info.get("run_id", JOB_ATTR_DEFAULTS["run_id"]), - tag=narr_cell_info.get("tag", JOB_ATTR_DEFAULTS["tag"]), - user=state.get("user", JOB_ATTR_DEFAULTS["user"]), - ) - return attrs + "batch_job": state.get("batch_job", JOB_ATTR_DEFAULTS["batch_job"]), + "cell_id": narr_cell_info.get("cell_id", JOB_ATTR_DEFAULTS["cell_id"]), + "child_jobs": state.get("child_jobs", JOB_ATTR_DEFAULTS["child_jobs"]), + "job_id": state.get("job_id"), + "params": job_input.get("params", JOB_ATTR_DEFAULTS["params"]), + "retry_ids": state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]), + "retry_parent": state.get("retry_parent", JOB_ATTR_DEFAULTS["retry_parent"]), + "run_id": narr_cell_info.get("run_id", JOB_ATTR_DEFAULTS["run_id"]), + "tag": narr_cell_info.get("tag", JOB_ATTR_DEFAULTS["tag"]), + "user": state.get("user", JOB_ATTR_DEFAULTS["user"]), + } def get_widget_info(job_id): @@ -166,7 +163,7 @@ def get_all_jobs(return_list=False): if job_id not in jobs: jobs[job_id] = create_job_from_ee2(job_id) if return_list: - jobs = list(jobs.values()) + return [jobs.values()] return jobs @@ -186,15 +183,19 @@ def check_jobs_equal(self, jobl, jobr): for attr in JOB_ATTRS: self.assertEqual(getattr(jobl, attr), getattr(jobr, attr)) - def check_job_attrs_custom(self, job, exp_attr={}): + def check_job_attrs_custom(self, job, exp_attr=None): + if not exp_attr: + exp_attr = {} attr = dict(JOB_ATTR_DEFAULTS) attr.update(exp_attr) with mock.patch(CLIENTS, get_mock_client): for name, value in attr.items(): self.assertEqual(value, getattr(job, name)) - def check_job_attrs(self, job, job_id, exp_attrs={}, skip_state=False): + def check_job_attrs(self, job, job_id, exp_attrs=None, skip_state=False): # TODO check _acc_state full vs pruned, extra_data + if not exp_attrs: + exp_attrs = {} # Check state() if no special values expected if not exp_attrs and not skip_state: @@ -721,7 +722,7 @@ def test_was_terminal__batch(self): self.assertFalse(batch_job.was_terminal()) def mock_check_job(self_, params): - assert params["job_id"] in BATCH_CHILDREN + self.assertTrue(params["job_id"] in BATCH_CHILDREN) return {"status": COMPLETED_STATUS} with mock.patch.object(MockClients, "check_job", mock_check_job): diff --git a/src/biokbase/narrative/tests/test_job_util.py b/src/biokbase/narrative/tests/test_job_util.py index d0952fb474..656d86f160 100644 --- a/src/biokbase/narrative/tests/test_job_util.py +++ b/src/biokbase/narrative/tests/test_job_util.py @@ -1,6 +1,7 @@ -from biokbase.narrative.jobs.util import load_job_constants import unittest +from biokbase.narrative.jobs.util import load_job_constants + class JobUtilTestCase(unittest.TestCase): def test_load_job_constants__no_file(self): diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index f61c1aa9e0..c526e1bebf 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -1,78 +1,72 @@ -import unittest -from unittest import mock -import os +import copy import itertools +import os import re -import copy -import time +import unittest +from unittest import mock -from biokbase.narrative.jobs.jobmanager import ( - JobManager, - OutputStateErrMsg, - JOB_NOT_REG_ERR, - JOB_NOT_BATCH_ERR, - JOBS_MISSING_ERR, +from biokbase.narrative.exception_util import ( + JobRequestException, + NarrativeException, + transform_job_exception, ) from biokbase.narrative.jobs.jobcomm import ( - JobRequest, - JobComm, - exc_to_msg, CELLS_NOT_PROVIDED_ERR, - ONE_INPUT_TYPE_ONLY_ERR, INVALID_REQUEST_ERR, - MISSING_REQUEST_TYPE_ERR, MESSAGE_TYPE, + MISSING_REQUEST_TYPE_ERR, + ONE_INPUT_TYPE_ONLY_ERR, PARAM, + JobComm, + JobRequest, + exc_to_msg, ) -from biokbase.narrative.exception_util import ( - NarrativeException, - JobRequestException, - transform_job_exception, +from biokbase.narrative.jobs.jobmanager import ( + JOB_NOT_BATCH_ERR, + JOB_NOT_REG_ERR, + JOBS_MISSING_ERR, + JobManager, +) +from biokbase.narrative.tests.generate_test_results import ( + ALL_RESPONSE_DATA, + JOBS_BY_CELL_ID, + TEST_CELL_ID_LIST, + TEST_CELL_IDs, ) - -from .util import ConfigTests, validate_job_state from biokbase.narrative.tests.job_test_constants import ( + ACTIVE_JOBS, + ALL_JOBS, + BAD_JOB_ID, + BAD_JOB_ID_2, + BAD_JOBS, + BATCH_CHILDREN, + BATCH_COMPLETED, + BATCH_ERROR_RETRIED, + BATCH_PARENT, + BATCH_PARENT_CHILDREN, + BATCH_RETRY_RUNNING, + BATCH_TERMINATED, + BATCH_TERMINATED_RETRIED, CLIENTS, - TIME_NS, - TEST_EPOCH_NS, - MAX_LOG_LINES, JOB_COMPLETED, JOB_CREATED, - JOB_RUNNING, - JOB_TERMINATED, JOB_ERROR, - BATCH_PARENT, - BATCH_COMPLETED, - BATCH_TERMINATED, - BATCH_TERMINATED_RETRIED, - BATCH_ERROR_RETRIED, - BATCH_RETRY_RUNNING, JOB_NOT_FOUND, - BAD_JOB_ID, - BAD_JOB_ID_2, - ALL_JOBS, - BAD_JOBS, - ACTIVE_JOBS, + JOB_RUNNING, + JOB_TERMINATED, + MAX_LOG_LINES, REFRESH_STATE, - BATCH_PARENT_CHILDREN, - BATCH_CHILDREN, generate_error, - get_test_jobs, -) -from biokbase.narrative.tests.generate_test_results import ( - ALL_RESPONSE_DATA, - TEST_CELL_ID_LIST, - TEST_CELL_IDs, - JOBS_BY_CELL_ID, ) -from .narrative_mock.mockcomm import MockComm from .narrative_mock.mockclients import ( - get_mock_client, - get_failing_mock_client, - generate_ee2_error, MockClients, + generate_ee2_error, + get_failing_mock_client, + get_mock_client, ) +from .narrative_mock.mockcomm import MockComm +from .util import ConfigTests, validate_job_state APP_NAME = "The Best App in the World" @@ -128,20 +122,6 @@ def make_comm_msg( return msg -def ts_are_close(t0: int, t1: int, tol: float = 1) -> bool: - """Check if two times, in ns, are "close" - - Args: - t0 (int): time in ns - t1 (int): time in ns - tol (float, optional): tolerated discrepancy between the times, in s. Defaults to 1. - - Returns: - bool: Whether the two times differ by less than the tolerance - """ - return abs(t1 - t0) * 1e-9 <= 1 - - class JobCommTestCase(unittest.TestCase): maxDiff = None @@ -491,7 +471,7 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): self.jm._jobs_by_cell_id = {} self.jm = JobManager() self.assertEqual(self.jm._running_jobs, {}) - # this will trigger a call to _get_all_job_states + # this will trigger a call to get_all_job_states # a message containing all jobs (i.e. {}) will be sent out # when it returns 0 jobs, the JobComm will run stop_job_status_loop self.jc.start_job_status_loop() @@ -506,7 +486,6 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): # --------------------- @mock.patch(CLIENTS, get_mock_client) - @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def check_job_output_states( self, output_states=None, @@ -515,7 +494,6 @@ def check_job_output_states( response_type=STATUS, ok_states=None, error_states=None, - last_checked=TEST_EPOCH_NS, ): """ Handle any request that returns a dictionary of job state objects; this @@ -528,7 +506,6 @@ def check_job_output_states( :param params: params for the comm message (opt) :param ok_states: list of job IDs expected to be in the output :param error_states: list of job IDs expected to return a not found error - :param last_checked: ts in ns """ if not params: params = {} @@ -550,11 +527,6 @@ def check_job_output_states( msg, ) - if response_type == STATUS: - self._check_pop_last_checked(output_states, last_checked) - else: - self.assertNotIn("last_checked", output_states) - for job_id, state in output_states.items(): self.assertEqual(ALL_RESPONSE_DATA[STATUS][job_id], state) if job_id in ok_states: @@ -572,7 +544,6 @@ def test_get_all_job_states__ok(self): # ----------------------- # Lookup single job state # ----------------------- - @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_state__1_ok(self): output_states = self.jc.get_job_state(JOB_COMPLETED) self.check_job_output_states( @@ -585,23 +556,9 @@ def test_get_job_state__no_job(self): ): self.jc.get_job_state(None) - def test_lookup_job_state__live_ts(self): - output_states = self.jc.get_job_state(JOB_COMPLETED) - self.assertTrue( - ts_are_close(output_states["last_checked"], time.time_ns()) - ) - # ----------------------- # Lookup select job states # ----------------------- - def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS): - self.assertIn("last_checked", output_states) - self.assertTrue( - last_checked == output_states["last_checked"] - or ts_are_close(last_checked, output_states["last_checked"]) - ) - del output_states["last_checked"] - def test_get_job_states__job_id__ok(self): self.check_job_output_states( params={JOB_ID: JOB_COMPLETED}, ok_states=[JOB_COMPLETED] @@ -655,7 +612,6 @@ def test_get_job_states__batch_id__not_batch(self): self.check_batch_id__not_batch_test(STATUS) @mock.patch(CLIENTS, get_mock_client) - @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_states__job_id_list__ee2_error(self): exc = Exception("Test exception") exc_message = str(exc) @@ -670,12 +626,10 @@ def mock_check_jobs(params): self.jc._handle_comm_message(req_dict) msg = self.jc._comm.last_message - self._check_pop_last_checked(msg["content"], TEST_EPOCH_NS) - expected = {job_id: copy.deepcopy(ALL_RESPONSE_DATA[STATUS][job_id]) for job_id in ALL_JOBS} for job_id in ACTIVE_JOBS: # add in the ee2_error message - expected[job_id]["error"] = OutputStateErrMsg.QUERY_EE2_STATES.value % (job_id, exc_message) + expected[job_id]["error"] = exc_message self.assertEqual( { @@ -685,79 +639,6 @@ def mock_check_jobs(params): msg, ) - @mock.patch(CLIENTS, get_mock_client) - def test_get_job_states__err(self): - """ - """ - # what FE would say was the last time the jobs were checked - NOW = time.time_ns() - - # mix of terminal and not terminal - not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] - # not terminal - updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] - - # error ids - not_found_ids = [JOB_NOT_FOUND] - - job_ids = not_updated_ids + updated_ids - active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) - - # output_states will be partitioned as not_found_ids - terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) - not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) - updated_active_ids = list(set(updated_ids) & set(active_ids)) # (yes, redundant) - - def mock_check_jobs(params): - """Update appropriate job states""" - lookup_ids = params["job_ids"] - self.assertCountEqual(active_ids, lookup_ids) # sanity check - - job_states_ret = get_test_jobs(lookup_ids) - for job_id, job_state in job_states_ret.items(): - # if job was updated, return an updated version - if job_id in updated_active_ids: - job_state["updated"] += 1 - return job_states_ret - - rq = make_comm_msg(STATUS, job_ids + not_found_ids, False, {"ts": NOW}) - with mock.patch.object(MockClients, "check_jobs", side_effect=mock_check_jobs): - output_states = self.jc._handle_comm_message(rq) - - # checks - exp_updated_output_states = { - job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in updated_active_ids - } - for job_state in exp_updated_output_states.values(): - job_state["jobState"]["updated"] += 1 - - expected = { - # corresponding to not_found_ids - **{ - job_id: { - "job_id": job_id, - "error": OutputStateErrMsg.NOT_FOUND.value % job_id - } - for job_id in not_found_ids - }, - # corresponding to updated_active_ids - **exp_updated_output_states, - # corresponding to not_updated_active_ids and terminal_ids - **{ - job_id: { - "job_id": job_id, - "error": OutputStateErrMsg.NOT_UPDATED.value % (job_id, NOW) - } - for job_id in not_updated_active_ids + terminal_ids - }, - } - - self._check_pop_last_checked(output_states, NOW) - self.assertEqual( - expected, - output_states - ) - # ----------------------- # get cell job states # ----------------------- @@ -959,19 +840,17 @@ def test_cancel_jobs__job_id_list__all_bad_jobs(self): ) @mock.patch(CLIENTS, get_mock_client) - @mock.patch(TIME_NS, lambda: TEST_EPOCH_NS) def test_cancel_jobs__job_id_list__failure(self): # the mock client will throw an error with BATCH_RETRY_RUNNING job_id_list = [JOB_RUNNING, BATCH_RETRY_RUNNING] req_dict = make_comm_msg(CANCEL, job_id_list, False) output = self.jc._handle_comm_message(req_dict) - self._check_pop_last_checked(output) expected = { JOB_RUNNING: ALL_RESPONSE_DATA[STATUS][JOB_RUNNING], BATCH_RETRY_RUNNING: { **ALL_RESPONSE_DATA[STATUS][BATCH_RETRY_RUNNING], - "error": OutputStateErrMsg.CANCEL.value % (BATCH_RETRY_RUNNING, CANCEL + " failed"), + "error": CANCEL + " failed", }, } diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index 5ac28b69de..dc2c36bb05 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -1,76 +1,63 @@ -""" -Tests for job management -""" -import unittest import copy import itertools -from unittest import mock -import re import os -from typing import List, Tuple -import time +import re +import unittest from datetime import datetime +from unittest import mock + from IPython.display import HTML -from biokbase.narrative.jobs.jobmanager import ( - JobManager, - OutputStateErrMsg, - JOB_NOT_REG_ERR, - JOB_NOT_BATCH_ERR, - JOBS_MISSING_ERR, - CELLS_NOT_PROVIDED_ERR, -) +from biokbase.narrative.exception_util import JobRequestException, NarrativeException from biokbase.narrative.jobs.job import ( - Job, EXCLUDED_JOB_STATE_FIELDS, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, + Job, ) from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE -from biokbase.narrative.exception_util import ( - NarrativeException, - JobRequestException, +from biokbase.narrative.jobs.jobmanager import ( + CELLS_NOT_PROVIDED_ERR, + JOB_NOT_BATCH_ERR, + JOB_NOT_REG_ERR, + JOBS_MISSING_ERR, + JobManager, +) +from biokbase.narrative.tests.generate_test_results import ( + ALL_RESPONSE_DATA, + JOBS_BY_CELL_ID, + TEST_CELL_ID_LIST, + TEST_CELL_IDs, ) from biokbase.narrative.tests.job_test_constants import ( + ACTIVE_JOBS, + ALL_JOBS, + BAD_JOB_ID, + BAD_JOBS, + BATCH_CHILDREN, + BATCH_ERROR_RETRIED, + BATCH_PARENT, + BATCH_TERMINATED, + BATCH_TERMINATED_RETRIED, CLIENTS, JOB_COMPLETED, JOB_CREATED, - JOB_RUNNING, - JOB_TERMINATED, JOB_ERROR, - BATCH_PARENT, - BATCH_TERMINATED, - BATCH_TERMINATED_RETRIED, - BATCH_ERROR_RETRIED, - BATCH_RETRY_RUNNING, JOB_NOT_FOUND, - BAD_JOB_ID, - ALL_JOBS, - BAD_JOBS, - TERMINAL_JOBS, - ACTIVE_JOBS, + JOB_RUNNING, + JOB_TERMINATED, REFRESH_STATE, - BATCH_CHILDREN, + TERMINAL_JOBS, TEST_JOBS, - TEST_EPOCH_NS, - get_test_job, - get_test_jobs, generate_error, -) - -from biokbase.narrative.tests.generate_test_results import ( - ALL_RESPONSE_DATA, - JOBS_BY_CELL_ID, - TEST_CELL_ID_LIST, - TEST_CELL_IDs, + get_test_job, ) from .narrative_mock.mockclients import ( - get_mock_client, - get_failing_mock_client, - assert_obj_method_called, MockClients, + assert_obj_method_called, + get_failing_mock_client, + get_mock_client, ) - from .util import ConfigTests TERMINAL_IDS = [JOB_COMPLETED, JOB_TERMINATED, JOB_ERROR] @@ -78,6 +65,10 @@ class JobManagerTest(unittest.TestCase): + """ + Tests for job management + """ + @classmethod @mock.patch(CLIENTS, get_mock_client) def setUpClass(cls): @@ -253,12 +244,9 @@ def mock_check_jobs(params): for job_id in ALL_JOBS } - for job_id, meta_exc_msg in zip( - ACTIVE_JOBS, - OutputStateErrMsg.QUERY_EE2_STATES.gen_err_msg([ACTIVE_JOBS, exc_msg]) - ): + for job_id in ACTIVE_JOBS: # expect there to be an error message added - expected[job_id]["error"] = meta_exc_msg + expected[job_id]["error"] = exc_msg self.assertEqual( expected, @@ -719,67 +707,6 @@ def test_get_job_states__empty(self): ): self.jm.get_job_states([]) - @mock.patch(CLIENTS, get_mock_client) - def test_get_job_states__last_updated(self): - """ - Test that only updated jobs return an actual state - and that the rest of the jobs return an error stub state - """ - # what FE would say was the last time the jobs were checked - NOW = time.time_ns() - - # mix of terminal and not terminal - not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] - # not terminal - updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] - - job_ids = not_updated_ids + updated_ids - active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) - - # output_states will be partitioned as - terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) - not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) - updated_active_ids = list(set(updated_ids) & set(active_ids)) # (yes, redundant) - - def mock_check_jobs(self_, params): - """Update appropriate job states""" - lookup_ids = params["job_ids"] - self.assertCountEqual(active_ids, lookup_ids) # sanity check - - job_states_ret = get_test_jobs(lookup_ids) - for job_id, job_state in job_states_ret.items(): - # if job was updated, return an updated version - if job_id in updated_active_ids: - job_state["updated"] += 1 - return job_states_ret - - with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): - output_states = self.jm.get_job_states(job_ids, ts=NOW) - - updated_output_states = { - job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in updated_active_ids - } - for job_state in updated_output_states.values(): - job_state["jobState"]["updated"] += 1 - - expected = { - # corresponding to updated_active_ids - **updated_output_states, - # corresponding to not_updated_active_ids and terminal_ids - **{ - job_id: { - "job_id": job_id, - "error": OutputStateErrMsg.NOT_UPDATED.value % (job_id, NOW) - } - for job_id in not_updated_active_ids + terminal_ids - } - } - - self.assertEqual( - expected, - output_states - ) - def test_update_batch_job__dne(self): with self.assertRaisesRegex( JobRequestException, f"{JOB_NOT_REG_ERR}: {JOB_NOT_FOUND}" @@ -884,219 +811,6 @@ def test_get_job_info(self): }, ) - @mock.patch(CLIENTS, get_mock_client) - def test_add_errors_to_results__concat_errs__integrated(self): - active_ids = [JOB_CREATED, JOB_RUNNING] - terminal_ids = [JOB_COMPLETED] - job_ids = active_ids + terminal_ids - - check_jobs_err = "Something went wrong in EE2.check_jobs" - check_jobs_exc = RuntimeError(check_jobs_err) - cancel_job_errs = { - job_id: err - for job_id, err in zip( - job_ids, [f"EE2.check_job err {num}" for num in ["UNO", "DOS"]] - ) - } - - def mock_check_jobs(self, params): - raise check_jobs_exc - - def mock_cancel_job(self, job_id): - return NarrativeException( - None, cancel_job_errs[job_id], None, None, None - ) - - with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): - with mock.patch.object(JobManager, "_cancel_job", mock_cancel_job): - output_states = self.jm.cancel_jobs(job_ids) - - exp = {job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) for job_id in job_ids} - for job_id in active_ids: - exp[job_id]["error"] = ( - f"A Job.query_ee2_states error occurred for job with ID {job_id}: {check_jobs_err}" - "\n" - f"An EE2.cancel_job error occurred for job with ID {job_id}: {cancel_job_errs[job_id]}" - ) - - self.assertEqual( - exp, - output_states - ) - - def test_add_errors_to_results__concat_errs__unit(self): - job_ids = ALL_JOBS - error_ids = [JOB_RUNNING, JOB_COMPLETED] - output_states = get_test_jobs(job_ids) - - check_jobs_err = "Test check_jobs exception" - cancel_job_errs = ["Test cancel_job exception UNO", "Test cancel_job exception DOS"] - - self.jm.add_errors_to_results( - output_states, error_ids, OutputStateErrMsg.QUERY_EE2_STATES, check_jobs_err - ) - self.jm.add_errors_to_results( - output_states, error_ids, OutputStateErrMsg.CANCEL, cancel_job_errs - ) - - for error_id, cancel_job_err in zip(error_ids, cancel_job_errs): - output_state = output_states[error_id] - self.assertIn("error", output_state) - self.assertEqual( - ( - f"A Job.query_ee2_states error occurred for job with ID {error_id}: {check_jobs_err}" - "\n" - f"An EE2.cancel_job error occurred for job with ID {error_id}: {cancel_job_err}" - ), - output_state["error"] - ) - - def test_add_errors_to_results__cannot_add_err(self): - job_ids = [JOB_RUNNING, JOB_COMPLETED] - error_ids = [JOB_CREATED] - output_states = get_test_jobs(job_ids) - - with self.assertRaisesRegex( - ValueError, f"Cannot add error because response dict is missing key {error_ids[0]}" - ): - self.jm.add_errors_to_results( - output_states, error_ids, OutputStateErrMsg.CANCEL, ["Test cancel_job exception`"] - ) - - -class OutputStateErrMsgTest(unittest.TestCase): - """ - Unit tests - """ - - JOB_IDS = [c + str(i) for c, i in zip(list("abc"), range(3))] - ERROR_IDS = JOB_IDS[1:] - CHECK_JOBS_ERR = "ee2.check_jobs rejection" - CANCEL_JOBS_ERR = [ - "ee2.cancel_job rejection UNO", "ee2.cancel_job rejection DOS" - ] - - maxDiff = None - - def get_orig_results(self): - return { - job_id: {"some": "random", "content": "with", "job_id": job_id} - for job_id in self.JOB_IDS - } - - def get_first_orig_result(self): - job_id = self.JOB_IDS[0] - return { - job_id: {"some": "random", "content": "with", "job_id": job_id} - } - - def add_errors_to_results( - self, results: dict, error_ids: List[str], error_enum: OutputStateErrMsg, *extra_its: Tuple - ): - """ - Strongly resembles jm.add_errors_to_results - But a pared down happy path method - """ - gen_err_msg = error_enum.gen_err_msg([error_ids] + list(extra_its)) - - for error_id, err_msg in zip(error_ids, gen_err_msg): - if error_enum.replace_result(): - results[error_id] = { - "job_id": error_id, - "error": err_msg, - } - else: - results[error_id].update( - {"error": err_msg} - ) - return results - - def test__NOT_FOUND(self): - results = self.add_errors_to_results( - self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.NOT_FOUND - ) - self.assertEqual( - { - **self.get_first_orig_result(), - **{ - job_id: { - "job_id": job_id, - "error": f"Cannot find job with ID {job_id}" - } - for job_id in self.ERROR_IDS - } - }, - results - ) - - def test__NOT_UPDATED(self): - results = self.add_errors_to_results( - self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.NOT_UPDATED, TEST_EPOCH_NS - ) - self.assertEqual( - { - **self.get_first_orig_result(), - **{ - job_id: { - "job_id": job_id, - "error": f"Job with ID {job_id} has not been updated since ts {TEST_EPOCH_NS}" - } - for job_id in self.ERROR_IDS - } - }, - results - ) - - def test__QUERY_EE2_STATES(self): - results = self.add_errors_to_results( - self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.QUERY_EE2_STATES, self.CHECK_JOBS_ERR - ) - self.assertEqual( - { - **self.get_first_orig_result(), - **{ - job_id: { - "some": "random", "content": "with", "job_id": job_id, - "error": f"A Job.query_ee2_states error occurred for job with ID {job_id}: {self.CHECK_JOBS_ERR}" - } - for job_id in self.ERROR_IDS - } - }, - results - ) - - def test__CANCEL(self): - results = self.add_errors_to_results( - self.get_orig_results(), self.ERROR_IDS, OutputStateErrMsg.CANCEL, self.CANCEL_JOBS_ERR - ) - self.assertEqual( - { - **self.get_first_orig_result(), - **{ - job_id: { - "some": "random", "content": "with", "job_id": job_id, - "error": f"An EE2.cancel_job error occurred for job with ID {job_id}: {cancel_job_err}" - } - for job_id, cancel_job_err in zip(self.ERROR_IDS, self.CANCEL_JOBS_ERR) - } - }, - results - ) - - def test_gen_err_msg__wrong_type_arg(self): - with self.assertRaisesRegex( - TypeError, - "Argument its must be of type list" - ): - OutputStateErrMsg.NOT_FOUND.gen_err_msg(42) - - def test_gen_err_msg__wrong_num_format(self): - with self.assertRaisesRegex( - ValueError, - re.escape("OutputStateErrMsg.NOT_FOUND must be formatted with 1 argument(s). Received 2 argument(s)") - ): - OutputStateErrMsg.NOT_FOUND.gen_err_msg([self.ERROR_IDS, "extra_unused_format_arg"]) - if __name__ == "__main__": unittest.main()