From edc332118a48f0b1da3d53992b50cb15c054dcdd Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 11:31:15 -0800 Subject: [PATCH] update get_finished_jobs to use last_status --- .../workflow_automation/watch_nmdc.py | 8 +- tests/conftest.py | 6 +- ..._state.json => agent_state_1_failure.json} | 4 +- ...job_state.json => failed_job_state_2.json} | 0 tests/test_watch_nmdc.py | 97 +++++++++++-------- 5 files changed, 65 insertions(+), 50 deletions(-) rename tests/fixtures/{initial_state.json => agent_state_1_failure.json} (99%) rename tests/fixtures/{failed_job_state.json => failed_job_state_2.json} (100%) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 439479bd..ce5bc06e 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -189,16 +189,16 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: failed_jobs = [] for job in self.job_cache: if not job.done: - status = job.job_status - if status == "Succeeded" and job.opid: + last_status = job.workflow.last_status + if last_status == "Succeeded" and job.opid: successful_jobs.append(job) - elif status == "Failed" and job.opid: + elif last_status == "Failed" and job.opid: failed_jobs.append(job) if successful_jobs: logger.info(f"Found {len(successful_jobs)} successful jobs.") if failed_jobs: logger.info(f"Found {len(failed_jobs)} failed jobs.") - return (successful_jobs, failed_jobs) + return successful_jobs, failed_jobs def process_successful_job(self, job: WorkflowJob) -> Database: """ Process a successful job and return a Database object """ diff --git a/tests/conftest.py b/tests/conftest.py index b14164d5..8a0187c4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -123,10 +123,10 @@ def site_config(site_config_file): return SiteConfig(site_config_file) @fixture -def initial_state_file(fixtures_dir, tmp_path): - state_file = fixtures_dir / "initial_state.json" +def initial_state_file_1_failure(fixtures_dir, tmp_path): + state_file = fixtures_dir / "agent_state_1_failure.json" # make a working copy in tmp_path - copied_state_file = tmp_path / "initial_state.json" + copied_state_file = tmp_path / "agent_state_1_failure.json" shutil.copy(state_file, copied_state_file) return copied_state_file diff --git a/tests/fixtures/initial_state.json b/tests/fixtures/agent_state_1_failure.json similarity index 99% rename from tests/fixtures/initial_state.json rename to tests/fixtures/agent_state_1_failure.json index aa7dfc62..a56e1cba 100644 --- a/tests/fixtures/initial_state.json +++ b/tests/fixtures/agent_state_1_failure.json @@ -1,12 +1,12 @@ { "jobs": [ { - "type": "MAGs: v1.3.10", + "type": "MAGs: v1.3.12", "cromwell_jobid": "9492a397-eb30-472b-9d3b-abc123456789", "nmdc_jobid": "nmdc:66cf64b6-7462-11ef-8b84-abc123456789", "conf": { "git_repo": "https://github.com/microbiomedata/metaMAGs", - "release": "v1.3.10", + "release": "v1.3.12", "wdl": "mbin_nmdc.wdl", "activity_id": "nmdc:wfmag-11-g7msr323.1", "activity_set": "mags_activity_set", diff --git a/tests/fixtures/failed_job_state.json b/tests/fixtures/failed_job_state_2.json similarity index 100% rename from tests/fixtures/failed_job_state.json rename to tests/fixtures/failed_job_state_2.json diff --git a/tests/test_watch_nmdc.py b/tests/test_watch_nmdc.py index 902bb2ee..438ba2ac 100644 --- a/tests/test_watch_nmdc.py +++ b/tests/test_watch_nmdc.py @@ -21,10 +21,10 @@ # FileHandler init tests -def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_path): +def test_file_handler_init_from_state_file(site_config, initial_state_file_1_failure, tmp_path): copy_state_file = tmp_path / "copy_state.json" - shutil.copy(initial_state_file, copy_state_file) - fh = FileHandler(site_config, initial_state_file) + shutil.copy(initial_state_file_1_failure, copy_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) assert fh assert fh.state_file assert isinstance(fh.state_file, PosixPath) @@ -35,7 +35,7 @@ def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_ assert not fh.state_file # test setter - fh.state_file = initial_state_file + fh.state_file = initial_state_file_1_failure assert fh.state_file assert fh.state_file.exists() assert fh.state_file.is_file() @@ -48,9 +48,9 @@ def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_ assert fh.state_file.is_file() -def test_file_handler_init_from_config_agent_state(site_config, initial_state_file, tmp_path): +def test_file_handler_init_from_config_agent_state(site_config, initial_state_file_1_failure, tmp_path): with patch("nmdc_automation.config.siteconfig.SiteConfig.agent_state", new_callable=PropertyMock) as mock_agent_state: - mock_agent_state.return_value = initial_state_file + mock_agent_state.return_value = initial_state_file_1_failure fh = FileHandler(site_config) assert fh assert fh.state_file @@ -76,8 +76,8 @@ def test_file_handler_init_default_state(site_config): assert fh2.state_file.exists() -def test_file_handler_read_state(site_config, initial_state_file): - fh = FileHandler(site_config, initial_state_file) +def test_file_handler_read_state(site_config, initial_state_file_1_failure): + fh = FileHandler(site_config, initial_state_file_1_failure) state = fh.read_state() assert state assert isinstance(state, dict) @@ -86,8 +86,8 @@ def test_file_handler_read_state(site_config, initial_state_file): assert len(state.get("jobs")) == 1 -def test_file_handler_write_state(site_config, initial_state_file, fixtures_dir): - fh = FileHandler(site_config, initial_state_file) +def test_file_handler_write_state(site_config, initial_state_file_1_failure, fixtures_dir): + fh = FileHandler(site_config, initial_state_file_1_failure) state = fh.read_state() assert state # add new job @@ -106,7 +106,7 @@ def test_file_handler_write_state(site_config, initial_state_file, fixtures_dir) fh.write_state(state) -def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_dir): +def test_file_handler_get_output_path(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange was_informed_by = "nmdc:1234" workflow_execution_id = "nmdc:56789" @@ -116,7 +116,7 @@ def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_ expected_output_path = site_config.data_dir / Path(was_informed_by) / Path(workflow_execution_id) - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) # Act output_path = fh.get_output_path(mock_job) @@ -127,7 +127,7 @@ def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_ assert output_path == expected_output_path -def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_file, fixtures_dir, tmp_path): +def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_file_1_failure, fixtures_dir, tmp_path): # Arrange was_informed_by = "nmdc:1234" workflow_execution_id = "nmdc:56789" @@ -141,7 +141,7 @@ def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_fi # patch config.data_dir with patch("nmdc_automation.config.siteconfig.SiteConfig.data_dir", new_callable=PropertyMock) as mock_data_dir: mock_data_dir.return_value = tmp_path - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) # Act metadata_path = fh.write_metadata_if_not_exists(mock_job) @@ -153,18 +153,18 @@ def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_fi # JobManager tests -def test_job_manager_init(site_config, initial_state_file): +def test_job_manager_init(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) assert jm assert jm.file_handler assert jm.file_handler.state_file -def test_job_manager_restore_from_state(site_config, initial_state_file): +def test_job_manager_restore_from_state(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh, init_cache=False) # Act jm.restore_from_state() @@ -174,10 +174,14 @@ def test_job_manager_restore_from_state(site_config, initial_state_file): assert len(jm.job_cache) == 1 assert isinstance(jm.job_cache[0], WorkflowJob) + # job has been cached - get new workflow jobs from state should not return any + new_jobs = jm.get_new_workflow_jobs_from_state() + assert not new_jobs -def test_job_manager_job_checkpoint(site_config, initial_state_file): + +def test_job_manager_job_checkpoint(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act data = jm.job_checkpoint() @@ -189,9 +193,9 @@ def test_job_manager_job_checkpoint(site_config, initial_state_file): assert len(data.get("jobs")) == 1 -def test_job_manager_save_checkpoint(site_config, initial_state_file): +def test_job_manager_save_checkpoint(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act jm.save_checkpoint() @@ -202,9 +206,9 @@ def test_job_manager_save_checkpoint(site_config, initial_state_file): # cleanup fh.state_file.unlink() -def test_job_manager_find_job_by_opid(site_config, initial_state_file): +def test_job_manager_find_job_by_opid(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act job = jm.find_job_by_opid("nmdc:test-opid") @@ -215,9 +219,9 @@ def test_job_manager_find_job_by_opid(site_config, initial_state_file): assert not job.done -def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) new_job_state = json.load(open(fixtures_dir / "new_state_job.json")) assert new_job_state @@ -234,9 +238,9 @@ def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file, jm.job_cache = [] -def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file, fixtures_dir): +def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) #already has an opid new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) @@ -261,12 +265,10 @@ def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_ assert job2.opid == opid - - -def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir): +def test_job_manager_get_finished_jobs(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - initial state has 1 failure and is not done - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Add a finished job: finished job is not done, but has a last_status of Succeeded @@ -278,7 +280,7 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures assert len(jm.job_cache) == 2 # add a failed job - failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json")) + failed_job_state = json.load(open(fixtures_dir / "failed_job_state_2.json")) assert failed_job_state failed_job = WorkflowJob(site_config, failed_job_state) assert failed_job.job_status == "Failed" @@ -308,16 +310,14 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures jm.job_cache = [] -def test_job_manager_process_successful_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_process_successful_job(site_config, initial_state_file_1_failure, fixtures_dir): # mock job.job.get_job_metadata - use fixture cromwell/succeded_metadata.json job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json")) with patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner.get_job_metadata") as mock_get_metadata: mock_get_metadata.return_value = job_metadata - - # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) assert new_job_state @@ -334,11 +334,25 @@ def test_job_manager_process_successful_job(site_config, initial_state_file, fix jm.job_cache = [] -def test_job_manager_process_failed_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_process_failed_job_1_failure(site_config, initial_state_file_1_failure, fixtures_dir): + # Arrange + fh = FileHandler(site_config, initial_state_file_1_failure) + jm = JobManager(site_config, fh) + # job handler should initialize the job_cache from the state file by default + assert jm.job_cache + assert isinstance(jm.job_cache, list) + assert len(jm.job_cache) == 1 + + successful_jobs, failed_jobs = jm.get_finished_jobs() + assert not successful_jobs + assert failed_jobs + + +def test_job_manager_process_failed_job_2_failures(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) - failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json")) + failed_job_state = json.load(open(fixtures_dir / "failed_job_state_2.json")) assert failed_job_state failed_job = WorkflowJob(site_config, failed_job_state) jm.job_cache.append(failed_job) @@ -346,6 +360,7 @@ def test_job_manager_process_failed_job(site_config, initial_state_file, fixture jm.process_failed_job(failed_job) # Assert assert failed_job.done + assert failed_job.job_status == "Failed" @fixture @@ -376,7 +391,7 @@ def test_claim_jobs(mock_submit, site_config_file, site_config, fixtures_dir): assert unclaimed_wfj.job_status -def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file, fixtures_dir): +def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange rt = RuntimeApiHandler(site_config) # Act