Skip to content

Commit

Permalink
sweep: DIRACGrid#7603 Perform bulk lookup of job parameters from elas…
Browse files Browse the repository at this point in the history
…ticsearch
  • Loading branch information
fstagni committed May 10, 2024
1 parent 76d15cf commit 2808372
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
17 changes: 17 additions & 0 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,23 @@ def getDoc(self, index: str, docID: str) -> dict:
except RequestError as re:
return S_ERROR(re)

@ifConnected
def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]:
"""Efficiently retrieve many documents from an index.
:param index: name of the index
:param docIDs: document IDs
"""
sLog.debug(f"Retrieving documents {docIDs}")
docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs]
try:
response = self.client.mget({"docs": docs})
except RequestError as re:
return S_ERROR(re)
else:
results = {int(x["_id"]): x["_source"] if x.get("found") else {} for x in response["docs"]}
return S_OK(results)

@ifConnected
def updateDoc(self, index: str, docID: str, body) -> dict:
"""Update an existing document with a script or partial document
Expand Down
13 changes: 9 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
- setJobParameter()
- deleteJobParameters()
"""
from collections import defaultdict
from typing import Union

from DIRAC import S_ERROR, S_OK
from DIRAC.Core.Base.ElasticDB import ElasticDB
from DIRAC.Core.Utilities import TimeUtilities
Expand Down Expand Up @@ -63,7 +66,7 @@ def _createIndex(self, indexName: str) -> None:
raise RuntimeError(result["Message"])
self.log.always("Index created:", indexName)

def getJobParameters(self, jobID: int, paramList=None) -> dict:
def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict:
"""Get Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If paramList is empty - all the parameters are returned.
Expand All @@ -73,11 +76,13 @@ def getJobParameters(self, jobID: int, paramList=None) -> dict:
:param paramList: list of parameters to be returned (also a string is treated)
:return: dict with all Job Parameter values
"""
if isinstance(jobIDs, int):
jobIDs = [jobIDs]
if isinstance(paramList, str):
paramList = paramList.replace(" ", "").split(",")
self.log.debug(f"JobDB.getParameters: Getting Parameters for job {jobID}")
self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}")

res = self.getDoc(self._indexName(jobID), str(jobID))
res = self.getDocs(self._indexName, str(jobIDs))
if not res["OK"]:
return res
resultDict = res["Value"]
Expand All @@ -86,7 +91,7 @@ def getJobParameters(self, jobID: int, paramList=None) -> dict:
if k not in paramList:
resultDict.pop(k)

return S_OK({jobID: resultDict})
return S_OK(resultDict)

def setJobParameter(self, jobID: int, key: str, value: str) -> dict:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,10 @@ def export_getJobParameters(cls, jobIDs, parName=None):
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
jobIDs = [int(jobID) for jobID in jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parameters.update(res["Value"])
res = cls.elasticJobParametersDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parameters = res["Value"]

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
Expand Down

0 comments on commit 2808372

Please sign in to comment.