Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: job status method #6

Merged
merged 8 commits into from
Mar 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 107 additions & 66 deletions snakemake_executor_plugin_htcondor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __post_init__(self):
# access executor specific settings
self.workflow.executor_settings

# jobDir: Directory where the job will tore log, output and error files.
self.jobDir = self.workflow.executor_settings.jobdir

def run_job(self, job: JobExecutorInterface):
Expand Down Expand Up @@ -156,6 +157,13 @@ def run_job(self, job: JobExecutorInterface):
except Exception as e:
raise WorkflowError(f"Failed to submit HTCondor job: {e}")

self.logger.info(
f"Job {job.jobid} submitted to "
"HTCondor Cluster ID {submit_result.cluster()}\n"
f"The logs of the HTCondor job are stored "
f"in {self.jobDir}/{submit_result.cluster()}.log"
)

self.report_job_submission(
SubmittedJobInfo(job=job, external_jobid=submit_result.cluster())
)
Expand All @@ -167,79 +175,112 @@ async def check_active_jobs(

for current_job in active_jobs:
async with self.status_rate_limiter:

# Event types that report an error
error_event_type = [
htcondor.JobEventType.JOB_ABORTED,
htcondor.JobEventType.JOB_HELD,
htcondor.JobEventType.EXECUTABLE_ERROR,
htcondor.JobEventType.REMOTE_ERROR,
]

# Event types that report a success
success_event_type = [htcondor.JobEventType.JOB_TERMINATED]

# Event types that report the job is running/other event is happening
running_event_type = [
htcondor.JobEventType.SUBMIT,
htcondor.JobEventType.EXECUTE,
htcondor.JobEventType.IMAGE_SIZE,
]

warning_event_type = [
htcondor.JobEventType.JOB_EVICTED,
htcondor.JobEventType.JOB_SUSPENDED,
]

# Look in the log file to check the status of the job
logFileName = join(self.jobDir, f"{current_job.external_jobid}.log")
jel = htcondor.JobEventLog(logFileName)

# Get the latest event from the iterator
for event in jel.events(stop_after=0):
latest_event = event

if latest_event.type in error_event_type:
# Job has an error
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
)
self.report_job_error(
current_job,
msg=f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}. ",
)
break
elif latest_event.type in running_event_type:
# Job is still running/idle
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
# Get the status of the job from HTCondor
try:
schedd = htcondor.Schedd()
job_status = schedd.query(
constraint=f"ClusterId == {current_job.external_jobid}",
projection=[
"ExitBySignal",
"ExitCode",
"ExitSignal",
"JobStatus",
],
)
# Job is not running anymore, look
if not job_status:
job_status = schedd.history(
constraint=f"ClusterId == {current_job.external_jobid}",
projection=[
"ExitBySignal",
"ExitCode",
"ExitSignal",
"JobStatus",
],
)
# Storing the one event from HistoryIterator to list
job_status = [next(job_status)]
except Exception as e:
self.logger.warning(f"Failed to retrieve HTCondor job status: {e}")
# Assuming the job is still running and retry next time
yield current_job
elif latest_event.type in warning_event_type:
# Job has a warning
self.logger.warning(
f"HTCondor job {current_job.external_jobid} has "
"obEventType {latest_event.type}."
)
self.logger.debug(
f"Job {current_job.job.jobid} with HTCondor Cluster ID "
f"{current_job.external_jobid} has status: {job_status}"
)

# Overview of HTCondor job status:
status_dict = {
"1": "Idle",
"2": "Running",
"3": "Removed",
"4": "Completed",
"5": "Held",
"6": "Transferring Output",
"7": "Suspended",
}

# Running/idle jobs
if job_status[0]["JobStatus"] in [1, 2, 6, 7]:
if job_status[0]["JobStatus"] in [7]:
self.logger.warning(
f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} is suspended."
)
yield current_job
elif latest_event.type in success_event_type:
# Job is terminated
# Completed jobs
elif job_status[0]["JobStatus"] in [4]:
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
f"Check whether Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} was successful."
)
# Check ExitCode
if job_status[0]["ExitCode"] == 0:
# Job was successful
self.logger.debug(
f"Report Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} success"
)
self.logger.info(
f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} was successful."
)
self.report_job_success(current_job)
else:
self.logger.debug(
f"Report Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} error"
)
self.report_job_error(
current_job,
msg=f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} has "
f" status {status_dict[str(job_status[0]['JobStatus'])]}, "
"but failed with"
f"ExitCode {job_status[0]['ExitCode']}.",
)
# Errored jobs
elif job_status[0]["JobStatus"] in [3, 5]:
self.report_job_error(
current_job,
msg=f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} has "
f"status {status_dict[str(job_status[0]['JobStatus'])]}.",
)
self.report_job_success(current_job)
else:
# Unsupported event type
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
"This event type is not supported."
raise WorkflowError(
f"Job {current_job.job.jobid} with "
"HTCondor Cluster ID "
f"{current_job.external_jobid} has "
f"unknown HTCondor job status: {job_status[0]['JobStatus']}"
)
yield current_job

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
# Cancel all active jobs.
Expand Down
Loading