Skip to content

Commit

Permalink
Merge pull request #7328 from sfayer/cherry-pick-2-af3b2b349-integration
Browse files Browse the repository at this point in the history
[sweep:integration] Improve performance of job delete/kill/reschedule API
  • Loading branch information
fstagni authored Nov 29, 2023
2 parents c2c9477 + 6562a06 commit a23e350
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 19 deletions.
40 changes: 21 additions & 19 deletions src/DIRAC/Interfaces/API/Dirac.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,14 +1449,15 @@ def deleteJob(self, jobID):
return ret
jobIDs = ret["Value"]

jobIDsToDelete = []
for jobID in jobIDs:
can_kill = JobStatus.checkJobStateTransition(jobID, JobStatus.KILLED)["OK"]
can_del = JobStatus.checkJobStateTransition(jobID, JobStatus.DELETED)["OK"]
if can_kill or can_del:
jobIDsToDelete.append(jobID)
# Remove any job IDs that can't change to the Killed or Deleted states
filteredJobs = set()
for filterState in (JobStatus.KILLED, JobStatus.DELETED):
filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState)
if not filterRes["OK"]:
return filterRes
filteredJobs.update(filterRes["Value"])

return WMSClient(useCertificates=self.useCertificates).deleteJob(jobIDsToDelete)
return WMSClient(useCertificates=self.useCertificates).deleteJob(list(filteredJobs))

#############################################################################

Expand All @@ -1481,11 +1482,11 @@ def rescheduleJob(self, jobID):
return ret
jobIDs = ret["Value"]

jobIDsToReschedule = []
for jobID in jobIDs:
res = JobStatus.checkJobStateTransition(jobID, JobStatus.RESCHEDULED)
if res["OK"]:
jobIDsToReschedule.append(jobID)
# Remove any job IDs that can't change to the rescheduled state
filterRes = JobStatus.filterJobStateTransition(jobIDs, JobStatus.RESCHEDULED)
if not filterRes["OK"]:
return filterRes
jobIDsToReschedule = filterRes["Value"]

return WMSClient(useCertificates=self.useCertificates).rescheduleJob(jobIDsToReschedule)

Expand All @@ -1508,14 +1509,15 @@ def killJob(self, jobID):
return ret
jobIDs = ret["Value"]

jobIDsToKill = []
for jobID in jobIDs:
can_kill = JobStatus.checkJobStateTransition(jobID, JobStatus.KILLED)["OK"]
can_del = JobStatus.checkJobStateTransition(jobID, JobStatus.DELETED)["OK"]
if can_kill or can_del:
jobIDsToKill.append(jobID)
# Remove any job IDs that can't change to the Killed or Deleted states
filteredJobs = set()
for filterState in (JobStatus.KILLED, JobStatus.DELETED):
filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState)
if not filterRes["OK"]:
return filterRes
filteredJobs.update(filterRes["Value"])

return WMSClient(useCertificates=self.useCertificates).killJob(jobIDsToKill)
return WMSClient(useCertificates=self.useCertificates).killJob(list(filteredJobs))

#############################################################################

Expand Down
30 changes: 30 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Utilities.StateMachine import State, StateMachine
from DIRAC.Core.Utilities.Decorators import deprecated


#:
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(self, state):
}


@deprecated("Use filterJobStateTransition instead")
def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonitoringClient=None):
"""Utility to check if a job state transition is allowed"""
if not currentStatus:
Expand Down Expand Up @@ -125,3 +127,31 @@ def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonito
)
return S_ERROR("Job state transition not allowed")
return S_OK()


def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None):
"""Given a list of jobIDs, return a list that are allowed to transition
to the given candidate state.
"""
allowedJobs = []

if not isinstance(jobIDs, list):
jobIDs = [jobIDs]

if not jobMonitoringClient:
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient

jobMonitoringClient = JobMonitoringClient()

res = jobMonitoringClient.getJobsStatus(jobIDs)
if not res["OK"]:
return res

for jobID in jobIDs:
if jobID in res["Value"]:
curState = res["Value"][jobID]["Status"]
stateRes = JobsStateMachine(curState).getNextState(candidateState)
if stateRes["OK"]:
if stateRes["Value"] == candidateState:
allowedJobs.append(jobID)
return S_OK(allowedJobs)

0 comments on commit a23e350

Please sign in to comment.