Skip to content

Commit

Permalink
feat: for JobParameters, only use Elastic/OpenSearch
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Apr 17, 2024
1 parent 0f463fd commit 504630a
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 381 deletions.
14 changes: 14 additions & 0 deletions dirac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,20 @@ Systems
##END
}
}
WorkloadManagementSystem
{
Databases
{
ElasticJobParametersDB
{
# Host of OpenSearch instance
Host=host.some.where

# index name (default is "job_parameters")
index_name=a_different_name
}
}
}
}
Resources
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ SandboxMetadataDB
TaskQueueDB
The TaskQueueDB is used to organize jobs requirements into task queues, for easier matching.

All the DBs above are MySQL DBs, and should be installed using the :ref:`system administrator console <system-admin-console>`.

The JobDB MySQL table *JobParameters* can be replaced by an JobParameters backend built in Elastic/OpenSearch.
To enable it, set the following flag::

/Operations/[Defaults | Setup]/Services/JobMonitoring/useESForJobParametersFlag=True

If you decide to make use of this Elastic/OpenSearch backend for storing job parameters, you would be in charge of setting
the index policies, as Job Parameters stored in Elastic/OpenSearch are not deleted together with the jobs.
All the DBs above are MySQL DBs with the only exception of the Elastic/OpenSearch backend for storing job parameters.


Services
Expand Down
21 changes: 7 additions & 14 deletions src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,12 @@ def initialize(self, jobDB=None, logDB=None):
if not self.jobDB.isValid():
dExit(1)

useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
self.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
self.elasticJobParametersDB = result["Value"]()

self.logDB = JobLoggingDB() if logDB is None else logDB

Expand Down Expand Up @@ -244,9 +239,7 @@ def setJobParam(self, job, reportName, value):
return S_OK()

self.log.debug(f"setJobParameter({job},'{reportName}','{value}')")
if self.elasticJobParametersDB:
return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value)
return self.jobDB.setJobParameter(job, reportName, value)
return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value)

#############################################################################
def setFailedJob(self, job, msg, classAdJob=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ def getAttribute(self, name):
def getAttributes(self, nameList=None):
return self.__cacheDict("att", self.__jobState.getAttributes, nameList)

# JobParameters --- REMOVED

# Optimizer params

def setOptParameter(self, name, value):
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,11 +1062,10 @@ def removeJobFromDB(self, jobIDs):
if not result["OK"]:
failedTablesList.append(table)

result = S_OK()
if failedTablesList:
result = S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")
return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")

return result
return S_OK()

#############################################################################
def rescheduleJob(self, jobID):
Expand Down
76 changes: 30 additions & 46 deletions src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

cls.elasticJobParametersDB = None
useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log)
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log)

cls.pilotManager = PilotManagerClient()
return S_OK()
Expand Down Expand Up @@ -446,14 +440,7 @@ def export_getJobParameter(cls, jobID, parName):
:param str/int jobID: one single Job ID
:param str parName: one single parameter name
"""
if cls.elasticJobParametersDB:
res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName])
if not res["OK"]:
return res
if res["Value"].get(int(jobID)):
return S_OK(res["Value"][int(jobID)])

res = cls.jobDB.getJobParameters(jobID, [parName])
res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName])
if not res["OK"]:
return res
return S_OK(res["Value"].get(int(jobID), {}))
Expand All @@ -475,34 +462,31 @@ def export_getJobParameters(cls, jobIDs, parName=None):
:param str/int/list jobIDs: one single job ID or a list of them
:param str parName: one single parameter name, a list or None (meaning all of them)
"""
if cls.elasticJobParametersDB:
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)

return cls.jobDB.getJobParameters(jobIDs, parName)
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)

##############################################################################
types_getAtticJobParameters = [int]
Expand Down
69 changes: 20 additions & 49 deletions src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility

Expand All @@ -37,17 +36,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

cls.elasticJobParametersDB = None
if Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False):
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()

cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB, cls.elasticJobParametersDB)

Expand Down Expand Up @@ -165,10 +159,7 @@ def export_setJobParameter(cls, jobID, name, value):
for job specified by its JobId
"""

if cls.elasticJobParametersDB:
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member

return cls.jobDB.setJobParameter(int(jobID), name, value)
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member

###########################################################################
types_setJobsParameter = [dict]
Expand All @@ -182,23 +173,13 @@ def export_setJobsParameter(cls, jobsParameterDict):
failed = False

for jobID in jobsParameterDict:
if cls.elasticJobParametersDB:
res = cls.elasticJobParametersDB.setJobParameter(
int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"])
failed = True
message = res["Message"]

else:
res = cls.jobDB.setJobParameter(
jobID, str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to MySQL", res["Message"])
failed = True
message = res["Message"]
res = cls.elasticJobParametersDB.setJobParameter(
int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"])
failed = True
message = res["Message"]

if failed:
return S_ERROR(message)
Expand All @@ -213,14 +194,9 @@ def export_setJobParameters(cls, jobID, parameters):
"""Set arbitrary parameters specified by a list of name/value pairs
for job specified by its JobId
"""
if cls.elasticJobParametersDB:
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"])
else:
result = cls.jobDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to MySQL", result["Message"])
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"])

return result

Expand All @@ -235,15 +211,10 @@ def export_sendHeartBeat(cls, jobID, dynamicData, staticData):
if not result["OK"]:
cls.log.warn("Failed to set the heart beat data", f"for job {jobID} ")

if cls.elasticJobParametersDB:
for key, value in staticData.items():
result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"])
else:
result = cls.jobDB.setJobParameters(int(jobID), list(staticData.items()))
for key, value in staticData.items():
result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to MySQL", result["Message"])
cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"])

# Restore the Running status if necessary
result = cls.jobDB.getJobAttributes(jobID, ["Status"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient

Expand All @@ -22,18 +21,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp!r}")

cls.elasticJobParametersDB = None
useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp!r}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()

cls.pilotManager = PilotManagerClient()

Expand Down
17 changes: 6 additions & 11 deletions src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,12 @@ def __init__(
raise

if not self.elasticJobParametersDB:
if Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False):
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
raise AttributeError(result["Message"])
self.elasticJobParametersDB = result["Value"](parentLogger=self.log)
except RuntimeError:
self.log.error("Can't connect to the JobLoggingDB")
raise
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
raise AttributeError(result["Message"])
self.elasticJobParametersDB = result["Value"](parentLogger=self.log)

def setJobStatus(
self, jobID: int, status=None, minorStatus=None, appStatus=None, source=None, dateTime=None, force=False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ def test_JobStateUpdateAndJobMonitoringMultiple(lfn: str) -> None:

res = jobMonitoringClient.getJobTypes()
assert res["OK"], res["Message"]
assert sorted(res["Value"]) == sorted(types)

res = jobMonitoringClient.getApplicationStates()
assert res["OK"], res["Message"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def test_setAndGetJobFromDB():
assert len(res["Value"][1010000]) == 0

# delete the indexes
res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB.indexName_base)
res = elasticJobParametersDB.deleteIndex("job_parameters")
assert res["OK"]
assert res["Value"] == "Nothing to delete"
res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(100))
Expand Down
Loading

0 comments on commit 504630a

Please sign in to comment.