diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 53ba7404473..6a632202288 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -272,6 +272,23 @@ def getDoc(self, index: str, docID: str) -> dict: except RequestError as re: return S_ERROR(re) + @ifConnected + def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]: + """Efficiently retrieve many documents from an index. + + :param index: name of the index + :param docIDs: document IDs + """ + sLog.debug(f"Retrieving documents {docIDs}") + docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs] + try: + response = self.client.mget({"docs": docs}) + except RequestError as re: + return S_ERROR(re) + else: + results = {int(x["_id"]): x["_source"] if x.get("found") else {} for x in response["docs"]} + return S_OK(results) + @ifConnected def updateDoc(self, index: str, docID: str, body) -> dict: """Update an existing document with a script or partial document diff --git a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py index e8358dc17b1..06ba89eb91f 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py @@ -10,6 +10,9 @@ - setJobParameter() - deleteJobParameters() """ +from collections import defaultdict +from typing import Union + from DIRAC import S_ERROR, S_OK from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals from DIRAC.Core.Base.ElasticDB import ElasticDB @@ -82,7 +85,7 @@ def _createIndex(self, indexName: str) -> None: raise RuntimeError(result["Message"]) self.log.always("Index created:", indexName) - def getJobParameters(self, jobID: int, paramList=None) -> dict: + def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict: """Get Job Parameters defined for jobID. Returns a dictionary with the Job Parameters. If paramList is empty - all the parameters are returned. @@ -92,51 +95,30 @@ def getJobParameters(self, jobID: int, paramList=None) -> dict: :param paramList: list of parameters to be returned (also a string is treated) :return: dict with all Job Parameter values """ + if isinstance(jobIDs, int): + jobIDs = [jobIDs] if isinstance(paramList, str): paramList = paramList.replace(" ", "").split(",") - self.log.debug(f"JobDB.getParameters: Getting Parameters for job {jobID}") - resultDict = {} - inNewIndex = self.existsDoc(self._indexName(jobID), jobID) - inOldIndex = self._isInOldIndex(self.oldIndexName, jobID) - # Case 1: the parameters are stored in both indices - if inNewIndex and inOldIndex: - # First we get the parameters from the old index - self.log.debug( - f"A document with JobID {jobID} was found in the old index {self.oldIndexName} and in the new index {self._indexName(jobID)}" - ) - resultDict = self._searchInOldIndex(jobID, paramList) + self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}") - # Now we get the parameters from the new index - res = self.getDoc(self._indexName(jobID), str(jobID)) - if not res["OK"]: - self.log.error("Could not retrieve the data from the new index!", res["Message"]) - else: - for key in res["Value"]: - if paramList and key not in paramList: - continue - # Add new parameters or overwrite the old ones - resultDict[key] = res["Value"][key] + # First search the old index + result = self._searchInOldIndex(jobIDs, paramList) - # Case 2: only in the old index - elif inOldIndex: - self.log.debug(f"A document with JobID {jobID} was found in the old index {self.oldIndexName}") - resultDict = self._searchInOldIndex(jobID, paramList) + # Next search the new index + res = self.getDocs(self._indexName, jobIDs) + if not res["OK"]: + return res - # Case 3: only in the new index - else: - self.log.debug( - f"The searched parameters with JobID {jobID} exists in the new index {self._indexName(jobID)}" - ) - res = self.getDoc(self._indexName(jobID), str(jobID)) - if not res["OK"]: - return res - resultDict = res["Value"] + # Update result, preferring parameters from the new index + for job_id, doc in res["Value"].items(): + if job_id not in result: + result[job_id] = {} if paramList: - for k in list(resultDict): - if k not in paramList: - resultDict.pop(k) + result[job_id] |= {k: v for k, v in doc.items() if k in paramList} + else: + result[job_id] |= doc - return S_OK({jobID: resultDict}) + return S_OK(result) def setJobParameter(self, jobID: int, key: str, value: str) -> dict: """ @@ -247,7 +229,7 @@ def _isInOldIndex(self, old_index: str, jobID: int) -> bool: except (RequestError, NotFoundError): return False - def _searchInOldIndex(self, jobID: int, paramList: list) -> bool: + def _searchInOldIndex(self, jobIDs: list[int], paramList: list): """Searches for a document with this jobID in the old index""" if paramList: if isinstance(paramList, str): @@ -255,28 +237,20 @@ def _searchInOldIndex(self, jobID: int, paramList: list) -> bool: else: paramList = [] - resultDict = {} - - # the following should be equivalent to - # { - # "query": { - # "bool": { - # "filter": { # no scoring - # "term": {"JobID": jobID} # term level query, does not pass through the analyzer - # } - # } - # } - # } - - s = self.dslSearch.query("bool", filter=self._Q("term", JobID=jobID)) + s = self.dslSearch.query("bool", filter=self._Q("terms", JobID=jobIDs)) res = s.scan() - for hit in res: - pname = hit.Name - if paramList and pname not in paramList: - continue - resultDict[pname] = hit.Value + resultDict = {} + try: + for hit in res: + jobID = int(hit.JobID) + pname = hit.Name + if paramList and pname not in paramList: + continue + resultDict.setdefault(jobID, {})[pname] = hit.Value + except NotFoundError: + pass return resultDict def _deleteInOldIndex(self, jobID: int, paramList: list) -> dict: diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index d7365634314..4ff0f9d60fb 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -483,12 +483,10 @@ def export_getJobParameters(cls, jobIDs, parName=None): if cls.elasticJobParametersDB: if not isinstance(jobIDs, list): jobIDs = [jobIDs] - parameters = {} - for jobID in jobIDs: - res = cls.elasticJobParametersDB.getJobParameters(jobID, parName) - if not res["OK"]: - return res - parameters.update(res["Value"]) + res = cls.elasticJobParametersDB.getJobParameters(jobIDs, parName) + if not res["OK"]: + return res + parameters = res["Value"] # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends res = cls.jobDB.getJobParameters(jobIDs, parName)