Skip to content

Commit

Permalink
feat: for JobParameters, only use Elastic/OpenSearch
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Nov 14, 2023
1 parent 66fac1e commit 47fa1fd
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ SandboxMetadataDB
TaskQueueDB
The TaskQueueDB is used to organize jobs requirements into task queues, for easier matching.

All the DBs above are MySQL DBs, and should be installed using the :ref:`system administrator console <system-admin-console>`.

The JobDB MySQL table *JobParameters* can be replaced by an JobParameters backend built in Elastic/OpenSearch.
To enable it, set the following flag::

/Operations/[Defaults | Setup]/Services/JobMonitoring/useESForJobParametersFlag=True

If you decide to make use of this Elastic/OpenSearch backend for storing job parameters, you would be in charge of setting
the index policies, as Job Parameters stored in Elastic/OpenSearch are not deleted together with the jobs.
All the DBs above are MySQL DBs with the only exception of the Elastic/OpenSearch backend for storing job parameters.


Services
Expand Down
21 changes: 7 additions & 14 deletions src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,12 @@ def initialize(self, jobDB=None, logDB=None):
if not self.jobDB.isValid():
dExit(1)

useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
self.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
self.elasticJobParametersDB = result["Value"]()

self.logDB = JobLoggingDB() if logDB is None else logDB

Expand Down Expand Up @@ -244,9 +239,7 @@ def setJobParam(self, job, reportName, value):
return S_OK()

self.log.debug(f"setJobParameter({job},'{reportName}','{value}')")
if self.elasticJobParametersDB:
return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value)
return self.jobDB.setJobParameter(job, reportName, value)
return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value)

#############################################################################
def setFailedJob(self, job, msg, classAdJob=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ def getAttribute(self, name):
def getAttributes(self, nameList=None):
return self.__cacheDict("att", self.__jobState.getAttributes, nameList)

# JobParameters --- REMOVED

# Optimizer params

def setOptParameter(self, name, value):
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,11 +1062,10 @@ def removeJobFromDB(self, jobIDs):
if not result["OK"]:
failedTablesList.append(table)

result = S_OK()
if failedTablesList:
result = S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")
return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")

return result
return S_OK()

#############################################################################
def rescheduleJob(self, jobID):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ def export_rescheduleJob(self, jobIDs):
)
for jobID in validJobList:
self.taskQueueDB.deleteJob(jobID)
# gJobDB.deleteJobFromQueue(jobID)
result = self.jobDB.rescheduleJob(jobID)
self.log.debug(str(result))
if not result["OK"]:
Expand Down
76 changes: 30 additions & 46 deletions src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

cls.elasticJobParametersDB = None
useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log)
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log)

cls.pilotManager = PilotManagerClient()
return S_OK()
Expand Down Expand Up @@ -446,14 +440,7 @@ def export_getJobParameter(cls, jobID, parName):
:param str/int jobID: one single Job ID
:param str parName: one single parameter name
"""
if cls.elasticJobParametersDB:
res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName])
if not res["OK"]:
return res
if res["Value"].get(int(jobID)):
return S_OK(res["Value"][int(jobID)])

res = cls.jobDB.getJobParameters(jobID, [parName])
res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName])
if not res["OK"]:
return res
return S_OK(res["Value"].get(int(jobID), {}))
Expand All @@ -475,34 +462,31 @@ def export_getJobParameters(cls, jobIDs, parName=None):
:param str/int/list jobIDs: one single job ID or a list of them
:param str parName: one single parameter name, a list or None (meaning all of them)
"""
if cls.elasticJobParametersDB:
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)

return cls.jobDB.getJobParameters(jobIDs, parName)
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)

##############################################################################
types_getAtticJobParameters = [int]
Expand Down
69 changes: 20 additions & 49 deletions src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility

Expand All @@ -37,17 +36,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

cls.elasticJobParametersDB = None
if Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False):
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()

cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB, cls.elasticJobParametersDB)

Expand Down Expand Up @@ -165,10 +159,7 @@ def export_setJobParameter(cls, jobID, name, value):
for job specified by its JobId
"""

if cls.elasticJobParametersDB:
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member

return cls.jobDB.setJobParameter(int(jobID), name, value)
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member

###########################################################################
types_setJobsParameter = [dict]
Expand All @@ -182,23 +173,13 @@ def export_setJobsParameter(cls, jobsParameterDict):
failed = False

for jobID in jobsParameterDict:
if cls.elasticJobParametersDB:
res = cls.elasticJobParametersDB.setJobParameter(
int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"])
failed = True
message = res["Message"]

else:
res = cls.jobDB.setJobParameter(
jobID, str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to MySQL", res["Message"])
failed = True
message = res["Message"]
res = cls.elasticJobParametersDB.setJobParameter(
int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"])
failed = True
message = res["Message"]

if failed:
return S_ERROR(message)
Expand All @@ -213,14 +194,9 @@ def export_setJobParameters(cls, jobID, parameters):
"""Set arbitrary parameters specified by a list of name/value pairs
for job specified by its JobId
"""
if cls.elasticJobParametersDB:
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"])
else:
result = cls.jobDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to MySQL", result["Message"])
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"])

return result

Expand All @@ -235,15 +211,10 @@ def export_sendHeartBeat(cls, jobID, dynamicData, staticData):
if not result["OK"]:
cls.log.warn("Failed to set the heart beat data", f"for job {jobID} ")

if cls.elasticJobParametersDB:
for key, value in staticData.items():
result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"])
else:
result = cls.jobDB.setJobParameters(int(jobID), list(staticData.items()))
for key, value in staticData.items():
result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to MySQL", result["Message"])
cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"])

# Restore the Running status if necessary
result = cls.jobDB.getJobAttributes(jobID, ["Status"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient

Expand All @@ -22,18 +21,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp!r}")

cls.elasticJobParametersDB = None
useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp!r}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()

cls.pilotManager = PilotManagerClient()

Expand Down
17 changes: 6 additions & 11 deletions src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,12 @@ def __init__(
raise

if not self.elasticJobParametersDB:
if Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False):
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
raise AttributeError(result["Message"])
self.elasticJobParametersDB = result["Value"](parentLogger=self.log)
except RuntimeError:
self.log.error("Can't connect to the JobLoggingDB")
raise
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
raise AttributeError(result["Message"])
self.elasticJobParametersDB = result["Value"](parentLogger=self.log)

def setJobStatus(
self, jobID: int, status=None, minorStatus=None, appStatus=None, source=None, dateTime=None, force=False
Expand Down
18 changes: 0 additions & 18 deletions tests/Integration/WorkloadManagementSystem/Test_JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,24 +301,6 @@ def test_heartBeatLogging(jobDB):
assert not res["Value"], str(res)


def test_getJobParameters(jobDB):
res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo")
assert res["OK"], res["Message"]
jobID = res["JobID"]

res = jobDB.getJobParameters(jobID)
assert res["OK"], res["Message"]
assert res["Value"] == {}, res["Value"]

res = jobDB.getJobParameters([jobID])
assert res["OK"], res["Message"]
assert res["Value"] == {}, res["Value"]

res = jobDB.getJobParameters(jobID, "not")
assert res["OK"], res["Message"]
assert res["Value"] == {}, res["Value"]


def test_setJobsMajorStatus(jobDB):
res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup", vo="vo")
assert res["OK"], res["Message"]
Expand Down
Loading

0 comments on commit 47fa1fd

Please sign in to comment.