diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 6cadcb2509c..f52618817a3 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -11,33 +11,26 @@ * *CompressJDLs*: Enable compression of JDLs when they are stored in the database, default *False*. """ -import base64 -import zlib import datetime - import operator from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSiteTier from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, convertToReturnValue -from DIRAC.Core.Utilities.DErrno import EWMSSUBM, EWMSJMAN, cmpError -from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError +from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus -from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest -from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import ( checkAndAddOwner, - fixJDL, checkAndPrepareJob, - createJDLWithInitialStatus, compressJDL, + createJDLWithInitialStatus, extractJDL, + fixJDL, ) @@ -891,6 +884,7 @@ def insertNewJobIntoDB( ownerGroup, initialStatus=JobStatus.RECEIVED, initialMinorStatus="Job accepted", + vo=None, ): """Insert the initial JDL into the Job database, Do initial JDL crosscheck, @@ -903,11 +897,17 @@ def insertNewJobIntoDB( :param str initialMinorStatus: optional initial minor job status :return: new job ID """ + # Workaround for the case when a custom version of dirac would be + # calling this method + if not vo: + vo = getVOForGroup(ownerGroup) + jobAttrs = { "LastUpdateTime": str(datetime.datetime.utcnow()), "SubmissionTime": str(datetime.datetime.utcnow()), "Owner": owner, "OwnerGroup": ownerGroup, + "VO": vo, } result = checkAndAddOwner(jdl, owner, ownerGroup) @@ -1193,6 +1193,8 @@ def rescheduleJob(self, jobID): jobAttrs["RescheduleTime"] = str(datetime.datetime.utcnow()) + jobAttrs["VO"] = getVOForGroup(resultDict["OwnerGroup"]) + reqJDL = classAdReq.asJDL() classAdJob.insertAttributeInt("JobRequirements", reqJDL) diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql index 721eb27418c..a54e48f585c 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql @@ -42,6 +42,7 @@ CREATE TABLE `Jobs` ( `JobName` VARCHAR(128) NOT NULL DEFAULT 'Unknown', `Owner` VARCHAR(64) NOT NULL DEFAULT 'Unknown', `OwnerGroup` VARCHAR(128) NOT NULL DEFAULT 'Unknown', + `VO` VARCHAR(32) NOT NULL, `SubmissionTime` DATETIME DEFAULT NULL, `RescheduleTime` DATETIME DEFAULT NULL, `LastUpdateTime` DATETIME DEFAULT NULL, diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 639a7c2fd71..bd19dc21f26 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -203,6 +203,7 @@ def export_submitJob(self, jobDesc): self.ownerGroup, initialStatus=initialStatus, initialMinorStatus=initialMinorStatus, + vo=getVOForGroup(self.ownerGroup), ) if not result["OK"]: return result diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py index c1330432e54..c818660c83b 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py @@ -78,11 +78,11 @@ def test_isValid(jobDB: JobDB): assert jobDB.isValid() -def test_insertNewJobIntoDB(jobDB): +def test_insertNewJobIntoDB(jobDB: JobDB): """Test the insertNewJobIntoDB method""" # Act - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") # Assert assert res["OK"], res["Message"] @@ -105,9 +105,9 @@ def test_insertNewJobIntoDB(jobDB): assert res["Value"] == {} -def test_removeJobFromDB(jobDB): +def test_removeJobFromDB(jobDB: JobDB): # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID = int(res["JobID"]) @@ -122,7 +122,7 @@ def test_getJobJDL_original(jobDB: JobDB): """Test of the getJobJDL method with the original parameter set to True""" # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID = int(res["JobID"]) @@ -139,7 +139,7 @@ def test_getJobJDL_nonOriginal(jobDB: JobDB): """Test of the getJobJDL method with the original parameter set to True""" # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID = int(res["JobID"]) @@ -191,13 +191,13 @@ def test_getJobJDL_nonOriginal(jobDB: JobDB): ) -def test_getJobsAttributes(jobDB): +def test_getJobsAttributes(jobDB: JobDB): # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID_1 = int(res["JobID"]) - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID_2 = int(res["JobID"]) @@ -214,7 +214,7 @@ def test_getJobsAttributes(jobDB): def test_rescheduleJob(jobDB): # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID = res["JobID"] @@ -261,7 +261,7 @@ def test_getCounters(jobDB): def test_heartBeatLogging(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID = res["JobID"] @@ -303,7 +303,7 @@ def test_heartBeatLogging(jobDB): def test_getJobParameters(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID = res["JobID"] @@ -321,10 +321,10 @@ def test_getJobParameters(jobDB): def test_setJobsMajorStatus(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID_1 = res["JobID"] - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID_2 = res["JobID"] @@ -367,10 +367,10 @@ def test_setJobsMajorStatus(jobDB): def test_attributes(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID_1 = res["JobID"] - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] jobID_2 = res["JobID"]