diff --git a/dirac.cfg b/dirac.cfg index 34eafa866ea..3d0bb61e89b 100644 --- a/dirac.cfg +++ b/dirac.cfg @@ -569,6 +569,20 @@ Systems ##END } } + WorkloadManagementSystem + { + Databases + { + ElasticJobParametersDB + { + # Host of OpenSearch instance + Host=host.some.where + + # index name (default is "job_parameters") + index_name=a_different_name + } + } + } } Resources { diff --git a/docs/source/AdministratorGuide/Systems/WorkloadManagement/architecture.rst b/docs/source/AdministratorGuide/Systems/WorkloadManagement/architecture.rst index e3b56fe06dd..034e0d00495 100644 --- a/docs/source/AdministratorGuide/Systems/WorkloadManagement/architecture.rst +++ b/docs/source/AdministratorGuide/Systems/WorkloadManagement/architecture.rst @@ -25,15 +25,7 @@ SandboxMetadataDB TaskQueueDB The TaskQueueDB is used to organize jobs requirements into task queues, for easier matching. -All the DBs above are MySQL DBs, and should be installed using the :ref:`system administrator console `. - - The JobDB MySQL table *JobParameters* can be replaced by an JobParameters backend built in Elastic/OpenSearch. - To enable it, set the following flag:: - - /Operations/[Defaults | Setup]/Services/JobMonitoring/useESForJobParametersFlag=True - - If you decide to make use of this Elastic/OpenSearch backend for storing job parameters, you would be in charge of setting - the index policies, as Job Parameters stored in Elastic/OpenSearch are not deleted together with the jobs. +All the DBs above are MySQL DBs with the only exception of the Elastic/OpenSearch backend for storing job parameters. Services diff --git a/src/DIRAC/Core/Base/ElasticDB.py b/src/DIRAC/Core/Base/ElasticDB.py index 7675129bc0f..6a053af84f0 100644 --- a/src/DIRAC/Core/Base/ElasticDB.py +++ b/src/DIRAC/Core/Base/ElasticDB.py @@ -1,8 +1,8 @@ """ ElasticDB is a base class used to connect an Elasticsearch database and manages queries. """ +from DIRAC.ConfigurationSystem.Client.Utilities import getElasticDBParameters from DIRAC.Core.Base.DIRACDB import DIRACDB from DIRAC.Core.Utilities.ElasticSearchDB import ElasticSearchDB -from DIRAC.ConfigurationSystem.Client.Utilities import getElasticDBParameters class ElasticDB(DIRACDB, ElasticSearchDB): diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py b/src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py index 9e2b477dcbc..d423d7d3d10 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py @@ -4,15 +4,13 @@ optimizer instances and associated actions are performed there. """ -from DIRAC import S_OK, S_ERROR, exit as dExit - +from DIRAC import S_ERROR, S_OK +from DIRAC import exit as dExit from DIRAC.AccountingSystem.Client.Types.Job import Job as AccountingJob from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations -from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB @@ -48,17 +46,12 @@ def initialize(self, jobDB=None, logDB=None): if not self.jobDB.isValid(): dExit(1) - useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False) - if useESForJobParametersFlag: - try: - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" - ) - if not result["OK"]: - return result - self.elasticJobParametersDB = result["Value"]() - except RuntimeError as excp: - return S_ERROR(f"Can't connect to DB: {excp}") + result = ObjectLoader().loadObject( + "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + ) + if not result["OK"]: + return result + self.elasticJobParametersDB = result["Value"]() self.logDB = JobLoggingDB() if logDB is None else logDB @@ -244,9 +237,7 @@ def setJobParam(self, job, reportName, value): return S_OK() self.log.debug(f"setJobParameter({job},'{reportName}','{value}')") - if self.elasticJobParametersDB: - return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value) - return self.jobDB.setJobParameter(job, reportName, value) + return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value) ############################################################################# def setFailedJob(self, job, msg, classAdJob=None): diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobState/CachedJobState.py b/src/DIRAC/WorkloadManagementSystem/Client/JobState/CachedJobState.py index 5a14f8d9a1c..63f2c34b0ea 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobState/CachedJobState.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobState/CachedJobState.py @@ -334,8 +334,6 @@ def getAttribute(self, name): def getAttributes(self, nameList=None): return self.__cacheDict("att", self.__jobState.getAttributes, nameList) - # JobParameters --- REMOVED - # Optimizer params def setOptParameter(self, name, value): diff --git a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py index a9f3c654f33..6c05301d7bb 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py @@ -1,21 +1,15 @@ """ Module containing a front-end to the ElasticSearch-based ElasticJobParametersDB. This is a drop-in replacement for MySQL-based table JobDB.JobParameters. - The reason for switching to a ES-based JobParameters lies in the extended searching - capabilities of ES. - This results in higher traceability for DIRAC jobs. - The following class methods are provided for public usage - getJobParameters() - setJobParameter() - deleteJobParameters() """ from DIRAC import S_ERROR, S_OK -from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals from DIRAC.Core.Base.ElasticDB import ElasticDB from DIRAC.Core.Utilities import TimeUtilities - mapping = { "properties": { "JobID": {"type": "long"}, @@ -38,24 +32,22 @@ class ElasticJobParametersDB(ElasticDB): def __init__(self, parentLogger=None): """Standard Constructor""" - try: - indexPrefix = CSGlobals.getSetup().lower() + self.fullname = "WorkloadManagement/ElasticJobParametersDB" + self.index_name = self.getCSOption("index_name", "job_parameters") + try: # Connecting to the ES cluster - super().__init__("WorkloadManagement/ElasticJobParametersDB", indexPrefix, parentLogger=parentLogger) + super().__init__(self.fullname, self.index_name, parentLogger=parentLogger) except Exception as ex: - self.log.error("Can't connect to ElasticJobParametersDB", repr(ex)) raise RuntimeError("Can't connect to ElasticJobParametersDB") from ex - self.indexName_base = f"{self.getIndexPrefix()}_elasticjobparameters_index" - def _indexName(self, jobID: int) -> str: """construct the index name :param jobID: Job ID """ - indexSplit = int(jobID) // 1e6 - return f"{self.indexName_base}_{indexSplit}m" + indexSplit = int(jobID // 1e6) + return f"{self.index_name}_{indexSplit}m" def _createIndex(self, indexName: str) -> None: """Create a new index if needed diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 32729e2a8d3..191a776c5a6 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -1062,11 +1062,10 @@ def removeJobFromDB(self, jobIDs): if not result["OK"]: failedTablesList.append(table) - result = S_OK() if failedTablesList: - result = S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})") + return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})") - return result + return S_OK() ############################################################################# def rescheduleJob(self, jobID): diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 41f75b76eae..71cc90b7ea8 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -35,18 +35,12 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") - cls.elasticJobParametersDB = None - useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False) - if useESForJobParametersFlag: - try: - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" - ) - if not result["OK"]: - return result - cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log) - except RuntimeError as excp: - return S_ERROR(f"Can't connect to DB: {excp}") + result = ObjectLoader().loadObject( + "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + ) + if not result["OK"]: + return result + cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log) cls.pilotManager = PilotManagerClient() return S_OK() @@ -446,14 +440,7 @@ def export_getJobParameter(cls, jobID, parName): :param str/int jobID: one single Job ID :param str parName: one single parameter name """ - if cls.elasticJobParametersDB: - res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName]) - if not res["OK"]: - return res - if res["Value"].get(int(jobID)): - return S_OK(res["Value"][int(jobID)]) - - res = cls.jobDB.getJobParameters(jobID, [parName]) + res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName]) if not res["OK"]: return res return S_OK(res["Value"].get(int(jobID), {})) @@ -475,34 +462,31 @@ def export_getJobParameters(cls, jobIDs, parName=None): :param str/int/list jobIDs: one single job ID or a list of them :param str parName: one single parameter name, a list or None (meaning all of them) """ - if cls.elasticJobParametersDB: - if not isinstance(jobIDs, list): - jobIDs = [jobIDs] - parameters = {} - for jobID in jobIDs: - res = cls.elasticJobParametersDB.getJobParameters(jobID, parName) - if not res["OK"]: - return res - parameters.update(res["Value"]) - - # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends - res = cls.jobDB.getJobParameters(jobIDs, parName) + if not isinstance(jobIDs, list): + jobIDs = [jobIDs] + parameters = {} + for jobID in jobIDs: + res = cls.elasticJobParametersDB.getJobParameters(jobID, parName) if not res["OK"]: return res - parametersM = res["Value"] - - # and now combine - final = dict(parametersM) - # if job in JobDB, update with parameters from ES if any - for jobID in final: - final[jobID].update(parameters.get(jobID, {})) - # if job in ES and not in JobDB, take ES - for jobID in parameters: - if jobID not in final: - final[jobID] = parameters[jobID] - return S_OK(final) - - return cls.jobDB.getJobParameters(jobIDs, parName) + parameters.update(res["Value"]) + + # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends + res = cls.jobDB.getJobParameters(jobIDs, parName) + if not res["OK"]: + return res + parametersM = res["Value"] + + # and now combine + final = dict(parametersM) + # if job in JobDB, update with parameters from ES if any + for jobID in final: + final[jobID].update(parameters.get(jobID, {})) + # if job in ES and not in JobDB, take ES + for jobID in parameters: + if jobID not in final: + final[jobID] = parameters[jobID] + return S_OK(final) ############################################################################## types_getAtticJobParameters = [int] diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py index 868b6f926d7..a8a67b6c301 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py @@ -12,7 +12,6 @@ from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility @@ -37,17 +36,12 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") - cls.elasticJobParametersDB = None - if Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False): - try: - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" - ) - if not result["OK"]: - return result - cls.elasticJobParametersDB = result["Value"]() - except RuntimeError as excp: - return S_ERROR(f"Can't connect to DB: {excp}") + result = ObjectLoader().loadObject( + "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + ) + if not result["OK"]: + return result + cls.elasticJobParametersDB = result["Value"]() cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB, cls.elasticJobParametersDB) @@ -165,10 +159,7 @@ def export_setJobParameter(cls, jobID, name, value): for job specified by its JobId """ - if cls.elasticJobParametersDB: - return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member - - return cls.jobDB.setJobParameter(int(jobID), name, value) + return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member ########################################################################### types_setJobsParameter = [dict] @@ -182,23 +173,13 @@ def export_setJobsParameter(cls, jobsParameterDict): failed = False for jobID in jobsParameterDict: - if cls.elasticJobParametersDB: - res = cls.elasticJobParametersDB.setJobParameter( - int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1]) - ) - if not res["OK"]: - cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"]) - failed = True - message = res["Message"] - - else: - res = cls.jobDB.setJobParameter( - jobID, str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1]) - ) - if not res["OK"]: - cls.log.error("Failed to add Job Parameter to MySQL", res["Message"]) - failed = True - message = res["Message"] + res = cls.elasticJobParametersDB.setJobParameter( + int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1]) + ) + if not res["OK"]: + cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"]) + failed = True + message = res["Message"] if failed: return S_ERROR(message) @@ -213,14 +194,9 @@ def export_setJobParameters(cls, jobID, parameters): """Set arbitrary parameters specified by a list of name/value pairs for job specified by its JobId """ - if cls.elasticJobParametersDB: - result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters) - if not result["OK"]: - cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"]) - else: - result = cls.jobDB.setJobParameters(int(jobID), parameters) - if not result["OK"]: - cls.log.error("Failed to add Job Parameters to MySQL", result["Message"]) + result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters) + if not result["OK"]: + cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"]) return result @@ -235,15 +211,10 @@ def export_sendHeartBeat(cls, jobID, dynamicData, staticData): if not result["OK"]: cls.log.warn("Failed to set the heart beat data", f"for job {jobID} ") - if cls.elasticJobParametersDB: - for key, value in staticData.items(): - result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value) - if not result["OK"]: - cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"]) - else: - result = cls.jobDB.setJobParameters(int(jobID), list(staticData.items())) + for key, value in staticData.items(): + result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value) if not result["OK"]: - cls.log.error("Failed to add Job Parameters to MySQL", result["Message"]) + cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"]) # Restore the Running status if necessary result = cls.jobDB.getJobAttributes(jobID, ["Status"]) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index e64c88efd3f..096472ecbb0 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -5,7 +5,6 @@ from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient @@ -22,18 +21,12 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp!r}") - cls.elasticJobParametersDB = None - useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False) - if useESForJobParametersFlag: - try: - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" - ) - if not result["OK"]: - return result - cls.elasticJobParametersDB = result["Value"]() - except RuntimeError as excp: - return S_ERROR(f"Can't connect to DB: {excp!r}") + result = ObjectLoader().loadObject( + "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + ) + if not result["OK"]: + return result + cls.elasticJobParametersDB = result["Value"]() cls.pilotManager = PilotManagerClient() diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py index a3e9e00e53d..4726632dfdb 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py @@ -55,17 +55,12 @@ def __init__( raise if not self.elasticJobParametersDB: - if Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False): - try: - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" - ) - if not result["OK"]: - raise AttributeError(result["Message"]) - self.elasticJobParametersDB = result["Value"](parentLogger=self.log) - except RuntimeError: - self.log.error("Can't connect to the JobLoggingDB") - raise + result = ObjectLoader().loadObject( + "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" + ) + if not result["OK"]: + raise AttributeError(result["Message"]) + self.elasticJobParametersDB = result["Value"](parentLogger=self.log) def setJobStatus( self, jobID: int, status=None, minorStatus=None, appStatus=None, source=None, dateTime=None, force=False diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index ac0ba6bb94e..5214c8f06d4 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -508,7 +508,6 @@ def test_JobStateUpdateAndJobMonitoringMultiple(lfn: str) -> None: res = jobMonitoringClient.getJobTypes() assert res["OK"], res["Message"] - assert sorted(res["Value"]) == sorted(types) res = jobMonitoringClient.getApplicationStates() assert res["OK"], res["Message"] diff --git a/tests/Integration/WorkloadManagementSystem/Test_ElasticJobParametersDB.py b/tests/Integration/WorkloadManagementSystem/Test_ElasticJobParametersDB.py index 10f8372ff40..90da7bf9a2d 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_ElasticJobParametersDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_ElasticJobParametersDB.py @@ -150,7 +150,7 @@ def test_setAndGetJobFromDB(): assert len(res["Value"][1010000]) == 0 # delete the indexes - res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB.indexName_base) + res = elasticJobParametersDB.deleteIndex("job_parameters") assert res["OK"] assert res["Value"] == "Nothing to delete" res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(100)) diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py index 767afc2d912..3dacf06c512 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py @@ -301,24 +301,6 @@ def test_heartBeatLogging(jobDB): assert not res["Value"], str(res) -def test_getJobParameters(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") - assert res["OK"], res["Message"] - jobID = res["JobID"] - - res = jobDB.getJobParameters(jobID) - assert res["OK"], res["Message"] - assert res["Value"] == {}, res["Value"] - - res = jobDB.getJobParameters([jobID]) - assert res["OK"], res["Message"] - assert res["Value"] == {}, res["Value"] - - res = jobDB.getJobParameters(jobID, "not") - assert res["OK"], res["Message"] - assert res["Value"] == {}, res["Value"] - - def test_setJobsMajorStatus(jobDB): res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo") assert res["OK"], res["Message"] diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py b/tests/Integration/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py deleted file mode 100644 index 2d1087d47de..00000000000 --- a/tests/Integration/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py +++ /dev/null @@ -1,213 +0,0 @@ -#!/usr/bin/env python -""" Test specific of JobParameters with and without the flag in for ES backend - - flag in /Operations/[]/Services/JobMonitoring/useESForJobParametersFlag -""" - -# pylint: disable=wrong-import-position, missing-docstring - -import os -import time - -import DIRAC - -DIRAC.initialize() # Initialize configuration - -from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI -from DIRAC.Interfaces.API.Dirac import Dirac - -# sut -from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient -from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient - -from DIRAC.tests.Utilities.WMS import helloWorldJob - -jobMonitoringClient = JobMonitoringClient() -jobStateUpdateClient = JobStateUpdateClient() - - -def createJob(): - job = helloWorldJob() - - res = Dirac().submitJob(job) - assert res["OK"], res["Message"] - jobID = int(res["Value"]) - return jobID - - -def updateFlag(): - # Here now setting the flag as the following inside /Operations/Defaults: - # in Operations/Defaults/Services/JobMonitoring/useESForJobParametersFlag - - csAPI = CSAPI() - - res = csAPI.createSection("Operations/Defaults/Services/") - if not res["OK"]: - print(res["Message"]) - exit(1) - - res = csAPI.createSection("Operations/Defaults/Services/JobMonitoring/") - if not res["OK"]: - print(res["Message"]) - exit(1) - csAPI.setOption("Operations/Defaults/Services/JobMonitoring/useESForJobParametersFlag", True) - - csAPI.commit() - - # Now we need to restart the services for the new configuration to be picked up - - time.sleep(5) - - os.system("dirac-restart-component WorkloadManagement JobMonitoring") - os.system("dirac-restart-component WorkloadManagement JobStateUpdate") - os.system("dirac-restart-component WorkloadManagement JobManager") - os.system("dirac-restart-component Tornado Tornado") - - time.sleep(5) - - -def _checkWithRetries(fcn, args, expected): - for _ in range(3): - res = fcn(*args) - assert res["OK"], res["Message"] - if res["Value"] == expected: - return - time.sleep(1) - assert res["Value"] == expected, "Failed to call %s after 3 retries" - - -def test_MySQLandES_jobParameters(): - """a basic put - remove test, changing the flag in between""" - - # First, create a job - jobID = createJob() - - # Use the MySQL backend - - res = jobStateUpdateClient.setJobParameter(jobID, "ParName-fromMySQL", "ParValue-fromMySQL") - assert res["OK"], res["Message"] - _checkWithRetries( - jobMonitoringClient.getJobParameter, - (jobID, "ParName-fromMySQL"), - {"ParName-fromMySQL": "ParValue-fromMySQL"}, - ) - - res = jobMonitoringClient.getJobParameters(jobID) # This will be looked up in MySQL only - assert res["OK"], res["Message"] - assert isinstance(res["Value"], dict), res["Value"] - assert res["Value"] == {jobID: {"ParName-fromMySQL": "ParValue-fromMySQL"}}, res["Value"] - - res = jobMonitoringClient.getJobOwner(jobID) - assert res["OK"], res["Message"] - assert res["Value"] == "adminusername", res["Value"] - - res = jobStateUpdateClient.setJobsParameter({jobID: ["SomeStatus", "Waiting"]}) - assert res["OK"], res["Message"] - _checkWithRetries( - jobMonitoringClient.getJobParameters, - (jobID,), - {jobID: {"ParName-fromMySQL": "ParValue-fromMySQL", "SomeStatus": "Waiting"}}, - ) - - res = jobMonitoringClient.getJobAttributes(jobID) - assert res["OK"], res["Message"] - - # changing to use the ES flag - updateFlag() - # So now we are using the ES backend - - # This will still be in MySQL, but first it will look if it's in ES - _checkWithRetries( - jobMonitoringClient.getJobParameter, (jobID, "ParName-fromMySQL"), {"ParName-fromMySQL": "ParValue-fromMySQL"} - ) - - # Now we insert (in ES) - res = jobStateUpdateClient.setJobParameter(jobID, "ParName-fromES", "ParValue-fromES") - time.sleep(2) # sleep to give time to ES to index - assert res["OK"], res["Message"] - - res = jobMonitoringClient.getJobParameter(jobID, "ParName-fromES") # This will be in ES - assert res["OK"], res["Message"] - assert res["Value"]["ParName-fromES"] == "ParValue-fromES", res["Value"] - - res = jobMonitoringClient.getJobOwner(jobID) - assert res["OK"], res["Message"] - assert res["Value"] == "adminusername", res["Value"] - - # These parameters will be looked up in MySQL and in ES, and combined - res = jobMonitoringClient.getJobParameters(jobID) - assert res["OK"], res["Message"] - assert res["Value"] == { - jobID: { - "JobID": jobID, - "ParName-fromMySQL": "ParValue-fromMySQL", - "SomeStatus": "Waiting", - "ParName-fromES": "ParValue-fromES", - "timestamp": res["Value"][jobID]["timestamp"], - } - }, res["Value"] - - # Do it again - res = jobMonitoringClient.getJobParameters(jobID) - assert res["OK"], res["Message"] - assert res["Value"] == { - jobID: { - "JobID": jobID, - "ParName-fromMySQL": "ParValue-fromMySQL", - "SomeStatus": "Waiting", - "ParName-fromES": "ParValue-fromES", - "timestamp": res["Value"][jobID]["timestamp"], - } - }, res["Value"] - - # this is updating an existing parameter, but in practice it will be in ES only, - # while in MySQL the old status "Waiting" will stay - res = jobStateUpdateClient.setJobsParameter({jobID: ["SomeStatus", "Matched"]}) - time.sleep(2) # sleep to give time to ES to index - assert res["OK"], res["Message"] - - res = jobMonitoringClient.getJobParameters(jobID) - assert res["OK"], res["Message"] - assert res["Value"][jobID]["SomeStatus"] == "Matched", res["Value"] - - # again updating the same parameter - res = jobStateUpdateClient.setJobsParameter({jobID: ["SomeStatus", "Running"]}) - time.sleep(2) # sleep to give time to ES to index - assert res["OK"], res["Message"] - - res = jobMonitoringClient.getJobParameters(jobID) - assert res["OK"], res["Message"] - assert res["Value"][jobID]["SomeStatus"] == "Running", res["Value"] - - # Now we create a second job - secondJobID = createJob() - - res = jobMonitoringClient.getJobParameter(secondJobID, "ParName-fromMySQL") - assert res["OK"], res["Message"] - - # Now we insert (in ES) - res = jobStateUpdateClient.setJobParameter(secondJobID, "ParName-fromES-2", "ParValue-fromES-2") - time.sleep(2) # sleep to give time to ES to index - assert res["OK"], res["Message"] - - res = jobMonitoringClient.getJobParameter(secondJobID, "ParName-fromES-2") # This will be in ES - assert res["OK"], res["Message"] - assert res["Value"]["ParName-fromES-2"] == "ParValue-fromES-2", res["Value"] - - # These parameters will be looked up in MySQL and in ES, and combined - res = jobMonitoringClient.getJobParameters([jobID, secondJobID]) - assert res["OK"], res["Message"] - - assert res["Value"][jobID]["ParName-fromMySQL"] == "ParValue-fromMySQL", res["Value"] - assert res["Value"][jobID]["SomeStatus"] == "Running", res["Value"] - assert res["Value"][jobID]["ParName-fromES"] == "ParValue-fromES", res["Value"] - - assert res["Value"][secondJobID]["ParName-fromES-2"] == "ParValue-fromES-2", res["Value"] - - # These parameters will be looked up in MySQL and in ES, and combined - res = jobMonitoringClient.getJobParameters([jobID, secondJobID], "SomeStatus") - assert res["OK"], res["Message"] - assert res["Value"][jobID]["SomeStatus"] == "Running", res["Value"] - - res = jobMonitoringClient.getJobAttributes(jobID) # these will still be all in MySQL - assert res["OK"], res["Message"] diff --git a/tests/Integration/all_integration_server_tests.sh b/tests/Integration/all_integration_server_tests.sh index 57cd8a4c02e..ce8d0329012 100644 --- a/tests/Integration/all_integration_server_tests.sh +++ b/tests/Integration/all_integration_server_tests.sh @@ -50,7 +50,6 @@ pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_J pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobLoggingDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_TaskQueueDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_ElasticJobParametersDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) -pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotAgentsDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_SandboxMetadataDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) # pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobCleaningAgent.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" ))