Skip to content

Commit

Permalink
fix: solved several bugs to obtain first running version (#3)
Browse files Browse the repository at this point in the history
* fix: receiving and passing the ClusterId

* fix: switched to retrieving the job status from the cluster log

* fix: only read latest job event log entry

* feat: Raising warning for certain job event states

* fix: setting executable and arguments properly

* style: corrected formating

* style: linting
  • Loading branch information
jannisspeer authored Mar 21, 2024
1 parent 7f62dd4 commit 5cdfdd2
Showing 1 changed file with 84 additions and 50 deletions.
134 changes: 84 additions & 50 deletions snakemake_executor_plugin_htcondor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,24 @@ def __post_init__(self):
# access executor specific settings
self.workflow.executor_settings

self.jobDir = self.workflow.executor_settings.jobdir

def run_job(self, job: JobExecutorInterface):
# Submitting job to HTCondor

print(job)
print(job.name)
print(job.resources)

# Creating directory to store log, output and error files
jobDir = self.workflow.executor_settings.jobdir
makedirs(jobDir, exist_ok=True)
makedirs(self.jobDir, exist_ok=True)

job_exec = self.get_python_executable()
job_args = self.format_job_exec(job).removeprefix(job_exec + " ")

# Creating submit dictionary which is passed to htcondor.Submit
submit_dict = {
"executable": "/bin/bash",
"arguments": self.format_job_exec(
job
), # using the method from RemoteExecutor
"log": join(jobDir, "$(ClusterId).log"),
"output": join(jobDir, "$(ClusterId).out"),
"error": join(jobDir, "$(ClusterId).err"),
"executable": job_exec,
"arguments": job_args,
"log": join(self.jobDir, "$(ClusterId).log"),
"output": join(self.jobDir, "$(ClusterId).out"),
"error": join(self.jobDir, "$(ClusterId).err"),
"request_cpus": str(job.threads),
}

Expand Down Expand Up @@ -146,11 +144,13 @@ def run_job(self, job: JobExecutorInterface):

# Submitting job to HTCondor
try:
clusterID = schedd.submit(submit_description)
submit_result = schedd.submit(submit_description)
except Exception as e:
raise WorkflowError(f"Failed to submit HTCondor job: {e}")

self.report_job_submission(SubmittedJobInfo(job=job, external_jobid=clusterID))
self.report_job_submission(
SubmittedJobInfo(job=job, external_jobid=submit_result.cluster())
)

async def check_active_jobs(
self, active_jobs: List[SubmittedJobInfo]
Expand All @@ -159,47 +159,79 @@ async def check_active_jobs(

for current_job in active_jobs:
async with self.status_rate_limiter:
# Get the status of the job
try:
schedd = htcondor.Schedd()
job_status = schedd.query(
constraint=f"ClusterId == {current_job.external_jobid}",
projection=["JobStatus"],

# Event types that report an error
error_event_type = [
htcondor.JobEventType.JOB_ABORTED,
htcondor.JobEventType.JOB_HELD,
htcondor.JobEventType.EXECUTABLE_ERROR,
htcondor.JobEventType.REMOTE_ERROR,
]

# Event types that report a success
success_event_type = [htcondor.JobEventType.JOB_TERMINATED]

# Event types that report the job is running/other event is happening
running_event_type = [
htcondor.JobEventType.SUBMIT,
htcondor.JobEventType.EXECUTE,
htcondor.JobEventType.IMAGE_SIZE,
]

warning_event_type = [
htcondor.JobEventType.JOB_EVICTED,
htcondor.JobEventType.JOB_SUSPENDED,
]

# Look in the log file to check the status of the job
logFileName = join(self.jobDir, f"{current_job.external_jobid}.log")
jel = htcondor.JobEventLog(logFileName)

# Get the latest event from the iterator
for event in jel.events(stop_after=0):
latest_event = event

if latest_event.type in error_event_type:
# Job has an error
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
)
self.report_job_error(
current_job,
msg=f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}. ",
)
break
elif latest_event.type in running_event_type:
# Job is still running/idle
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
)
except Exception as e:
self.logger.warning(f"Failed to retrieve HTCondor job status: {e}")
# Assuming the job is still running and retry next time
yield current_job
self.logger.debug(
f"HTCondor job {current_job.external_jobid} status: {job_status}"
)

# Overview of HTCondor job status:
# 1: Idle
# 2: Running
# 3: Removed
# 4: Completed
# 5: Held
# 6: Transferring Output
# 7: Suspended

# Running/idle jobs
if job_status[0]["JobStatus"] in [1, 2, 6, 7]:
if job_status[0]["JobStatus"] in [7]:
self.logger.warning(
f"HTCondor job {current_job.external_jobid} is suspended."
)
elif latest_event.type in warning_event_type:
# Job has a warning
self.logger.warning(
f"HTCondor job {current_job.external_jobid} has "
"obEventType {latest_event.type}."
)
yield current_job
# Successful jobs
elif job_status[0]["JobStatus"] in [4]:
elif latest_event.type in success_event_type:
# Job is terminated
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
)
self.report_job_success(current_job)
# Errored jobs
elif job_status[0]["JobStatus"] in [3, 5]:
self.report_job_error(current_job)
else:
raise WorkflowError(
f"Unknown HTCondor job status: {job_status[0]['JobStatus']}"
# Unsupported event type
self.logger.debug(
f"HTCondor job {current_job.external_jobid} has "
"JobEventType {latest_event.type}."
"This event type is not supported."
)
yield current_job

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
# Cancel all active jobs.
Expand All @@ -208,6 +240,8 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
if active_jobs:
schedd = htcondor.Schedd()
job_ids = [current_job.external_jobid for current_job in active_jobs]
# For some reason HTCondor requires not the BATCH_NAME but the full JOB_IDS
job_ids = [f"ClusterId == {x}.0" for x in job_ids]
self.logger.debug(f"Cancelling HTCondor jobs: {job_ids}")
try:
schedd.act(htcondor.JobAction.Remove, job_ids)
Expand Down

0 comments on commit 5cdfdd2

Please sign in to comment.