From 0ee1b474879e032bc9a53b120aafbf08bdb605af Mon Sep 17 00:00:00 2001 From: Jannis Speer Date: Sat, 23 Mar 2024 22:49:30 +0100 Subject: [PATCH] fix: job status method (#6) --- .../__init__.py | 173 +++++++++++------- 1 file changed, 107 insertions(+), 66 deletions(-) diff --git a/snakemake_executor_plugin_htcondor/__init__.py b/snakemake_executor_plugin_htcondor/__init__.py index 852d884..8e85ab5 100644 --- a/snakemake_executor_plugin_htcondor/__init__.py +++ b/snakemake_executor_plugin_htcondor/__init__.py @@ -72,6 +72,7 @@ def __post_init__(self): # access executor specific settings self.workflow.executor_settings + # jobDir: Directory where the job will tore log, output and error files. self.jobDir = self.workflow.executor_settings.jobdir def run_job(self, job: JobExecutorInterface): @@ -156,6 +157,13 @@ def run_job(self, job: JobExecutorInterface): except Exception as e: raise WorkflowError(f"Failed to submit HTCondor job: {e}") + self.logger.info( + f"Job {job.jobid} submitted to " + "HTCondor Cluster ID {submit_result.cluster()}\n" + f"The logs of the HTCondor job are stored " + f"in {self.jobDir}/{submit_result.cluster()}.log" + ) + self.report_job_submission( SubmittedJobInfo(job=job, external_jobid=submit_result.cluster()) ) @@ -167,79 +175,112 @@ async def check_active_jobs( for current_job in active_jobs: async with self.status_rate_limiter: - - # Event types that report an error - error_event_type = [ - htcondor.JobEventType.JOB_ABORTED, - htcondor.JobEventType.JOB_HELD, - htcondor.JobEventType.EXECUTABLE_ERROR, - htcondor.JobEventType.REMOTE_ERROR, - ] - - # Event types that report a success - success_event_type = [htcondor.JobEventType.JOB_TERMINATED] - - # Event types that report the job is running/other event is happening - running_event_type = [ - htcondor.JobEventType.SUBMIT, - htcondor.JobEventType.EXECUTE, - htcondor.JobEventType.IMAGE_SIZE, - ] - - warning_event_type = [ - htcondor.JobEventType.JOB_EVICTED, - htcondor.JobEventType.JOB_SUSPENDED, - ] - - # Look in the log file to check the status of the job - logFileName = join(self.jobDir, f"{current_job.external_jobid}.log") - jel = htcondor.JobEventLog(logFileName) - - # Get the latest event from the iterator - for event in jel.events(stop_after=0): - latest_event = event - - if latest_event.type in error_event_type: - # Job has an error - self.logger.debug( - f"HTCondor job {current_job.external_jobid} has " - "JobEventType {latest_event.type}." - ) - self.report_job_error( - current_job, - msg=f"HTCondor job {current_job.external_jobid} has " - "JobEventType {latest_event.type}. ", - ) - break - elif latest_event.type in running_event_type: - # Job is still running/idle - self.logger.debug( - f"HTCondor job {current_job.external_jobid} has " - "JobEventType {latest_event.type}." + # Get the status of the job from HTCondor + try: + schedd = htcondor.Schedd() + job_status = schedd.query( + constraint=f"ClusterId == {current_job.external_jobid}", + projection=[ + "ExitBySignal", + "ExitCode", + "ExitSignal", + "JobStatus", + ], ) + # Job is not running anymore, look + if not job_status: + job_status = schedd.history( + constraint=f"ClusterId == {current_job.external_jobid}", + projection=[ + "ExitBySignal", + "ExitCode", + "ExitSignal", + "JobStatus", + ], + ) + # Storing the one event from HistoryIterator to list + job_status = [next(job_status)] + except Exception as e: + self.logger.warning(f"Failed to retrieve HTCondor job status: {e}") + # Assuming the job is still running and retry next time yield current_job - elif latest_event.type in warning_event_type: - # Job has a warning - self.logger.warning( - f"HTCondor job {current_job.external_jobid} has " - "obEventType {latest_event.type}." - ) + self.logger.debug( + f"Job {current_job.job.jobid} with HTCondor Cluster ID " + f"{current_job.external_jobid} has status: {job_status}" + ) + + # Overview of HTCondor job status: + status_dict = { + "1": "Idle", + "2": "Running", + "3": "Removed", + "4": "Completed", + "5": "Held", + "6": "Transferring Output", + "7": "Suspended", + } + + # Running/idle jobs + if job_status[0]["JobStatus"] in [1, 2, 6, 7]: + if job_status[0]["JobStatus"] in [7]: + self.logger.warning( + f"Job {current_job.job.jobid} with " + "HTCondor Cluster ID " + f"{current_job.external_jobid} is suspended." + ) yield current_job - elif latest_event.type in success_event_type: - # Job is terminated + # Completed jobs + elif job_status[0]["JobStatus"] in [4]: self.logger.debug( - f"HTCondor job {current_job.external_jobid} has " - "JobEventType {latest_event.type}." + f"Check whether Job {current_job.job.jobid} with " + "HTCondor Cluster ID " + f"{current_job.external_jobid} was successful." + ) + # Check ExitCode + if job_status[0]["ExitCode"] == 0: + # Job was successful + self.logger.debug( + f"Report Job {current_job.job.jobid} with " + "HTCondor Cluster ID " + f"{current_job.external_jobid} success" + ) + self.logger.info( + f"Job {current_job.job.jobid} with " + "HTCondor Cluster ID " + f"{current_job.external_jobid} was successful." + ) + self.report_job_success(current_job) + else: + self.logger.debug( + f"Report Job {current_job.job.jobid} with " + "HTCondor Cluster ID " + f"{current_job.external_jobid} error" + ) + self.report_job_error( + current_job, + msg=f"Job {current_job.job.jobid} with " + "HTCondor Cluster ID " + f"{current_job.external_jobid} has " + f" status {status_dict[str(job_status[0]['JobStatus'])]}, " + "but failed with" + f"ExitCode {job_status[0]['ExitCode']}.", + ) + # Errored jobs + elif job_status[0]["JobStatus"] in [3, 5]: + self.report_job_error( + current_job, + msg=f"Job {current_job.job.jobid} with " + "HTCondor Cluster ID " + f"{current_job.external_jobid} has " + f"status {status_dict[str(job_status[0]['JobStatus'])]}.", ) - self.report_job_success(current_job) else: - # Unsupported event type - self.logger.debug( - f"HTCondor job {current_job.external_jobid} has " - "JobEventType {latest_event.type}." - "This event type is not supported." + raise WorkflowError( + f"Job {current_job.job.jobid} with " + "HTCondor Cluster ID " + f"{current_job.external_jobid} has " + f"unknown HTCondor job status: {job_status[0]['JobStatus']}" ) - yield current_job def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): # Cancel all active jobs.