Skip to content

Commit

Permalink
Merge pull request #271 from microbiomedata/265-inplement-cromwell-ru…
Browse files Browse the repository at this point in the history
…nner

265 inplement cromwell runner
  • Loading branch information
mbthornton-lbl authored Oct 14, 2024
2 parents 4768863 + ae9b270 commit 74a6c92
Show file tree
Hide file tree
Showing 30 changed files with 669 additions and 164 deletions.
18 changes: 13 additions & 5 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,18 @@ def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False
existing_job = self.find_job_by_opid(opid)
if not existing_job:
new_job.set_opid(opid, force=force)
new_job.done = False
self.job_cache.append(new_job)
return new_job
elif force:
self.job_cache.remove(existing_job)
new_job.set_opid(opid, force=force)
new_job.done = False
self.job_cache.append(new_job)
return new_job



def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
""" Get finished jobs """
successful_jobs = []
Expand Down Expand Up @@ -192,11 +195,16 @@ def process_successful_job(self, job: WorkflowJob) -> Database:
return database


def process_failed_job(self, job) -> None:
def process_failed_job(self, job: WorkflowJob) -> None:
""" Process a failed job """
if job.failed_count < self._MAX_FAILS:
job.failed_count += 1
job.cromwell_submit()
if job.workflow.state.get("failed_count", 0) >= self._MAX_FAILS:
logger.error(f"Job {job.opid} failed {self._MAX_FAILS} times. Skipping.")
return
job.workflow.state["failed_count"] = job.workflow.state.get("failed_count", 0) + 1
job.workflow.state["last_status"] = job.job_status
self.save_checkpoint()
logger.error(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.")
job.job.submit_job()


class RuntimeApiHandler:
Expand All @@ -216,7 +224,7 @@ def get_unclaimed_jobs(self, allowed_workflows)-> List[WorkflowJob]:
job_records = self.runtime_api.list_jobs(filt=filt)

for job in job_records:
jobs.append(WorkflowJob(self.config, job))
jobs.append(WorkflowJob(self.config, workflow_state=job))

return jobs

Expand Down
Loading

0 comments on commit 74a6c92

Please sign in to comment.