diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py b/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py index 2fac939997b..28bc9a75a5e 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py @@ -6,6 +6,8 @@ from DIRAC.Core.Utilities.StateMachine import State, StateMachine from DIRAC.Core.Utilities.Decorators import deprecated +from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient + #: SUBMITTING = "Submitting" @@ -129,7 +131,7 @@ def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonito return S_OK() -def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None): +def filterJobStateTransition(jobIDs, candidateState): """Given a list of jobIDs, return a list that are allowed to transition to the given candidate state. """ @@ -138,12 +140,7 @@ def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None): if not isinstance(jobIDs, list): jobIDs = [jobIDs] - if not jobMonitoringClient: - from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient - - jobMonitoringClient = JobMonitoringClient() - - res = jobMonitoringClient.getJobsStatus(jobIDs) + res = JobMonitoringClient().getJobsStatus(jobIDs) if not res["OK"]: return res diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 534eff5e248..abf908782b9 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -22,6 +22,7 @@ from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient +from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( RIGHT_DELETE, @@ -305,6 +306,9 @@ def __getJobList(jobInput): :return : a list of int job IDs """ + if not jobInput: + return [] + if isinstance(jobInput, int): return [jobInput] if isinstance(jobInput, str): @@ -485,55 +489,44 @@ def __killJob(self, jobID, sendKillCommand=True): return S_OK() - def __kill_delete_jobs(self, jobIDList, right): + def _kill_delete_jobs(self, jobIDList, right): """Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary :param list jobIDList: job IDs - :param str right: right + :param str right: RIGHT_KILL or RIGHT_DELETE :return: S_OK()/S_ERROR() """ jobList = self.__getJobList(jobIDList) if not jobList: - return S_ERROR("Invalid job specification: " + str(jobIDList)) + self.log.warn("No jobs specified") + return S_OK([]) validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(jobList, right) badIDs = [] + killJobList = [] + deleteJobList = [] if validJobList: - # Get job status to see what is to be killed or deleted - result = self.jobDB.getJobsAttributes(validJobList, ["Status"]) + # Get the jobs allowed to transition to the Killed state + filterRes = filterJobStateTransition(validJobList, JobStatus.KILLED) + if not filterRes["OK"]: + return filterRes + killJobList.extend(filterRes["Value"]) + + if not right == RIGHT_KILL: + # Get the jobs allowed to transition to the Deleted state + filterRes = filterJobStateTransition(validJobList, JobStatus.DELETED) + if not filterRes["OK"]: + return filterRes + deleteJobList.extend(filterRes["Value"]) + + # Look for jobs that are in the Staging state to send kill signal to the stager + result = self.jobDB.getJobsAttributes(killJobList, ["Status"]) if not result["OK"]: return result - killJobList = [] - deleteJobList = [] - markKilledJobList = [] - stagingJobList = [] - for jobID, sDict in result["Value"].items(): - if sDict["Status"] in (JobStatus.RUNNING, JobStatus.MATCHED, JobStatus.STALLED): - killJobList.append(jobID) - elif sDict["Status"] in ( - JobStatus.SUBMITTING, - JobStatus.RECEIVED, - JobStatus.CHECKING, - JobStatus.WAITING, - JobStatus.RESCHEDULED, - JobStatus.DONE, - JobStatus.FAILED, - JobStatus.KILLED, - ): - if not right == RIGHT_KILL: - deleteJobList.append(jobID) - else: - markKilledJobList.append(jobID) - if sDict["Status"] in [JobStatus.STAGING]: - stagingJobList.append(jobID) - - for jobID in markKilledJobList: - result = self.__killJob(jobID, sendKillCommand=False) - if not result["OK"]: - badIDs.append(jobID) + stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING] for jobID in killJobList: result = self.__killJob(jobID) @@ -562,7 +555,8 @@ def __kill_delete_jobs(self, jobIDList, right): result["FailedJobIDs"] = badIDs return result - result = S_OK(validJobList) + jobsList = killJobList if right == RIGHT_KILL else deleteJobList + result = S_OK(jobsList) result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired() if invalidJobList: @@ -581,7 +575,7 @@ def export_deleteJob(self, jobIDs): :return: S_OK/S_ERROR """ - return self.__kill_delete_jobs(jobIDs, RIGHT_DELETE) + return self._kill_delete_jobs(jobIDs, RIGHT_DELETE) ########################################################################### types_killJob = [] @@ -594,7 +588,7 @@ def export_killJob(self, jobIDs): :return: S_OK/S_ERROR """ - return self.__kill_delete_jobs(jobIDs, RIGHT_KILL) + return self._kill_delete_jobs(jobIDs, RIGHT_KILL) ########################################################################### types_resetJob = [] diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobManager.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobManager.py new file mode 100644 index 00000000000..fb1b5f64552 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobManager.py @@ -0,0 +1,58 @@ +""" unit test (pytest) of JobManager service +""" + +from unittest.mock import MagicMock +import pytest + +from DIRAC import gLogger + +gLogger.setLevel("DEBUG") + +from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( + RIGHT_DELETE, + RIGHT_KILL, +) + +# sut +from DIRAC.WorkloadManagementSystem.Service.JobManagerHandler import JobManagerHandlerMixin + +# mocks +jobPolicy_mock = MagicMock() +jobDB_mock = MagicMock() +jobDB_mock.getJobsAttributes.return_value = {"OK": True, "Value": {}} + + +@pytest.mark.parametrize( + "jobIDs_list, right, lists, filteredJobsList, expected_res, expected_value", + [ + ([], RIGHT_KILL, ([], [], [], []), [], True, []), + ([], RIGHT_DELETE, ([], [], [], []), [], True, []), + (1, RIGHT_KILL, ([], [], [], []), [], True, []), + (1, RIGHT_KILL, ([1], [], [], []), [], True, []), + ([1, 2], RIGHT_KILL, ([], [], [], []), [], True, []), + ([1, 2], RIGHT_KILL, ([1], [], [], []), [], True, []), + (1, RIGHT_KILL, ([1], [], [], []), [1], True, [1]), + ([1, 2], RIGHT_KILL, ([1], [], [], []), [1], True, [1]), + ([1, 2], RIGHT_KILL, ([1], [2], [], []), [1], True, [1]), + ([1, 2], RIGHT_KILL, ([1], [2], [], []), [], True, []), + ([1, 2], RIGHT_KILL, ([1, 2], [], [], []), [1, 2], True, [1, 2]), + ], +) +def test___kill_delete_jobs(mocker, jobIDs_list, right, lists, filteredJobsList, expected_res, expected_value): + mocker.patch( + "DIRAC.WorkloadManagementSystem.Service.JobManagerHandler.filterJobStateTransition", + return_value={"OK": True, "Value": filteredJobsList}, + ) + + JobManagerHandlerMixin.log = gLogger + JobManagerHandlerMixin.jobPolicy = jobPolicy_mock + JobManagerHandlerMixin.jobDB = jobDB_mock + JobManagerHandlerMixin.taskQueueDB = MagicMock() + + jobPolicy_mock.evaluateJobRights.return_value = lists + + jm = JobManagerHandlerMixin() + + res = jm._kill_delete_jobs(jobIDs_list, right) + assert res["OK"] == expected_res + assert res["Value"] == expected_value