Skip to content

Commit

Permalink
feat: adding possibility to bulk insert in JobLoggingDB
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Nov 4, 2024
1 parent 67d06bf commit ff8f2d2
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 22 deletions.
26 changes: 26 additions & 0 deletions 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# This is a combination of 2 commits.
# This is the 1st commit message:

feat: adding possibility to bulk insert in JobLoggingDB

# This is the commit message #2:


# Please enter the commit message for your changes. Lines starting
# with '#' will be ignored, and an empty message aborts the commit.
#
# Date: Fri Nov 1 12:22:03 2024 +0100
#
# interactive rebase in progress; onto 23a864aa1
# Last commands done (3 commands done):
# pick f4e154966 feat: adding possibility to bulk insert in JobLoggingDB
# squash 8486c57ad f
# Next command to do (1 remaining command):
# pick 0ce73e189 fix: prevent corner case
# You are currently rebasing branch '80_bulkInsertJobLoggingDB' on '23a864aa1'.
#
# Changes to be committed:
# modified: src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py
# modified: src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
# modified: tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py
#
67 changes: 50 additions & 17 deletions src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
getWMSTimeStamps()
"""
import datetime
import time

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC import S_ERROR, S_OK
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import TimeUtilities

MAGIC_EPOC_NUMBER = 1270000000

Expand Down Expand Up @@ -56,32 +55,66 @@ def addLoggingRecord(
event = f"status/minor/app={status}/{minorStatus}/{applicationStatus}"
self.log.info("Adding record for job ", str(jobID) + ": '" + event + "' from " + source)

try:
def _get_date(date):
# We need to specify that timezone is UTC because otherwise timestamp
# assumes local time while we mean UTC.
if not date:
# Make the UTC datetime string and float
_date = datetime.datetime.utcnow()
# Make the UTC datetime
return datetime.datetime.utcnow()
elif isinstance(date, str):
# The date is provided as a string in UTC
_date = TimeUtilities.fromString(date)
return TimeUtilities.fromString(date)
elif isinstance(date, datetime.datetime):
_date = date
return date
else:
raise Exception("Incorrect date for the logging record")

try:
if isinstance(date, list):
_date = []
for d in date:
try:
_date.append(_get_date(d))
except Exception:
self.log.exception("Exception while date evaluation")
_date.append(datetime.datetime.utcnow())
else:
self.log.error("Incorrect date for the logging record")
_date = datetime.datetime.utcnow()
_date = _get_date(date)
except Exception:
self.log.exception("Exception while date evaluation")
_date = datetime.datetime.utcnow()

# We need to specify that timezone is UTC because otherwise timestamp
# assumes local time while we mean UTC.
epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
_date = [datetime.datetime.utcnow()]

cmd = (
"INSERT INTO LoggingInfo (JobId, Status, MinorStatus, ApplicationStatus, "
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES (%d,'%s','%s','%s','%s',%f,'%s')"
% (int(jobID), status, minorStatus, applicationStatus[:255], str(_date), epoc, source[:32])
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES "
)

# if JobID is a list, make a bulk insert
if isinstance(jobID, list):
for i, _ in enumerate(jobID):
epoc = _date[i].replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
cmd = cmd + "(%d,'%s','%s','%s','%s',%f,'%s')," % (
int(jobID[i]),
status[i],
minorStatus[i],
applicationStatus[:255],
str(_date[i]),
epoc,
source[:32],
)
cmd = cmd[:-1]
else: # else make a single insert
epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
cmd = cmd + "(%d,'%s','%s','%s','%s',%f,'%s')" % (
int(jobID),
status,
minorStatus,
applicationStatus[:255],
str(_date),
epoc,
source[:32],
)

return self._update(cmd)

#############################################################################
Expand Down
25 changes: 20 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def export_submitJob(self, jobDesc):
jobDescList = [jobDesc]

jobIDList = []
statusList = []
minorStatusList = []
timeStampList = []

if parametricJob:
initialStatus = JobStatus.SUBMITTING
Expand All @@ -199,13 +202,25 @@ def export_submitJob(self, jobDesc):
return result

jobID = result["JobID"]
self.log.info(f'Job added to the JobDB", "{jobID} for {self.ownerDN}/{self.ownerGroup}')

self.jobLoggingDB.addLoggingRecord(
jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager"
)
self.log.info(f"Job added to the JobDB", f"{jobID} for {self.ownerDN}/{self.ownerGroup}")

jobIDList.append(jobID)
statusList.append(result["Status"])
minorStatusList.append(result["MinorStatus"])
timeStampList.append(result["TimeStamp"])

# insert records in logging DB

# For parametric jobs I can insert logging records in a bulk
if parametricJob and len(set(jobIDList)) == len(jobIDList):
result = self.jobLoggingDB.addLoggingRecord(
jobIDList, statusList, minorStatusList, date=timeStampList, source="JobManager"
)
else:
for i, _ in enumerate(jobIDList):
result = self.jobLoggingDB.addLoggingRecord(
jobIDList[i], statusList[i], minorStatusList[i], date=timeStampList[i], source="JobManager"
)

# Set persistency flag
retVal = gProxyManager.getUserPersistence(self.ownerDN, self.ownerGroup)
Expand Down
19 changes: 19 additions & 0 deletions tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,27 @@ def test_JobStatus(jobLoggingDB: JobLoggingDB):

result = jobLoggingDB.getJobLoggingInfo(1)
assert result["OK"] is True, result["Message"]
print(result)

result = jobLoggingDB.getWMSTimeStamps(1)
assert result["OK"] is True, result["Message"]

now = datetime.datetime.utcnow()
result = jobLoggingDB.addLoggingRecord(
[2, 3, 4, 5],
status=["testing", "testing", "testing", "testing"],
minorStatus=["mn", "mn", "mn", "mn"],
date=[now, now, now, now],
source="Unittest",
)
assert result["OK"] is True, result["Message"]

result = jobLoggingDB.getJobLoggingInfo(2)
assert result["OK"] is True, result["Message"]
assert result["Value"][-1] == ("testing", "mn", "mn", str(now), "Unittest")

result = jobLoggingDB.getJobLoggingInfo(5)
assert result["OK"] is True, result["Message"]
assert result["Value"][-1] == ("testing", "mn", "mn", str(now), "Unittest")

jobLoggingDB.deleteJob(1)

0 comments on commit ff8f2d2

Please sign in to comment.