diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 53ba7404473..6a632202288 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -272,6 +272,23 @@ def getDoc(self, index: str, docID: str) -> dict: except RequestError as re: return S_ERROR(re) + @ifConnected + def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]: + """Efficiently retrieve many documents from an index. + + :param index: name of the index + :param docIDs: document IDs + """ + sLog.debug(f"Retrieving documents {docIDs}") + docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs] + try: + response = self.client.mget({"docs": docs}) + except RequestError as re: + return S_ERROR(re) + else: + results = {int(x["_id"]): x["_source"] if x.get("found") else {} for x in response["docs"]} + return S_OK(results) + @ifConnected def updateDoc(self, index: str, docID: str, body) -> dict: """Update an existing document with a script or partial document diff --git a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py index 6c05301d7bb..016ba896731 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py @@ -6,6 +6,9 @@ - setJobParameter() - deleteJobParameters() """ +from collections import defaultdict +from typing import Union + from DIRAC import S_ERROR, S_OK from DIRAC.Core.Base.ElasticDB import ElasticDB from DIRAC.Core.Utilities import TimeUtilities @@ -63,7 +66,7 @@ def _createIndex(self, indexName: str) -> None: raise RuntimeError(result["Message"]) self.log.always("Index created:", indexName) - def getJobParameters(self, jobID: int, paramList=None) -> dict: + def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict: """Get Job Parameters defined for jobID. Returns a dictionary with the Job Parameters. If paramList is empty - all the parameters are returned. @@ -73,11 +76,13 @@ def getJobParameters(self, jobID: int, paramList=None) -> dict: :param paramList: list of parameters to be returned (also a string is treated) :return: dict with all Job Parameter values """ + if isinstance(jobIDs, int): + jobIDs = [jobIDs] if isinstance(paramList, str): paramList = paramList.replace(" ", "").split(",") - self.log.debug(f"JobDB.getParameters: Getting Parameters for job {jobID}") + self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}") - res = self.getDoc(self._indexName(jobID), str(jobID)) + res = self.getDocs(self._indexName, str(jobIDs)) if not res["OK"]: return res resultDict = res["Value"] @@ -86,7 +91,7 @@ def getJobParameters(self, jobID: int, paramList=None) -> dict: if k not in paramList: resultDict.pop(k) - return S_OK({jobID: resultDict}) + return S_OK(resultDict) def setJobParameter(self, jobID: int, key: str, value: str) -> dict: """ diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 53b54400cff..a5da7a75a1b 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -465,12 +465,10 @@ def export_getJobParameters(cls, jobIDs, parName=None): if not isinstance(jobIDs, list): jobIDs = [jobIDs] jobIDs = [int(jobID) for jobID in jobIDs] - parameters = {} - for jobID in jobIDs: - res = cls.elasticJobParametersDB.getJobParameters(jobID, parName) - if not res["OK"]: - return res - parameters.update(res["Value"]) + res = cls.elasticJobParametersDB.getJobParameters(jobIDs, parName) + if not res["OK"]: + return res + parameters = res["Value"] # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends res = cls.jobDB.getJobParameters(jobIDs, parName)