From 965384a9f94269875c08dd305ce91b4a7ad6b7cd Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 18 Jun 2024 14:58:41 +0200 Subject: [PATCH 1/7] fix: from ElasticJobParametersDB to JobParametersDB --- .../JobParametersDashboard.ndjson | 0 dirac.cfg | 2 +- .../Systems/WorkloadManagement/index.rst | 4 ++-- .../FrameworkSystem/Client/ComponentInstaller.py | 2 +- ...ElasticJobParametersDB.py => JobParametersDB.py} | 13 ++++++------- .../Service/JobMonitoringHandler.py | 2 +- .../Service/JobStateUpdateHandler.py | 4 ++-- .../Service/WMSAdministratorHandler.py | 2 +- .../Utilities/JobStatusUtility.py | 7 +++---- ...icJobParametersDB.py => Test_JobParametersDB.py} | 6 +++--- tests/Integration/all_integration_server_tests.sh | 2 +- .../ESJobMonitoring/test_scripts/query.py | 4 ++-- .../ESJobMonitoring/test_scripts/update.py | 4 ++-- 13 files changed, 25 insertions(+), 27 deletions(-) rename dashboards/{ElasticJobParameters => JobParameters}/JobParametersDashboard.ndjson (100%) rename src/DIRAC/WorkloadManagementSystem/DB/{ElasticJobParametersDB.py => JobParametersDB.py} (94%) rename tests/Integration/WorkloadManagementSystem/{Test_ElasticJobParametersDB.py => Test_JobParametersDB.py} (96%) diff --git a/dashboards/ElasticJobParameters/JobParametersDashboard.ndjson b/dashboards/JobParameters/JobParametersDashboard.ndjson similarity index 100% rename from dashboards/ElasticJobParameters/JobParametersDashboard.ndjson rename to dashboards/JobParameters/JobParametersDashboard.ndjson diff --git a/dirac.cfg b/dirac.cfg index f57fae99d25..1afcb93626c 100644 --- a/dirac.cfg +++ b/dirac.cfg @@ -573,7 +573,7 @@ Systems { Databases { - ElasticJobParametersDB + JobParametersDB { # Host of OpenSearch instance Host=host.some.where diff --git a/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst b/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst index 9a4bc096769..7ec3701e700 100644 --- a/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst +++ b/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst @@ -99,6 +99,6 @@ It is based on layered architecture and is based on DIRAC architecture: * SandboxMetadataDB SandboxMetadataDB class is a front-end to the metadata for sandboxes. - * ElasticJobParametersDB - ElasticJobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters. + * JobParametersDB + JobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters. It is used in most of the WMS components and is based on Elastic/OpenSearch. diff --git a/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py b/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py index 77671052030..e6217033d6e 100644 --- a/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py +++ b/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py @@ -2108,7 +2108,7 @@ def getAvailableESDatabases(self, extensions): Result should be something like:: {'MonitoringDB': {'Type': 'ES', 'System': 'Monitoring', 'Extension': ''}, - 'ElasticJobParametersDB': {'Type': 'ES', 'System': 'WorkloadManagement', 'Extension': ''}} + 'JobParametersDB': {'Type': 'ES', 'System': 'WorkloadManagement', 'Extension': ''}} :param list extensions: list of DIRAC extensions :return: dict of ES DBs diff --git a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py similarity index 94% rename from src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py rename to src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py index cdd32156d4a..6e5b0a5d964 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py @@ -1,4 +1,4 @@ -""" Module containing a front-end to the ElasticSearch-based ElasticJobParametersDB. +""" Module containing a front-end to the ElasticSearch-based JobParametersDB. This is a drop-in replacement for MySQL-based table JobDB.JobParameters. The following class methods are provided for public usage @@ -6,7 +6,6 @@ - setJobParameter() - deleteJobParameters() """ -from typing import Union from DIRAC import S_ERROR, S_OK from DIRAC.Core.Base.ElasticDB import ElasticDB @@ -34,7 +33,7 @@ } -class ElasticJobParametersDB(ElasticDB): +class JobParametersDB(ElasticDB): def __init__(self, parentLogger=None): """Standard Constructor""" @@ -45,7 +44,7 @@ def __init__(self, parentLogger=None): # Connecting to the ES cluster super().__init__(self.fullname, self.index_name, parentLogger=parentLogger) except Exception as ex: - raise RuntimeError("Can't connect to ElasticJobParametersDB") from ex + RuntimeError("Can't connect to JobParameters index") from ex self.addIndexTemplate("elasticjobparametersdb", index_patterns=[f"{self.index_name}_*"], mapping=mapping) def _indexName(self, jobID: int) -> str: @@ -70,7 +69,7 @@ def _createIndex(self, indexName: str) -> None: raise RuntimeError(result["Message"]) self.log.always("Index created:", indexName) - def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict: + def getJobParameters(self, jobIDs: int | list[int], vo: str, paramList=None) -> dict: """Get Job Parameters defined for jobID. Returns a dictionary with the Job Parameters. If paramList is empty - all the parameters are returned. @@ -100,7 +99,7 @@ def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dic def setJobParameter(self, jobID: int, key: str, value: str) -> dict: """ - Inserts data into ElasticJobParametersDB index + Inserts data into JobParametersDB index :param self: self reference :param jobID: Job ID @@ -128,7 +127,7 @@ def setJobParameter(self, jobID: int, key: str, value: str) -> dict: def setJobParameters(self, jobID: int, parameters: list) -> dict: """ - Inserts data into ElasticJobParametersDB index using bulk indexing + Inserts data into JobParametersDB index using bulk indexing :param self: self reference :param jobID: Job ID diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 9c4ceb17607..79147021bb3 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -34,7 +34,7 @@ def initializeHandler(cls, svcInfoDict): return S_ERROR(f"Can't connect to DB: {excp}") result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" ) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py index 88173cb70ba..6b8c979a71c 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py @@ -38,7 +38,7 @@ def initializeHandler(cls, svcInfoDict): return S_ERROR(f"Can't connect to DB: {excp}") result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" ) if not result["OK"]: return result @@ -198,7 +198,7 @@ def export_setJobParameters(cls, jobID, parameters): """ result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters) if not result["OK"]: - cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"]) + cls.log.error("Failed to add Job Parameters to JobParametersDB", result["Message"]) return result diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index 096472ecbb0..63037702c7f 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -22,7 +22,7 @@ def initializeHandler(cls, svcInfoDict): return S_ERROR(f"Can't connect to DB: {excp!r}") result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" ) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py index 4726632dfdb..906b2728929 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py @@ -6,13 +6,12 @@ from typing import TYPE_CHECKING, Any from DIRAC import S_ERROR, S_OK, gLogger -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.Utilities import TimeUtilities from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.WorkloadManagementSystem.Client import JobStatus if TYPE_CHECKING: - from DIRAC.WorkloadManagementSystem.DB.ElasticJobParametersDB import ElasticJobParametersDB + from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB @@ -22,7 +21,7 @@ def __init__( self, jobDB: JobDB = None, jobLoggingDB: JobLoggingDB = None, - elasticJobParametersDB: ElasticJobParametersDB = None, + elasticJobParametersDB: JobParametersDB = None, ) -> None: """ :raises: RuntimeError, AttributeError @@ -56,7 +55,7 @@ def __init__( if not self.elasticJobParametersDB: result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" ) if not result["OK"]: raise AttributeError(result["Message"]) diff --git a/tests/Integration/WorkloadManagementSystem/Test_ElasticJobParametersDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py similarity index 96% rename from tests/Integration/WorkloadManagementSystem/Test_ElasticJobParametersDB.py rename to tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py index 90da7bf9a2d..389f2754a91 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_ElasticJobParametersDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py @@ -1,4 +1,4 @@ -""" This tests only need the ElasticJobParametersDB, and connects directly to it +""" This tests only need the JobParametersDB, and connects directly to it """ import time @@ -8,13 +8,13 @@ DIRAC.initialize() # Initialize configuration from DIRAC import gLogger -from DIRAC.WorkloadManagementSystem.DB.ElasticJobParametersDB import ElasticJobParametersDB +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB # Add a time delay to allow updating the modified index before querying it. SLEEP_DELAY = 2 gLogger.setLevel("DEBUG") -elasticJobParametersDB = ElasticJobParametersDB() +elasticJobParametersDB = JobParametersDB() def test_setAndGetJobFromDB(): diff --git a/tests/Integration/all_integration_server_tests.sh b/tests/Integration/all_integration_server_tests.sh index ce8d0329012..0f8a585a7d2 100644 --- a/tests/Integration/all_integration_server_tests.sh +++ b/tests/Integration/all_integration_server_tests.sh @@ -49,7 +49,7 @@ echo -e "*** $(date -u) **** WMS TESTS ****\n" pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobLoggingDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_TaskQueueDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) -pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_ElasticJobParametersDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) +pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobParametersDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotAgentsDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_SandboxMetadataDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) # pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobCleaningAgent.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) diff --git a/tests/Performance/ESJobMonitoring/test_scripts/query.py b/tests/Performance/ESJobMonitoring/test_scripts/query.py index 33610c6b09e..34d544dbd14 100644 --- a/tests/Performance/ESJobMonitoring/test_scripts/query.py +++ b/tests/Performance/ESJobMonitoring/test_scripts/query.py @@ -6,12 +6,12 @@ import random import time -from DIRAC.WorkloadManagementSystem.DB.ElasticJobParametersDB import ElasticJobParametersDB +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB class Transaction: def __init__(self): - self.elasticJobParametersDB = ElasticJobParametersDB() + self.elasticJobParametersDB = JobParametersDB() self.custom_timers = {} def run(self): diff --git a/tests/Performance/ESJobMonitoring/test_scripts/update.py b/tests/Performance/ESJobMonitoring/test_scripts/update.py index ae65c41b5d8..588f22d7a8a 100644 --- a/tests/Performance/ESJobMonitoring/test_scripts/update.py +++ b/tests/Performance/ESJobMonitoring/test_scripts/update.py @@ -7,7 +7,7 @@ import string import time -from DIRAC.WorkloadManagementSystem.DB.ElasticJobParametersDB import ElasticJobParametersDB +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB def random_generator(size=6, chars=string.ascii_letters): @@ -16,7 +16,7 @@ def random_generator(size=6, chars=string.ascii_letters): class Transaction: def __init__(self): - self.elasticJobParametersDB = ElasticJobParametersDB() + self.elasticJobParametersDB = JobParametersDB() self.custom_timers = {} def run(self): From e63f234be2cf361f580aa1f325853835a1e0bdda Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 18 Jun 2024 14:59:14 +0200 Subject: [PATCH 2/7] fix: remove .0 from index name --- src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py index 6e5b0a5d964..19e22f14bb4 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py @@ -52,7 +52,7 @@ def _indexName(self, jobID: int) -> str: :param jobID: Job ID """ - indexSplit = int(jobID) // 1e6 + indexSplit = int(int(jobID) // 1e6) return f"{self.index_name}_{indexSplit}m" def _createIndex(self, indexName: str) -> None: From 4f39be6591d3249db0a29a17f860bd50cfcdbae9 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 18 Jun 2024 17:04:11 +0200 Subject: [PATCH 3/7] feat: added vo to index names --- src/DIRAC/Core/Utilities/ElasticSearchDB.py | 4 +- .../DB/JobParametersDB.py | 34 ++++----- .../Service/JobMonitoringHandler.py | 31 ++++---- .../Service/JobStateUpdateHandler.py | 58 +++++++-------- .../Service/TornadoJobStateUpdateHandler.py | 3 +- .../Service/WMSAdministratorHandler.py | 23 +++--- .../Utilities/JobStatusUtility.py | 4 +- .../Test_JobParametersDB.py | 70 +++++++++---------- 8 files changed, 114 insertions(+), 113 deletions(-) diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 6455ec07c30..f65b4bb29c7 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -298,14 +298,14 @@ def getDoc(self, index: str, docID: str) -> dict: return S_ERROR(re) @ifConnected - def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]: + def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]: """Efficiently retrieve many documents from an index. :param index: name of the index :param docIDs: document IDs """ sLog.debug(f"Retrieving documents {docIDs}") - docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs] + docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs] try: response = self.client.mget({"docs": docs}) except RequestError as re: diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py index 19e22f14bb4..c36ea010f71 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py @@ -47,13 +47,13 @@ def __init__(self, parentLogger=None): RuntimeError("Can't connect to JobParameters index") from ex self.addIndexTemplate("elasticjobparametersdb", index_patterns=[f"{self.index_name}_*"], mapping=mapping) - def _indexName(self, jobID: int) -> str: + def _indexName(self, jobID: int, vo: str) -> str: """construct the index name :param jobID: Job ID """ indexSplit = int(int(jobID) // 1e6) - return f"{self.index_name}_{indexSplit}m" + return f"{self.index_name}_{vo}_{indexSplit}m" def _createIndex(self, indexName: str) -> None: """Create a new index if needed @@ -85,7 +85,7 @@ def getJobParameters(self, jobIDs: int | list[int], vo: str, paramList=None) -> paramList = paramList.replace(" ", "").split(",") self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}") - res = self.getDocs(self._indexName, jobIDs) + res = self.getDocs(self._indexName, jobIDs, vo) if not res["OK"]: return res result = {} @@ -97,7 +97,7 @@ def getJobParameters(self, jobIDs: int | list[int], vo: str, paramList=None) -> return S_OK(result) - def setJobParameter(self, jobID: int, key: str, value: str) -> dict: + def setJobParameter(self, jobID: int, key: str, value: str, vo: str) -> dict: """ Inserts data into JobParametersDB index @@ -114,18 +114,18 @@ def setJobParameter(self, jobID: int, key: str, value: str) -> dict: # The _id in ES can't exceed 512 bytes, this is a ES hard-coded limitation. # If a record with this jobID update and add parameter, otherwise create a new record - if self.existsDoc(self._indexName(jobID), docID=str(jobID)): + if self.existsDoc(self._indexName(jobID, vo), docID=str(jobID)): self.log.debug("A document for this job already exists, it will now be updated") - result = self.updateDoc(index=self._indexName(jobID), docID=str(jobID), body={"doc": data}) + result = self.updateDoc(index=self._indexName(jobID, vo), docID=str(jobID), body={"doc": data}) else: self.log.debug("No document has this job id, creating a new document for this job") - self._createIndex(self._indexName(jobID)) - result = self.index(indexName=self._indexName(jobID), body=data, docID=str(jobID)) + self._createIndex(self._indexName(jobID, vo)) + result = self.index(indexName=self._indexName(jobID, vo), body=data, docID=str(jobID)) if not result["OK"]: self.log.error("Couldn't insert or update data", result["Message"]) return result - def setJobParameters(self, jobID: int, parameters: list) -> dict: + def setJobParameters(self, jobID: int, parameters: list, vo: str) -> dict: """ Inserts data into JobParametersDB index using bulk indexing @@ -134,24 +134,24 @@ def setJobParameters(self, jobID: int, parameters: list) -> dict: :param parameters: list of tuples (name, value) pairs :returns: S_OK/S_ERROR as result of indexing """ - self.log.debug("Inserting parameters", f"in {self._indexName(jobID)}: for job {jobID}: {parameters}") + self.log.debug("Inserting parameters", f"in {self._indexName(jobID, vo)}: for job {jobID}: {parameters}") parametersDict = dict(parameters) parametersDict["JobID"] = jobID parametersDict["timestamp"] = int(TimeUtilities.toEpochMilliSeconds()) - if self.existsDoc(self._indexName(jobID), docID=str(jobID)): + if self.existsDoc(self._indexName(jobID, vo), docID=str(jobID)): self.log.debug("A document for this job already exists, it will now be updated") - result = self.updateDoc(index=self._indexName(jobID), docID=str(jobID), body={"doc": parametersDict}) + result = self.updateDoc(index=self._indexName(jobID, vo), docID=str(jobID), body={"doc": parametersDict}) else: self.log.debug("Creating a new document for this job") - self._createIndex(self._indexName(jobID)) - result = self.index(self._indexName(jobID), body=parametersDict, docID=str(jobID)) + self._createIndex(self._indexName(jobID, vo)) + result = self.index(self._indexName(jobID, vo), body=parametersDict, docID=str(jobID)) if not result["OK"]: self.log.error("Couldn't insert or update data", result["Message"]) return result - def deleteJobParameters(self, jobID: int, paramList=None) -> dict: + def deleteJobParameters(self, jobID: int, paramList=None, vo: str = "") -> dict: """Deletes Job Parameters defined for jobID. Returns a dictionary with the Job Parameters. If paramList is empty - all the parameters for the job are removed @@ -168,13 +168,13 @@ def deleteJobParameters(self, jobID: int, paramList=None) -> dict: if not paramList: # Deleting the whole record self.log.debug("Deleting record of job {jobID}") - result = self.deleteDoc(self._indexName(jobID), docID=str(jobID)) + result = self.deleteDoc(self._indexName(jobID, vo), docID=str(jobID)) else: # Deleting the specific parameters self.log.debug(f"JobDB.getParameters: Deleting Parameters {paramList} for job {jobID}") for paramName in paramList: result = self.updateDoc( - index=self._indexName(jobID), + index=self._indexName(jobID, vo), docID=str(jobID), body={"script": "ctx._source.remove('" + paramName + "')"}, ) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 79147021bb3..6e730f3f63e 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -4,15 +4,17 @@ The following methods are available in the Service interface """ -from DIRAC import S_OK, S_ERROR -from DIRAC.Core.DISET.RequestHandler import RequestHandler import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities +from DIRAC import S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.JEncode import strToIntDict from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient -from DIRAC.WorkloadManagementSystem.Service.JobPolicy import JobPolicy, RIGHT_GET_INFO +from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, JobPolicy class JobMonitoringHandlerMixin: @@ -33,9 +35,7 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: return result cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log) @@ -43,6 +43,10 @@ def initializeHandler(cls, svcInfoDict): cls.pilotManager = PilotManagerClient() return S_OK() + def initializeRequest(self): + credDict = self.getRemoteCredentials() + self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) + @classmethod def parseSelectors(cls, selectDict=None): """Parse selectors before DB query @@ -410,14 +414,13 @@ def export_getJobStats(cls, attribute, selectDict): ############################################################################## types_getJobParameter = [[str, int], str] - @classmethod @ignoreEncodeWarning - def export_getJobParameter(cls, jobID, parName): + def export_getJobParameter(self, jobID, parName): """ :param str/int jobID: one single Job ID :param str parName: one single parameter name """ - res = cls.elasticJobParametersDB.getJobParameters(int(jobID), [parName]) + res = self.elasticJobParametersDB.getJobParameters(int(jobID), self.vo, [parName]) if not res["OK"]: return res return S_OK(res["Value"].get(int(jobID), {})) @@ -432,9 +435,8 @@ def export_getJobOptParameters(cls, jobID): ############################################################################## types_getJobParameters = [[str, int, list]] - @classmethod @ignoreEncodeWarning - def export_getJobParameters(cls, jobIDs, parName=None): + def export_getJobParameters(self, jobIDs, parName=None): """ :param str/int/list jobIDs: one single job ID or a list of them :param str parName: one single parameter name, a list or None (meaning all of them) @@ -442,13 +444,13 @@ def export_getJobParameters(cls, jobIDs, parName=None): if not isinstance(jobIDs, list): jobIDs = [jobIDs] jobIDs = [int(jobID) for jobID in jobIDs] - res = cls.elasticJobParametersDB.getJobParameters(jobIDs, parName) + res = self.elasticJobParametersDB.getJobParameters(jobIDs, self.vo, parName) if not res["OK"]: return res parameters = res["Value"] # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends - res = cls.jobDB.getJobParameters(jobIDs, parName) + res = self.jobDB.getJobParameters(jobIDs, parName) if not res["OK"]: return res parametersM = res["Value"] @@ -524,4 +526,5 @@ def export_getInputData(cls, jobID): class JobMonitoringHandler(JobMonitoringHandlerMixin, RequestHandler): - pass + def initialize(self): + return self.initializeRequest() diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py index 6b8c979a71c..a528b63c4b3 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py @@ -9,7 +9,8 @@ import time -from DIRAC import S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader @@ -37,9 +38,7 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: return result cls.elasticJobParametersDB = result["Value"]() @@ -48,6 +47,11 @@ def initializeHandler(cls, svcInfoDict): return S_OK() + + def initializeRequest(self): + credDict = self.getRemoteCredentials() + self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) + ########################################################################### types_updateJobFromStager = [[str, int], str] @@ -154,20 +158,17 @@ def export_setJobApplicationStatus(cls, jobID, appStatus, source="Unknown"): ########################################################################### types_setJobParameter = [[str, int], str, str] - @classmethod - def export_setJobParameter(cls, jobID, name, value): + def export_setJobParameter(self, jobID, name, value): """Set arbitrary parameter specified by name/value pair for job specified by its JobId """ - - return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member + return self.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=self.vo) # pylint: disable=no-member ########################################################################### types_setJobsParameter = [dict] - @classmethod @ignoreEncodeWarning - def export_setJobsParameter(cls, jobsParameterDict): + def export_setJobsParameter(self, jobsParameterDict): """Set arbitrary parameter specified by name/value pair for job specified by its JobId """ @@ -175,11 +176,11 @@ def export_setJobsParameter(cls, jobsParameterDict): message = "" for jobID in jobsParameterDict: - res = cls.elasticJobParametersDB.setJobParameter( - int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1]) + res = self.elasticJobParametersDB.setJobParameter( + int(jobID), key=str(jobsParameterDict[jobID][0]), value=str(jobsParameterDict[jobID][1]), vo=self.vo ) if not res["OK"]: - cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"]) + self.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"]) failed = True message = res["Message"] @@ -190,36 +191,34 @@ def export_setJobsParameter(cls, jobsParameterDict): ########################################################################### types_setJobParameters = [[str, int], list] - @classmethod @ignoreEncodeWarning - def export_setJobParameters(cls, jobID, parameters): + def export_setJobParameters(self, jobID, parameters): """Set arbitrary parameters specified by a list of name/value pairs for job specified by its JobId """ - result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters) + result = self.elasticJobParametersDB.setJobParameters(int(jobID), parameters=parameters, vo=self.vo) if not result["OK"]: - cls.log.error("Failed to add Job Parameters to JobParametersDB", result["Message"]) + self.log.error("Failed to add Job Parameters to JobParametersDB", result["Message"]) return result ########################################################################### types_sendHeartBeat = [[str, int], dict, dict] - @classmethod - def export_sendHeartBeat(cls, jobID, dynamicData, staticData): + def export_sendHeartBeat(self, jobID, dynamicData, staticData): """Send a heart beat sign of life for a job jobID""" - result = cls.jobDB.setHeartBeatData(int(jobID), dynamicData) + result = self.jobDB.setHeartBeatData(int(jobID), dynamicData) if not result["OK"]: - cls.log.warn("Failed to set the heart beat data", f"for job {jobID} ") + self.log.warn("Failed to set the heart beat data", f"for job {jobID} ") for key, value in staticData.items(): - result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value) + result = self.elasticJobParametersDB.setJobParameter(int(jobID), key, value, vo=self.vo) if not result["OK"]: - cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"]) + self.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"]) # Restore the Running status if necessary - result = cls.jobDB.getJobAttributes(jobID, ["Status"]) + result = self.jobDB.getJobAttributes(jobID, ["Status"]) if not result["OK"]: return result @@ -228,21 +227,22 @@ def export_sendHeartBeat(cls, jobID, dynamicData, staticData): status = result["Value"]["Status"] if status in (JobStatus.STALLED, JobStatus.MATCHED): - result = cls.jobDB.setJobAttribute(jobID=jobID, attrName="Status", attrValue=JobStatus.RUNNING, update=True) + result = self.jobDB.setJobAttribute(jobID=jobID, attrName="Status", attrValue=JobStatus.RUNNING, update=True) if not result["OK"]: - cls.log.warn("Failed to restore the job status to Running") + self.log.warn("Failed to restore the job status to Running") jobMessageDict = {} - result = cls.jobDB.getJobCommand(int(jobID)) + result = self.jobDB.getJobCommand(int(jobID)) if result["OK"]: jobMessageDict = result["Value"] if jobMessageDict: for key in jobMessageDict: - result = cls.jobDB.setJobCommandStatus(int(jobID), key, "Sent") + result = self.jobDB.setJobCommandStatus(int(jobID), key, "Sent") return S_OK(jobMessageDict) class JobStateUpdateHandler(JobStateUpdateHandlerMixin, RequestHandler): - pass + def initialize(self): + return self.initializeRequest() diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoJobStateUpdateHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoJobStateUpdateHandler.py index 4ce8c1e6703..a8b73613e16 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/TornadoJobStateUpdateHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoJobStateUpdateHandler.py @@ -12,4 +12,5 @@ class TornadoJobStateUpdateHandler(JobStateUpdateHandlerMixin, TornadoService): - pass + def initializeRequest(self): + return JobStateUpdateHandlerMixin.initializeRequest(self) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index 63037702c7f..84a375b30c3 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -1,11 +1,11 @@ """ This is a DIRAC WMS administrator interface. """ -from DIRAC import S_OK, S_ERROR - +from DIRAC import S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient @@ -21,9 +21,7 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp!r}") - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: return result cls.elasticJobParametersDB = result["Value"]() @@ -176,12 +174,13 @@ def export_getJobPilotOutput(self, jobID): pilotReference = "" # Get the pilot grid reference first from the job parameters - if self.elasticJobParametersDB: - res = self.elasticJobParametersDB.getJobParameters(int(jobID), "Pilot_Reference") - if not res["OK"]: - return res - if res["Value"].get(int(jobID)): - pilotReference = res["Value"][int(jobID)]["Pilot_Reference"] + credDict = self.getRemoteCredentials() + vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) + res = self.elasticJobParametersDB.getJobParameters(int(jobID), vo=vo, parNameList=["Pilot_Reference"]) + if not res["OK"]: + return res + if res["Value"].get(int(jobID)): + pilotReference = res["Value"][int(jobID)]["Pilot_Reference"] if not pilotReference: res = self.jobDB.getJobParameter(int(jobID), "Pilot_Reference") diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py index 906b2728929..1440d86bd1a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py @@ -54,9 +54,7 @@ def __init__( raise if not self.elasticJobParametersDB: - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: raise AttributeError(result["Message"]) self.elasticJobParametersDB = result["Value"](parentLogger=self.log) diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py index 389f2754a91..68feee3f55f 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py @@ -18,134 +18,134 @@ def test_setAndGetJobFromDB(): - res = elasticJobParametersDB.setJobParameter(100, "DIRAC", "dirac@cern") + res = elasticJobParametersDB.setJobParameter(100, "DIRAC", "dirac@cern", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(100) + res = elasticJobParametersDB.getJobParameters(100, "vo") assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern" # update it - res = elasticJobParametersDB.setJobParameter(100, "DIRAC", "dirac@cern.cern") + res = elasticJobParametersDB.setJobParameter(100, "DIRAC", "dirac@cern.cern", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(100) + res = elasticJobParametersDB.getJobParameters(100, "vo") assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" - res = elasticJobParametersDB.getJobParameters(100, ["DIRAC"]) + res = elasticJobParametersDB.getJobParameters(100, "vo", ["DIRAC"]) assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" - res = elasticJobParametersDB.getJobParameters(100, "DIRAC") + res = elasticJobParametersDB.getJobParameters(100, "vo", "DIRAC") assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" # add one - res = elasticJobParametersDB.setJobParameter(100, "someKey", "someValue") + res = elasticJobParametersDB.setJobParameter(100, "someKey", "someValue", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) # now search - res = elasticJobParametersDB.getJobParameters(100) + res = elasticJobParametersDB.getJobParameters(100, "vo") assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" assert res["Value"][100]["someKey"] == "someValue" - res = elasticJobParametersDB.getJobParameters(100, ["DIRAC", "someKey"]) + res = elasticJobParametersDB.getJobParameters(100, "vo", ["DIRAC", "someKey"]) assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" assert res["Value"][100]["someKey"] == "someValue" - res = elasticJobParametersDB.getJobParameters(100, "DIRAC, someKey") + res = elasticJobParametersDB.getJobParameters(100, "vo", "DIRAC, someKey") assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" assert res["Value"][100]["someKey"] == "someValue" # another one + search - res = elasticJobParametersDB.setJobParameter(100, "someOtherKey", "someOtherValue") + res = elasticJobParametersDB.setJobParameter(100, "someOtherKey", "someOtherValue", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(100) + res = elasticJobParametersDB.getJobParameters(100, "vo") assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" assert res["Value"][100]["someKey"] == "someValue" assert res["Value"][100]["someOtherKey"] == "someOtherValue" - res = elasticJobParametersDB.getJobParameters(100, ["DIRAC", "someKey", "someOtherKey"]) + res = elasticJobParametersDB.getJobParameters(100, "vo", ["DIRAC", "someKey", "someOtherKey"]) assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" assert res["Value"][100]["someKey"] == "someValue" assert res["Value"][100]["someOtherKey"] == "someOtherValue" # another job - res = elasticJobParametersDB.setJobParameter(101, "DIRAC", "dirac@cern") + res = elasticJobParametersDB.setJobParameter(101, "DIRAC", "dirac@cern", "vo") assert res["OK"] - res = elasticJobParametersDB.setJobParameter(101, "key101", "value101") + res = elasticJobParametersDB.setJobParameter(101, "key101", "value101", "vo") assert res["OK"] - res = elasticJobParametersDB.setJobParameter(101, "someKey", "value101") + res = elasticJobParametersDB.setJobParameter(101, "someKey", "value101", "vo") assert res["OK"] - res = elasticJobParametersDB.setJobParameter(101, "key101", "someValue") + res = elasticJobParametersDB.setJobParameter(101, "key101", "someValue", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(100) + res = elasticJobParametersDB.getJobParameters(100, "vo") assert res["OK"] assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" assert res["Value"][100]["someKey"] == "someValue" assert res["Value"][100]["someOtherKey"] == "someOtherValue" assert len(res["Value"]) == 1 assert len(res["Value"][100]) == 5 # Added two extra because of the new timestamp and ID field in the mapping - res = elasticJobParametersDB.getJobParameters(101) + res = elasticJobParametersDB.getJobParameters(101, "vo") assert res["OK"] assert res["Value"][101]["DIRAC"] == "dirac@cern" assert res["Value"][101]["key101"] == "someValue" assert res["Value"][101]["someKey"] == "value101" assert len(res["Value"]) == 1 assert len(res["Value"][101]) == 5 # Same thing as with doc 100 - res = elasticJobParametersDB.setJobParameters(101, [("k", "v"), ("k1", "v1"), ("k2", "v2")]) + res = elasticJobParametersDB.setJobParameters(101, [("k", "v"), ("k1", "v1"), ("k2", "v2")], vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(101) + res = elasticJobParametersDB.getJobParameters(101, "vo") assert res["OK"] assert res["Value"][101]["DIRAC"] == "dirac@cern" assert res["Value"][101]["k"] == "v" assert res["Value"][101]["k2"] == "v2" # another job with jobID > 1000000 - res = elasticJobParametersDB.setJobParameters(1010000, [("k", "v"), ("k1", "v1"), ("k2", "v2")]) + res = elasticJobParametersDB.setJobParameters(1010000, [("k", "v"), ("k1", "v1"), ("k2", "v2")], vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(1010000) + res = elasticJobParametersDB.getJobParameters(1010000, "vo") assert res["Value"][1010000]["k"] == "v" assert res["Value"][1010000]["k2"] == "v2" # deleting - res = elasticJobParametersDB.deleteJobParameters(100) + res = elasticJobParametersDB.deleteJobParameters(100, vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(100) + res = elasticJobParametersDB.getJobParameters(100, "vo") assert res["OK"] assert len(res["Value"][100]) == 0 - res = elasticJobParametersDB.deleteJobParameters(101, "someKey") + res = elasticJobParametersDB.deleteJobParameters(101, "someKey", vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(101) + res = elasticJobParametersDB.getJobParameters(101, "vo") assert res["OK"] assert len(res["Value"][101]) == 7 - res = elasticJobParametersDB.deleteJobParameters(101, "someKey, key101") # someKey is already deleted + res = elasticJobParametersDB.deleteJobParameters(101, "someKey, key101", vo="vo") # someKey is already deleted assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(101) + res = elasticJobParametersDB.getJobParameters(101, "vo") assert res["OK"] assert len(res["Value"][101]) == 6 - res = elasticJobParametersDB.deleteJobParameters(101, "nonExistingKey") + res = elasticJobParametersDB.deleteJobParameters(101, "nonExistingKey", vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(101) + res = elasticJobParametersDB.getJobParameters(101, "vo") assert res["OK"] assert len(res["Value"][101]) == 6 - res = elasticJobParametersDB.deleteJobParameters(1010000) + res = elasticJobParametersDB.deleteJobParameters(1010000, vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) - res = elasticJobParametersDB.getJobParameters(1010000) + res = elasticJobParametersDB.getJobParameters(1010000, "vo") assert res["OK"] assert len(res["Value"][1010000]) == 0 @@ -153,7 +153,7 @@ def test_setAndGetJobFromDB(): res = elasticJobParametersDB.deleteIndex("job_parameters") assert res["OK"] assert res["Value"] == "Nothing to delete" - res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(100)) + res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(100, vo="vo")) assert res["OK"] - res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(1010000)) + res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(1010000, vo="vo")) assert res["OK"] From 5df84f201ab09cc696b01fa88afbbbd7521e07c8 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Thu, 27 Jun 2024 13:42:51 +0200 Subject: [PATCH 4/7] fix: simply removed commented test --- .../Test_Client_WMS.py | 99 ------------------- 1 file changed, 99 deletions(-) diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index 5214c8f06d4..c3939f4c7f2 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -155,70 +155,6 @@ def test_submitJob(jobType: str, inputData: dict[str, int], expectedSite: str) - jobID = result["Value"] print(f"Submitted job {jobID}") - # try: - # # Wait for the optimizers to run - # startingTime = time.time() - # while time.time() - startingTime < 7 * 60: - # result = jobMonitoringClient.getJobsStates(jobID) - # assert result["OK"], result["Message"] - # if result["Value"][jobID]["Status"] in (JobStatus.WAITING, JobStatus.FAILED): - # break - # time.sleep(5) - # print(f"Lookup time: {time.time() - startingTime}s") - - # # Check if the optimizers ran correctly - # print(result["Value"][jobID]) - # assert result["Value"][jobID]["Status"] == JobStatus.WAITING - # # FIXME: flaky. This has to do with the CachedJobState.commitChanges() - # # assert result["Value"][jobID]["MinorStatus"] == JobMinorStatus.PILOT_AGENT_SUBMISSION - # # assert result["Value"][jobID]["ApplicationStatus"] == "Unknown" - - # res = jobMonitoringClient.getJobJDL(jobID, False) - # assert res["OK"], res["Message"] - # print(f"Job description: {res['Value']}") - # jobDescription = ClassAd(res["Value"]) - - # # Check that the JDL contains some fields - # assert jobDescription.lookupAttribute("Owner") is True - # assert jobDescription.lookupAttribute("OwnerGroup") is True - # assert jobDescription.lookupAttribute("CPUTime") is True - # assert jobDescription.lookupAttribute("Priority") is True - # assert jobDescription.lookupAttribute("JobID") is True - - # res = jobMonitoringClient.getJobSite(jobID) - # assert res["OK"], res["Message"] - # assert res["Value"] == expectedSite - - # resourceDescription = { - # "OwnerGroup": jobDescription.getAttributeString("OwnerGroup"), - # "VirtualOrganization": jobDescription.getAttributeString("VirtualOrganization"), - # "CPUTime": jobDescription.getAttributeInt("CPUTime"), - # "DIRACVersion": "pippo", - # "GridCE": "some.grid.ce.org", - # "ReleaseVersion": "blabla", - # "PilotInfoReportedFlag": "True", - # "PilotBenchmark": "anotherPilot", - # "Site": "DIRAC.Jenkins.ch", - # } - - # # Request job until ours is picked up or request returns S_ERROR - # while True: - # time.sleep(1) - # res = matcherClient.requestJob(resourceDescription) - # print(f"Matcher result: {res}") - # if not res["OK"] or res["Value"]["JobID"] == jobID: - # break - - # # Assert that our job has been selected by the matcher - # assert res["OK"], res["Message"] - # assert res["Value"]["JobID"] == jobID - - # # Check that the job has been putten in the MATCHED status - # result = jobMonitoringClient.getJobsStates(jobID) - # assert result["OK"], result["Message"] - # assert result["Value"][jobID]["Status"] == JobStatus.MATCHED - # assert result["Value"][jobID]["MinorStatus"] == "Assigned" - # assert result["Value"][jobID]["ApplicationStatus"] == "Unknown" # finally: # Remove the file from storage element and file catalog for lfn in lfns: @@ -252,26 +188,6 @@ def test_submitJob_parametricJob() -> None: jobNames = [res["Value"][jobID]["JobName"] for jobID in res["Value"]] assert set(jobNames) == {f"parametric_helloWorld_{nJob}" for nJob in range(3)} - # # Wait for the optimizers to run - # startingTime = time.time() - # while time.time() - startingTime < 7 * 60: - # result = jobMonitoringClient.getJobsStates(jobIDList) - # assert result["OK"], result["Message"] - # jobsAreNoLongerInChecking = True - # for jobID in jobIDList: - # if result["Value"][jobID]["Status"] == JobStatus.CHECKING: - # jobsAreNoLongerInChecking = False - # if jobsAreNoLongerInChecking: - # break - - # time.sleep(5) - # print(f"Lookup time: {time.time() - startingTime}s") - - # for jobID in jobIDList: - # assert result["Value"][jobID]["Status"] == JobStatus.WAITING - # FIXME: flaky. This has to do with the CachedJobState.commitChanges() - # assert result["Value"][jobID]["MinorStatus"] == JobMinorStatus.PILOT_AGENT_SUBMISSION - # assert result["Value"][jobID]["ApplicationStatus"] == "Unknown" finally: jobManagerClient.removeJob(jobIDList) @@ -286,21 +202,6 @@ def test_WMSClient_rescheduleJob() -> None: jobID = result["Value"] try: - # Wait for the optimizers to run - # startingTime = time.time() - # while time.time() - startingTime < 7 * 60: - # result = jobMonitoringClient.getJobsStates(jobID) - # assert result["OK"], result["Message"] - # if result["Value"][jobID]["Status"] in (JobStatus.WAITING, JobStatus.FAILED): - # break - # time.sleep(5) - # print(f"Lookup time: {time.time() - startingTime}s") - - # # Check if the optimizers ran correctly - # assert result["Value"][jobID]["Status"] == JobStatus.WAITING - # # FIXME: flaky. This has to do with the CachedJobState.commitChanges() - # # assert result["Value"][jobID]["MinorStatus"] == JobMinorStatus.PILOT_AGENT_SUBMISSION - # # assert result["Value"][jobID]["ApplicationStatus"] == "Unknown" res = jobMonitoringClient.getJobJDL(jobID, False) assert res["OK"], res["Message"] From 59c382c323637219af26346967a7c0e21540cd8d Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Thu, 27 Jun 2024 13:51:05 +0200 Subject: [PATCH 5/7] fix: do not add JobStatus to JobParameters --- .../Utilities/JobStatusUtility.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py index 1440d86bd1a..ebdd0e76546 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py @@ -11,7 +11,6 @@ from DIRAC.WorkloadManagementSystem.Client import JobStatus if TYPE_CHECKING: - from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB @@ -21,7 +20,6 @@ def __init__( self, jobDB: JobDB = None, jobLoggingDB: JobLoggingDB = None, - elasticJobParametersDB: JobParametersDB = None, ) -> None: """ :raises: RuntimeError, AttributeError @@ -31,7 +29,6 @@ def __init__( self.jobDB = jobDB self.jobLoggingDB = jobLoggingDB - self.elasticJobParametersDB = elasticJobParametersDB if not self.jobDB: try: @@ -53,11 +50,6 @@ def __init__( self.log.error("Can't connect to the JobLoggingDB") raise - if not self.elasticJobParametersDB: - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") - if not result["OK"]: - raise AttributeError(result["Message"]) - self.elasticJobParametersDB = result["Value"](parentLogger=self.log) def setJobStatus( self, jobID: int, status=None, minorStatus=None, appStatus=None, source=None, dateTime=None, force=False @@ -154,10 +146,7 @@ def setJobStatusBulk(self, jobID: int, statusDict: dict, force: bool = False): result = self.jobDB.setJobAttributes(jobID, attrNames, attrValues, update=True, force=True) if not result["OK"]: return result - if self.elasticJobParametersDB: - result = self.elasticJobParametersDB.setJobParameter(int(jobID), "Status", status) - if not result["OK"]: - return result + # Update start and end time if needed if not endTime and newEndTime: log.debug("Set job end time", endTime) From 5395cf2cc44a24fc973f71480fe5d30a0a13f80f Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Thu, 27 Jun 2024 15:01:02 +0200 Subject: [PATCH 6/7] fix: removed deprecated methods --- .../Client/JobState/JobState.py | 19 ++++-- .../WorkloadManagementSystem/DB/JobDB.py | 65 +------------------ .../Service/JobMonitoringHandler.py | 2 +- .../Service/JobStateUpdateHandler.py | 9 +-- .../Utilities/JobStatusUtility.py | 1 - .../Utilities/test/Test_JobStatusUtility.py | 6 +- .../Test_Client_WMS.py | 3 +- 7 files changed, 24 insertions(+), 81 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py index e26f646ceb6..7a80d9f17bb 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py @@ -1,13 +1,18 @@ """ This object is a wrapper for setting and getting jobs states """ -from DIRAC import gLogger, S_OK, S_ERROR -from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest +from DIRAC import S_ERROR, S_OK, gLogger from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB -from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, singleValueDefFields, multiValueDefFields -from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, RIGHT_RESCHEDULE -from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_RESET, RIGHT_CHANGE_STATUS +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB +from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueDefFields, singleValueDefFields +from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( + RIGHT_CHANGE_STATUS, + RIGHT_GET_INFO, + RIGHT_RESCHEDULE, + RIGHT_RESET, +) from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility @@ -21,6 +26,7 @@ def reset(self): self.jobDB = None self.logDB = None self.tqDB = None + self.jobParametersDB = None __db = DBHold() @@ -32,6 +38,7 @@ def checkDBAccess(cls): JobState.__db.jobDB = JobDB() JobState.__db.logDB = JobLoggingDB() JobState.__db.tqDB = TaskQueueDB() + JobState.__db.jpDB = JobParametersDB() def __init__(self, jid): self.__jid = jid @@ -112,7 +119,7 @@ def commitCache(self, initialState, cache, jobLog): return result if data["jobp"]: - result = self.__retryFunction(5, JobState.__db.jobDB.setJobParameters, (self.__jid, data["jobp"])) + result = self.__retryFunction(5, JobState.__db.jpDB.setJobParameters, (self.__jid, data["jobp"])) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 191a776c5a6..d0de65e6652 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -19,6 +19,7 @@ from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd +from DIRAC.Core.Utilities.Decorators import deprecated from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus @@ -264,6 +265,7 @@ def getJobAttribute(self, jobID, attribute): return S_OK(result["Value"].get(attribute)) ############################################################################# + @deprecated("Use JobParametersDB instead") def getJobParameter(self, jobID, parameter): """Get the given parameter of a job specified by its jobID""" @@ -668,50 +670,6 @@ def setStartExecTime(self, jobID, startDate=None): req = f"UPDATE Jobs SET StartExecTime={startDate} WHERE JobID={jobID} AND StartExecTime IS NULL" return self._update(req) - ############################################################################# - def setJobParameter(self, jobID, key, value): - """Set a parameter specified by name,value pair for the job JobID""" - - ret = self._escapeString(key) - if not ret["OK"]: - return ret - e_key = ret["Value"] - ret = self._escapeString(value) - if not ret["OK"]: - return ret - e_value = ret["Value"] - - cmd = "REPLACE JobParameters (JobID,Name,Value) VALUES (%d,%s,%s)" % (int(jobID), e_key, e_value) - return self._update(cmd) - - ############################################################################# - def setJobParameters(self, jobID, parameters): - """Set parameters specified by a list of name/value pairs for the job JobID - - :param int jobID: Job ID - :param list parameters: list of tuples (name, value) pairs - - :return: S_OK/S_ERROR - """ - - if not parameters: - return S_OK() - - insertValueList = [] - for name, value in parameters: - ret = self._escapeString(name) - if not ret["OK"]: - return ret - e_name = ret["Value"] - ret = self._escapeString(value) - if not ret["OK"]: - return ret - e_value = ret["Value"] - insertValueList.append(f"({jobID},{e_name},{e_value})") - - cmd = f"REPLACE JobParameters (JobID,Name,Value) VALUES {', '.join(insertValueList)}" - return self._update(cmd) - ############################################################################# def setJobOptParameter(self, jobID, name, value): """Set an optimzer parameter specified by name,value pair for the job JobID""" @@ -780,16 +738,6 @@ def setAtticJobParameter(self, jobID, key, value, rescheduleCounter): ) return self._update(cmd) - ############################################################################# - def __setInitialJobParameters(self, classadJob, jobID): - """Set initial job parameters as was defined in the Classad""" - - # Extract initital job parameters - parameters = {} - if classadJob.lookupAttribute("Parameters"): - parameters = classadJob.getDictionaryFromSubJDL("Parameters") - return self.setJobParameters(jobID, list(parameters.items())) - ############################################################################# def setJobJDL(self, jobID, jdl=None, originalJDL=None): """Insert JDL's for job specified by jobID""" @@ -969,11 +917,6 @@ def insertNewJobIntoDB( if not result["OK"]: return result - # Setting the Job parameters - result = self.__setInitialJobParameters(classAdJob, jobID) - if not result["OK"]: - return result - # Looking for the Input Data inputData = [] if classAdJob.lookupAttribute("InputData"): @@ -1205,10 +1148,6 @@ def rescheduleJob(self, jobID): if not result["OK"]: return result - result = self.__setInitialJobParameters(classAdJob, jobID) - if not result["OK"]: - return result - result = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()), force=True) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 6e730f3f63e..f97667fd460 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -46,7 +46,7 @@ def initializeHandler(cls, svcInfoDict): def initializeRequest(self): credDict = self.getRemoteCredentials() self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) - + @classmethod def parseSelectors(cls, selectDict=None): """Parse selectors before DB query diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py index a528b63c4b3..281161c722c 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py @@ -43,11 +43,10 @@ def initializeHandler(cls, svcInfoDict): return result cls.elasticJobParametersDB = result["Value"]() - cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB, cls.elasticJobParametersDB) + cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB) return S_OK() - def initializeRequest(self): credDict = self.getRemoteCredentials() self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) @@ -162,7 +161,7 @@ def export_setJobParameter(self, jobID, name, value): """Set arbitrary parameter specified by name/value pair for job specified by its JobId """ - return self.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=self.vo) # pylint: disable=no-member + return self.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=self.vo) ########################################################################### types_setJobsParameter = [dict] @@ -227,7 +226,9 @@ def export_sendHeartBeat(self, jobID, dynamicData, staticData): status = result["Value"]["Status"] if status in (JobStatus.STALLED, JobStatus.MATCHED): - result = self.jobDB.setJobAttribute(jobID=jobID, attrName="Status", attrValue=JobStatus.RUNNING, update=True) + result = self.jobDB.setJobAttribute( + jobID=jobID, attrName="Status", attrValue=JobStatus.RUNNING, update=True + ) if not result["OK"]: self.log.warn("Failed to restore the job status to Running") diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py index ebdd0e76546..7ff43246afe 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py @@ -50,7 +50,6 @@ def __init__( self.log.error("Can't connect to the JobLoggingDB") raise - def setJobStatus( self, jobID: int, status=None, minorStatus=None, appStatus=None, source=None, dateTime=None, force=False ): diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobStatusUtility.py index 1205beed32c..913ab1c038a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobStatusUtility.py @@ -216,11 +216,7 @@ def test__setJobStatusBulk( jobLoggingDB_mock = MagicMock() jobLoggingDB_mock.getWMSTimeStamps.return_value = jobLoggingDB_getWMSTimeStamps_rv - esJobParameters_mock = MagicMock() - esJobParameters_mock.getJobAttributes.return_value = jobDB_getJobAttributes_rv - esJobParameters_mock.setJobAttributes.return_value = jobDB_setJobAttributes_rv - - jsu = JobStatusUtility(jobDB_mock, jobLoggingDB_mock, esJobParameters_mock) + jsu = JobStatusUtility(jobDB_mock, jobLoggingDB_mock) # Act res = jsu.setJobStatusBulk(1, statusDict_in, force) diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index c3939f4c7f2..a392bd54756 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -202,7 +202,6 @@ def test_WMSClient_rescheduleJob() -> None: jobID = result["Value"] try: - res = jobMonitoringClient.getJobJDL(jobID, False) assert res["OK"], res["Message"] print(f"Job description: {res['Value']}") @@ -320,6 +319,8 @@ def test_JobStateUpdateAndJobMonitoring() -> None: assert res["OK"], res["Message"] assert res["Value"][jobID]["Status"] == JobStatus.RUNNING + time.sleep(3) + res = jobMonitoringClient.getJobParameters(jobID, ["par1", "par2"]) assert res["OK"], res["Message"] assert res["Value"] == {jobID: {"par1": "par1Value", "par2": "par2Value"}} From b1db3018d1f68076c747bdf6ccfd3984e670d36f Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 20 Aug 2024 15:35:18 +0200 Subject: [PATCH 7/7] fix: JobState does not need to push JobParameters --- .../Client/JobState/JobState.py | 10 +--------- .../WorkloadManagementSystem/DB/JobParametersDB.py | 4 ++-- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py index 7a80d9f17bb..8280b6abe4a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py @@ -5,7 +5,6 @@ from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB -from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueDefFields, singleValueDefFields from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( RIGHT_CHANGE_STATUS, @@ -26,7 +25,6 @@ def reset(self): self.jobDB = None self.logDB = None self.tqDB = None - self.jobParametersDB = None __db = DBHold() @@ -38,7 +36,6 @@ def checkDBAccess(cls): JobState.__db.jobDB = JobDB() JobState.__db.logDB = JobLoggingDB() JobState.__db.tqDB = TaskQueueDB() - JobState.__db.jpDB = JobParametersDB() def __init__(self, jid): self.__jid = jid @@ -103,7 +100,7 @@ def commitCache(self, initialState, cache, jobLog): return S_OK(False) gLogger.verbose(f"Job {self.__jid}: About to execute trace. Current state {initialState}") - data = {"att": [], "jobp": [], "optp": []} + data = {"att": [], "optp": []} for key in cache: for dk in data: if key.find(f"{dk}.") == 0: @@ -118,11 +115,6 @@ def commitCache(self, initialState, cache, jobLog): if not result["OK"]: return result - if data["jobp"]: - result = self.__retryFunction(5, JobState.__db.jpDB.setJobParameters, (self.__jid, data["jobp"])) - if not result["OK"]: - return result - for k, v in data["optp"]: result = self.__retryFunction(5, JobState.__db.jobDB.setJobOptParameter, (self.__jid, k, v)) if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py index c36ea010f71..84b9d9afbde 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py @@ -43,8 +43,8 @@ def __init__(self, parentLogger=None): try: # Connecting to the ES cluster super().__init__(self.fullname, self.index_name, parentLogger=parentLogger) - except Exception as ex: - RuntimeError("Can't connect to JobParameters index") from ex + except Exception: + RuntimeError("Can't connect to JobParameters index") self.addIndexTemplate("elasticjobparametersdb", index_patterns=[f"{self.index_name}_*"], mapping=mapping) def _indexName(self, jobID: int, vo: str) -> str: