diff --git a/src/DIRAC/Interfaces/API/Dirac.py b/src/DIRAC/Interfaces/API/Dirac.py index a0b2d061665..3d5a37d4444 100755 --- a/src/DIRAC/Interfaces/API/Dirac.py +++ b/src/DIRAC/Interfaces/API/Dirac.py @@ -1652,14 +1652,15 @@ def deleteJob(self, jobID): return ret jobIDs = ret["Value"] - jobIDsToDelete = [] - for jobID in jobIDs: - can_kill = JobStatus.checkJobStateTransition(jobID, JobStatus.KILLED)["OK"] - can_del = JobStatus.checkJobStateTransition(jobID, JobStatus.DELETED)["OK"] - if can_kill or can_del: - jobIDsToDelete.append(jobID) - - result = WMSClient(useCertificates=self.useCertificates).deleteJob(jobIDsToDelete) + # Remove any job IDs that can't change to the Killed or Deleted states + filteredJobs = set() + for filterState in (JobStatus.KILLED, JobStatus.DELETED): + filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState) + if not filterRes["OK"]: + return filterRes + filteredJobs.update(filterRes["Value"]) + + result = WMSClient(useCertificates=self.useCertificates).deleteJob(list(filteredJobs)) if result["OK"]: if self.jobRepo: for jID in result["Value"]: @@ -1689,11 +1690,11 @@ def rescheduleJob(self, jobID): return ret jobIDs = ret["Value"] - jobIDsToReschedule = [] - for jobID in jobIDs: - res = JobStatus.checkJobStateTransition(jobID, JobStatus.RESCHEDULED) - if res["OK"]: - jobIDsToReschedule.append(jobID) + # Remove any job IDs that can't change to the rescheduled state + filterRes = JobStatus.filterJobStateTransition(jobIDs, JobStatus.RESCHEDULED) + if not filterRes["OK"]: + return filterRes + jobIDsToReschedule = filterRes["Value"] result = WMSClient(useCertificates=self.useCertificates).rescheduleJob(jobIDsToReschedule) if result["OK"]: @@ -1724,14 +1725,15 @@ def killJob(self, jobID): return ret jobIDs = ret["Value"] - jobIDsToKill = [] - for jobID in jobIDs: - can_kill = JobStatus.checkJobStateTransition(jobID, JobStatus.KILLED)["OK"] - can_del = JobStatus.checkJobStateTransition(jobID, JobStatus.DELETED)["OK"] - if can_kill or can_del: - jobIDsToKill.append(jobID) + # Remove any job IDs that can't change to the Killed or Deleted states + filteredJobs = set() + for filterState in (JobStatus.KILLED, JobStatus.DELETED): + filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState) + if not filterRes["OK"]: + return filterRes + filteredJobs.update(filterRes["Value"]) - result = WMSClient(useCertificates=self.useCertificates).killJob(jobIDsToKill) + result = WMSClient(useCertificates=self.useCertificates).killJob(list(filteredJobs)) if result["OK"]: if self.jobRepo: for jID in result["Value"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py b/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py index f8c912cda2a..2fac939997b 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py @@ -4,6 +4,7 @@ from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.Utilities.StateMachine import State, StateMachine +from DIRAC.Core.Utilities.Decorators import deprecated #: @@ -97,6 +98,7 @@ def __init__(self, state): } +@deprecated("Use filterJobStateTransition instead") def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonitoringClient=None): """Utility to check if a job state transition is allowed""" if not currentStatus: @@ -125,3 +127,31 @@ def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonito ) return S_ERROR("Job state transition not allowed") return S_OK() + + +def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None): + """Given a list of jobIDs, return a list that are allowed to transition + to the given candidate state. + """ + allowedJobs = [] + + if not isinstance(jobIDs, list): + jobIDs = [jobIDs] + + if not jobMonitoringClient: + from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient + + jobMonitoringClient = JobMonitoringClient() + + res = jobMonitoringClient.getJobsStatus(jobIDs) + if not res["OK"]: + return res + + for jobID in jobIDs: + if jobID in res["Value"]: + curState = res["Value"][jobID]["Status"] + stateRes = JobsStateMachine(curState).getNextState(candidateState) + if stateRes["OK"]: + if stateRes["Value"] == candidateState: + allowedJobs.append(jobID) + return S_OK(allowedJobs)