Skip to content

Commit

Permalink
sweep: DIRACGrid#7866 adding possibility to bulk insert in JobLoggingDB
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Nov 20, 2024
1 parent bc5b5fb commit 1168ca0
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 27 deletions.
46 changes: 40 additions & 6 deletions src/DIRAC/Core/Utilities/MySQL.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,20 +529,19 @@ def __del__(self):
except Exception:
pass

def _except(self, methodName, x, err, cmd="", print=True):
def _except(self, methodName, x, err, cmd="", debug=True):
"""
print MySQL error or exception
return S_ERROR with Exception
"""

try:
raise x
except MySQLdb.Error as e:
if print:
if debug:
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", "%d: %s" % (e.args[0], e.args[1]))
return S_ERROR(DErrno.EMYSQL, "%s: ( %d: %s )" % (err, e.args[0], e.args[1]))
except Exception as e:
if print:
if debug:
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", repr(e))
return S_ERROR(DErrno.EMYSQL, f"{err}: ({repr(e)})")

Expand Down Expand Up @@ -757,8 +756,8 @@ def _update(self, cmd, *, conn=None, debug=True):
:param debug: print or not the errors
return S_OK with number of updated registers upon success
return S_ERROR upon error
:return: S_OK with number of updated registers upon success.
S_ERROR upon error.
"""

self.log.debug(f"_update: {self._safeCmd(cmd)}")
Expand Down Expand Up @@ -786,6 +785,41 @@ def _update(self, cmd, *, conn=None, debug=True):

return retDict

@captureOptimizerTraces
def _updatemany(self, cmd, data, *, conn=None, debug=True):
"""execute MySQL updatemany command
:param debug: print or not the errors
:return: S_OK with number of updated registers upon success.
S_ERROR upon error.
"""

self.log.debug(f"_updatemany: {self._safeCmd(cmd)}")
if conn:
connection = conn
else:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]

try:
cursor = connection.cursor()
res = cursor.executemany(cmd, data)
retDict = S_OK(res)
if cursor.lastrowid:
retDict["lastRowId"] = cursor.lastrowid
except Exception as x:
retDict = self._except("_updatemany", x, "Execution failed.", cmd, debug)

try:
cursor.close()
except Exception:
pass

return retDict

def _transaction(self, cmdList, conn=None):
"""dummy transaction support
Expand Down
6 changes: 6 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ def setJobAttribute(self, jobID, attrName, attrValue, update=False, myDate=None,
:return: S_OK/S_ERROR
"""

if not jobID:
return S_OK()

if attrName not in self.jobAttributeNames:
return S_ERROR(EWMSJMAN, "Request to set non-existing job attribute")

Expand Down Expand Up @@ -506,6 +509,9 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
:return: S_OK/S_ERROR
"""

if not jobID:
return S_OK()

jobIDList = jobID
if not isinstance(jobID, (list, tuple)):
jobIDList = [jobID]
Expand Down
58 changes: 43 additions & 15 deletions src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,61 @@ def addLoggingRecord(
event = f"status/minor/app={status}/{minorStatus}/{applicationStatus}"
self.log.info("Adding record for job ", str(jobID) + ": '" + event + "' from " + source)

try:
def _get_date(date):
# We need to specify that timezone is UTC because otherwise timestamp
# assumes local time while we mean UTC.
if not date:
# Make the UTC datetime string and float
_date = datetime.datetime.utcnow()
# Make the UTC datetime
return datetime.datetime.utcnow()
elif isinstance(date, str):
# The date is provided as a string in UTC
_date = TimeUtilities.fromString(date)
return TimeUtilities.fromString(date)
elif isinstance(date, datetime.datetime):
_date = date
return date
else:
raise Exception("Incorrect date for the logging record")

try:
if isinstance(date, list):
_date = []
for d in date:
try:
_date.append(_get_date(d))
except Exception:
self.log.exception("Exception while date evaluation")
_date.append(datetime.datetime.utcnow())
else:
self.log.error("Incorrect date for the logging record")
_date = datetime.datetime.utcnow()
_date = _get_date(date)
except Exception:
self.log.exception("Exception while date evaluation")
_date = datetime.datetime.utcnow()

# We need to specify that timezone is UTC because otherwise timestamp
# assumes local time while we mean UTC.
epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
_date = [datetime.datetime.utcnow()]

cmd = (
"INSERT INTO LoggingInfo (JobId, Status, MinorStatus, ApplicationStatus, "
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES (%d,'%s','%s','%s','%s',%f,'%s')"
% (int(jobID), status, minorStatus, applicationStatus[:255], str(_date), epoc, source[:32])
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES "
)

return self._update(cmd)
if not isinstance(jobID, list):
jobID = [jobID]

if isinstance(status, str):
status = [status] * len(jobID)
if isinstance(minorStatus, str):
minorStatus = [minorStatus] * len(jobID)
if isinstance(applicationStatus, str):
applicationStatus = [applicationStatus[:255]] * len(jobID)
if isinstance(_date, datetime.datetime):
_date = [_date] * len(jobID)

epocs = []
for dt in _date:
epoc = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
epocs.append(epoc)
cmd = cmd + "(%s, %s, %s, %s, %s, %s, %s)"
data = list(
zip(jobID, status, minorStatus, applicationStatus, _date, epocs, [source[:32]] * len(jobID), strict=True)
)
return self._updatemany(cmd, data)

#############################################################################
def getJobLoggingInfo(self, jobID):
Expand Down
27 changes: 21 additions & 6 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
RIGHT_KILL,
Expand Down Expand Up @@ -169,6 +169,9 @@ def export_submitJob(self, jobDesc):
jobDescList = [jobDesc]

jobIDList = []
statusList = []
minorStatusList = []
timeStampList = []

if parametricJob:
initialStatus = JobStatus.SUBMITTING
Expand Down Expand Up @@ -206,13 +209,25 @@ def export_submitJob(self, jobDesc):
return result

jobID = result["JobID"]
self.log.info(f'Job added to the JobDB", "{jobID} for {self.owner}/{self.ownerGroup}')

self.jobLoggingDB.addLoggingRecord(
jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager"
)
self.log.info(f"Job added to the JobDB", f"{jobID} for {self.owner}/{self.ownerGroup}")

jobIDList.append(jobID)
statusList.append(result["Status"])
minorStatusList.append(result["MinorStatus"])
timeStampList.append(result["TimeStamp"])

# insert records in logging DB

# For parametric jobs I can insert logging records in a bulk
if parametricJob and len(set(jobIDList)) == len(jobIDList):
result = self.jobLoggingDB.addLoggingRecord(
jobIDList, statusList, minorStatusList, date=timeStampList, source="JobManager"
)
else:
for jobID, status, minorStatus, timeStamp in zip(jobIDList, statusList, minorStatusList, timeStampList):
result = self.jobLoggingDB.addLoggingRecord(
jobID, status, minorStatus, date=timeStamp, source="JobManager"
)

if parametricJob:
result = S_OK(jobIDList)
Expand Down
18 changes: 18 additions & 0 deletions tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,22 @@ def test_JobStatus(jobLoggingDB: JobLoggingDB):
result = jobLoggingDB.getWMSTimeStamps(1)
assert result["OK"] is True, result["Message"]

now = datetime.datetime.utcnow()
result = jobLoggingDB.addLoggingRecord(
[2, 3, 4, 5],
status=["testing", "testing", "testing", "testing"],
minorStatus=["mn", "mn", "mn", "mn"],
date=[now, now, now, now],
source="Unittest",
)
assert result["OK"] is True, result["Message"]

result = jobLoggingDB.getJobLoggingInfo(2)
assert result["OK"] is True, result["Message"]
assert result["Value"][-1][0:3] == ("testing", "mn", "Unknown")

result = jobLoggingDB.getJobLoggingInfo(5)
assert result["OK"] is True, result["Message"]
assert result["Value"][-1][0:3] == ("testing", "mn", "Unknown")

jobLoggingDB.deleteJob(1)

0 comments on commit 1168ca0

Please sign in to comment.