Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.0] Job parameters: use only OpenSearch #7292

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions dirac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,20 @@ Systems
##END
}
}
WorkloadManagementSystem
{
Databases
{
ElasticJobParametersDB
{
# Host of OpenSearch instance
Host=host.some.where

# index name (default is "job_parameters")
index_name=a_different_name
}
}
}
}
Resources
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ SandboxMetadataDB
TaskQueueDB
The TaskQueueDB is used to organize jobs requirements into task queues, for easier matching.

All the DBs above are MySQL DBs, and should be installed using the :ref:`system administrator console <system-admin-console>`.

The JobDB MySQL table *JobParameters* can be replaced by an JobParameters backend built in Elastic/OpenSearch.
To enable it, set the following flag::

/Operations/[Defaults | Setup]/Services/JobMonitoring/useESForJobParametersFlag=True

If you decide to make use of this Elastic/OpenSearch backend for storing job parameters, you would be in charge of setting
the index policies, as Job Parameters stored in Elastic/OpenSearch are not deleted together with the jobs.
All the DBs above are MySQL DBs with the only exception of the Elastic/OpenSearch backend for storing job parameters.


Services
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Core/Base/ElasticDB.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
""" ElasticDB is a base class used to connect an Elasticsearch database and manages queries.
"""
from DIRAC.ConfigurationSystem.Client.Utilities import getElasticDBParameters
from DIRAC.Core.Base.DIRACDB import DIRACDB
from DIRAC.Core.Utilities.ElasticSearchDB import ElasticSearchDB
from DIRAC.ConfigurationSystem.Client.Utilities import getElasticDBParameters


class ElasticDB(DIRACDB, ElasticSearchDB):
Expand Down
29 changes: 10 additions & 19 deletions src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
optimizer instances and associated actions are performed there.
"""

from DIRAC import S_OK, S_ERROR, exit as dExit

from DIRAC import S_ERROR, S_OK
from DIRAC import exit as dExit
from DIRAC.AccountingSystem.Client.Types.Job import Job as AccountingJob
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB

Expand Down Expand Up @@ -48,17 +46,12 @@ def initialize(self, jobDB=None, logDB=None):
if not self.jobDB.isValid():
dExit(1)

useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
self.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
self.elasticJobParametersDB = result["Value"]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that try/except for RuntimeError that was there before, really no longer needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it's ObjectLoader().loadObject() that should catch that. Do you agree?

Copy link
Contributor

@andresailer andresailer Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the throwing comes from calling the c'tor of ElasticJobParametersDB in this line here, doesn't it?

        self.elasticJobParametersDB = result["Value"]()
                                                     ^^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. But, anyway, this is initialize() and it's OK to see the exception, if it happens.


self.logDB = JobLoggingDB() if logDB is None else logDB

Expand Down Expand Up @@ -244,9 +237,7 @@ def setJobParam(self, job, reportName, value):
return S_OK()

self.log.debug(f"setJobParameter({job},'{reportName}','{value}')")
if self.elasticJobParametersDB:
return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value)
return self.jobDB.setJobParameter(job, reportName, value)
return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value)

#############################################################################
def setFailedJob(self, job, msg, classAdJob=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ def getAttribute(self, name):
def getAttributes(self, nameList=None):
return self.__cacheDict("att", self.__jobState.getAttributes, nameList)

# JobParameters --- REMOVED

# Optimizer params

def setOptParameter(self, name, value):
Expand Down
20 changes: 6 additions & 14 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
""" Module containing a front-end to the ElasticSearch-based ElasticJobParametersDB.
This is a drop-in replacement for MySQL-based table JobDB.JobParameters.

The reason for switching to a ES-based JobParameters lies in the extended searching
capabilities of ES.
This results in higher traceability for DIRAC jobs.

The following class methods are provided for public usage
- getJobParameters()
- setJobParameter()
- deleteJobParameters()
"""
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals
from DIRAC.Core.Base.ElasticDB import ElasticDB
from DIRAC.Core.Utilities import TimeUtilities


mapping = {
"properties": {
"JobID": {"type": "long"},
Expand All @@ -38,24 +32,22 @@ class ElasticJobParametersDB(ElasticDB):
def __init__(self, parentLogger=None):
"""Standard Constructor"""

try:
indexPrefix = CSGlobals.getSetup().lower()
self.fullname = "WorkloadManagement/ElasticJobParametersDB"
self.index_name = self.getCSOption("index_name", "job_parameters")
fstagni marked this conversation as resolved.
Show resolved Hide resolved

try:
# Connecting to the ES cluster
super().__init__("WorkloadManagement/ElasticJobParametersDB", indexPrefix, parentLogger=parentLogger)
super().__init__(self.fullname, self.index_name, parentLogger=parentLogger)
except Exception as ex:
self.log.error("Can't connect to ElasticJobParametersDB", repr(ex))
raise RuntimeError("Can't connect to ElasticJobParametersDB") from ex

self.indexName_base = f"{self.getIndexPrefix()}_elasticjobparameters_index"

def _indexName(self, jobID: int) -> str:
"""construct the index name

