Skip to content

Commit

Permalink
fix: job status method (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
jannisspeer authored Mar 23, 2024
1 parent f721f0d commit 0ee1b47
Showing 1 changed file with 107 additions and 66 deletions.
173 changes: 107 additions & 66 deletions snakemake_executor_plugin_htcondor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __post_init__(self):
# access executor specific settings
self.workflow.executor_settings

# jobDir: Directory where the job will tore log, output and error files.
self.jobDir = self.workflow.executor_settings.jobdir

def run_job(self, job: JobExecutorInterface):
Expand Down Expand Up @@ -156,6 +157,13 @@ def run_job(self, job: JobExecutorInterface):
except Exception as e:
raise WorkflowError(f"Failed to submit HTCondor job: {e}")

self.logger.info(
f"Job {job.jobid} submitted to "
"HTCondor Cluster ID {submit_result.cluster()}\n"
f"The logs of the HTCondor job are stored "
f"in {self.jobDir}/{submit_result.cluster()}.log"
)

self.report_job_submission(
SubmittedJobInfo(job=job, external_jobid=submit_result.cluster())
)
Expand All @@ -167,79 +175,112 @@ async def check_active_jobs(

for current_job in active_jobs:
async with self.status_rate_limiter:

# Event types that report an error
error_event_type = [
htcondor.JobEventType.JOB_ABORTED,
htcondor.JobEventType.JOB_HELD,
htcondor.JobEventType.EXECUTABLE_ERROR,
htcondor.JobEventType.REMOTE_ERROR,
]

# Event types that report a success
success_event_type = [htcondor.JobEventType.JOB_TERMINATED]

# Event types that report the job is running/other event is happening
running_event_type = [
htcondor.JobEventType.SUBMIT,
htcondor.JobEventType.EXECUTE,
htcondor.JobEventType.IMAGE_SIZE,
]

warning_event_type = [
htcondor.JobEventType.JOB_EVICTED,
htcondor.JobEventType.JOB_SUSPENDED,
]

# Look in the log file to check the status of the job
logFileName = join(self.jobDir, f"{current_job.external_jobid}.log")
jel = htcondor.JobEventLog(logFileName)

# Get the latest event from the iterator
for event in jel.events(stop_after=0):
latest_event = event

if latest_event.type in error_event_type:
# Job has an error
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
)
self.report_job_error(
current_job,
msg=f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}. ",
)
break
elif latest_event.type in running_event_type:
# Job is still running/idle
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
# Get the status of the job from HTCondor
try:
schedd = htcondor.Schedd()
job_status = schedd.query(
constraint=f"ClusterId == {current_job.external_jobid}",
projection=[
"ExitBySignal",
"ExitCode",
"ExitSignal",
"JobStatus",
],
)
# Job is not running anymore, look
if not job_status:
job_status = schedd.history(
constraint=f"ClusterId == {current_job.external_jobid}",
projection=[
"ExitBySignal",
"ExitCode",
"ExitSignal",
"JobStatus",
],
)
# Storing the one event from HistoryIterator to list
job_status = [next(job_status)]
except Exception as e:
self.logger.warning(f"Failed to retrieve HTCondor job status: {e}")
# Assuming the job is still running and retry next time
yield current_job
elif latest_event.type in warning_event_type:
# Job has a warning
self.logger.warning(
f"HTCondor job {current_job.external_jobid} has "
"obEventType {latest_event.type}."
)
self.logger.debug(
f"Job {current_job.job.jobid} with HTCondor Cluster ID "
f"{current_job.external_jobid} has status: {job_status}"
)

# Overview of HTCondor job status:
status_dict = {
"1": "Idle",
"2": "Running",
"3": "Removed",
"4": "Completed",
"5": "Held",
"6": "Transferring Output",
"7": "Suspended",
}

# Running/idle jobs
if job_status[0]["JobStatus"] in [1, 2, 6, 7]:
if job_status[0]["JobStatus"] in [7]:
self.logger.warning(
f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} is suspended."
)
yield current_job
elif latest_event.type in success_event_type:
# Job is terminated
# Completed jobs
elif job_status[0]["JobStatus"] in [4]:
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
f"Check whether Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} was successful."
)
# Check ExitCode
if job_status[0]["ExitCode"] == 0:
# Job was successful
self.logger.debug(
f"Report Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} success"
)
self.logger.info(
f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} was successful."
)
self.report_job_success(current_job)
else:
self.logger.debug(
f"Report Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} error"
)
self.report_job_error(
current_job,
msg=f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} has "
f" status {status_dict[str(job_status[0]['JobStatus'])]}, "
"but failed with"
f"ExitCode {job_status[0]['ExitCode']}.",
)
# Errored jobs
elif job_status[0]["JobStatus"] in [3, 5]:
self.report_job_error(
current_job,
msg=f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} has "
f"status {status_dict[str(job_status[0]['JobStatus'])]}.",
)
self.report_job_success(current_job)
else:
# Unsupported event type
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
"This event type is not supported."
raise WorkflowError(
f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} has "
f"unknown HTCondor job status: {job_status[0]['JobStatus']}"
)
yield current_job

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
# Cancel all active jobs.
Expand Down

0 comments on commit 0ee1b47

Please sign in to comment.