Skip to content

Commit

Permalink
feat: adding possibility to bulk insert in JobLoggingDB
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Nov 4, 2024
1 parent 67d06bf commit dc30f94
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 22 deletions.
67 changes: 50 additions & 17 deletions src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
getWMSTimeStamps()
"""
import datetime
import time

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC import S_ERROR, S_OK
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import TimeUtilities

MAGIC_EPOC_NUMBER = 1270000000

Expand Down Expand Up @@ -56,32 +55,66 @@ def addLoggingRecord(
event = f"status/minor/app={status}/{minorStatus}/{applicationStatus}"
self.log.info("Adding record for job ", str(jobID) + ": '" + event + "' from " + source)

try:
def _get_date(date):
# We need to specify that timezone is UTC because otherwise timestamp
# assumes local time while we mean UTC.
if not date:
# Make the UTC datetime string and float
_date = datetime.datetime.utcnow()
# Make the UTC datetime
return datetime.datetime.utcnow()
elif isinstance(date, str):
# The date is provided as a string in UTC
_date = TimeUtilities.fromString(date)
return TimeUtilities.fromString(date)
elif isinstance(date, datetime.datetime):
_date = date
return date
else:
raise Exception("Incorrect date for the logging record")

try:
if isinstance(date, list):
_date = []
for d in date:
try:
_date.append(_get_date(d))
except Exception:
self.log.exception("Exception while date evaluation")
_date.append(datetime.datetime.utcnow())
else:
self.log.error("Incorrect date for the logging record")
_date = datetime.datetime.utcnow()
_date = _get_date(date)
except Exception:
self.log.exception("Exception while date evaluation")
_date = datetime.datetime.utcnow()

# We need to specify that timezone is UTC because otherwise timestamp
# assumes local time while we mean UTC.
epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
_date = [datetime.datetime.utcnow()]

cmd = (
"INSERT INTO LoggingInfo (JobId, Status, MinorStatus, ApplicationStatus, "
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES (%d,'%s','%s','%s','%s',%f,'%s')"
% (int(jobID), status, minorStatus, applicationStatus[:255], str(_date), epoc, source[:32])
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES "
)

# if JobID is a list, make a bulk insert
if isinstance(jobID, list):
for i, _ in enumerate(jobID):
epoc = _date[i].replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
cmd = cmd + "(%d,'%s','%s','%s','%s',%f,'%s')," % (
int(jobID[i]),
status[i],
minorStatus[i],
applicationStatus[:255],
str(_date[i]),
epoc,
source[:32],
)
cmd = cmd[:-1]
else: # else make a single insert
epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
cmd = cmd + "(%d,'%s','%s','%s','%s',%f,'%s')" % (
int(jobID),
status,
minorStatus,
applicationStatus[:255],
str(_date),
epoc,
source[:32],
)

return self._update(cmd)

#############################################################################
Expand Down
27 changes: 22 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def export_submitJob(self, jobDesc):
jobDescList = [jobDesc]

jobIDList = []
statusList = []
minorStatusList = []
timeStampList = []

if parametricJob:
initialStatus = JobStatus.SUBMITTING
Expand All @@ -199,13 +202,27 @@ def export_submitJob(self, jobDesc):
return result

jobID = result["JobID"]
self.log.info(f'Job added to the JobDB", "{jobID} for {self.ownerDN}/{self.ownerGroup}')

self.jobLoggingDB.addLoggingRecord(
jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager"
)
self.log.info(f"Job added to the JobDB", f"{jobID} for {self.ownerDN}/{self.ownerGroup}")

jobIDList.append(jobID)
statusList.append(result["Status"])
minorStatusList.append(result["MinorStatus"])
timeStampList.append(result["TimeStamp"])

# insert records in logging DB

# For parametric jobs I can insert logging records in a bulk
if parametricJob and len(set(jobIDList)) == len(jobIDList):
print("in bulk")
result = self.jobLoggingDB.addLoggingRecord(
jobIDList, statusList, minorStatusList, date=timeStampList, source="JobManager"
)
else:
for i, _ in enumerate(jobIDList):
print(f"{i}, {jobIDList[i]}, {statusList[i]}, {minorStatusList[i]}, {timeStampList[i]}")
result = self.jobLoggingDB.addLoggingRecord(
jobIDList[i], statusList[i], minorStatusList[i], date=timeStampList[i], source="JobManager"
)

# Set persistency flag
retVal = gProxyManager.getUserPersistence(self.ownerDN, self.ownerGroup)
Expand Down
10 changes: 10 additions & 0 deletions tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@ def test_JobStatus(jobLoggingDB: JobLoggingDB):
result = jobLoggingDB.getWMSTimeStamps(1)
assert result["OK"] is True, result["Message"]

now = datetime.datetime.utcnow()
result = jobLoggingDB.addLoggingRecord(
[2, 3, 4, 5],
status=["testing", "testing", "testing", "testing"],
minorStatus=["mn", "mn", "mn", "mn"],
date=[now, now, now, now],
source="Unittest",
)
assert result["OK"] is True, result["Message"]

jobLoggingDB.deleteJob(1)

0 comments on commit dc30f94

Please sign in to comment.