diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 0096310c..5894df91 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -150,15 +150,18 @@ def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False existing_job = self.find_job_by_opid(opid) if not existing_job: new_job.set_opid(opid, force=force) + new_job.done = False self.job_cache.append(new_job) return new_job elif force: self.job_cache.remove(existing_job) new_job.set_opid(opid, force=force) + new_job.done = False self.job_cache.append(new_job) return new_job + def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: """ Get finished jobs """ successful_jobs = [] @@ -192,11 +195,16 @@ def process_successful_job(self, job: WorkflowJob) -> Database: return database - def process_failed_job(self, job) -> None: + def process_failed_job(self, job: WorkflowJob) -> None: """ Process a failed job """ - if job.failed_count < self._MAX_FAILS: - job.failed_count += 1 - job.cromwell_submit() + if job.workflow.state.get("failed_count", 0) >= self._MAX_FAILS: + logger.error(f"Job {job.opid} failed {self._MAX_FAILS} times. Skipping.") + return + job.workflow.state["failed_count"] = job.workflow.state.get("failed_count", 0) + 1 + job.workflow.state["last_status"] = job.job_status + self.save_checkpoint() + logger.error(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.") + job.job.submit_job() class RuntimeApiHandler: @@ -216,7 +224,7 @@ def get_unclaimed_jobs(self, allowed_workflows)-> List[WorkflowJob]: job_records = self.runtime_api.list_jobs(filt=filt) for job in job_records: - jobs.append(WorkflowJob(self.config, job)) + jobs.append(WorkflowJob(self.config, workflow_state=job)) return jobs diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index 1b81cc4c..86c67912 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -1,105 +1,240 @@ #!/usr/bin/env python -from abc import ABC, abstractmethod -import os +import hashlib import json -import tempfile import logging +import os import re -import hashlib -from typing import Any, Dict, List, Optional, Union -from pathlib import Path import shutil +import tempfile +from abc import ABC, abstractmethod +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +import pytz +import requests from nmdc_automation.config import SiteConfig -from nmdc_automation.workflow_automation.models import DataObject, workflow_process_factory +from nmdc_automation.workflow_automation.models import DataObject DEFAULT_MAX_RETRIES = 2 + class JobRunnerABC(ABC): + """Abstract base class for job runners""" @abstractmethod def submit_job(self) -> str: + """ Submit a job """ pass @abstractmethod def get_job_status(self) -> str: + """ Get the status of a job """ pass @abstractmethod def get_job_metadata(self) -> Dict[str, Any]: + """ Get metadata for a job """ pass @property @abstractmethod def job_id(self) -> Optional[str]: + """ Get the job id """ pass @property @abstractmethod def outputs(self) -> Dict[str, str]: + """ Get the outputs """ pass @property @abstractmethod def metadata(self) -> Dict[str, Any]: + """ Get the metadata """ pass @property @abstractmethod def max_retries(self) -> int: + """ Get the maximum number of retries """ pass - class CromwellRunner(JobRunnerABC): - - def __init__(self, site_config: SiteConfig, workflow: "WorkflowStateManager", job_metadata: Dict[str, - Any] = None, max_retries: int = DEFAULT_MAX_RETRIES): + """Job runner for Cromwell""" + LABEL_SUBMITTER_VALUE = "nmdcda" + LABEL_PARAMETERS = ["release", "wdl", "git_repo"] + NO_SUBMIT_STATES = ["Submitted", # job is already submitted but not running + "Running", # job is already running + "Failed", # job failed + "Succeeded", # job succeeded + "Aborted", # job was aborted and did not finish + "Aborting" # job is in the process of being aborted + "On Hold", # job is on hold and not running. It can be manually resumed later + ] + + def __init__(self, site_config: SiteConfig, workflow: "WorkflowStateManager", job_metadata: Dict[str, Any] = None, + max_retries: int = DEFAULT_MAX_RETRIES, dry_run: bool = False) -> None: + """ + Create a Cromwell job runner. + :param site_config: SiteConfig object + :param workflow: WorkflowStateManager object + :param job_metadata: metadata for the job + :param max_retries: maximum number of retries for a job + :param dry_run: if True, do not submit the job + """ self.config = site_config + if not isinstance(workflow, WorkflowStateManager): + raise ValueError("workflow must be a WorkflowStateManager object") self.workflow = workflow self.service_url = self.config.cromwell_url self._metadata = {} if job_metadata: self._metadata = job_metadata self._max_retries = max_retries - - - def submit_job(self) -> str: - # TODO: implement - pass + self.dry_run = dry_run + + def _generate_workflow_inputs(self) -> Dict[str, str]: + """ Generate inputs for the job runner from the workflow state """ + inputs = {} + prefix = self.workflow.input_prefix + for input_key, input_val in self.workflow.inputs.items(): + # special case for resource + if input_val == "{resource}": + input_val = self.config.resource + inputs[f"{prefix}.{input_key}"] = input_val + return inputs + + def _generate_workflow_labels(self) -> Dict[str, str]: + """ Generate labels for the job runner from the workflow state """ + labels = {param: self.workflow.config[param] for param in self.LABEL_PARAMETERS} + labels["submitter"] = self.LABEL_SUBMITTER_VALUE + # some Cromwell-specific labels + labels["pipeline_version"] = self.workflow.config["release"] + labels["pipeline"] = self.workflow.config["wdl"] + labels["activity_id"] = self.workflow.workflow_execution_id + labels["opid"] = self.workflow.opid + return labels + + def generate_submission_files(self) -> Dict[str, Any]: + """ Generate the files needed for a Cromwell job submission """ + files = {} + try: + wdl_file = self.workflow.fetch_release_file(self.workflow.config["wdl"], suffix=".wdl") + bundle_file = self.workflow.fetch_release_file("bundle.zip", suffix=".zip") + files = {"workflowSource": open(wdl_file, "rb"), "workflowDependencies": open(bundle_file, "rb"), + "workflowInputs": open(_json_tmp(self._generate_workflow_inputs()), "rb"), + "labels": open(_json_tmp(self._generate_workflow_labels()), "rb"), } + except Exception as e: + logging.error(f"Failed to generate submission files: {e}") + self._cleanup_files(list(files.values())) + raise e + return files + + def _cleanup_files(self, files: List[Union[tempfile.NamedTemporaryFile, tempfile.SpooledTemporaryFile]]): + """Safely closes and removes files.""" + for file in files: + try: + file.close() + os.unlink(file.name) + except Exception as e: + logging.error(f"Failed to cleanup file: {e}") + + def submit_job(self, force: bool = False) -> Optional[str]: + """ + Submit a job to Cromwell. Update the workflow state with the job id and status. + :param force: if True, submit the job even if it is in a state that does not require submission + :return: the job id + """ + status = self.get_job_status() + if status in self.NO_SUBMIT_STATES and not force: + logging.info(f"Job {self.job_id} in state {status}, skipping submission") + return + cleanup_files = [] + try: + files = self.generate_submission_files() + cleanup_files = list(files.values()) + if not self.dry_run: + response = requests.post(self.service_url, files=files) + response.raise_for_status() + self.metadata = response.json() + self.job_id = self.metadata["id"] + logging.info(f"Submitted job {self.job_id}") + else: + logging.info(f"Dry run: skipping job submission") + self.job_id = "dry_run" + + logging.info(f"Job {self.job_id} submitted") + start_time = datetime.now(pytz.utc).isoformat() + # update workflow state + self.workflow.done = False + self.workflow.update_state({"start": start_time}) + self.workflow.update_state({"cromwell_jobid": self.job_id}) + self.workflow.update_state({"last_status": "Submitted"}) + return self.job_id + except Exception as e: + logging.error(f"Failed to submit job: {e}") + raise e + finally: + self._cleanup_files(cleanup_files) def get_job_status(self) -> str: - # TODO: implement - return "Pending" + """ Get the status of a job from Cromwell """ + if not self.job_id: + return "Unknown" + status_url = f"{self.service_url}/{self.job_id}/status" + response = requests.get(status_url) + response.raise_for_status() + return response.json().get("status", "Unknown") def get_job_metadata(self) -> Dict[str, Any]: - raise NotImplementedError - # TODO: implement + """ Get metadata for a job from Cromwell """ + metadata_url = f"{self.service_url}/{self.job_id}/metadata" + response = requests.get(metadata_url) + response.raise_for_status() + metadata = response.json() + # update cached metadata + self.metadata = metadata + return metadata @property def job_id(self) -> Optional[str]: - return self.metadata.get("id", None) + """ Get the job id from the metadata """ + return self.metadata.get("id", None) + + @job_id.setter + def job_id(self, job_id: str): + """ Set the job id in the metadata """ + self.metadata["id"] = job_id @property def outputs(self) -> Dict[str, str]: - return self.metadata.get("outputs", {}) + """ Get the outputs from the metadata """ + return self.metadata.get("outputs", {}) @property def metadata(self) -> Dict[str, Any]: - return self._metadata + """ Get the metadata """ + return self._metadata @metadata.setter def metadata(self, metadata: Dict[str, Any]): - self._metadata = metadata + """ Set the metadata """ + self._metadata = metadata @property def max_retries(self) -> int: - return self._max_retries - + return self._max_retries class WorkflowStateManager: + CHUNK_SIZE = 1000000 # 1 MB + GIT_RELEASES_PATH = "/releases/download" + def __init__(self, state: Dict[str, Any] = None, opid: str = None): if state is None: state = {} @@ -109,7 +244,6 @@ def __init__(self, state: Dict[str, Any] = None, opid: str = None): if opid: self.cached_state["opid"] = opid - def update_state(self, state: Dict[str, Any]): self.cached_state.update(state) @@ -156,7 +290,11 @@ def input_prefix(self) -> Optional[str]: return self.config.get("input_prefix", None) @property - def nmdc_jobid(self)-> Optional[str]: + def inputs(self) -> Dict[str, str]: + return self.config.get("inputs", {}) + + @property + def nmdc_jobid(self) -> Optional[str]: # different keys in state file vs database record return self.cached_state.get("nmdc_jobid", self.cached_state.get("id", None)) @@ -168,11 +306,70 @@ def job_runner_id(self) -> Optional[str]: if job_runner_id in self.cached_state: return self.cached_state[job_runner_id] + @property + def opid(self) -> Optional[str]: + return self.cached_state.get("opid", None) + + @opid.setter + def opid(self, opid: str): + if self.opid: + raise ValueError("opid already set in job state") + self.cached_state["opid"] = opid + + def fetch_release_file(self, filename: str, suffix: str = None) -> str: + """ + Download a release file from the Git repository and save it as a temporary file. + Note: the temporary file is not deleted automatically. + """ + url = self._build_release_url(filename) + logging.debug(f"Fetching release file from URL: {url}") + # download the file as a stream to handle large files + response = requests.get(url, stream=True) + try: + response.raise_for_status() + # create a named temporary file + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp_file: + self._write_stream_to_file(response, tmp_file) + return tmp_file.name + finally: + response.close() + + def _build_release_url(self, filename: str) -> str: + """Build the URL for a release file in the Git repository.""" + release = self.config["release"] + base_url = self.config["git_repo"].rstrip("/") + url = f"{base_url}{self.GIT_RELEASES_PATH}/{release}/{filename}" + + def _write_stream_to_file(self, response: requests.Response, file: tempfile.NamedTemporaryFile) -> None: + """Write a stream from a requests response to a file.""" + try: + for chunk in response.iter_content(chunk_size=self.CHUNK_SIZE): + if chunk: + file.write(chunk) + file.flush() + except Exception as e: + # clean up the temporary file + Path(file.name).unlink(missing_ok=True) + logging.error(f"Error writing stream to file: {e}") + raise e + class WorkflowJob: + """ + A class to manage a Workflow's job state and execution, including submission, status, and output. A WorkflowJob + combines a SiteConfig object, a WorkflowStateManager object, and a JobRunner object to manage the job state and + execution, and to propagate job results back to the workflow state and ultimately to the database. + A WorkflowJob object is created with: + - a SiteConfig object + - a workflow state dictionary + - a job metadata dictionary + - an optional operation id (opid) + - an optional JobRunnerABC object (default is CromwellRunner) + + + """ def __init__(self, site_config: SiteConfig, workflow_state: Dict[str, Any] = None, - job_metadata: Dict['str', Any] = None, opid: str = None, job_runner: JobRunnerABC = None - )-> None: + job_metadata: Dict['str', Any] = None, opid: str = None, job_runner: JobRunnerABC = None) -> None: self.site_config = site_config self.workflow = WorkflowStateManager(workflow_state, opid) # default to CromwellRunner if no job_runner is provided @@ -181,28 +378,36 @@ def __init__(self, site_config: SiteConfig, workflow_state: Dict[str, Any] = Non self.job = job_runner # Properties to access the site config, job state, and job runner attributes - # getter and setter props for job state opid @property - def opid(self) -> str: + def opid(self) -> Optional[str]: + """ Get the operation id """ return self.workflow.state.get("opid", None) def set_opid(self, opid: str, force: bool = False): + """ Set the operation id """ if self.opid and not force: raise ValueError("opid already set in job state") self.workflow.update_state({"opid": opid}) @property def done(self) -> Optional[bool]: + """ Get the done state of the job """ return self.workflow.state.get("done", None) @done.setter def done(self, done: bool): + """ Set the done state of the job """ self.workflow.update_state({"done": done}) - @property def job_status(self) -> str: + """ + Get the status of the job. If the job has not been submitted, return "Unsubmitted". + If the job has failed and the number of retries has been exceeded, return "Failed". + Otherwise, return the status from the job runner. + """ status = None + # extend this list as needed for other job runners job_id_keys = ["cromwell_jobid"] failed_count = self.workflow.state.get("failed_count", 0) # if none of the job id keys are in the workflow state, it is unsubmitted @@ -218,57 +423,54 @@ def job_status(self) -> str: self.workflow.update_state({"last_status": status}) return status - @property def workflow_execution_id(self) -> Optional[str]: + """ Get the workflow execution id """ return self.workflow.workflow_execution_id - @property - def cromwell_url(self) -> str: - return self.site_config.cromwell_url - @property def data_dir(self) -> str: + """ Get the data directory """ return self.site_config.data_dir @property def execution_resource(self) -> str: + """ Get the execution resource (e.g., NERSC-Perlmutter) """ return self.site_config.resource @property def url_root(self) -> str: + """ Get the URL root """ return self.site_config.url_root @property def was_informed_by(self) -> str: + """ get the was_informed_by ID value """ return self.workflow.was_informed_by @property def as_workflow_execution_dict(self) -> Dict[str, Any]: + """ + Create a dictionary representation of the basic workflow execution attributes for a WorkflowJob. + """ # for forward compatibility we need to strip Activity from the type normalized_type = self.workflow.workflow_execution_type.replace("Activity", "") - base_dict = { - "id": self.workflow_execution_id, - "type": normalized_type, - "name": self.workflow.workflow_execution_name, - "git_url": self.workflow.config["git_repo"], - "execution_resource": self.execution_resource, - "was_informed_by": self.was_informed_by, + base_dict = {"id": self.workflow_execution_id, "type": normalized_type, + "name": self.workflow.workflow_execution_name, "git_url": self.workflow.config["git_repo"], + "execution_resource": self.execution_resource, "was_informed_by": self.was_informed_by, "has_input": [dobj["id"] for dobj in self.workflow.config["input_data_objects"]], - "started_at_time": self.workflow.state.get("start"), - "ended_at_time": self.workflow.state.get("end"), - "version": self.workflow.config["release"], - } + "started_at_time": self.workflow.state.get("start"), "ended_at_time": self.workflow.state.get("end"), + "version": self.workflow.config["release"], } return base_dict - def make_data_objects(self, output_dir: Union[str, Path] = None)-> List[DataObject]: + def make_data_objects(self, output_dir: Union[str, Path] = None) -> List[DataObject]: """ Create DataObject objects for each output of the job. """ data_objects = [] - for output_spec in self.workflow.data_outputs: # specs are defined in the workflow.yaml file under Outputs + for output_spec in self.workflow.data_outputs: # specs are defined in the workflow.yaml file under Outputs output_key = f"{self.workflow.input_prefix}.{output_spec['output']}" if output_key not in self.job.outputs: if output_spec.get("optional"): @@ -280,7 +482,6 @@ def make_data_objects(self, output_dir: Union[str, Path] = None)-> List[DataObje # get the full path to the output file from the job_runner output_file_path = Path(self.job.outputs[output_key]) - md5_sum = _md5(output_file_path) file_url = f"{self.url_root}/{self.was_informed_by}/{self.workflow_execution_id}/{output_file_path.name}" @@ -295,22 +496,19 @@ def make_data_objects(self, output_dir: Union[str, Path] = None)-> List[DataObje # create a DataObject object data_object = DataObject( - id = output_spec["id"], - name=output_file_path.name, - type="nmdc:DataObject", - url=file_url, - data_object_type=output_spec["data_object_type"], - md5_checksum=md5_sum, - description=output_spec["description"], - was_generated_by=self.workflow_execution_id, - ) + id=output_spec["id"], name=output_file_path.name, type="nmdc:DataObject", url=file_url, + data_object_type=output_spec["data_object_type"], md5_checksum=md5_sum, + description=output_spec["description"], was_generated_by=self.workflow_execution_id, ) data_objects.append(data_object) return data_objects def make_workflow_execution_record(self, data_objects: List[DataObject]) -> Dict[str, Any]: """ - Create a workflow execution record for the job + Create a workflow execution record for the job. This record includes the basic workflow execution attributes + and the data objects generated by the job. Additional workflow-specific attributes can be defined in the + workflow execution template and read from a job's output files. + The data objects are added to the record as a list of IDs in the "has_output" key. """ wf_dict = self.as_workflow_execution_dict wf_dict["has_output"] = [dobj.id for dobj in data_objects] @@ -352,9 +550,5 @@ def _json_tmp(data): return fname -def jprint(obj): - print(json.dumps(obj, indent=2)) - - def _md5(file): return hashlib.md5(open(file, "rb").read()).hexdigest() diff --git a/tests/conftest.py b/tests/conftest.py index ac83981a..b8a89714 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ from pymongo import MongoClient from pathlib import Path from pytest import fixture +import requests_mock import shutil from time import time from unittest.mock import Mock @@ -96,3 +97,63 @@ def initial_state_file(fixtures_dir, tmp_path): copied_state_file = tmp_path / "initial_state.json" shutil.copy(state_file, copied_state_file) return copied_state_file + + +# Sample Cromwell API responses +CROMWELL_SUCCESS_RESPONSE = { + "id": "cromwell-job-id-12345", + "status": "Succeeded", + "outputs": { + "output_file": "/path/to/output.txt" + } +} + +CROMWELL_FAIL_RESPONSE = { + "id": "cromwell-job-id-54321", + "status": "Failed", + "failures": [ + {"message": "Error processing job"} + ] +} + +JOB_SUBMIT_RESPONSE = { + "id": "cromwell-workflow-id", + "status": "Submitted", + "submission": "2024-10-13T12:34:56.789Z", + "workflowName": "workflow_name", + "workflowRoot": "gs://path/to/workflow/root", + "metadataSource": "Unarchived", + "outputs": {}, + "labels": { + "label1": "value1", + "label2": "value2" + }, + "parentWorkflowId": None, + "rootWorkflowId": "cromwell-root-id" +} + +@fixture +def mock_cromwell_api(fixtures_dir): + successful_job_metadata = json.load(open(fixtures_dir / 'cromwell/succeeded_metadata.json')) + with requests_mock.Mocker() as m: + # Mock the Cromwell submit job endpoint + m.post('http://localhost:8088/api/workflows/v1', json=JOB_SUBMIT_RESPONSE, status_code=201) + + # Mock Cromwell status check endpoint + m.get( + 'http://localhost:8088/api/workflows/v1/cromwell-job-id-12345/status', json={ + "id": "cromwell-job-id-12345", + "status": "Succeeded" + } + ) + + # Mock Cromwell failure scenario + m.get('http://localhost:8088/api/workflows/v1/cromwell-job-id-54321/status', json=CROMWELL_FAIL_RESPONSE) + + # Mock Cromwell metadata endpoint + m.get( + 'http://localhost:8088/api/workflows/v1/cromwell-job-id-12345/metadata', + json=successful_job_metadata + ) + + yield m \ No newline at end of file diff --git a/tests/fixtures/cromwell_metadata.json b/tests/fixtures/cromwell/succeeded_metadata.json similarity index 97% rename from tests/fixtures/cromwell_metadata.json rename to tests/fixtures/cromwell/succeeded_metadata.json index 2a6f1171..f6c4c3e5 100644 --- a/tests/fixtures/cromwell_metadata.json +++ b/tests/fixtures/cromwell/succeeded_metadata.json @@ -1,5 +1,5 @@ { - "id": "34b41f4a-fe50-4c00-bb60-444104b4c024", + "id": "cromwell-job-id-12345", "status": "Succeeded", "start": "2023-09-01T10:00:00.000Z", "end": "2023-09-01T12:00:00.000Z", diff --git a/tests/fixtures/data_object_set2.json b/tests/fixtures/data_object_set2.json deleted file mode 100644 index 2e855058..00000000 --- a/tests/fixtures/data_object_set2.json +++ /dev/null @@ -1,32 +0,0 @@ -[ - { - "id" : "nmdc:dobj-11-qcstats2", - "name" : "nmdc_wfrqc-11-test001.2_filterStats.txt", - "description" : "Reads QC summary for nmdc:wfrqc-11-metag1.2", - "file_size_bytes" : 123456, - "md5_checksum" : "7172cd332a734e002c88b35827acd991", - "data_object_type" : "QC Statistics", - "url" : "https://data.microbiomedata.org", - "type" : "nmdc:DataObject" -}, -{ - "id" : "nmdc:dobj-11-qcinfo2", - "name" : "nmdc_wfrqc-11-test001.2_readsQC.info", - "description" : "Read filtering info for nmdc:wfrqc-11-metag1.2", - "file_size_bytes" : 123456, - "md5_checksum" : "d3812377eb0a57c9f2bdea5692d157fb", - "data_object_type" : "Read Filtering Info File", - "url" : "https://data.microbiomedata.org", - "type" : "nmdc:DataObject" -}, -{ - "id" : "nmdc:dobj-11-filteredreads2", - "name" : "nmdc_wfrqc-11-test001.2_filtered.fastq.gz", - "description" : "Reads QC for nmdc:wfrqc-11-metag1.2", - "file_size_bytes" : 123456, - "md5_checksum" : "fafb41665d8e00654ac0fbf80adc1b87", - "data_object_type" : "Filtered Sequencing Reads", - "url" : "https://data.microbiomedata.org", - "type" : "nmdc:DataObject" -} -] diff --git a/tests/fixtures/db_utils.py b/tests/fixtures/db_utils.py index 9e9500a0..385e5063 100644 --- a/tests/fixtures/db_utils.py +++ b/tests/fixtures/db_utils.py @@ -26,7 +26,8 @@ def load_fixture(test_db, fn, col=None, reset=False, version=None): col = fn.split("/")[-1].split(".")[0] if reset: test_db[col].delete_many({}) - data = read_json(fn) + fixture_path = FIXTURE_DIR / Path('nmdc_db') / Path(fn) + data = json.load(open(fixture_path)) logging.debug("Loading %d recs into %s" % (len(data), col)) if len(data) > 0: if version: diff --git a/tests/fixtures/mags_job_metadata.json b/tests/fixtures/mags_job_metadata.json index ed705cff..552aeac8 100644 --- a/tests/fixtures/mags_job_metadata.json +++ b/tests/fixtures/mags_job_metadata.json @@ -942,9 +942,9 @@ "nmdc_mags.low": "tests/test_pscratch/nmdc_mags/nmdc_wfmag-12-fxwdrv82.1_bins.lowDepth.fa", "nmdc_mags.final_stats_json": "tests/test_pscratch/nmdc_mags/nmdc_wfmag-12-fxwdrv82.1_mags_stats.json" }, - "workflowRoot": "/pscratch/sd/n/nmdcda/cromwell-executions/nmdc_mags/29628c3e-8850-4210-927a-1d4258fa35d1", + "workflowRoot": "/pscratch/sd/n/nmdcda/cromwell-executions/nmdc_mags/cromwell-000-000-000-000", "actualWorkflowLanguage": "WDL", - "id": "29628c3e-8850-4210-927a-1d4258fa35d1", + "id": "cromwell-000-000-000-000", "inputs": { "checkm_db": "/refdata/checkM_DB/checkm_data_2015_01_16", "map_file": null, diff --git a/tests/fixtures/mags_workflow_record.json b/tests/fixtures/models/mags_analysis_record.json similarity index 100% rename from tests/fixtures/mags_workflow_record.json rename to tests/fixtures/models/mags_analysis_record.json diff --git a/tests/fixtures/metagenome_annotation_record.json b/tests/fixtures/models/metagenome_annotation_record.json similarity index 100% rename from tests/fixtures/metagenome_annotation_record.json rename to tests/fixtures/models/metagenome_annotation_record.json diff --git a/tests/fixtures/metagenome_assembly_record.json b/tests/fixtures/models/metagenome_assembly_record.json similarity index 100% rename from tests/fixtures/metagenome_assembly_record.json rename to tests/fixtures/models/metagenome_assembly_record.json diff --git a/tests/fixtures/metatranscriptome_annotation_record.json b/tests/fixtures/models/metatranscriptome_annotation_record.json similarity index 100% rename from tests/fixtures/metatranscriptome_annotation_record.json rename to tests/fixtures/models/metatranscriptome_annotation_record.json diff --git a/tests/fixtures/metatranscriptome_assembly_record.json b/tests/fixtures/models/metatranscriptome_assembly_record.json similarity index 100% rename from tests/fixtures/metatranscriptome_assembly_record.json rename to tests/fixtures/models/metatranscriptome_assembly_record.json diff --git a/tests/fixtures/metatranscriptome_expression_analysis_record.json b/tests/fixtures/models/metatranscriptome_expression_analysis_record.json similarity index 100% rename from tests/fixtures/metatranscriptome_expression_analysis_record.json rename to tests/fixtures/models/metatranscriptome_expression_analysis_record.json diff --git a/tests/fixtures/nucleotide_sequencing_record.json b/tests/fixtures/models/nucleotide_sequencing_record.json similarity index 100% rename from tests/fixtures/nucleotide_sequencing_record.json rename to tests/fixtures/models/nucleotide_sequencing_record.json diff --git a/tests/fixtures/read_based_taxonomy_analysis_record.json b/tests/fixtures/models/read_based_taxonomy_analysis_record.json similarity index 100% rename from tests/fixtures/read_based_taxonomy_analysis_record.json rename to tests/fixtures/models/read_based_taxonomy_analysis_record.json diff --git a/tests/fixtures/read_qc_analysis_record.json b/tests/fixtures/models/read_qc_analysis_record.json similarity index 100% rename from tests/fixtures/read_qc_analysis_record.json rename to tests/fixtures/models/read_qc_analysis_record.json diff --git a/tests/fixtures/unsubmitted_job_record.json b/tests/fixtures/nmdc_api/unsubmitted_job.json similarity index 100% rename from tests/fixtures/unsubmitted_job_record.json rename to tests/fixtures/nmdc_api/unsubmitted_job.json diff --git a/tests/fixtures/data_generation_set.json b/tests/fixtures/nmdc_db/data_generation_set.json similarity index 100% rename from tests/fixtures/data_generation_set.json rename to tests/fixtures/nmdc_db/data_generation_set.json diff --git a/tests/fixtures/data_object_set.json b/tests/fixtures/nmdc_db/data_object_set.json similarity index 100% rename from tests/fixtures/data_object_set.json rename to tests/fixtures/nmdc_db/data_object_set.json diff --git a/tests/fixtures/jobs.json b/tests/fixtures/nmdc_db/jobs.json similarity index 100% rename from tests/fixtures/jobs.json rename to tests/fixtures/nmdc_db/jobs.json diff --git a/tests/fixtures/metagenome_annotation.json b/tests/fixtures/nmdc_db/metagenome_annotation.json similarity index 100% rename from tests/fixtures/metagenome_annotation.json rename to tests/fixtures/nmdc_db/metagenome_annotation.json diff --git a/tests/fixtures/metagenome_assembly.json b/tests/fixtures/nmdc_db/metagenome_assembly.json similarity index 100% rename from tests/fixtures/metagenome_assembly.json rename to tests/fixtures/nmdc_db/metagenome_assembly.json diff --git a/tests/fixtures/metatranscriptome_annotation.json b/tests/fixtures/nmdc_db/metatranscriptome_annotation.json similarity index 100% rename from tests/fixtures/metatranscriptome_annotation.json rename to tests/fixtures/nmdc_db/metatranscriptome_annotation.json diff --git a/tests/fixtures/metatranscriptome_assembly.json b/tests/fixtures/nmdc_db/metatranscriptome_assembly.json similarity index 100% rename from tests/fixtures/metatranscriptome_assembly.json rename to tests/fixtures/nmdc_db/metatranscriptome_assembly.json diff --git a/tests/fixtures/read_qc_analysis.json b/tests/fixtures/nmdc_db/read_qc_analysis.json similarity index 100% rename from tests/fixtures/read_qc_analysis.json rename to tests/fixtures/nmdc_db/read_qc_analysis.json diff --git a/tests/fixtures/workflow_execution_set.json b/tests/fixtures/nmdc_db/workflow_execution_set.json similarity index 100% rename from tests/fixtures/workflow_execution_set.json rename to tests/fixtures/nmdc_db/workflow_execution_set.json diff --git a/tests/fixtures/read_qc_analysis2.json b/tests/fixtures/read_qc_analysis2.json deleted file mode 100644 index 4f7a5495..00000000 --- a/tests/fixtures/read_qc_analysis2.json +++ /dev/null @@ -1,24 +0,0 @@ -[ - { - "has_input": [ - "nmdc:22afa3d49b73eaec2e9787a6b88fbdc3" - ], - "git_url": "https://github.com/microbiomedata/ReadsQC", - "version": "v1.1.8", - "has_output": [ - "nmdc:f107af0a000ec0b90e157fc09473c337v2", - "nmdc:71528f677698dd6657ea7ddcc3105184v2" - ], - "was_informed_by": "nmdc:omprc-11-nhy4pz43", - "input_read_count": 102766230, - "output_read_bases": 15235297055, - "id": "nmdc:fdefb3fa15098906cf788f5cadf17bb3v2", - "execution_resource": "NERSC-Cori", - "input_read_bases": 15517700730, - "name": "Read QC Activity for nmdc:mga0vx38", - "output_read_count": 101660590, - "started_at_time": "2021-08-05T14:48:51+00:00", - "type": "nmdc:ReadQCAnalysis", - "ended_at_time": "2021-09-15T10:13:20+00:00" - } -] diff --git a/tests/test_models.py b/tests/test_models.py index c0096dc1..1d9aa4ec 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,5 +1,8 @@ """ Test cases for the models module. """ +import json + from bson import ObjectId +from pathlib import Path from pytest import mark from nmdc_automation.workflow_automation.models import( DataObject, @@ -12,10 +15,10 @@ from nmdc_automation.workflow_automation.workflows import load_workflow_configs from tests.fixtures import db_utils -def test_workflow_process_factory(): +def test_workflow_process_factory(fixtures_dir): """ Test the workflow_process_factory function. """ record_types = { - "nmdc:MagsAnalysis": "mags_record.json", + "nmdc:MagsAnalysis": "mags_analysis_record.json", "nmdc:MetagenomeAnnotation": "metagenome_annotation_record.json", "nmdc:MetagenomeAssembly": "metagenome_assembly_record.json", "nmdc:MetatranscriptomeAnnotation": "metatranscriptome_annotation_record.json", @@ -26,12 +29,12 @@ def test_workflow_process_factory(): "nmdc:ReadQcAnalysis": "read_qc_analysis_record.json", } for record_type, record_file in record_types.items(): - record = db_utils.read_json(record_file) + record = json.load(open(fixtures_dir / f"models/{record_file}")) wfe = workflow_process_factory(record) assert wfe.type == record_type -def test_workflow_process_factory_mags_with_mags_list(): - record = db_utils.read_json("mags_workflow_record.json") +def test_workflow_process_factory_mags_with_mags_list(fixtures_dir): + record = json.load(open(fixtures_dir / "models/mags_analysis_record.json")) mga = workflow_process_factory(record) assert mga.type == "nmdc:MagsAnalysis" @@ -48,7 +51,7 @@ def test_process_factory_with_db_record(): assert wfe.type == "nmdc:NucleotideSequencing" @mark.parametrize("record_file, record_type", [ - ("mags_record.json", "nmdc:MagsAnalysis"), + ("mags_analysis_record.json", "nmdc:MagsAnalysis"), ("metagenome_annotation_record.json", "nmdc:MetagenomeAnnotation"), ("metagenome_assembly_record.json", "nmdc:MetagenomeAssembly"), ("metatranscriptome_annotation_record.json", "nmdc:MetatranscriptomeAnnotation"), @@ -58,7 +61,7 @@ def test_process_factory_with_db_record(): ("read_based_taxonomy_analysis_record.json", "nmdc:ReadBasedTaxonomyAnalysis"), ("read_qc_analysis_record.json", "nmdc:ReadQcAnalysis"), ]) -def test_workflow_process_node(workflows_config_dir,record_file, record_type): +def test_workflow_process_node(workflows_config_dir,record_file, record_type, fixtures_dir): """ Test the WorkflowProcessNode class. """ # load all workflows for both metagenome and metatranscriptome wfs = load_workflow_configs(workflows_config_dir / "workflows.yaml") @@ -72,15 +75,16 @@ def test_workflow_process_node(workflows_config_dir,record_file, record_type): assert wfs_for_type wf = wfs_for_type[0] - record = db_utils.read_json(record_file) + record = json.load(open(fixtures_dir / f"models/{record_file}")) wfn = WorkflowProcessNode(record, wf) assert wfn.process.type == record_type -def test_data_object_creation_from_records(): +def test_data_object_creation_from_records(fixtures_dir): """ Test the creation of DataObject objects from records. """ - records = db_utils.read_json("data_object_set.json") + records_path = fixtures_dir / Path('nmdc_db/data_object_set.json') + records = json.load(open(records_path)) for record in records: data_obj = DataObject(**record) assert data_obj.type == "nmdc:DataObject" @@ -92,21 +96,29 @@ def test_data_object_creation_from_records(): assert data_obj_dict == record -def test_data_object_creation_from_db_records(test_db): +def test_data_object_creation_from_db_records(test_db, fixtures_dir): db_utils.reset_db(test_db) - db_utils.read_json("data_object_set.json") + db_utils.load_fixture(test_db, "data_object_set.json") + # db_utils.read_json("data_object_set.json") db_records = test_db["data_object_set"].find() db_records = list(db_records) + assert db_records for db_record in db_records: data_obj = DataObject(**db_record) assert data_obj.type == "nmdc:DataObject" assert data_obj.id == db_record["id"] assert data_obj.name == db_record["name"] assert data_obj.data_object_type == db_record["data_object_type"] - assert data_obj.data_object_format == db_record["data_object_format"] + assert data_obj.description == db_record["description"] + assert data_obj.url == db_record["url"] + assert data_obj.file_size_bytes == db_record.get("file_size_bytes") + assert data_obj.md5_checksum == db_record["md5_checksum"] data_obj_dict = data_obj.as_dict() + # The db record will have an _id field that is not in the data object + _id = db_record.pop("_id") + assert _id assert data_obj_dict == db_record @@ -131,8 +143,8 @@ def test_job_output_creation(): job_output = JobOutput(**output) -def test_job_creation(): - job_record = db_utils.read_json("unsubmitted_job_record.json") +def test_job_creation(fixtures_dir): + job_record = json.load(open(fixtures_dir / "nmdc_api/unsubmitted_job.json")) job = Job(**job_record) assert job.id == job_record["id"] assert isinstance(job.workflow, JobWorkflow) diff --git a/tests/test_watch_nmdc.py b/tests/test_watch_nmdc.py index 2c790608..98ea78a5 100644 --- a/tests/test_watch_nmdc.py +++ b/tests/test_watch_nmdc.py @@ -1,7 +1,11 @@ import copy import json from pathlib import PosixPath, Path + +import pytest from pytest import fixture +from unittest import mock +import requests_mock import shutil from unittest.mock import patch, PropertyMock, Mock @@ -9,7 +13,8 @@ from nmdc_automation.workflow_automation.watch_nmdc import ( Watcher, FileHandler, - JobManager + JobManager, + RuntimeApiHandler, ) from nmdc_automation.workflow_automation.wfutils import WorkflowJob from tests.fixtures import db_utils @@ -229,6 +234,35 @@ def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file, jm.job_cache = [] +def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file, fixtures_dir): + # Arrange + fh = FileHandler(site_config, initial_state_file) + jm = JobManager(site_config, fh) + #already has an opid + new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + assert new_job_state + new_job = WorkflowJob(site_config, new_job_state) + # Act + opid = "nmdc:test-opid-1" + job = jm.prepare_and_cache_new_job(new_job, opid, force=True) + # Assert + assert job + assert isinstance(job, WorkflowJob) + assert job.opid == opid + assert not job.done + assert job in jm.job_cache + # resubmit the job without force it will return None + job2 = jm.prepare_and_cache_new_job(job, opid) + assert not job2 + # try again with force + job2 = jm.prepare_and_cache_new_job(job, opid, force=True) + assert job2 + assert isinstance(job2, WorkflowJob) + assert job2.opid == opid + + + + def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir): # Arrange - initial state has 1 failure and is not done fh = FileHandler(site_config, initial_state_file) @@ -279,20 +313,56 @@ def test_job_manager_process_successful_job(site_config, initial_state_file, fix jm.job_cache = [] +def test_job_manager_process_failed_job(site_config, initial_state_file, fixtures_dir): + # Arrange + fh = FileHandler(site_config, initial_state_file) + jm = JobManager(site_config, fh) + failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json")) + assert failed_job_state + failed_job = WorkflowJob(site_config, failed_job_state) + jm.job_cache.append(failed_job) + # Act + jm.process_failed_job(failed_job) + # Assert + assert failed_job.done + + @fixture def mock_runtime_api_handler(site_config, mock_api): pass - -def test_claim_jobs(site_config_file, site_config, fixtures_dir): +@mock.patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner.submit_job") +def test_claim_jobs(mock_submit, site_config_file, site_config, fixtures_dir): # Arrange - with (patch("nmdc_automation.workflow_automation.watch_nmdc.RuntimeApiHandler.claim_job") as mock_claim_job): + mock_submit.return_value = {"id": "nmdc:1234", "detail": {"id": "nmdc:1234"}} + with patch( + "nmdc_automation.workflow_automation.watch_nmdc.RuntimeApiHandler.claim_job" + ) as mock_claim_job, requests_mock.Mocker() as m: mock_claim_job.return_value = {"id": "nmdc:1234", "detail": {"id": "nmdc:1234"}} - job_record = json.load(open(fixtures_dir / "mags_job_metadata.json")) - unclaimed_wfj = WorkflowJob(site_config, job_record) + job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + # remove the opid + job_state.pop("opid") + unclaimed_wfj = WorkflowJob(site_config, workflow_state=job_state) + + # mock the status URL response + status_url = f"http://localhost:8088/api/workflows/v1/{unclaimed_wfj.job.job_id}/status" + m.get(status_url, json={"id": "nmdc:1234", "status": "Succeeded"}) + w = Watcher(site_config_file) w.claim_jobs(unclaimed_jobs=[unclaimed_wfj]) + # Assert + assert unclaimed_wfj.job_status + + +def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file, fixtures_dir): + # Arrange + rt = RuntimeApiHandler(site_config) + # Act + unclaimed_jobs = rt.get_unclaimed_jobs(rt.config.allowed_workflows) + # Assert + assert unclaimed_jobs + def test_reclaim_job(requests_mock, site_config_file, mock_api): requests_mock.real_http = True diff --git a/tests/test_wfutils.py b/tests/test_wfutils.py index 7e0ff4ea..fde4942c 100644 --- a/tests/test_wfutils.py +++ b/tests/test_wfutils.py @@ -1,11 +1,19 @@ from nmdc_automation.workflow_automation.wfutils import ( CromwellRunner, WorkflowJob, - WorkflowStateManager + WorkflowStateManager, + _json_tmp, ) from nmdc_automation.workflow_automation.models import DataObject, workflow_process_factory from nmdc_schema.nmdc import MagsAnalysis, EukEval +import io import json +import os +import pytest +import requests +import tempfile +from unittest import mock + def test_workflow_job(site_config, fixtures_dir): @@ -21,10 +29,53 @@ def test_cromwell_job_runner(site_config, fixtures_dir): # load cromwell metadata job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json")) job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + state_manager = WorkflowStateManager(job_state) + job_runner = CromwellRunner(site_config, state_manager, job_metadata) + assert job_runner + + +def test_cromwell_job_runner_get_job_status(site_config, fixtures_dir, mock_cromwell_api): + # load cromwell metadata + job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json")) + job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + # successful job from the test fixtures + job_state['cromwell_jobid'] = "cromwell-job-id-12345" + job_metadata['id'] = "cromwell-job-id-12345" + + state_manager = WorkflowStateManager(job_state) + job_runner = CromwellRunner(site_config, state_manager, job_metadata) + status = job_runner.get_job_status() + assert status + assert status == "Succeeded" + + # failed job from the test fixtures + job_state['cromwell_jobid'] = "cromwell-job-id-54321" + job_metadata['id'] = "cromwell-job-id-54321" + state_manager = WorkflowStateManager(job_state) + job_runner = CromwellRunner(site_config, state_manager, job_metadata) + status = job_runner.get_job_status() + assert status + assert status == "Failed" + + +def test_cromwell_job_runner_get_job_metadata(site_config, fixtures_dir, mock_cromwell_api): + # load cromwell metadata + job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json")) + job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + # successful job from the test fixtures + job_state['cromwell_jobid'] = "cromwell-job-id-12345" + job_metadata['id'] = "cromwell-job-id-12345" + + state_manager = WorkflowStateManager(job_state) + job_runner = CromwellRunner(site_config, state_manager, job_metadata) + metadata = job_runner.get_job_metadata() + assert metadata + assert metadata['id'] == "cromwell-job-id-12345" + # check that the metadata is cached + assert job_runner.metadata == metadata + - job_runner = CromwellRunner(site_config, job_state, job_metadata) - assert job_runner def test_workflow_job_as_workflow_execution_dict(site_config, fixtures_dir): @@ -37,7 +88,7 @@ def test_workflow_job_as_workflow_execution_dict(site_config, fixtures_dir): assert wfe_dict -def test_state_manager(fixtures_dir): +def test_workflow_state_manager(fixtures_dir): mags_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) state = WorkflowStateManager(mags_job_state) @@ -47,8 +98,176 @@ def test_state_manager(fixtures_dir): assert state.was_informed_by == mags_job_state['conf']['was_informed_by'] +# Mock response content +MOCK_FILE_CONTENT = b"Test file content" +MOCK_CHUNK_SIZE = 1024 # Assume the CHUNK_SIZE is 1024 in your class + +@mock.patch('requests.get') +def test_workflow_manager_fetch_release_file_success(mock_get, fixtures_dir): + mock_response = mock.Mock() + mock_response.iter_content = mock.Mock( + return_value=[MOCK_FILE_CONTENT[i:i + MOCK_CHUNK_SIZE] + for i in range(0, len(MOCK_FILE_CONTENT), MOCK_CHUNK_SIZE)] + ) + mock_response.status_code = 200 + mock_get.return_value = mock_response + + # Test the function + initial_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + state = WorkflowStateManager(initial_state) + + file_path = state.fetch_release_file("test_file", ".txt") + print(f"File path: {file_path}") + + assert file_path + assert os.path.exists(file_path), f"File not found at {file_path}" + with open(file_path, 'rb') as f: + assert f.read() == MOCK_FILE_CONTENT + + os.remove(file_path) + + +@mock.patch('requests.get') +def test_workflow_manager_fetch_release_file_failed_download(mock_get, fixtures_dir): + # Mock a failed request + mock_response = mock.Mock() + mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("404 Client Error: Not Found") + mock_get.return_value = mock_response + + # Test the function + initial_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + state = WorkflowStateManager(initial_state) + + with pytest.raises(requests.exceptions.HTTPError): + state.fetch_release_file("test_file", ".txt") + + # Check that the file was not created + assert not os.path.exists("test_file.txt") + + +@mock.patch('requests.get') +def test_workflow_manager_fetch_release_file_failed_write(mock_get, fixtures_dir): + # Mock the response + mock_response = mock.Mock() + mock_response.iter_content = mock.Mock( + return_value=[MOCK_FILE_CONTENT[i:i + MOCK_CHUNK_SIZE] + for i in range(0, len(MOCK_FILE_CONTENT), MOCK_CHUNK_SIZE)] + ) + mock_response.status_code = 200 + mock_get.return_value = mock_response + + # Patch the tempfile.mkstemp function to raise an exception during file creation + with mock.patch('tempfile.NamedTemporaryFile', side_effect=OSError("Failed to create file")): + # Test the function + initial_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + state = WorkflowStateManager(initial_state) + + with pytest.raises(OSError): + state.fetch_release_file("test_file", ".txt") + + # Check that the file was not created + assert not os.path.exists("test_file.txt") + + +def test_cromwell_runner_setup_inputs_and_labels(site_config, fixtures_dir): + job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + workflow = WorkflowStateManager(job_state) + runner = CromwellRunner(site_config, workflow) + inputs = runner._generate_workflow_inputs() + assert inputs + # we expect the inputs to be a key-value dict with URLs as values + for key, value in inputs.items(): + if key.endswith("file"): + assert value.startswith("http") + + labels = runner._generate_workflow_labels() + assert labels + assert labels['submitter'] == "nmdcda" + assert labels['git_repo'].startswith("https://github.com/microbiomedata") + assert labels['pipeline'] == labels['wdl'] + + +@mock.patch("nmdc_automation.workflow_automation.wfutils.WorkflowStateManager.fetch_release_file") +def test_cromwell_runner_generate_submission_files( mock_fetch_release_file, site_config, fixtures_dir): + mock_fetch_release_file.side_effect = [ + '/tmp/test_workflow.wdl', + '/tmp/test_bundle.zip', + ] + job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + assert job_state + workflow = WorkflowStateManager(job_state) + + # Now mock 'open' for the workflow submission files + with mock.patch("builtins.open", new_callable=mock.mock_open) as mock_open: + mock_open.side_effect = [ + io.BytesIO(b"mock wdl file content"), # workflowSource file + io.BytesIO(b"mock bundle file content"), # workflowDependencies file + io.BytesIO(b"mock workflow inputs"), # workflowInputs file + io.BytesIO(b"mock labels") # labels file + ] + runner = CromwellRunner(site_config, workflow) + submission_files = runner.generate_submission_files() + assert submission_files + assert "workflowSource" in submission_files + assert "workflowDependencies" in submission_files + assert "workflowInputs" in submission_files + assert "labels" in submission_files + + # check that the files were written + assert mock_open.call_count == 4 + mock_open.assert_any_call("/tmp/test_workflow.wdl", 'rb') + mock_open.assert_any_call("/tmp/test_bundle.zip", 'rb') + + +@mock.patch("nmdc_automation.workflow_automation.wfutils.WorkflowStateManager.fetch_release_file") +@mock.patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner._cleanup_files") +def test_cromwell_runner_generate_submission_files_exception(mock_cleanup_files, mock_fetch_release_file, + site_config, fixtures_dir): + # Mock file fetching + mock_fetch_release_file.side_effect = [ + '/tmp/test_workflow.wdl', # First file fetch is successful + '/tmp/test_bundle.zip', # Second file fetch is successful + ] + job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + assert job_state + workflow = WorkflowStateManager(job_state) + + # Now mock 'open' for the workflow submission files + with mock.patch("builtins.open", new_callable=mock.mock_open) as mock_open: + mock_open.side_effect = [ + io.BytesIO(b"mock wdl file content"), # workflowSource file + io.BytesIO(b"mock bundle file content"), # workflowDependencies file + OSError("Failed to open file"), # workflowInputs file + io.BytesIO(b"mock labels") # labels file + ] + runner = CromwellRunner(site_config, workflow) + with pytest.raises(OSError): + runner.generate_submission_files() + # Check that the cleanup function was called + mock_cleanup_files.assert_called_once() + + +@mock.patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner.generate_submission_files") +def test_cromwell_job_runner_submit_job_new_job(mock_generate_submission_files, site_config, fixtures_dir, mock_cromwell_api): + mock_generate_submission_files.return_value = { + "workflowSource": "workflowSource", + "workflowDependencies": "workflowDependencies", + "workflowInputs": "workflowInputs", + "labels": "labels" + } + # A new workflow job that has not been submitted - it has a workflow state + # but no job metadata + wf_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + wf_state['last_status'] = None # simulate a job that has not been submitted + wf_state['cromwell_jobid'] = None # simulate a job that has not been submitted + wf_state['done'] = False # simulate a job that has not been submitted + + wf_state_manager = WorkflowStateManager(wf_state) + job_runner = CromwellRunner(site_config, wf_state_manager) + job_runner.submit_job() + + def test_workflow_job_data_objects_and_execution_record_mags(site_config, fixtures_dir, tmp_path): - # Note: test working dir must be the root of the project for this to work job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json")) workflow_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) job = WorkflowJob(site_config, workflow_state, job_metadata) @@ -77,15 +296,11 @@ def test_workflow_job_data_objects_and_execution_record_mags(site_config, fixtur assert isinstance(wfe.binned_contig_num, int) + + def test_workflow_job_from_database_job_record(site_config, fixtures_dir): - job_rec = json.load(open(fixtures_dir / "unsubmitted_job_record.json")) + job_rec = json.load(open(fixtures_dir / "nmdc_api/unsubmitted_job.json")) assert job_rec job = WorkflowJob(site_config, job_rec) assert job assert job.workflow.nmdc_jobid == job_rec['id'] - - - - - -