From fa4ea85606717b2c6e97fc965b2c42cea8e45dc0 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Mon, 4 Apr 2022 12:56:58 -0700 Subject: [PATCH] WIP WIP WIP WIP updating docs WIP --- docs/design/job_architecture.md | 240 ++++---- .../static/kbase/config/job_config.json | 3 +- src/biokbase/narrative/jobs/job.py | 106 ++-- src/biokbase/narrative/jobs/jobcomm.py | 266 ++++++--- src/biokbase/narrative/jobs/jobmanager.py | 551 +++++++++++++----- src/biokbase/narrative/jobs/util.py | 6 +- .../narrative/tests/generate_test_results.py | 13 +- .../narrative/tests/job_test_constants.py | 3 + src/biokbase/narrative/tests/test_job.py | 2 + src/biokbase/narrative/tests/test_jobcomm.py | 34 ++ .../narrative/tests/test_jobmanager.py | 17 + 11 files changed, 869 insertions(+), 372 deletions(-) diff --git a/docs/design/job_architecture.md b/docs/design/job_architecture.md index 56dc5b660d..43154bc89d 100644 --- a/docs/design/job_architecture.md +++ b/docs/design/job_architecture.md @@ -38,16 +38,6 @@ These messages are sent to the `JobCommChannel` on the front end, to get process * `JOB_ID_LIST` - an array of job IDs OR * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) -`START_UPDATE` - request the status for a job or jobs, but start an update cycle so that it's continually requested. - * `JOB_ID` - a string, the job id OR - * `JOB_ID_LIST` - an array of job IDs OR - * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) - -`STOP_UPDATE` - signal that the front end doesn't need any more updates for the specified job(s), so stop sending them for each loop cycle. Doesn't actually end the job, only requests for updates. - * `JOB_ID` - a string, the job id OR - * `JOB_ID_LIST` - an array of job IDs OR - * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) - `INFO` - request information about the job(s), specifically app id, spec, input parameters and (if finished) outputs * `JOB_ID` - a string, the job id OR * `JOB_ID_LIST` - an array of job IDs OR @@ -56,6 +46,7 @@ These messages are sent to the `JobCommChannel` on the front end, to get process `CANCEL` - request that the server cancel the running job(s) * `JOB_ID` - a string, the job id OR * `JOB_ID_LIST` - an array of job IDs + * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) `RETRY` - request that the server rerun a job or set of jobs * `JOB_ID` - a string, the job id OR @@ -68,6 +59,18 @@ These messages are sent to the `JobCommChannel` on the front end, to get process * `num_lines` - the number of lines to request (will get back up to that many if there aren't more) (optional) * `latest` - true if requesting just the latest set of logs (optional) +### To be deprecated -- do not use + +`START_UPDATE` - request the status for a job or jobs, but start an update cycle so that it's continually requested. + * `JOB_ID` - a string, the job id OR + * `JOB_ID_LIST` - an array of job IDs OR + * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) + +`STOP_UPDATE` - signal that the front end doesn't need any more updates for the specified job(s), so stop sending them for each loop cycle. Doesn't actually end the job, only requests for updates. + * `JOB_ID` - a string, the job id OR + * `JOB_ID_LIST` - an array of job IDs OR + * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) + ### Usage Example The comm channel is used through the main Bus object that's instantiated through the global `Runtime` object. That needs to be included in the `define` statement for all AMD modules. The bus is then used with its `emit` function (you have the bus *emit* a message to its listeners), and any inputs are passed along with it. @@ -182,7 +185,6 @@ Note that both of these create events that get bound to the DOM, and when the wi ## Kernel Comm Channel On the kernel side, a complementary comm channel is used. This is set up in the `biokbase.narrative.jobs.jobcomm.JobComm` class. On Narrative load, page reload, or kernel restart, this is initialized to handle any messages sent to the kernel. The same controlled vocabulary of terms is used for message types and job parameters as for the frontend. - Note that these are autogenerated by the frontend `JobCommChannel` object, using the `Jupyter.kernel.comm` package. The actual message that the JobComm sees in the kernel has this format: @@ -212,7 +214,8 @@ print(PARAM["JOB_ID_LIST"]) # prints 'job_id_list' For simplicity below, it is assumed that constants have been created from the key/value pairs in MESSAGE_TYPE and PARAM. -An example of a job status message sent from the frontend to the kernel: +An example of a job status message sent from the frontend to the kernel, requesting a job status update on a job. + ```json { "msg_id": "some string", @@ -224,45 +227,50 @@ An example of a job status message sent from the frontend to the kernel: } } ``` + Messages expected by the narrative backend: -`STATUS` - request job status +#### `STATUS` - request job status; responds with `STATUS` * `JOB_ID` - a string, the job id OR * `JOB_ID_LIST` - an array of job IDs OR * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) + * `TS` (optional) - an int indicating to send back only jobs that have been updated since this ns epoch -`STATUS_ALL` - request the status of all jobs registered in the job manager +#### `STATUS_ALL` - request the status of all jobs registered in the job manager; responds with `STATUS` -`START_UPDATE` - request updating job(s) during the update thread, responds with `STATUS` +#### `START_UPDATE` - request updating job(s) during the update thread; responds with `STATUS` * `JOB_ID` - a string, the job id OR * `JOB_ID_LIST` - an array of job IDs OR * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) -`STOP_UPDATE` - request halting update for job(s) during the update thread, responds with `STATUS` +#### `STOP_UPDATE` - request halting update for job(s) during the update thread, responds with `STATUS` * `JOB_ID` - a string, the job id OR * `JOB_ID_LIST` - an array of job IDs OR * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) -`INFO` - request general information about job(s) +#### `INFO` - request general information about job(s); responds with `INFO` * `JOB_ID` - a string, the job id OR * `JOB_ID_LIST` - an array of job IDs OR * `BATCH_ID` - a batch parent (make the request for all jobs in the batch) -`LOGS` - request job log information +#### `LOGS` - request job log information; responds with `LOGS` * `JOB_ID` - string OR * `JOB_ID_LIST` - array of strings * `first_line` - int >= 0, ignored if `latest` is `true` * `num_lines` - int > 0 * `latest` - boolean, `true` if requesting just the latest logs -`CANCEL` - cancel a job or list of jobs; responds with `STATUS` +#### `CANCEL` - cancel a job or list of jobs; responds with `STATUS` * `JOB_ID` - string OR * `JOB_ID_LIST` - array of strings -`RETRY` - retry a job or list of jobs, responds with `RETRY` +#### `RETRY` - retry a job or list of jobs; responds with `RETRY` * `JOB_ID` - string OR * `JOB_ID_LIST` - array of strings +#### `CELL_JOB_STATUS` - fetch status of jobs associated with cell ID(s); responds with `CELL_JOB_STATUS` +* `CELL_ID_LIST` + ## Messages sent from the kernel to the browser These are all caught by the `JobCommChannel` on the browser side, then parsed and sent as the bus messages described above. Like other kernel messages, they have a `msg_type` field, and a `content` field containing data meant for the frontend to use. They have a rough structure like this: @@ -286,6 +294,7 @@ For example: "msg_type": "job_status", "content": { "example_job_id": { + "job_id": "example_job_id", "jobState": { "job_id": "example_job_id", "status": "running", @@ -305,14 +314,14 @@ By design, these should only be seen by the `JobCommChannel` instance, then sent The backend bundles together multiple messages of the same type in an object indexed by key (usually job or cell ID). In nearly all cases, the frontend then separates out the data and sends it out on individual channels for each job or cell. ### `ERROR` -A general job comm error, capturing most errors that get thrown by the kernel +A general job comm error message, capturing exceptions that get thrown by the kernel **content** (this varies, but usually includes the below) - * `raw_request` - the original request message that wound up in an error - * `source` - request type or method that triggered the error + * `source` - request type, arbitrary string, or null, meant to indicate what triggered the error + * `request` - request data, arbitrary string, or null, meant to indicate the comm request that triggered the error Error messages usually also contain data about the error itself: - * `name` - the error type - * `message` - description of the issue + * `name` - the exception type name + * `message` - the exception message **bus** `ERROR` @@ -327,24 +336,32 @@ Dictionary with key(s) job ID and value dictionaries with the following structur * `job_params` - the unstructured set of parameters sent to the execution engine * `batch_id` - id of batch container job -i.e. +In case of error, the response has instead the keys: + * `job_id` + * `error` - brief message explaining the issue + +Sample response JSON: ```json { "job_id_1": { - "app_id": ..., - "app_name": ..., + "app_id": string, + "app_name": string, "job_id": "job_id_1", "job_params": ..., "batch_id": "some_batch_id", }, - "job_id_2": { ...contents... } + "job_id_2": { + ... + }, + ..., + "error_id_1": { + "job_id": "error_id_1", + "error": string + }, + ... } ``` -In case of error, the response has the keys: - * `job_id` - * `error` - brief message explaining the issue - **bus** The frontend splits out the job info and distributes it out by job ID. @@ -352,14 +369,23 @@ The frontend splits out the job info and distributes it out by job ID. The current job state. This one is probably most common. **content** -Dictionary with key(s) job ID and value dictionaries with the following structure: - * `jobState` - see **Data Structures** below for details (it's big and shouldn't be repeated all over this document). Regarding error states: non-existent jobs have the status `does_not_exist`, and when the job state cannot be retrieved from EE2 the status `ee2_error` is used - * `outputWidgetInfo` - the parameters to send to output widgets, only available for a completed job +Dictionaries generated from `job.output_state()` with key(s) generally being + * "job_id" with the value being a dictionary with the following structure: + * `job_id` + * `jobState` - see **Data Structures** below for details (it's big and shouldn't be repeated all over this document). + * `outputWidgetInfo` - the parameters to send to output widgets, only available for a completed job + * `error` (optional) - an error added in case both the most recent state and the error are desired + * `last_checked` (optional) - ns epoch that tells the frontend when this `STATUS` request was completed so the frontend can use it to make the next `STATUS` request + +In case of error, the response can have instead the keys: + * `job_id` + * `error` - brief message explaining the issue Sample response JSON: ```json { "job_id_1": { + "job_id": "job_id_1", "jobState": { "job_id": "job_id_1", "status": "running", @@ -367,14 +393,27 @@ Sample response JSON: }, "outputWidgetInfo": null, // only available for completed jobs }, - "job_id_2": { ...contents... } + "job_id_2": { + "job_id": "job_id_2", + "error": "Cannot find job with ID job_id_2" + }, + "job_id_3": { + "job_id": "job_id_3", + "jobState": { + "job_id": "job_id_3", + "status": "completed", + ... + }, + "outputWidgetInfo": { + ... + }, + "error": , + }, + ..., + "last_checked": 1647888392311312353 // optional } ``` -In case of error, the response has the keys: - * `job_id` - * `error` - brief message explaining the issue - **bus** Job status data is split out into individual jobs by the frontend and distributed. @@ -382,15 +421,19 @@ Job status data is split out into individual jobs by the frontend and distribute ### `STATUS_ALL` The set of all job states for all running jobs, or at least the set that should be updated (those that are complete and not requested by the front end are not included - if a job is sitting in an error or finished state, it doesn't need ot have its app cell updated) -**content** - all of the below are included, but the top-level keys are all job id strings, e.g.: +**content** +Dictionary similar to `STATUS` response. All of the below are included, but the top-level keys are all job id strings, e.g.: + * `job_id` + * `jobState` - the job state (see the **Data Structures** section below for details + * `outputWidgetInfo` - the parameters to send to output widgets, only available for a completed job + +Sample response JSON: ```json { "job_id_1": { ...contents... }, "job_id_2": { ...contents... } } ``` - * `jobState` - the job state (see the **Data Structures** section below for details - * `outputWidgetInfo` - the parameters to send to output widgets, only available for a completed job **bus** - see `STATUS` @@ -415,7 +458,6 @@ In case of error, the response has the keys: The most common log error encountered is that logs are not found -- this can occur if the job has not yet started running or the job was terminated whilst it was still in the job queue. **bus** `LOGS` - Log data is split out into individual jobs and sent to `job_id` (see above) @@ -424,6 +466,18 @@ Sent when one or more jobs are retried **content** Dictionary with key(s) original job ID and value dictionaries with the following structure: +Where the dict values corresponding to "job" or "retry" are the same data structures as for `job_status` +Outer keys: + * `job_id` - string, ID of the job that was retried (the retry parent) + * `job` - string, the job state object of that job + * `retry_id` - string, ID of the new job + * `retry` - string, the job state object of the new job that was launched + +In case of error, the response has instead the keys: + * `job_id` + * `error` - brief message explaining the issue + +Sample response JSON: ```json { "job_id_1": { @@ -435,28 +489,18 @@ Dictionary with key(s) original job ID and value dictionaries with the following "job_id_2": { "job_id": "job_id_2", "job": {"jobState": {"job_id": "job_id_2", "status": status, ...} ...}, - "error": "..." + "error": "..." // from EE2 }, ... - job_id_x: { - "job": {"jobState": {"job_id": "job_id_x", "status": "does_not_exist"}}, + "error_id_1": { + "job_id": "error_id_1", "error": "does_not_exist" } } ``` -Where the dict values corresponding to "job" or "retry" are the same data structures as for `job_status` -Outer keys: - * `job_id` - string, ID of the job that was retried (the retry parent) - * `job` - string, the job state object of that job - * `retry_id` - string, ID of the new job - * `retry` - string, the job state object of the new job that was launched -In case of error, the response has the keys: - * `job_id` - * `error` - brief message explaining the issue **bus** `RETRY` - Retry data is split out into individual jobs and sent to `job_id` @@ -536,7 +580,7 @@ JobComm().start_update_loop ## Data Structures ### Job state #### EE2 State -In kernel, as retrieved from EE2.check_job +In kernel, as retrieved from `EE2.check_job` (described by example) ```json { @@ -584,54 +628,38 @@ In kernel, as retrieved from EE2.check_job } ``` #### BE Output State -As sent to browser, includes cell info and run info +As generated by `job.output_state()` and sent to browser, includes cell info and run info ``` { - outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result - jobState: { - job_id: string, - status: string, - created: epoch ms, - updated: epoch ms, - queued: optional - epoch ms, - finished: optional - epoch ms, - terminated_code: optional - int, - tag: string (release, beta, dev), - parent_job_id: optional - string or null, - run_id: string, - cell_id: string, - errormsg: optional - string, - error (optional): { - code: int, - name: string, - message: string (should be for the user to read), - error: string, (likely a stacktrace) - }, - error_code: optional - int - created: 1583863267000, - batch_id: str, - batch_job: bool, - child_jobs: array, - retry_ids: array, - retry_parent: str - } -} -``` - -When an error occurs while preparing the job state, the output states will have the formats -```json -{ - "job_id_0": { - "jobState": {"job_id": "job_id_0", "status": "does_not_exist"} - }, - "job_id_1": { - "jobState": { - "job_id": "job_id_1", - "status": "ee2_error", - "updated": 1234567890, // the current epoch time - ... // cached job data - } - }, - ... + job_id: string, + jobState: { + job_id: string, + status: string, + created: epoch ms, + updated: epoch ms, + queued: optional - epoch ms, + finished: optional - epoch ms, + terminated_code: optional - int, + tag: string (release, beta, dev), + parent_job_id: optional - string or null, + run_id: string, + cell_id: string, + errormsg: optional - string, + error (optional): { + code: int, + name: string, + message: string (should be for the user to read), + error: string, (likely a stacktrace) + }, + error_code: optional - int + created: 1583863267000, + batch_id: str, + batch_job: bool, + child_jobs: array, + retry_ids: array, + retry_parent: str + } + outputWidgetInfo: if not finished, None, else job.get_viewer_params result,4 + error: optional - string } ``` diff --git a/kbase-extension/static/kbase/config/job_config.json b/kbase-extension/static/kbase/config/job_config.json index bc8f7b4d74..f0198a7eba 100644 --- a/kbase-extension/static/kbase/config/job_config.json +++ b/kbase-extension/static/kbase/config/job_config.json @@ -4,7 +4,8 @@ "CELL_ID": "cell_id", "CELL_ID_LIST": "cell_id_list", "JOB_ID": "job_id", - "JOB_ID_LIST": "job_id_list" + "JOB_ID_LIST": "job_id_list", + "TS": "ts" }, "message_types": { "CANCEL": "cancel_job", diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index fb16b9014b..6d7e0f7b3e 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -25,9 +25,7 @@ "scheduler_type", "scheduler_id", ] - EXCLUDED_JOB_STATE_FIELDS = JOB_INIT_EXCLUDED_JOB_STATE_FIELDS + ["job_input"] - OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS = EXCLUDED_JOB_STATE_FIELDS + ["user", "wsid"] EXTRA_JOB_STATE_FIELDS = ["batch_id", "child_jobs"] @@ -100,7 +98,7 @@ def __init__(self, ee2_state, extra_data=None, children=None): if ee2_state.get("job_id") is None: raise ValueError("Cannot create a job without a job ID!") - self._acc_state = ee2_state + self._update_state(ee2_state) self.extra_data = extra_data # verify parent-children relationship @@ -325,20 +323,30 @@ def _update_state(self, state: dict) -> None: """ given a state data structure (as emitted by ee2), update the stored state in the job object """ - if state: + if not isinstance(state, dict): + raise TypeError("state must be a dict") + # Check job_id match + if self._acc_state: if "job_id" in state and state["job_id"] != self.job_id: raise ValueError( f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}" ) - state = copy.deepcopy(state) - if self._acc_state is None: - self._acc_state = state - else: - self._acc_state.update(state) + # Check if there would be no change in updating + # i.e., if state <= self._acc_state + if self._acc_state is not None: + if {**self._acc_state, **state} == self._acc_state: + return + + state = copy.deepcopy(state) + if self._acc_state is None: + self._acc_state = state + else: + self._acc_state.update(state) + self.last_updated = time.time_ns() - def state(self, force_refresh=False): + def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ Queries the job service to see the state of the current job. """ @@ -347,7 +355,7 @@ def state(self, force_refresh=False): state = self.query_ee2_state(self.job_id, init=False) self._update_state(state) - return self._internal_state(JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + return self._internal_state(exclude) def _internal_state(self, exclude=None): """Wrapper for self._acc_state""" @@ -355,39 +363,55 @@ def _internal_state(self, exclude=None): self._trim_ee2_state(state, exclude) return state - def output_state(self, state=None) -> dict: + def output_state(self, state=None, no_refresh=False) -> dict: """ - :param state: can be queried individually from ee2/cache with self.state(), - but sometimes want it to be queried in bulk from ee2 upstream - :return: dict, with structure - - { - outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result - jobState: { - job_id: string, - status: string, - created: epoch ms, - updated: epoch ms, - queued: optional - epoch ms, - finished: optional - epoc ms, - terminated_code: optional - int, - tag: string (release, beta, dev), - parent_job_id: optional - string or null, - run_id: string, - cell_id: string, - errormsg: optional - string, - error (optional): { - code: int, - name: string, - message: string (should be for the user to read), - error: string, (likely a stacktrace) - }, - error_code: optional - int - } - } + :param state: Supplied when the state is queried beforehand from EE2 in bulk, + or when it is retrieved from a cache. If not supplied, must be + queried with self.state() or self._internal_state() + :return: dict - with structure generally like (not accounting for error modes): + { + "job_id": string, + "jobState": { + "status": string - enum, + "created": epoch ms, + "updated": epoch ms, + "queued": epoch ms, + "running": epoch ms, + "finished": epoch ms, + "batch_job": bool, + "job_output": { + "version": string, + "result": [ + { + "obj_ref": string, + "report_name": string, + "report_ref": string, + } + ], + "id": string + }, + "batch_id": string, + "child_jobs": list, + "retry_ids": list, + "retry_count": int, + "job_id": string, + "created": epoch ms + }, + "outputWidgetInfo": { # None if not finished + "name": string, + "tag": string - (release, beta, dev), + "params": { + "wsName": string, + "obj_ref": string, + "report_name": string, + "report_ref": string + "report_window_line_height": string + } + } + } """ if not state: - state = self.state() + state = self._internal_state() if no_refresh else self.state() else: self._update_state(state) state = self._internal_state() diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index 5e5dbc378c..a7c39d3718 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -1,6 +1,7 @@ import copy import threading from typing import List, Union +import time from ipykernel.comm import Comm from biokbase.narrative.jobs.util import load_job_constants from biokbase.narrative.jobs.jobmanager import JobManager @@ -16,7 +17,8 @@ JOBS_NOT_PROVIDED_ERR = "job_id_list not provided" CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided" BATCH_NOT_PROVIDED_ERR = "batch_id not provided" -ONE_INPUT_TYPE_ONLY_ERR = "Please provide one of job_id, job_id_list, or batch_id" +NO_INPUT_TYPE_ERR = "Please provide one of job_id, job_id_list, or batch_id" +ONE_INPUT_TYPE_ONLY_ERR = "Please provide at most one of job_id, job_id_list, or batch_id" INVALID_REQUEST_ERR = "Improperly formatted job channel message!" MISSING_REQUEST_TYPE_ERR = "Missing request type in job channel message!" @@ -28,58 +30,54 @@ class JobRequest(object): """ A small wrapper for job comm channel request data. This generally comes in the form of a packet from the kernel Comm object. - It's expected to be a dict of the format: + It is expected to be a dict of the format: { content: { 'comm_id': , 'data': { - 'request_type': effectively, the function requested. + 'request_type': job function requested 'job_id': optional 'job_id_list': optional + 'batch_id': optional } } } - This little wrapper fills 2 roles: + This little wrapper fills some roles: 1. It validates that the packet is correct and has some expected values. - 2. If there's a job_id field, it makes sure that it's real (by asking the - JobManager - avoids a bunch of duplicate validation code) - - Each JobRequest has at most one of a job_id or job_id_list. If the job comm - channel request data is received with a job_id_list, it may be split up - into multiple JobRequests. Likewise, if the job comm channel request data - is received with a job_id, a JobRequest may be created with a job_id_list - containing that job_id + 2. It provides any job ID information with guardrails Provides the following attributes: raw_request dict the unedited request received by the job comm msg_id str unique message id request_type str the function to perform. This isn't - strictly controlled here, but by JobComm._handle_comm_message. + strictly controlled here, but by + JobComm._handle_comm_message. rq_data dict the actual data of the request. Contains the request - type and other parameters specific to the function to be performed + type and other parameters specific to the function + to be performed - Optional: + The IDs of the job(s) to perform the function on (optional): job_id str job_id_list list(str) batch_id str - - The IDs of the job(s) to perform the function on. - """ + INPUT_TYPES = [PARAM["JOB_ID"], PARAM["JOB_ID_LIST"], PARAM["BATCH_ID"]] + def __init__(self, rq: dict): - self.raw_request = copy.deepcopy(rq) + rq = copy.deepcopy(rq) + self.raw_request = rq self.msg_id = rq.get("msg_id") # might be useful later? self.rq_data = rq.get("content", {}).get("data") - if self.rq_data is None: + if not self.rq_data: raise JobRequestException(INVALID_REQUEST_ERR) self.request_type = self.rq_data.get("request_type") - if self.request_type is None: + if not self.request_type: raise JobRequestException(MISSING_REQUEST_TYPE_ERR) input_type_count = 0 - for input_type in [PARAM["JOB_ID"], PARAM["JOB_ID_LIST"], PARAM["BATCH_ID"]]: + for input_type in self.INPUT_TYPES: if input_type in self.rq_data: input_type_count += 1 if input_type_count > 1: @@ -105,6 +103,9 @@ def batch_id(self): return self.rq_data[PARAM["BATCH_ID"]] raise JobRequestException(BATCH_NOT_PROVIDED_ERR) + def has_job_ids(self): + return any([input_type in self.rq_data for input_type in self.INPUT_TYPES]) + def has_batch_id(self): return PARAM["BATCH_ID"] in self.rq_data @@ -114,6 +115,11 @@ def cell_id_list(self): return self.rq_data[PARAM["CELL_ID_LIST"]] raise JobRequestException(CELLS_NOT_PROVIDED_ERR) + @property + def ts(self): + """This param is completely optional""" + return self.rq_data.get(PARAM["TS"]) + class JobComm: """ @@ -181,14 +187,21 @@ def __init__(self): MESSAGE_TYPE["STOP_UPDATE"]: self._modify_job_updates, } - def _get_job_ids(self, req: JobRequest = None): + def _get_job_ids(self, req: JobRequest = None) -> List[str]: + """ + Extract the job IDs from a job request object + + :param req: the job request to take the IDs from, defaults to None + :type req: JobRequest, optional + + :return: list of job IDs + :rtype: List[str] + """ + if not req.has_job_ids(): + raise JobRequestException(NO_INPUT_TYPE_ERR) if req.has_batch_id(): return self._jm.update_batch_job(req.batch_id) - - try: - return req.job_id_list - except Exception as ex: - raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex + return req.job_id_list def start_job_status_loop( self, @@ -208,11 +221,12 @@ def start_job_status_loop( self._jm.initialize_jobs(cell_list) except Exception as e: error = { - "error": "Unable to get initial jobs list", + "source": getattr(e, "source", "jc.start_job_status_loop"), + "request": getattr(e, "request", "jc.start_job_status_loop"), + "name": getattr(e, "name", type(e).__name__), "message": getattr(e, "message", UNKNOWN_REASON), + "error": "Unable to get initial jobs list", "code": getattr(e, "code", -1), - "source": getattr(e, "source", "jobmanager"), - "name": getattr(e, "name", type(e).__name__), } self.send_comm_message(MESSAGE_TYPE["ERROR"], error) # if job init failed, set the lookup loop var back to False and return @@ -260,9 +274,12 @@ def _get_all_job_states( def _get_job_states_by_cell_id(self, req: JobRequest = None) -> dict: """ - Fetches status of all jobs associated with the given cell ID(s) - :param req: a JobRequest with the cell_id_list of interest - :returns: dict in the form + Fetches status of all jobs associated with the given cell ID(s). + + :param req: job request object with the cell_id_list param set, defaults to None + :type req: JobRequest, optional + + :return: dictionary in the form { "jobs": { # dict with job IDs as keys and job states as values @@ -276,6 +293,7 @@ def _get_job_states_by_cell_id(self, req: JobRequest = None) -> dict: "cell_two": [ ... ], } } + :rtype: dict """ cell_job_states = self._jm.get_job_states_by_cell_id( cell_id_list=req.cell_id_list @@ -285,41 +303,86 @@ def _get_job_states_by_cell_id(self, req: JobRequest = None) -> dict: def _get_job_info(self, req: JobRequest) -> dict: """ - Look up job info. This is just some high-level generic information about the running - job, including the app id, name, and job parameters. - :param req: a JobRequest with the job_id_list of interest - :returns: a dict keyed with job IDs and with values of dicts with the following keys: - - app_id - str - module/name, - - app_name - str - name of the app as it shows up in the Narrative interface - - batch_id - str - the batch parent ID (if appropriate) - - job_id - str - just re-reporting the id string - - job_params - dict - the params that were passed to that particular job + Gets job information for a list of job IDs. + + Job info for a given job ID is in the form: + { + "app_id": string in the form "/", + "app_name": string, + "job_id": string, + "job_params": dictionary, + "batch_id": string | None, + } + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } + + :param req: job request with a list of job IDs + :type req: JobRequest + :return: dictionary containing job info for each input job, indexed by job ID + :rtype: dict """ job_id_list = self._get_job_ids(req) job_info = self._jm.get_job_info(job_id_list) self.send_comm_message(MESSAGE_TYPE["INFO"], job_info) return job_info - def __get_job_states(self, job_id_list) -> dict: + def _get_send_job_states(self, job_id_list: list, ts: int = None) -> dict: """ - Look up job states. + Retrieves the job states for the supplied job_ids. + + TODO: update as required + If the ts parameter is present, only jobs that have been updated since that time are returned. + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } + + :param job_id_list: job IDs to retrieve job states for + :type job_id_list: list + :param ts: timestamp, in the format generated by time.time_ns(); defaults to None + :type ts: int, optional - Returns a dictionary of job state information indexed by job ID. + :return: dictionary of job states, indexed by job ID + :rtype: dict """ - output_states = self._jm.get_job_states(job_id_list) + output_states = self._jm.get_job_states(job_id_list, ts) + # output_states["last_checked"] = time.time_ns() self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states def get_job_state(self, job_id: str) -> dict: """ + Retrieve the job state for a single job. + This differs from the _get_job_state (underscored version) in that it just takes a job_id string, not a JobRequest. + + :param job_id: the job ID to get the state for + :type job_id: string + + :return: dictionary of job states, indexed by job ID + :rtype: dict """ - return self.__get_job_states([job_id]) + return self._get_send_job_states([job_id]) def _get_job_states(self, req: JobRequest) -> dict: + """ + Retrieves the job states for the supplied job_ids. + + :param req: job request with a list of job IDs + :type req: JobRequest + + :return: dictionary of job states, indexed by job ID + :rtype: dict + """ job_id_list = self._get_job_ids(req) - return self.__get_job_states(job_id_list) + return self._get_send_job_states(job_id_list, req.ts) def _modify_job_updates(self, req: JobRequest) -> dict: """ @@ -354,11 +417,16 @@ def _modify_job_updates(self, req: JobRequest) -> dict: def _cancel_jobs(self, req: JobRequest) -> dict: """ - This cancels a running job. - If there are no valid jobs, this raises a JobRequestException. - If there's an error while attempting to cancel, this raises a NarrativeError. - In the end, after a successful cancel, this finishes up by fetching and returning the - job state with the new status. + Cancel a job or list of jobs. After sending the cancellation request, the job states + are refreshed and their new output states returned. + + See JobManager.cancel_jobs() for more details. + + :param req: job request containing job ID or list of job IDs to be cancelled + :type req: JobRequest + + :return: job output states, indexed by job ID + :rtype: dict """ job_id_list = self._get_job_ids(req) cancel_results = self._jm.cancel_jobs(job_id_list) @@ -366,6 +434,17 @@ def _cancel_jobs(self, req: JobRequest) -> dict: return cancel_results def _retry_jobs(self, req: JobRequest) -> dict: + """ + Retry a job or list of jobs. + + See JobManager.retry_jobs() for more details. + + :param req: job request containing job ID or list of job IDs to be retried + :type req: JobRequest + + :return: job retry data, indexed by job ID + :rtype: dict + """ job_id_list = self._get_job_ids(req) retry_results = self._jm.retry_jobs(job_id_list) self.send_comm_message(MESSAGE_TYPE["RETRY"], retry_results) @@ -373,7 +452,15 @@ def _retry_jobs(self, req: JobRequest) -> dict: def _get_job_logs(self, req: JobRequest) -> dict: """ - This returns a set of job logs based on the info in the request. + Fetch the logs for a job or list of jobs. + + See JobManager.get_job_logs_for_list() for more details. + + :param req: job request containing job ID or list of job IDs to fetch logs for + :type req: JobRequest + + :return: job log data, indexed by job ID + :rtype: dict """ job_id_list = self._get_job_ids(req) log_output = self._jm.get_job_logs_for_list( @@ -387,7 +474,8 @@ def _get_job_logs(self, req: JobRequest) -> dict: def _handle_comm_message(self, msg: dict) -> dict: """ - Handles comm messages that come in from the other end of the KBaseJobs channel. + Handle incoming messages on the KBaseJobs channel. + Messages get translated into one or more JobRequest objects, which are then passed to the right handler, based on the request. @@ -395,6 +483,15 @@ def _handle_comm_message(self, msg: dict) -> dict: Any unknown request is returned over the channel with message type 'job_error', and a JobRequestException is raised. + + :param msg: incoming comm message + :type msg: dict + + :raises JobRequestException: if the message type is not recognised + + :return: result of running the appropriate method; generally this is a dictionary + of job data indexed by job ID. + :rtype: dict """ with exc_to_msg(msg): request = JobRequest(msg) @@ -421,34 +518,37 @@ def send_error_message( self, req: Union[JobRequest, dict, str], content: dict = None ) -> None: """ - Sends a comm message over the KBaseJobs channel as an error. This will have msg_type set to - ERROR ('job_error'), and include the original request in the message content as - "source". - - req can be the original request message or its JobRequest form. - Since the latter is made from the former, they have the same information. - It can also be a string or None if this context manager is invoked outside of a JC request - - This sends a packet that looks like: + Wrapper for self.send_comm_message generally resulting in a message like: { - request: the original JobRequest data object, function params, or function name - source: the function request that spawned the error - other fields about the error, dependent on the content. + "msg_type": "job_error", + "content": { + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, + "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + **{any extra error information} + } } + where this method computes "msg_type" and "content" and passes it to self.send_comm_message + + :param req: Can be the original comm request message or its JobRequest instantiation. + Since the latter is made from the former, they have the same information. + It can also be a string or None if this method is invoked outside of a JobComm request + :type req: JobRequest, dict, str, or NoneType + :param content: Additional error information + :type content: dict, optional """ error_content = {} if isinstance(req, JobRequest): - error_content["request"] = req.rq_data error_content["source"] = req.request_type + error_content["request"] = req.rq_data elif isinstance(req, dict): data = req.get("content", {}).get("data", {}) - error_content["request"] = data error_content["source"] = data.get("request_type") + error_content["request"] = data elif isinstance(req, str) or req is None: - error_content["request"] = req error_content["source"] = req + error_content["request"] = req - if content is not None: + if content: error_content.update(content) self.send_comm_message(MESSAGE_TYPE["ERROR"], error_content) @@ -456,7 +556,8 @@ def send_error_message( class exc_to_msg: """ - This is a context manager to wrap around JC code + This is a context manager to wrap around JobComm code in order to catch any exception, + send it back as a comm error messages, and then re-raise that exception """ jc = JobComm() @@ -476,17 +577,18 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, exc_tb): """ - If exception is caught, will send job comm message in this format + If an exception is caught during execution in the JobComm code, + this will send back a comm error message like: { - "msg_type": ERROR, + "msg_type": "job_error", "content": { - "source": "request_type", - "job_id": "0123456789abcdef", # or job_id_list. optional and mutually exclusive - "name": "ValueError", - "message": "Something happened", - #---------- Below, NarrativeException only ----------- - "code": -1, - "error": "Unable to complete this request" + "source": request type from original incoming comm request, if available, else an arbitrary str/NoneType, + "request": request data from original incoming comm request, if available, else an arbitrary str/NoneType, + "name": exception name, # e.g., ValueError + "message": exception message, # e.g. "Something specifically went wrong!" + #---------- Below, for NarrativeException only ----------- + "error": exception error attribute, # e.g. "Unable to complete this request" + "code": exception code attribute, # e.g., -1 } } Will then re-raise exception diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index a3a0fa8e5d..5f19ec049b 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -2,11 +2,13 @@ from jinja2 import Template from datetime import datetime, timezone, timedelta import copy +from enum import Enum +from itertools import cycle from typing import List, Tuple +from collections.abc import Iterable import biokbase.narrative.clients as clients from .job import ( Job, - EXCLUDED_JOB_STATE_FIELDS, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, ) from biokbase.narrative.common import kblogging @@ -36,7 +38,47 @@ JOBS_MISSING_ERR = "No valid job IDs provided" CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided" -DOES_NOT_EXIST = "does_not_exist" + + +class JobStateErrMsg(Enum): + """ + For errors that go into the STATUS response. + Enum mapping from error names to formattable error messages. + The first formatting argument of each error message must be the job ID + """ + + DOES_NOT_EXIST = "Cannot find job with ID %s" + NOT_UPDATED = "Job with ID %s has not been updated since ts %d" + QUERY_EE2_STATES_ERROR = "A Job.query_ee2_states error occurred for job with ID %s: %s" + CANCEL_ERROR = "An EE2.cancel_job error occurred for job with ID %s: %s" + + def gen_err_msg(self, its: List): + """ + Create a generator for filled in enum values + + :param its: A list of arguments, where each argument should be iterable. + (If any argument is not iterable, convert it to so.) + The iterables will then be zipped and used to format the enum values. + The first argument must be the list of job_ids + """ + # check number of arguments matches number of %s or %d to be formatted + num_format = self.value.count("%") + if num_format != len(its): + raise ValueError(f"{self.__class__.name}.{self.name} must be formatted with {num_format} arguments") + + for i, e in enumerate(its): + # if element is not iterable, convert to so + if not isinstance(e, Iterable): + its[i] = cycle([e]) + + for tup in zip(*its): + yield self.value % tup + + def replace_result(self) -> bool: + names = [e.name for e in list(JobStateErrMsg)] + replace = [True, True, False, False] + ans = dict(zip(names, replace)) + return ans[self.name] class JobManager(object): @@ -49,9 +91,9 @@ class JobManager(object): __instance = None # keys: job_id, values: { refresh = 1/0, job = Job object } - _running_jobs = dict() + _running_jobs = {} # keys: cell_id, values: set(job_1_id, job_2_id, job_3_id) - _jobs_by_cell_id = dict() + _jobs_by_cell_id = {} _log = kblogging.get_logger(__name__) @@ -74,12 +116,19 @@ def _reorder_parents_children(states: dict) -> dict: def _check_job_list(self, input_ids: List[str] = []) -> Tuple[List[str], List[str]]: """ - Deduplicates the input job list, maintaining insertion order + Deduplicates the input job list, maintaining insertion order. Any jobs not present in self._running_jobs are added to an error list - :param input_ids: a list of putative job IDs - :return results: tuple with items "job_ids", containing valid IDs; - and "error_ids", for jobs that the narrative backend does not know about + :param input_ids: list of putative job IDs, defaults to [] + :type input_ids: List[str], optional + + :raises JobRequestException: if the input_ids parameter is not a list or + or if there are no valid job IDs supplied + + :return: tuple with items + job_ids - valid job IDs + error_ids - jobs that the narrative backend does not know about + :rtype: Tuple[List[str], List[str]] """ if not isinstance(input_ids, list): raise JobRequestException(f"{JOBS_TYPE_ERR}: {input_ids}") @@ -103,10 +152,10 @@ def register_new_job(self, job: Job, refresh: bool = None) -> None: Registers a new Job with the manager and stores the job locally. This should only be invoked when a new Job gets started. - Parameters: - ----------- - job : biokbase.narrative.jobs.job.Job object - The new Job that was started. + :param job: a Job object for the new job that was started + :type job: Job + :param refresh: whether or not the job should be refreshed, defaults to None + :type refresh: bool, optional """ kblogging.log_event(self._log, "register_new_job", {"job_id": job.job_id}) @@ -127,14 +176,19 @@ def initialize_jobs(self, cell_ids: List[str] = None) -> None: """ Initializes this JobManager. This is expected to be run by a running Narrative, and naturally linked to a workspace. - So it does the following steps. + It runs the following steps: 1. gets the current workspace ID from app_util.system_variable('workspace_id') - 2. get list of jobs with that ws id from ee2 (also gets tag, cell_id, run_id) + 2. get job state data on all jobs with that ws id from ee2 3. initialize the Job objects and add them to the running jobs list 4. start the status lookup loop. + + :param cell_ids: list of cell IDs to filter the existing jobs for, defaults to None + :type cell_ids: List[str], optional + + :raises NarrativeException: if the call to ee2 fails """ ws_id = system_variable("workspace_id") - job_states = dict() + job_states = {} kblogging.log_event(self._log, "JobManager.initialize_jobs", {"ws_id": ws_id}) try: job_states = clients.get("execution_engine2").check_workspace_jobs( @@ -149,7 +203,7 @@ def initialize_jobs(self, cell_ids: List[str] = None) -> None: new_e = transform_job_exception(e, "Unable to initialize jobs") raise new_e - self._running_jobs = dict() + self._running_jobs = {} job_states = self._reorder_parents_children(job_states) for job_state in job_states.values(): child_jobs = None @@ -170,22 +224,22 @@ def initialize_jobs(self, cell_ids: List[str] = None) -> None: self.register_new_job(job, refresh) - def _create_jobs(self, job_ids) -> dict: + def _create_jobs(self, job_ids: List[str]) -> dict: """ + Given a list of job IDs, creates job objects for them and populates the _running_jobs dictionary. TODO: error handling - Given a list of job IDs, creates job objects for them and populates the _running_jobs dictionary + + :param job_ids: job IDs to create job objects for + :type job_ids: List[str] + + :return: dictionary of job states indexed by job ID + :rtype: dict """ job_ids = [job_id for job_id in job_ids if job_id not in self._running_jobs] if not len(job_ids): return {} - job_states = clients.get("execution_engine2").check_jobs( - { - "job_ids": job_ids, - "exclude_fields": JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, - "return_list": 0, - } - ) + job_states = Job.query_ee2_states(job_ids, init=True) for job_state in job_states.values(): # do not set new jobs to be automatically refreshed - if the front end wants them # refreshed, it'll make a request. @@ -193,10 +247,17 @@ def _create_jobs(self, job_ids) -> dict: return job_states - def get_job(self, job_id): + def get_job(self, job_id: str) -> Job: """ - Returns a Job with the given job_id. - Raises a JobRequestException if not found. + Retrieve a job from the Job Manager's _running_jobs index. + + :param job_id: the job ID to be retrieved + :type job_id: str + + :raises JobRequestException: if the job cannot be found + + :return: Job object corresponding to that job ID + :rtype: Job """ if job_id not in self._running_jobs: raise JobRequestException(JOB_NOT_REG_ERR, job_id) @@ -206,9 +267,48 @@ def _construct_job_output_state_set( self, job_ids: List[str], states: dict = None ) -> dict: """ + Precondition: job_ids already validated + Builds a set of job states for the list of job ids. - :param job_ids: list of job IDs (may be empty) - :param states: dict, where each value is a state is from EE2 + The output will look like: + { + "job_id_0": { # dict generated by job.output_state() + "job_id": "job_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": { + ... + } + }, + "job_id_1": { # dict generated by job.output_state() + "job_id": "job_id_1": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": None + }, + ..., + "ee2_error_id_0": { # dict generated by job.output_state() with EE2 error message added + "job_id": "ee2_error_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": ..., + "error": + }, + ... + } + + :param job_ids: list of job IDs + :type job_ids: List[str] + :param states: dict of job state data from EE2, indexed by job ID, defaults to None + :type states: dict, optional + + :raises JobRequestException: if job_ids is not a list + + :return: dict containing the output_state for each job, indexed by job ID. + :rtype: dict """ if not isinstance(job_ids, list): raise JobRequestException("job_ids must be a list") @@ -216,8 +316,8 @@ def _construct_job_output_state_set( if not len(job_ids): return {} - output_states = dict() - jobs_to_lookup = list() + output_states = {} + jobs_to_lookup = [] # Fetch from cache of terminated jobs, where available. # These are already post-processed and ready to return. @@ -231,17 +331,11 @@ def _construct_job_output_state_set( else: jobs_to_lookup.append(job_id) - fetched_states = dict() + fetched_states = {} # Get the rest of states direct from EE2. if len(jobs_to_lookup): try: - fetched_states = clients.get("execution_engine2").check_jobs( - { - "job_ids": jobs_to_lookup, - "exclude_fields": EXCLUDED_JOB_STATE_FIELDS, - "return_list": 0, - } - ) + fetched_states = Job.query_ee2_states(jobs_to_lookup, init=False) except Exception as e: error_message = str(e) kblogging.log_event( @@ -259,16 +353,78 @@ def _construct_job_output_state_set( output_states[job_id] = job.output_state(fetched_states[job_id]) else: # fetch the current state without updating it - output_states[job_id] = job.output_state({}) - # add an error field with the error message from the failed look up - output_states[job_id]["error"] = error_message + output_states[job_id] = job.output_state(no_refresh=True) + + failed_ids = [job_id for job_id in jobs_to_lookup if job_id not in fetched_states] + if failed_ids: + self.add_errors_to_results( + output_states, failed_ids, JobStateErrMsg.QUERY_EE2_STATES_ERROR, error_message + ) return output_states - def get_job_states(self, job_ids: List[str]) -> dict: - job_ids, error_ids = self._check_job_list(job_ids) + def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: + """ + Retrieves the job states for the supplied job_ids, with the option to + replace any jobs that have not been updated since ts with a short stub + + (Omitting some error states from job.output_state()) the output is generally like: + { + "job_id_0": { # dict generated by job.output_state() + "job_id": "job_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": { + ... + } + }, + "job_id_1": { # dict generated by job.output_state() + "job_id": "job_id_1": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": None + }, + ..., + "ee2_error_id_0": { # dict generated by job.output_state() with EE2 error message added + "job_id": "ee2_error_id_0": + "jobState": { # modified job state from EE2 + ... + }, + "outputWidgetInfo": ..., + "error": + }, + ..., + "nonexistent_error_id_0": { # jobs that cannot be found in the `_running_jobs` index + "job_id": "nonexistent_error_id_0", + "error": "Cannot find job with ID nonexistent_error_id_0" + }, + ... + "not_updated_id_0": { # jobs that have not been updated since ts + "job_id": "not_updated_id_0", + "error": "Job with ID not_updated_id_0 has not been updated since ts 123085709827" + } + } + + :param job_ids: job IDs to retrieve job state data for + :type job_ids: List[str] + :param ts: timestamp (as generated by time.time_ns()) to filter the jobs, defaults to None + :type ts: int, optional + + :return: dictionary of job states, indexed by job ID + :rtype: dict + """ + job_ids, does_not_exist_ids = self._check_job_list(job_ids) output_states = self._construct_job_output_state_set(job_ids) - return self.add_errors_to_results(output_states, error_ids) + not_updated_ids = [] + if ts is not None: + for job_id in output_states: + if self.get_job(job_id).last_updated <= ts: + not_updated_ids.append(job_id) + self.add_errors_to_results(output_states, does_not_exist_ids, JobStateErrMsg.DOES_NOT_EXIST) + self.add_errors_to_results(output_states, not_updated_ids, JobStateErrMsg.NOT_UPDATED, ts) + return output_states def get_all_job_states(self, ignore_refresh_flag=False) -> dict: """ @@ -276,26 +432,35 @@ def get_all_job_states(self, ignore_refresh_flag=False) -> dict: If ignore_refresh_flag is True, then returns states for all jobs this JobManager knows about (i.e. all jobs associated with the workspace). - This returns them all as a dictionary, keyed on the job id. - :param ignore_refresh_flag: boolean - if True, ignore the usual refresh state of the job. - Even if the job is stopped, or completed, fetch and return its state from the service. + :param ignore_refresh_flag: if True, ignore the refresh state of the job -- return the state + regardless of whether the job is stopped or completed. Defaults to False. + :type ignore_refresh_flag: bool, optional + + :return: dictionary of job states, indexed by job ID + :rtype: dict """ - jobs_to_lookup = list() + jobs_to_lookup = [] # grab the list of running job ids, so we don't run into update-while-iterating problems. - for job_id in self._running_jobs.keys(): + for job_id in self._running_jobs: if self._running_jobs[job_id]["refresh"] or ignore_refresh_flag: jobs_to_lookup.append(job_id) if len(jobs_to_lookup) > 0: return self._construct_job_output_state_set(jobs_to_lookup) - return dict() + return {} def _get_job_ids_by_cell_id(self, cell_id_list: List[str] = None) -> tuple: """ - Finds jobs with a cell_id in cell_id_list - Mappings of job ID to cell ID are added when new jobs are registered - Returns a list of job IDs and a mapping of cell IDs to the list of - job IDs associated with the cell. + Finds jobs with a cell_id in cell_id_list. + Mappings of job ID to cell ID are added when new jobs are registered. + + :param cell_id_list: cell IDs to retrieve job state data for + :type cell_id_list: List[str] + + :return: tuple with two components: + job_id_list: list of job IDs associated with the cell IDs supplied + cell_to_job_mapping: mapping of cell IDs to the list of job IDs associated with the cell + :rtype: tuple """ if not cell_id_list: raise JobRequestException(CELLS_NOT_PROVIDED_ERR) @@ -310,9 +475,15 @@ def _get_job_ids_by_cell_id(self, cell_id_list: List[str] = None) -> tuple: def get_job_states_by_cell_id(self, cell_id_list: List[str] = None) -> dict: """ - Fetch job states for jobs with a cell_id in cell_id_list - Returns a dictionary of job states keyed by job ID and a mapping of - cell IDs to the list of job IDs associated with the cell. + Retrieves the job states for jobs associated with the cell_id_list supplied. + + :param cell_id_list: cell IDs to retrieve job state data for + :type cell_id_list: List[str] + + :return: dictionary with two keys: + 'jobs': job states, indexed by job ID + 'mapping': mapping of cell IDs to the list of job IDs associated with the cell + :rtype: dict """ (jobs_to_lookup, cell_to_job_mapping) = self._get_job_ids_by_cell_id( cell_id_list @@ -325,19 +496,31 @@ def get_job_states_by_cell_id(self, cell_id_list: List[str] = None) -> dict: def get_job_info(self, job_ids: List[str]) -> dict: """ - Sends the info over the comm channel as these packets: + Gets job information for a list of job IDs. + + Job info for a given job ID is in the form: + { + "app_id": string in the form "/", + "app_name": string, + "job_id": string, + "job_params": dictionary, + "batch_id": string | None, + } + + Jobs that cannot be found in the `_running_jobs` index will return { - app_id: module/name, - app_name: random string, - job_id: string, - job_params: dictionary, - batch_id: string, + "job_id": string, + "error": "Cannot find job with ID " } - Will set packet to the generic job not found message if job_id doesn't exist. + + :param job_ids: job IDs to retrieve job info for + :type job_ids: List[str] + :return: job info for each job, indexed by job ID + :rtype: dict """ job_ids, error_ids = self._check_job_list(job_ids) - infos = dict() + infos = {} for job_id in job_ids: job = self.get_job(job_id) infos[job_id] = { @@ -347,7 +530,7 @@ def get_job_info(self, job_ids: List[str]) -> dict: "job_id": job_id, "job_params": job.params, } - return self.add_errors_to_results(infos, error_ids) + return self.add_errors_to_results(infos, error_ids, JobStateErrMsg.DOES_NOT_EXIST) def get_job_logs( self, @@ -357,32 +540,47 @@ def get_job_logs( latest: bool = False, ) -> dict: """ - :param job_id: str - the job id from the execution engine - :param first_line: int - the first line to be requested by the log. 0-indexed. If < 0, + Retrieves job logs for the job ID supplied. + + Jobs logs for a given job ID are in the form: + { + "job_id": string, + "batch_id": string | None, + "first": int - the first line returned, + "latest": bool - whether the latest lines were returned, + "max_lines": int - the number of logs lines currently available for that job, + "lines": list - the lines themselves, fresh from the server; these are dicts in the form + "line" - the log line string + "is_error" - either 0 or 1 + } + + If there is an error when retrieving logs (e.g. the job has yet to start or + it is a batch job and does not generate logs), the return structure will be: + { + "job_id": string + "batch_id": string | None + "error": string - error message + } + + :param job_id: the job id from the execution engine + :type job_id: str + :param first_line: the first line to be requested by the log. 0-indexed. If < 0, this will be set to 0 + :type first_line: int, defaults to 0 :param num_lines: int - the maximum number of lines to return. if < 0, will be reset to 0. if None, then will not be considered, and just return all the lines. - :param latest: bool - if True, will only return the most recent max_lines - of logs. This overrides the first_line parameter if set to True. If the call made is - get_job_logs(id, first_line=0, num_lines=5, latest=True), and there are 100 - log lines available, then lines 96-100 will be returned. - :returns: dict with keys: - job_id: string - batch_id: string | None - first: int - the first line returned - latest: bool - whether the latest lines were returned - max_lines: int - the number of logs lines currently available for that job - lines: list - the lines themselves, fresh from the server. These are all tiny dicts with keys - "line" - the log line string - "is_error" - either 0 or 1 + :type num_lines: int, defaults to None. + :param latest: if True, will only return the most recent max_lines + of logs. If set to True, overrides the first_line parameter; e.g. for the call + + get_job_logs(id, first_line=0, num_lines=5, latest=True) + + if there are 100 log lines available, then lines 96-100 will be returned. + :type latest: boolean, defaults to False. - If there is an error when retrieving logs (e.g. the job - has yet to start or it is a batch job and does not generate - logs), the return structure will be: - job_id: string - batch_id: string | None - error: string - error message + :return: job log data for each job, indexed by job ID + :rtype: dict """ job = self.get_job(job_id) @@ -426,6 +624,24 @@ def get_job_logs_for_list( ) -> dict: """ Fetch the logs for a list of jobs. Note that the parameters supplied are applied to all jobs. + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } + + :param job_id_list: list of jobs to fetch logs for + :type job_id_list: List[str] + :param first_line: the first line to be returned, defaults to 0 + :type first_line: int, optional + :param num_lines: number of lines to be returned, defaults to None + :type num_lines: int, optional + :param latest: whether to return the latest log lines; only relevant if num_lines is set. Defaults to False + :type latest: bool, optional + + :return: job log data indexed by job ID; see get_job_logs for details + :rtype: dict """ job_ids, error_ids = self._check_job_list(job_id_list) @@ -433,23 +649,33 @@ def get_job_logs_for_list( for job_id in job_ids: output[job_id] = self.get_job_logs(job_id, first_line, num_lines, latest) - return self.add_errors_to_results(output, error_ids) + return self.add_errors_to_results(output, error_ids, JobStateErrMsg.DOES_NOT_EXIST) def cancel_jobs(self, job_id_list: List[str]) -> dict: """ - Cancel a list of running jobs, placing them in a canceled state - Does NOT delete the jobs. - If the job_ids are not present or are not found in the Narrative, - a JobRequestException is raised. + Cancel a list of jobs and return their new state. After sending the cancellation + request, the job states are refreshed and their new output states returned. - Results are returned as a dict of job status objects keyed by job id + Jobs that trigger an error when cancelled will return + { + "job_id": string, + "error": + } + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } - :param job_id_list: list of strs - :return job_states: dict with keys job IDs and values job state objects + :param job_id_list: job IDs to cancel + :type job_id_list: List[str] + :return: job output states, indexed by job ID + :rtype: dict """ job_ids, error_ids = self._check_job_list(job_id_list) - error_states = dict() + error_states = {} for job_id in job_ids: if not self.get_job(job_id).was_terminal(): error = self._cancel_job(job_id) @@ -457,12 +683,21 @@ def cancel_jobs(self, job_id_list: List[str]) -> dict: error_states[job_id] = error.message job_states = self._construct_job_output_state_set(job_ids) - for job_id in error_states: - job_states[job_id]["error"] = error_states[job_id] - return self.add_errors_to_results(job_states, error_ids) + self.add_errors_to_results(job_states, list(error_states.keys()), JobStateErrMsg.CANCEL_ERROR, list(error_states.values())) + self.add_errors_to_results(job_states, error_ids, JobStateErrMsg.DOES_NOT_EXIST) + return job_states def _cancel_job(self, job_id: str) -> None: + """ + Cancel a single job. If an error occurs during cancellation, that error is converted + into a NarrativeException and returned to the caller. + + :param job_id: job ID to be cancelled + :type job_id: str + :return: if present, the exception raised when trying to cancel the job + :rtype: NarrativeException | None + """ # Stop updating the job status while we try to cancel. # Set the job to a special state of 'canceling' while we're doing the cancel is_refreshing = self._running_jobs[job_id].get("refresh", False) @@ -479,26 +714,44 @@ def _cancel_job(self, job_id: str) -> None: def retry_jobs(self, job_id_list: List[str]) -> dict: """ - Returns - [ - { - "job_id": job_id, - "job": {"state": {"job_id": job_id, "status": status, ...} ...}, - "retry_id": retry_id, - "retry": {"state": {"job_id": retry_id, "status": status, ...} ...} + Retry a list of job IDs, returning job output states for the jobs to be retried + and the new jobs created by the retry command. + + Retry data for a given job ID is in the form: + { + "job_id": "job_id_1", + "job": { # i.e. a job.output_state() object + "jobState": {"job_id": "job_id_1", "status": status, ...} + ... }, - { - "job": {"state": {"job_id": job_id, "status": status, ...} ...}, - "error": "..." + "retry_id": "retry_id_1", + "retry": { # i.e. a job.output_state() object + "jobState": {"job_id": "retry_id_1", "status": status, ...} + ... } - ... - { - "job": {"state": {"job_id": job_id, "status": DOES_NOT_EXIST}}, - "error": f"Cannot find job with ID {job_id}", - } - ] - where the innermost dictionaries are job states from ee2 and are within the - job states from job.output_state() + } + + If the job cannot be retried (e.g. it is a batch job or the user doesn't have permissions), + the error message from ee2 will be returned: + { + "job_id": string, + "job": { "jobState": { ... }, ... }, + "error": "Cannot retry a batch parent job", # from ee2 + } + + Jobs that cannot be found in the `_running_jobs` index will return + { + "job_id": string, + "error": "Cannot find job with ID " + } + + :param job_id_list: list of job IDs + :type job_id_list: List[str] + + :raises NarrativeException: if EE2 returns an error from the retry request + + :return: job retry data indexed by job ID + :rtype: dict """ job_ids, error_ids = self._check_job_list(job_id_list) try: @@ -506,7 +759,8 @@ def retry_jobs(self, job_id_list: List[str]) -> dict: {"job_ids": job_ids} ) except Exception as e: - raise transform_job_exception(e, "Unable to retry job(s)") + raise transform_job_exception(e, "Unable to retry job(s)") from e + # for each retry result, refresh the state of the retried and new jobs orig_ids = [result["job_id"] for result in retry_results] retry_ids = [ @@ -516,30 +770,61 @@ def retry_jobs(self, job_id_list: List[str]) -> dict: retry_states = self._construct_job_output_state_set( retry_ids, self._create_jobs(retry_ids) # add to self._running_jobs index ) - job_states = {**orig_states, **retry_states} results_by_job_id = {} # fill in the job state details for result in retry_results: job_id = result["job_id"] - results_by_job_id[job_id] = {"job_id": job_id, "job": job_states[job_id]} + results_by_job_id[job_id] = {"job_id": job_id, "job": orig_states[job_id]} if "retry_id" in result: retry_id = result["retry_id"] - results_by_job_id[job_id]["retry_id"] = retry_id - results_by_job_id[job_id]["retry"] = job_states[retry_id] + results_by_job_id[job_id].update( + {"retry_id": retry_id, "retry": retry_states[retry_id]} + ) if "error" in result: results_by_job_id[job_id]["error"] = result["error"] - return self.add_errors_to_results(results_by_job_id, error_ids) + return self.add_errors_to_results(results_by_job_id, error_ids, JobStateErrMsg.DOES_NOT_EXIST) - def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict: - """ - Add the generic "not found" error for each job_id in error_ids + def add_errors_to_results( + self, results: dict, error_ids: List[str], error_enum: JobStateErrMsg, *extra_its: Tuple + ) -> dict: """ - for error_id in error_ids: - results[error_id] = { - "job_id": error_id, - "error": f"Cannot find job with ID {error_id}", - } + Add error states to results + + :param results: dictionary of job data (output state, info, retry, etc.) indexed by job ID + :type results: dict + :param error_ids: list of IDs that could not be found + :type error_ids: List[str] + :param error_enum: an enum instance from JobStateErrMsg + :type error_enum: JobStateErrMsg + :param its: any extra arguments to feed into error_enum.gen_str_fill to format the error message + :type its: list + + :return: input results augmented by either extra error dictionaries or errors in existing dictionaries + :rtype: dict + """ + # create generator yielding error messages + gen_err_msg = error_enum.gen_err_msg([error_ids] + list(extra_its)) + + for error_id, err_msg in zip(error_ids, gen_err_msg): + # if there's already an error there + if error_id in results and "error" in results[error_id]: + existing_error = results[error_id]["error"] + # concatenate the errors (works recursively) + err_msg = f"{existing_error}\n{err_msg}" + + if error_enum.replace_result(): + results[error_id] = { + "job_id": error_id, + "error": err_msg, + } + else: + if error_id not in results: + raise ValueError(f"Cannot add error because response dict is missing key {error_id}") + results[error_id].update( + {"error": err_msg} + ) + return results def modify_job_refresh(self, job_ids: List[str], update_refresh: bool) -> None: diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 0174485a9c..d1752dd307 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -15,8 +15,8 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): Load the job-related terms that are shared by front- and back ends. """ full_path = [os.environ["NARRATIVE_DIR"]] + relative_path_to_file - config_json = open(os.path.join(*full_path)).read() - config = json.loads(config_json) + with open(os.path.join(*full_path)) as fh: + config = json.load(fh) REQUIRED = { "message_types": [ "CANCEL", @@ -31,7 +31,7 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): "ERROR", "RUN_STATUS", ], - "params": ["BATCH_ID", "CELL_ID_LIST", "JOB_ID", "JOB_ID_LIST"], + "params": ["BATCH_ID", "CELL_ID_LIST", "JOB_ID", "JOB_ID_LIST", "TS"], } # ensure we have all the required message type and param names diff --git a/src/biokbase/narrative/tests/generate_test_results.py b/src/biokbase/narrative/tests/generate_test_results.py index 91fa444d2e..e6ac2e6a1f 100644 --- a/src/biokbase/narrative/tests/generate_test_results.py +++ b/src/biokbase/narrative/tests/generate_test_results.py @@ -16,9 +16,13 @@ ) """ -generate_test_results.py is used to generate the job message data that the narrative backend produces and that the frontend consumes. It uses data from `ee2_job_test_data_file` and `app_specs_file` and provides expected narrative backend message data, as well as a number of mappings that are used in python tests. +generate_test_results.py is used to generate the job message data that the narrative backend produces and that the frontend consumes. +It uses data from `ee2_job_test_data_file` and `app_specs_file` and provides expected narrative backend message data, +as well as a number of mappings that are used in python tests. -The narrative backend message data is written as JSON to the `response_data_file`, indexed by message type and job ID. By default, if `response_data_file` does not exist when `generate_test_results.py` is run, job message data will be saved there. The `response_data_file` can also be generated by running `generate_test_results.py` with the `--force` argument. +The narrative backend message data is written as JSON to the `response_data_file`, indexed by message type and job ID. +By default, if `response_data_file` does not exist when `generate_test_results.py` is run, job message data will be saved there. +The `response_data_file` can also be generated by running `generate_test_results.py` with the `--force` argument. """ @@ -70,7 +74,7 @@ def generate_mappings(all_jobs): def _generate_job_output(job_id): state = get_test_job(job_id) - widget_info = state.get("widget_info", None) + widget_info = state.get("widget_info", {}) state.update( { @@ -88,9 +92,6 @@ def _generate_job_output(job_id): if state["status"] != COMPLETED_STATUS: return {"job_id": job_id, "jobState": state, "outputWidgetInfo": widget_info} - if not widget_info: - widget_info = {} - return {"job_id": job_id, "jobState": state, "outputWidgetInfo": widget_info} diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index 68bd9b2a51..1691563eb8 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -36,6 +36,9 @@ def get_test_job(job_id): CLIENTS = "biokbase.narrative.clients.get" +TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" + +TEST_TIME = 42 # arbitrary epoch ns MAX_LOG_LINES = 10 diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 7c582ed0ed..f3e9e96c3e 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -385,6 +385,8 @@ def test_job_update__no_state(self): self.assertFalse(job.was_terminal()) job._update_state(None) self.assertFalse(job.was_terminal()) + job._update_state({}) + self.assertFalse(job.was_terminal()) @mock.patch(CLIENTS, get_mock_client) def test_job_update__invalid_job_id(self): diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index 84d1baefab..02de257fcb 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -4,9 +4,11 @@ import itertools import re import copy +import time from biokbase.narrative.jobs.jobmanager import ( JobManager, + JobStateErrMsg, JOB_NOT_REG_ERR, JOB_NOT_BATCH_ERR, JOBS_TYPE_ERR, @@ -35,6 +37,8 @@ from .util import ConfigTests, validate_job_state from biokbase.narrative.tests.job_test_constants import ( CLIENTS, + TIME_NS, + TEST_TIME, MAX_LOG_LINES, JOB_COMPLETED, JOB_CREATED, @@ -134,6 +138,20 @@ def get_app_data(*args): return {"info": {"name": APP_NAME}} +def ts_are_close(t0: int, t1: int, tol: float = 1) -> bool: + """Check if two times, in ns, are "close" + + Args: + t0 (int): time in ns + t1 (int): time in ns + tol (float, optional): tolerated discrepancy between the times, in s. Defaults to 1. + + Returns: + bool: Whether the two times differ by less than the tolerance + """ + return abs(t1 - t0) * 1e-9 <= 1 + + class JobCommTestCase(unittest.TestCase): maxDiff = None @@ -478,6 +496,7 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): # --------------------- @mock.patch(CLIENTS, get_mock_client) + @mock.patch(TIME_NS, lambda: TEST_TIME) def check_job_output_states( self, output_states=None, @@ -486,6 +505,7 @@ def check_job_output_states( response_type=STATUS, ok_states=[], error_states=[], + last_checked=TEST_TIME, ): """ Handle any request that returns a dictionary of job state objects; this @@ -498,6 +518,7 @@ def check_job_output_states( :param params: params for the comm message (opt) :param ok_states: list of job IDs expected to be in the output :param error_states: list of job IDs expected to return a not found error + :param last_checked: ts in ns """ if output_states is None: req_dict = make_comm_msg(request_type, params, False) @@ -512,6 +533,12 @@ def check_job_output_states( msg, ) + if response_type == STATUS: + self.assertIn("last_checked", output_states) + self.assertEqual(last_checked, output_states.pop("last_checked")) + else: + self.assertNotIn("last_checked", output_states) + for job_id, state in output_states.items(): self.assertEqual(ALL_RESPONSE_DATA[STATUS][job_id], state) if job_id in ok_states: @@ -529,6 +556,7 @@ def test_get_all_job_states__ok(self): # ----------------------- # Lookup single job state # ----------------------- + @mock.patch(TIME_NS, lambda: TEST_TIME) def test_get_job_state__1_ok(self): output_states = self.jc.get_job_state(JOB_COMPLETED) self.check_job_output_states( @@ -541,6 +569,12 @@ def test_get_job_state__no_job(self): ): self.jc.get_job_state(None) + def test_lookup_job_state__live_ts(self): + output_states = self.jc.lookup_job_state(JOB_COMPLETED) + self.assertTrue( + ts_are_close(output_states["last_checked"], time.time_ns()) + ) + # ----------------------- # Lookup select job states # ----------------------- diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index a6735b01dd..fe8f4539fb 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -640,6 +640,23 @@ def test_get_job_states__empty(self): ): self.jm.get_job_states([]) + def test_lookup_job_states__last_updated(self): + not_updated = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED] + updated = [JOB_RUNNING, BATCH_PARENT, BATCH_RETRY_RUNNING] + job_ids = not_updated + updated + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + jobs = [] + for job_id in job_ids: + jobs.append(Job(job_id)) + + + def mock_query_ee2_states(lookup_ids, init): + self.assertCountEqual(active_ids, lookup_ids) + + + + def test_update_batch_job__dne(self): with self.assertRaisesRegex( JobRequestException, f"{JOB_NOT_REG_ERR}: {JOB_NOT_FOUND}"