diff --git a/nmdc_automation/run_process/run_workflows.py b/nmdc_automation/run_process/run_workflows.py index e0e771d0..bed06251 100644 --- a/nmdc_automation/run_process/run_workflows.py +++ b/nmdc_automation/run_process/run_workflows.py @@ -26,7 +26,7 @@ def watcher(ctx, site_configuration_file): level=logging_level, format="%(asctime)s %(levelname)s: %(message)s" ) logger = logging.getLogger(__name__) - logger.info(f"Config file: {site_configuration_file}") + logger.info(f"Initializing Watcher: config file: {site_configuration_file}") ctx.obj = Watcher(site_configuration_file) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index b11308cc..439479bd 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -35,7 +35,7 @@ def __init__(self, config: SiteConfig, state_file: Union[str, Path] = None): self._state_file = None # set state file if state_file: - logger.info(f"Using state file: {state_file}") + logger.info(f"Initializing FileHandler with state file: {state_file}") self._state_file = Path(state_file) elif self.config.agent_state: logger.info(f"Using state file from config: {self.config.agent_state}") @@ -64,7 +64,6 @@ def state_file(self, value) -> None: def read_state(self) -> Optional[Dict[str, Any]]: """ Read the state file and return the data """ - logging.info(f"Reading state from {self.state_file}") with open(self.state_file, "r") as f: state = loads(f.read()) return state @@ -137,7 +136,7 @@ def restore_from_state(self) -> None: """ Restore jobs from state data """ new_jobs = self.get_new_workflow_jobs_from_state() if new_jobs: - logger.info(f"Restoring {len(new_jobs)} jobs from state.") + logger.info(f"Adding {len(new_jobs)} new jobs from state file.") self.job_cache.extend(new_jobs) def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]: @@ -151,10 +150,10 @@ def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]: # already in cache continue wf_job = WorkflowJob(self.config, workflow_state=job) - logger.debug(f"New workflow job: {wf_job.opid} from state.") + logger.info(f"New Job from State: {wf_job.workflow_execution_id}, {wf_job.workflow.nmdc_jobid}") + logger.info(f"Last Status: {wf_job.workflow.last_status}") job_cache_ids.append(wf_job.opid) wf_job_list.append(wf_job) - logging.info(f"Restored {len(wf_job_list)} jobs from state") return wf_job_list def find_job_by_opid(self, opid) -> Optional[WorkflowJob]: diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index 56ed0786..4f4d5bcc 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -186,11 +186,12 @@ def submit_job(self, force: bool = False) -> Optional[str]: def get_job_status(self) -> str: """ Get the status of a job from Cromwell """ - if not self.job_id: + if not self.workflow.cromwell_jobid: return "Unknown" - status_url = f"{self.service_url}/{self.job_id}/status" + status_url = f"{self.service_url}/{self.workflow.cromwell_jobid}/status" # There can be a delay between submitting a job and it # being available in Cromwell so handle 404 errors + logging.debug(f"Getting job status from {status_url}") try: response = requests.get(status_url) response.raise_for_status() @@ -200,11 +201,6 @@ def get_job_status(self) -> str: return "Unknown" raise e - - response = requests.get(status_url) - response.raise_for_status() - return response.json().get("status", "Unknown") - def get_job_metadata(self) -> Dict[str, Any]: """ Get metadata for a job from Cromwell """ metadata_url = f"{self.service_url}/{self.job_id}/metadata" @@ -270,6 +266,18 @@ def config(self) -> Dict[str, Any]: # for backward compatibility we need to check for both keys return self.cached_state.get("conf", self.cached_state.get("config", {})) + @property + def last_status(self) -> Optional[str]: + return self.cached_state.get("last_status", None) + + @property + def nmdc_jobid(self) -> Optional[str]: + return self.cached_state.get("nmdc_jobid", None) + + @property + def cromwell_jobid(self) -> Optional[str]: + return self.cached_state.get("cromwell_jobid", None) + @property def execution_template(self) -> Dict[str, str]: # for backward compatibility we need to check for both keys