From ff8f2d2747c150121ba8aea9a2ff2196752bd669 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Fri, 1 Nov 2024 12:22:03 +0100 Subject: [PATCH] feat: adding possibility to bulk insert in JobLoggingDB --- 1 | 26 +++++++ .../DB/JobLoggingDB.py | 67 ++++++++++++++----- .../Service/JobManagerHandler.py | 25 +++++-- .../Test_JobLoggingDB.py | 19 ++++++ 4 files changed, 115 insertions(+), 22 deletions(-) create mode 100644 1 diff --git a/1 b/1 new file mode 100644 index 00000000000..5fb92aea2e1 --- /dev/null +++ b/1 @@ -0,0 +1,26 @@ +# This is a combination of 2 commits. +# This is the 1st commit message: + +feat: adding possibility to bulk insert in JobLoggingDB + +# This is the commit message #2: + + +# Please enter the commit message for your changes. Lines starting +# with '#' will be ignored, and an empty message aborts the commit. +# +# Date: Fri Nov 1 12:22:03 2024 +0100 +# +# interactive rebase in progress; onto 23a864aa1 +# Last commands done (3 commands done): +# pick f4e154966 feat: adding possibility to bulk insert in JobLoggingDB +# squash 8486c57ad f +# Next command to do (1 remaining command): +# pick 0ce73e189 fix: prevent corner case +# You are currently rebasing branch '80_bulkInsertJobLoggingDB' on '23a864aa1'. +# +# Changes to be committed: +# modified: src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py +# modified: src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +# modified: tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py +# diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py index 277103ba4cb..3ed38f9bd7e 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py @@ -7,11 +7,10 @@ getWMSTimeStamps() """ import datetime -import time -from DIRAC import S_OK, S_ERROR -from DIRAC.Core.Utilities import TimeUtilities +from DIRAC import S_ERROR, S_OK from DIRAC.Core.Base.DB import DB +from DIRAC.Core.Utilities import TimeUtilities MAGIC_EPOC_NUMBER = 1270000000 @@ -56,32 +55,66 @@ def addLoggingRecord( event = f"status/minor/app={status}/{minorStatus}/{applicationStatus}" self.log.info("Adding record for job ", str(jobID) + ": '" + event + "' from " + source) - try: + def _get_date(date): + # We need to specify that timezone is UTC because otherwise timestamp + # assumes local time while we mean UTC. if not date: - # Make the UTC datetime string and float - _date = datetime.datetime.utcnow() + # Make the UTC datetime + return datetime.datetime.utcnow() elif isinstance(date, str): # The date is provided as a string in UTC - _date = TimeUtilities.fromString(date) + return TimeUtilities.fromString(date) elif isinstance(date, datetime.datetime): - _date = date + return date + else: + raise Exception("Incorrect date for the logging record") + + try: + if isinstance(date, list): + _date = [] + for d in date: + try: + _date.append(_get_date(d)) + except Exception: + self.log.exception("Exception while date evaluation") + _date.append(datetime.datetime.utcnow()) else: - self.log.error("Incorrect date for the logging record") - _date = datetime.datetime.utcnow() + _date = _get_date(date) except Exception: self.log.exception("Exception while date evaluation") - _date = datetime.datetime.utcnow() - - # We need to specify that timezone is UTC because otherwise timestamp - # assumes local time while we mean UTC. - epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER + _date = [datetime.datetime.utcnow()] cmd = ( "INSERT INTO LoggingInfo (JobId, Status, MinorStatus, ApplicationStatus, " - + "StatusTime, StatusTimeOrder, StatusSource) VALUES (%d,'%s','%s','%s','%s',%f,'%s')" - % (int(jobID), status, minorStatus, applicationStatus[:255], str(_date), epoc, source[:32]) + + "StatusTime, StatusTimeOrder, StatusSource) VALUES " ) + # if JobID is a list, make a bulk insert + if isinstance(jobID, list): + for i, _ in enumerate(jobID): + epoc = _date[i].replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER + cmd = cmd + "(%d,'%s','%s','%s','%s',%f,'%s')," % ( + int(jobID[i]), + status[i], + minorStatus[i], + applicationStatus[:255], + str(_date[i]), + epoc, + source[:32], + ) + cmd = cmd[:-1] + else: # else make a single insert + epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER + cmd = cmd + "(%d,'%s','%s','%s','%s',%f,'%s')" % ( + int(jobID), + status, + minorStatus, + applicationStatus[:255], + str(_date), + epoc, + source[:32], + ) + return self._update(cmd) ############################################################################# diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index f6f56837e95..10f6f62281e 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -177,6 +177,9 @@ def export_submitJob(self, jobDesc): jobDescList = [jobDesc] jobIDList = [] + statusList = [] + minorStatusList = [] + timeStampList = [] if parametricJob: initialStatus = JobStatus.SUBMITTING @@ -199,13 +202,25 @@ def export_submitJob(self, jobDesc): return result jobID = result["JobID"] - self.log.info(f'Job added to the JobDB", "{jobID} for {self.ownerDN}/{self.ownerGroup}') - - self.jobLoggingDB.addLoggingRecord( - jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager" - ) + self.log.info(f"Job added to the JobDB", f"{jobID} for {self.ownerDN}/{self.ownerGroup}") jobIDList.append(jobID) + statusList.append(result["Status"]) + minorStatusList.append(result["MinorStatus"]) + timeStampList.append(result["TimeStamp"]) + + # insert records in logging DB + + # For parametric jobs I can insert logging records in a bulk + if parametricJob and len(set(jobIDList)) == len(jobIDList): + result = self.jobLoggingDB.addLoggingRecord( + jobIDList, statusList, minorStatusList, date=timeStampList, source="JobManager" + ) + else: + for i, _ in enumerate(jobIDList): + result = self.jobLoggingDB.addLoggingRecord( + jobIDList[i], statusList[i], minorStatusList[i], date=timeStampList[i], source="JobManager" + ) # Set persistency flag retVal = gProxyManager.getUserPersistence(self.ownerDN, self.ownerGroup) diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py index de3445925cf..f7771bf743b 100755 --- a/tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py @@ -42,8 +42,27 @@ def test_JobStatus(jobLoggingDB: JobLoggingDB): result = jobLoggingDB.getJobLoggingInfo(1) assert result["OK"] is True, result["Message"] + print(result) result = jobLoggingDB.getWMSTimeStamps(1) assert result["OK"] is True, result["Message"] + now = datetime.datetime.utcnow() + result = jobLoggingDB.addLoggingRecord( + [2, 3, 4, 5], + status=["testing", "testing", "testing", "testing"], + minorStatus=["mn", "mn", "mn", "mn"], + date=[now, now, now, now], + source="Unittest", + ) + assert result["OK"] is True, result["Message"] + + result = jobLoggingDB.getJobLoggingInfo(2) + assert result["OK"] is True, result["Message"] + assert result["Value"][-1] == ("testing", "mn", "mn", str(now), "Unittest") + + result = jobLoggingDB.getJobLoggingInfo(5) + assert result["OK"] is True, result["Message"] + assert result["Value"][-1] == ("testing", "mn", "mn", str(now), "Unittest") + jobLoggingDB.deleteJob(1)