Skip to content

Commit

Permalink
Merge pull request #7317 from sfayer/fix_deljob_speed
Browse files Browse the repository at this point in the history
[8.0] Improve performance of job delete/kill/reschedule API
  • Loading branch information
fstagni authored Nov 29, 2023
2 parents 13128a2 + 4a746a3 commit af3b2b3
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 20 deletions.
42 changes: 22 additions & 20 deletions src/DIRAC/Interfaces/API/Dirac.py
Original file line number Diff line number Diff line change
Expand Up @@ -1652,14 +1652,15 @@ def deleteJob(self, jobID):
return ret
jobIDs = ret["Value"]

jobIDsToDelete = []
for jobID in jobIDs:
can_kill = JobStatus.checkJobStateTransition(jobID, JobStatus.KILLED)["OK"]
can_del = JobStatus.checkJobStateTransition(jobID, JobStatus.DELETED)["OK"]
if can_kill or can_del:
jobIDsToDelete.append(jobID)

result = WMSClient(useCertificates=self.useCertificates).deleteJob(jobIDsToDelete)
# Remove any job IDs that can't change to the Killed or Deleted states
filteredJobs = set()
for filterState in (JobStatus.KILLED, JobStatus.DELETED):
filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState)
if not filterRes["OK"]:
return filterRes
filteredJobs.update(filterRes["Value"])

result = WMSClient(useCertificates=self.useCertificates).deleteJob(list(filteredJobs))
if result["OK"]:
if self.jobRepo:
for jID in result["Value"]:
Expand Down Expand Up @@ -1689,11 +1690,11 @@ def rescheduleJob(self, jobID):
return ret
jobIDs = ret["Value"]

jobIDsToReschedule = []
for jobID in jobIDs:
res = JobStatus.checkJobStateTransition(jobID, JobStatus.RESCHEDULED)
if res["OK"]:
jobIDsToReschedule.append(jobID)
# Remove any job IDs that can't change to the rescheduled state
filterRes = JobStatus.filterJobStateTransition(jobIDs, JobStatus.RESCHEDULED)
if not filterRes["OK"]:
return filterRes
jobIDsToReschedule = filterRes["Value"]

result = WMSClient(useCertificates=self.useCertificates).rescheduleJob(jobIDsToReschedule)
if result["OK"]:
Expand Down Expand Up @@ -1724,14 +1725,15 @@ def killJob(self, jobID):
return ret
jobIDs = ret["Value"]

jobIDsToKill = []
for jobID in jobIDs:
can_kill = JobStatus.checkJobStateTransition(jobID, JobStatus.KILLED)["OK"]
can_del = JobStatus.checkJobStateTransition(jobID, JobStatus.DELETED)["OK"]
if can_kill or can_del:
jobIDsToKill.append(jobID)
# Remove any job IDs that can't change to the Killed or Deleted states
filteredJobs = set()
for filterState in (JobStatus.KILLED, JobStatus.DELETED):
filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState)
if not filterRes["OK"]:
return filterRes
filteredJobs.update(filterRes["Value"])

result = WMSClient(useCertificates=self.useCertificates).killJob(jobIDsToKill)
result = WMSClient(useCertificates=self.useCertificates).killJob(list(filteredJobs))
if result["OK"]:
if self.jobRepo:
for jID in result["Value"]:
Expand Down
30 changes: 30 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Utilities.StateMachine import State, StateMachine
from DIRAC.Core.Utilities.Decorators import deprecated


#:
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(self, state):
}


@deprecated("Use filterJobStateTransition instead")
def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonitoringClient=None):
"""Utility to check if a job state transition is allowed"""
if not currentStatus:
Expand Down Expand Up @@ -125,3 +127,31 @@ def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonito
)
return S_ERROR("Job state transition not allowed")
return S_OK()


def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None):
"""Given a list of jobIDs, return a list that are allowed to transition
to the given candidate state.
"""
allowedJobs = []

if not isinstance(jobIDs, list):
jobIDs = [jobIDs]

if not jobMonitoringClient:
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient

jobMonitoringClient = JobMonitoringClient()

res = jobMonitoringClient.getJobsStatus(jobIDs)
if not res["OK"]:
return res

for jobID in jobIDs:
if jobID in res["Value"]:
curState = res["Value"][jobID]["Status"]
stateRes = JobsStateMachine(curState).getNextState(candidateState)
if stateRes["OK"]:
if stateRes["Value"] == candidateState:
allowedJobs.append(jobID)
return S_OK(allowedJobs)

0 comments on commit af3b2b3

Please sign in to comment.