:param jobID: Job ID
"""
indexSplit = int(jobID) // 1e6
return f"{self.indexName_base}_{indexSplit}m"
indexSplit = int(jobID // 1e6)
return f"{self.index_name}_{indexSplit}m"

def _createIndex(self, indexName: str) -> None:
"""Create a new index if needed
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,11 +1062,10 @@ def removeJobFromDB(self, jobIDs):
if not result["OK"]:
failedTablesList.append(table)

result = S_OK()
if failedTablesList:
result = S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")
return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")

return result
return S_OK()

#############################################################################
def rescheduleJob(self, jobID):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

cls.elasticJobParametersDB = None
useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log)
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
fstagni marked this conversation as resolved.
Show resolved Hide resolved
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log)

cls.pilotManager = PilotManagerClient()
return S_OK()
Expand Down Expand Up @@ -446,14 +440,7 @@ def export_getJobParameter(cls, jobID, parName):
:param str/int jobID: one single Job ID
:param str parName: one single parameter name
"""
if cls.elasticJobParametersDB:
res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName])
if not res["OK"]:
return res
if res["Value"].get(int(jobID)):
return S_OK(res["Value"][int(jobID)])

res = cls.jobDB.getJobParameters(jobID, [parName])
res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName])
if not res["OK"]:
return res
return S_OK(res["Value"].get(int(jobID), {}))
Expand All @@ -475,34 +462,31 @@ def export_getJobParameters(cls, jobIDs, parName=None):
:param str/int/list jobIDs: one single job ID or a list of them
:param str parName: one single parameter name, a list or None (meaning all of them)
"""
if cls.elasticJobParametersDB:
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)

return cls.jobDB.getJobParameters(jobIDs, parName)
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)

##############################################################################
types_getAtticJobParameters = [int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility

Expand All @@ -37,17 +36,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

cls.elasticJobParametersDB = None
if Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False):
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()

cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB, cls.elasticJobParametersDB)

Expand Down Expand Up @@ -165,10 +159,7 @@ def export_setJobParameter(cls, jobID, name, value):
for job specified by its JobId
"""

if cls.elasticJobParametersDB:
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member

return cls.jobDB.setJobParameter(int(jobID), name, value)
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member

###########################################################################
types_setJobsParameter = [dict]
Expand All @@ -182,23 +173,13 @@ def export_setJobsParameter(cls, jobsParameterDict):
failed = False

for jobID in jobsParameterDict:
if cls.elasticJobParametersDB:
res = cls.elasticJobParametersDB.setJobParameter(
int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"])
failed = True
message = res["Message"]

else:
res = cls.jobDB.setJobParameter(
jobID, str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to MySQL", res["Message"])
failed = True
message = res["Message"]
res = cls.elasticJobParametersDB.setJobParameter(
int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"])
failed = True
message = res["Message"]

if failed:
return S_ERROR(message)
Expand All @@ -213,14 +194,9 @@ def export_setJobParameters(cls, jobID, parameters):
"""Set arbitrary parameters specified by a list of name/value pairs
for job specified by its JobId
"""
if cls.elasticJobParametersDB:
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"])
else:
result = cls.jobDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to MySQL", result["Message"])
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"])

return result

Expand All @@ -235,15 +211,10 @@ def export_sendHeartBeat(cls, jobID, dynamicData, staticData):
if not result["OK"]:
cls.log.warn("Failed to set the heart beat data", f"for job {jobID} ")

if cls.elasticJobParametersDB:
for key, value in staticData.items():
result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"])
else:
result = cls.jobDB.setJobParameters(int(jobID), list(staticData.items()))
for key, value in staticData.items():
result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to MySQL", result["Message"])
cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"])

# Restore the Running status if necessary
result = cls.jobDB.getJobAttributes(jobID, ["Status"])
Expand Down
Loading
Loading