From 004ee488d18e379f899843ac7dd2172e3ee0d94b Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Mon, 13 May 2024 17:34:09 +0200 Subject: [PATCH] sweep: #7603 Perform bulk lookup of job parameters from elasticsearch --- src/DIRAC/Core/LCG/GOCDBClient.py | 8 +++--- src/DIRAC/Core/Utilities/ElasticSearchDB.py | 17 ++++++++++++ .../DB/ElasticJobParametersDB.py | 27 +++++++++++-------- .../Service/JobMonitoringHandler.py | 10 +++---- 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/src/DIRAC/Core/LCG/GOCDBClient.py b/src/DIRAC/Core/LCG/GOCDBClient.py index 2b735a1709d..915d3652c11 100644 --- a/src/DIRAC/Core/LCG/GOCDBClient.py +++ b/src/DIRAC/Core/LCG/GOCDBClient.py @@ -80,14 +80,14 @@ def getStatus(self, granularity, name=None, startDate=None, startingInHours=None .. code-block:: python {'OK': True, - 'Value': {'92569G0 lhcbsrm-kit.gridka.de': { + 'Value': {'92569G0 lhcbdcache-kit-tape.gridka.de': { 'DESCRIPTION': 'Annual site downtime for various major tasks i...', 'FORMATED_END_DATE': '2014-05-27 15:21', 'FORMATED_START_DATE': '2014-05-26 04:00', 'GOCDB_PORTAL_URL': 'https://goc.egi.eu/portal/index.php?Page_Type=Downtime&id=14051', 'HOSTED_BY': 'FZK-LCG2', - 'HOSTNAME': 'lhcbsrm-kit.gridka.de', - 'SERVICE_TYPE': 'SRM.nearline', + 'HOSTNAME': 'lhcbdcache-kit-tape.gridka.de', + 'SERVICE_TYPE': 'wlcg.webdav.tape', 'SEVERITY': 'OUTAGE'}, '99873G0 srm.pic.esSRM': { 'HOSTED_BY': 'pic', @@ -97,7 +97,7 @@ def getStatus(self, granularity, name=None, startDate=None, startingInHours=None 'URL': 'srm.pic.es', 'GOCDB_PORTAL_URL': 'https://goc.egi.eu/portal/index.php?Page_Type=Downtime&id=21303', 'FORMATED_START_DATE': '2016-09-14 06:00', - 'SERVICE_TYPE': 'SRM', + 'SERVICE_TYPE': 'webdav', 'FORMATED_END_DATE': '2016-09-14 15:00', 'DESCRIPTION': 'Outage declared due to network and dCache upgrades'} } diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 53ba7404473..6a632202288 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -272,6 +272,23 @@ def getDoc(self, index: str, docID: str) -> dict: except RequestError as re: return S_ERROR(re) + @ifConnected + def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]: + """Efficiently retrieve many documents from an index. + + :param index: name of the index + :param docIDs: document IDs + """ + sLog.debug(f"Retrieving documents {docIDs}") + docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs] + try: + response = self.client.mget({"docs": docs}) + except RequestError as re: + return S_ERROR(re) + else: + results = {int(x["_id"]): x["_source"] if x.get("found") else {} for x in response["docs"]} + return S_OK(results) + @ifConnected def updateDoc(self, index: str, docID: str, body) -> dict: """Update an existing document with a script or partial document diff --git a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py index 6c05301d7bb..14d4dec1a36 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py @@ -6,6 +6,8 @@ - setJobParameter() - deleteJobParameters() """ +from typing import Union + from DIRAC import S_ERROR, S_OK from DIRAC.Core.Base.ElasticDB import ElasticDB from DIRAC.Core.Utilities import TimeUtilities @@ -46,7 +48,7 @@ def _indexName(self, jobID: int) -> str: :param jobID: Job ID """ - indexSplit = int(jobID // 1e6) + indexSplit = int(jobID) // 1e6 return f"{self.index_name}_{indexSplit}m" def _createIndex(self, indexName: str) -> None: @@ -63,7 +65,7 @@ def _createIndex(self, indexName: str) -> None: raise RuntimeError(result["Message"]) self.log.always("Index created:", indexName) - def getJobParameters(self, jobID: int, paramList=None) -> dict: + def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict: """Get Job Parameters defined for jobID. Returns a dictionary with the Job Parameters. If paramList is empty - all the parameters are returned. @@ -73,20 +75,23 @@ def getJobParameters(self, jobID: int, paramList=None) -> dict: :param paramList: list of parameters to be returned (also a string is treated) :return: dict with all Job Parameter values """ + if isinstance(jobIDs, int): + jobIDs = [jobIDs] if isinstance(paramList, str): paramList = paramList.replace(" ", "").split(",") - self.log.debug(f"JobDB.getParameters: Getting Parameters for job {jobID}") + self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}") - res = self.getDoc(self._indexName(jobID), str(jobID)) + res = self.getDocs(self._indexName, jobIDs) if not res["OK"]: return res - resultDict = res["Value"] - if paramList: - for k in list(resultDict): - if k not in paramList: - resultDict.pop(k) - - return S_OK({jobID: resultDict}) + result = {} + for job_id, doc in res["Value"].items(): + if paramList: + result[job_id] = {k: v for k, v in doc.items() if k in paramList} + else: + result[job_id] = doc + + return S_OK(result) def setJobParameter(self, jobID: int, key: str, value: str) -> dict: """ diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 53b54400cff..a5da7a75a1b 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -465,12 +465,10 @@ def export_getJobParameters(cls, jobIDs, parName=None): if not isinstance(jobIDs, list): jobIDs = [jobIDs] jobIDs = [int(jobID) for jobID in jobIDs] - parameters = {} - for jobID in jobIDs: - res = cls.elasticJobParametersDB.getJobParameters(jobID, parName) - if not res["OK"]: - return res - parameters.update(res["Value"]) + res = cls.elasticJobParametersDB.getJobParameters(jobIDs, parName) + if not res["OK"]: + return res + parameters = res["Value"] # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends res = cls.jobDB.getJobParameters(jobIDs, parName)