From 5cdfdd2937206d46d2f615ac5a3c4ae20d440907 Mon Sep 17 00:00:00 2001 From: Jannis Speer Date: Thu, 21 Mar 2024 21:46:09 +0100 Subject: [PATCH] fix: solved several bugs to obtain first running version (#3) * fix: receiving and passing the ClusterId * fix: switched to retrieving the job status from the cluster log * fix: only read latest job event log entry * feat: Raising warning for certain job event states * fix: setting executable and arguments properly * style: corrected formating * style: linting --- .../__init__.py | 134 +++++++++++------- 1 file changed, 84 insertions(+), 50 deletions(-) diff --git a/snakemake_executor_plugin_htcondor/__init__.py b/snakemake_executor_plugin_htcondor/__init__.py index 2290980..c9379ad 100644 --- a/snakemake_executor_plugin_htcondor/__init__.py +++ b/snakemake_executor_plugin_htcondor/__init__.py @@ -72,26 +72,24 @@ def __post_init__(self): # access executor specific settings self.workflow.executor_settings + self.jobDir = self.workflow.executor_settings.jobdir + def run_job(self, job: JobExecutorInterface): # Submitting job to HTCondor - print(job) - print(job.name) - print(job.resources) - # Creating directory to store log, output and error files - jobDir = self.workflow.executor_settings.jobdir - makedirs(jobDir, exist_ok=True) + makedirs(self.jobDir, exist_ok=True) + + job_exec = self.get_python_executable() + job_args = self.format_job_exec(job).removeprefix(job_exec + " ") # Creating submit dictionary which is passed to htcondor.Submit submit_dict = { - "executable": "/bin/bash", - "arguments": self.format_job_exec( - job - ), # using the method from RemoteExecutor - "log": join(jobDir, "$(ClusterId).log"), - "output": join(jobDir, "$(ClusterId).out"), - "error": join(jobDir, "$(ClusterId).err"), + "executable": job_exec, + "arguments": job_args, + "log": join(self.jobDir, "$(ClusterId).log"), + "output": join(self.jobDir, "$(ClusterId).out"), + "error": join(self.jobDir, "$(ClusterId).err"), "request_cpus": str(job.threads), } @@ -146,11 +144,13 @@ def run_job(self, job: JobExecutorInterface): # Submitting job to HTCondor try: - clusterID = schedd.submit(submit_description) + submit_result = schedd.submit(submit_description) except Exception as e: raise WorkflowError(f"Failed to submit HTCondor job: {e}") - self.report_job_submission(SubmittedJobInfo(job=job, external_jobid=clusterID)) + self.report_job_submission( + SubmittedJobInfo(job=job, external_jobid=submit_result.cluster()) + ) async def check_active_jobs( self, active_jobs: List[SubmittedJobInfo] @@ -159,47 +159,79 @@ async def check_active_jobs( for current_job in active_jobs: async with self.status_rate_limiter: - # Get the status of the job - try: - schedd = htcondor.Schedd() - job_status = schedd.query( - constraint=f"ClusterId == {current_job.external_jobid}", - projection=["JobStatus"], + + # Event types that report an error + error_event_type = [ + htcondor.JobEventType.JOB_ABORTED, + htcondor.JobEventType.JOB_HELD, + htcondor.JobEventType.EXECUTABLE_ERROR, + htcondor.JobEventType.REMOTE_ERROR, + ] + + # Event types that report a success + success_event_type = [htcondor.JobEventType.JOB_TERMINATED] + + # Event types that report the job is running/other event is happening + running_event_type = [ + htcondor.JobEventType.SUBMIT, + htcondor.JobEventType.EXECUTE, + htcondor.JobEventType.IMAGE_SIZE, + ] + + warning_event_type = [ + htcondor.JobEventType.JOB_EVICTED, + htcondor.JobEventType.JOB_SUSPENDED, + ] + + # Look in the log file to check the status of the job + logFileName = join(self.jobDir, f"{current_job.external_jobid}.log") + jel = htcondor.JobEventLog(logFileName) + + # Get the latest event from the iterator + for event in jel.events(stop_after=0): + latest_event = event + + if latest_event.type in error_event_type: + # Job has an error + self.logger.debug( + f"HTCondor job {current_job.external_jobid} has " + "JobEventType {latest_event.type}." + ) + self.report_job_error( + current_job, + msg=f"HTCondor job {current_job.external_jobid} has " + "JobEventType {latest_event.type}. ", + ) + break + elif latest_event.type in running_event_type: + # Job is still running/idle + self.logger.debug( + f"HTCondor job {current_job.external_jobid} has " + "JobEventType {latest_event.type}." ) - except Exception as e: - self.logger.warning(f"Failed to retrieve HTCondor job status: {e}") - # Assuming the job is still running and retry next time yield current_job - self.logger.debug( - f"HTCondor job {current_job.external_jobid} status: {job_status}" - ) - - # Overview of HTCondor job status: - # 1: Idle - # 2: Running - # 3: Removed - # 4: Completed - # 5: Held - # 6: Transferring Output - # 7: Suspended - - # Running/idle jobs - if job_status[0]["JobStatus"] in [1, 2, 6, 7]: - if job_status[0]["JobStatus"] in [7]: - self.logger.warning( - f"HTCondor job {current_job.external_jobid} is suspended." - ) + elif latest_event.type in warning_event_type: + # Job has a warning + self.logger.warning( + f"HTCondor job {current_job.external_jobid} has " + "obEventType {latest_event.type}." + ) yield current_job - # Successful jobs - elif job_status[0]["JobStatus"] in [4]: + elif latest_event.type in success_event_type: + # Job is terminated + self.logger.debug( + f"HTCondor job {current_job.external_jobid} has " + "JobEventType {latest_event.type}." + ) self.report_job_success(current_job) - # Errored jobs - elif job_status[0]["JobStatus"] in [3, 5]: - self.report_job_error(current_job) else: - raise WorkflowError( - f"Unknown HTCondor job status: {job_status[0]['JobStatus']}" + # Unsupported event type + self.logger.debug( + f"HTCondor job {current_job.external_jobid} has " + "JobEventType {latest_event.type}." + "This event type is not supported." ) + yield current_job def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): # Cancel all active jobs. @@ -208,6 +240,8 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): if active_jobs: schedd = htcondor.Schedd() job_ids = [current_job.external_jobid for current_job in active_jobs] + # For some reason HTCondor requires not the BATCH_NAME but the full JOB_IDS + job_ids = [f"ClusterId == {x}.0" for x in job_ids] self.logger.debug(f"Cancelling HTCondor jobs: {job_ids}") try: schedd.act(htcondor.JobAction.Remove, job_ids)