Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[integration] Simplifying Time module #6091

Merged
merged 5 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/DIRAC/AccountingSystem/Client/Types/BaseAccountingType.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
""" Within this module is defined the class from which all other accounting types are defined
"""

import datetime

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Utilities import Time
from DIRAC.Core.Base.Client import Client
from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient

Expand Down Expand Up @@ -53,7 +55,7 @@ def setStartTime(self, startTime=False):
By default use now
"""
if not startTime:
self.startTime = Time.dateTime()
self.startTime = datetime.datetime.utcnow()
else:
self.startTime = startTime

Expand All @@ -63,15 +65,15 @@ def setEndTime(self, endTime=False):
By default use now
"""
if not endTime:
self.endTime = Time.dateTime()
self.endTime = datetime.datetime.utcnow()
else:
self.endTime = endTime

def setNowAsStartAndEndTime(self):
"""
Set current time as start and end time of the report
"""
self.startTime = Time.dateTime()
self.startTime = datetime.datetime.utcnow()
self.endTime = self.startTime

def setValueByKey(self, key, value):
Expand Down Expand Up @@ -121,11 +123,11 @@ def checkValues(self):
return S_ERROR("Invalid values: %s" % ", ".join(errorList))
if not self.startTime:
return S_ERROR("Start time has not been defined")
if not isinstance(self.startTime, Time._dateTimeType):
if not isinstance(self.startTime, datetime.datetime):
return S_ERROR("Start time is not a datetime object")
if not self.endTime:
return S_ERROR("End time has not been defined")
if not isinstance(self.endTime, Time._dateTimeType):
if not isinstance(self.endTime, datetime.datetime):
return S_ERROR("End time is not a datetime object")
return self.checkRecord()

Expand Down
42 changes: 23 additions & 19 deletions src/DIRAC/AccountingSystem/DB/AccountingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from DIRAC.Core.Base.DB import DB
from DIRAC import S_OK, S_ERROR, gConfig
from DIRAC.Core.Utilities import List, ThreadSafe, Time, DEncode
from DIRAC.Core.Utilities import List, ThreadSafe, DEncode, TimeUtilities
from DIRAC.Core.Utilities.Plotting.TypeLoader import TypeLoader
from DIRAC.Core.Utilities.ThreadPool import ThreadPool

Expand Down Expand Up @@ -49,9 +49,9 @@ def __init__(self, name="Accounting/AccountingDB", readOnly=False, parentLogger=
self.__loadCatalogFromDB()

self.__compactTime = datetime.time(hour=2, minute=random.randint(0, 59), second=random.randint(0, 59))
lcd = Time.dateTime()
lcd = datetime.datetime.utcnow()
lcd.replace(hour=self.__compactTime.hour + 1, minute=0, second=0)
self.__lastCompactionEpoch = Time.toEpoch(lcd)
self.__lastCompactionEpoch = TimeUtilities.toEpoch(lcd)

self.__registerTypes()

Expand All @@ -69,14 +69,14 @@ def autoCompactDB(self):

def __periodicAutoCompactDB(self):
while self.autoCompact:
nct = Time.dateTime()
nct = datetime.datetime.utcnow()
if nct.hour >= self.__compactTime.hour:
nct = nct + datetime.timedelta(days=1)
nct = nct.replace(
hour=self.__compactTime.hour, minute=self.__compactTime.minute, second=self.__compactTime.second
)
self.log.info("Next db compaction", "will be at %s" % nct)
sleepTime = Time.toEpoch(nct) - Time.toEpoch()
sleepTime = TimeUtilities.toEpoch(nct) - TimeUtilities.toEpoch()
time.sleep(sleepTime)
self.compactBuckets()

Expand Down Expand Up @@ -173,7 +173,7 @@ def loadPendingRecords(self):
gSynchro.unlock()
self.log.info("[PENDING] Loading pending records for insertion")
pending = 0
now = Time.toEpoch()
now = TimeUtilities.toEpoch()
recordsPerSlot = self.getCSOption("RecordsPerSlot", 100)
for typeName in self.dbCatalog:
self.log.info("[PENDING] Checking %s" % typeName)
Expand Down Expand Up @@ -534,7 +534,7 @@ def calculateBuckets(self, typeName, startTime, endTime, nowEpoch=False):
the proportional part for each bucket
"""
if not nowEpoch:
nowEpoch = int(Time.toEpoch(Time.dateTime()))
nowEpoch = int(TimeUtilities.toEpoch())
bucketTimeLength = self.calculateBucketLengthForTime(typeName, nowEpoch, startTime)
currentBucketStart = startTime - startTime % bucketTimeLength
if startTime == endTime:
Expand Down Expand Up @@ -566,7 +566,7 @@ def insertRecordBundleThroughQueue(self, recordsToQueue):
if self.__readOnly:
return S_ERROR("ReadOnly mode enabled. No modification allowed")
recordsToProcess = []
now = Time.toEpoch()
now = TimeUtilities.toEpoch()
for record in recordsToQueue:
typeName, startTime, endTime, valuesList = record
result = self.__insertInQueueTable(typeName, startTime, endTime, valuesList)
Expand All @@ -585,7 +585,8 @@ def insertRecordThroughQueue(self, typeName, startTime, endTime, valuesList):
return S_ERROR("ReadOnly mode enabled. No modification allowed")
self.log.info(
"Adding record to queue",
"for type %s\n [%s -> %s]" % (typeName, Time.fromEpoch(startTime), Time.fromEpoch(endTime)),
"for type %s\n [%s -> %s]"
% (typeName, TimeUtilities.fromEpoch(startTime), TimeUtilities.fromEpoch(endTime)),
)
if typeName not in self.dbCatalog:
return S_ERROR("Type %s has not been defined in the db" % typeName)
Expand Down Expand Up @@ -618,7 +619,9 @@ def insertRecordDirectly(self, typeName, startTime, endTime, valuesList):
if self.__readOnly:
return S_ERROR("ReadOnly mode enabled. No modification allowed")
self.log.info(
"Adding record", "for type %s\n [%s -> %s]" % (typeName, Time.fromEpoch(startTime), Time.fromEpoch(endTime))
"Adding record",
"for type %s\n [%s -> %s]"
% (typeName, TimeUtilities.fromEpoch(startTime), TimeUtilities.fromEpoch(endTime)),
)
if typeName not in self.dbCatalog:
return S_ERROR("Type %s has not been defined in the db" % typeName)
Expand Down Expand Up @@ -667,7 +670,8 @@ def deleteRecord(self, typeName, startTime, endTime, valuesList):

self.log.info(
"Deleting record",
"for type %s\n [%s -> %s]" % (typeName, Time.fromEpoch(startTime), Time.fromEpoch(endTime)),
"for type %s\n [%s -> %s]"
% (typeName, TimeUtilities.fromEpoch(startTime), TimeUtilities.fromEpoch(endTime)),
)
sqlValues = []
sqlValues.extend(valuesList)
Expand Down Expand Up @@ -950,7 +954,7 @@ def retrieveBucketedData(
)
if not retVal["OK"]:
return retVal
nowEpoch = Time.toEpoch(Time.dateTime())
nowEpoch = TimeUtilities.toEpoch()
bucketTimeLength = self.calculateBucketLengthForTime(typeName, nowEpoch, startTime)
startTime = startTime - startTime % bucketTimeLength
result = self.__queryType(
Expand Down Expand Up @@ -1121,7 +1125,7 @@ def compactBuckets(self, typeFilter=False):
else:
self.__compactBucketsForType(typeName)
self.log.info("[COMPACT] Compaction finished")
self.__lastCompactionEpoch = int(Time.toEpoch())
self.__lastCompactionEpoch = int(TimeUtilities.toEpoch())
gSynchro.lock()
try:
if self.__doingCompaction:
Expand Down Expand Up @@ -1169,7 +1173,7 @@ def __compactBucketsForType(self, typeName):
"""
Compact all buckets for a given type
"""
nowEpoch = Time.toEpoch()
nowEpoch = TimeUtilities.toEpoch()
# retVal = self.__startTransaction(connObj)
# if not retVal[ 'OK' ]:
# return retVal
Expand All @@ -1181,7 +1185,7 @@ def __compactBucketsForType(self, typeName):
nextBucketLength = self.dbBucketsLength[typeName][bPos + 1][1]
self.log.info(
"[COMPACT] Compacting data newer that %s with bucket size %s"
% (Time.fromEpoch(timeLimit), bucketLength)
% (TimeUtilities.fromEpoch(timeLimit), bucketLength)
)
# Retrieve the data
retVal = self.__selectForCompactBuckets(typeName, timeLimit, bucketLength, nextBucketLength)
Expand Down Expand Up @@ -1218,15 +1222,15 @@ def __slowCompactBucketsForType(self, typeName):
"""
Compact all buckets for a given type
"""
nowEpoch = Time.toEpoch()
nowEpoch = TimeUtilities.toEpoch()
for bPos in range(len(self.dbBucketsLength[typeName]) - 1):
self.log.info("[COMPACT] Query %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1))
secondsLimit = self.dbBucketsLength[typeName][bPos][0]
bucketLength = self.dbBucketsLength[typeName][bPos][1]
timeLimit = (nowEpoch - nowEpoch % bucketLength) - secondsLimit
self.log.info(
"[COMPACT] Compacting data newer that %s with bucket size %s for %s"
% (Time.fromEpoch(timeLimit), bucketLength, typeName)
% (TimeUtilities.fromEpoch(timeLimit), bucketLength, typeName)
)
querySize = 10000
previousRecordsSelected = querySize
Expand All @@ -1235,7 +1239,7 @@ def __slowCompactBucketsForType(self, typeName):
# Retrieve the data
self.log.info(
"[COMPACT] Retrieving buckets to compact newer that %s with size %s"
% (Time.fromEpoch(timeLimit), bucketLength)
% (TimeUtilities.fromEpoch(timeLimit), bucketLength)
)
roundStartTime = time.time()
result = self.__selectIndividualForCompactBuckets(typeName, timeLimit, bucketLength, querySize)
Expand Down Expand Up @@ -1402,7 +1406,7 @@ def regenerateBuckets(self, typeName):
sqlQueries = []
dateInclusiveConditions = []
countedField = "`%s`.`%s`" % (rawTableName, self.dbCatalog[typeName]["keys"][0])
lastTime = Time.toEpoch()
lastTime = TimeUtilities.toEpoch()
# Iterate for all ranges
for iRange, iValue in enumerate(self.dbBucketsLength[typeName]):
bucketTimeSpan = iValue[0]
Expand Down
18 changes: 9 additions & 9 deletions src/DIRAC/AccountingSystem/Service/DataStoreHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from DIRAC.AccountingSystem.DB.MultiAccountingDB import MultiAccountingDB
from DIRAC.ConfigurationSystem.Client import PathFinder
from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption
from DIRAC.Core.Utilities import Time
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.Core.Base.Client import Client

Expand Down Expand Up @@ -133,8 +133,8 @@ def export_commit(self, typeName, startTime, endTime, valuesList):
Add a record for a type
"""
setup = self.serviceInfoDict["clientSetup"]
startTime = int(Time.toEpoch(startTime))
endTime = int(Time.toEpoch(endTime))
startTime = int(TimeUtilities.toEpoch(startTime))
endTime = int(TimeUtilities.toEpoch(endTime))
return self.__acDB.insertRecordThroughQueue( # pylint: disable=no-member
setup, typeName, startTime, endTime, valuesList
)
Expand All @@ -159,8 +159,8 @@ def export_commitRegisters(self, entriesList):
return S_ERROR("Unexpected type in report")
records = []
for entry in entriesList:
startTime = int(Time.toEpoch(entry[1]))
endTime = int(Time.toEpoch(entry[2]))
startTime = int(TimeUtilities.toEpoch(entry[1]))
endTime = int(TimeUtilities.toEpoch(entry[2]))
self.log.debug("inserting", entry)
records.append((setup, entry[0], startTime, endTime, entry[3]))
return self.__acDB.insertRecordBundleThroughQueue(records)
Expand All @@ -186,8 +186,8 @@ def export_remove(self, typeName, startTime, endTime, valuesList):
Remove a record for a type
"""
setup = self.serviceInfoDict["clientSetup"]
startTime = int(Time.toEpoch(startTime))
endTime = int(Time.toEpoch(endTime))
startTime = int(TimeUtilities.toEpoch(startTime))
endTime = int(TimeUtilities.toEpoch(endTime))
return self.__acDB.deleteRecord(setup, typeName, startTime, endTime, valuesList) # pylint: disable=no-member

types_removeRegisters = [list]
Expand All @@ -206,8 +206,8 @@ def export_removeRegisters(self, entriesList):
return S_ERROR("%s field in the records should be %s" % (i, expectedTypes[i]))
ok = 0
for entry in entriesList:
startTime = int(Time.toEpoch(entry[1]))
endTime = int(Time.toEpoch(entry[2]))
startTime = int(TimeUtilities.toEpoch(entry[1]))
endTime = int(TimeUtilities.toEpoch(entry[2]))
record = entry[3]
result = self.__acDB.deleteRecord(setup, entry[0], startTime, endTime, record) # pylint: disable=no-member
if not result["OK"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from DIRAC import S_OK, S_ERROR, rootPath, gConfig, gLogger
from DIRAC.Core.Utilities.File import mkDir
from DIRAC.Core.Utilities import Time
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.AccountingSystem.DB.MultiAccountingDB import MultiAccountingDB
from DIRAC.Core.Utilities.Plotting import gDataCache
from DIRAC.AccountingSystem.private.MainReporter import MainReporter
Expand Down Expand Up @@ -76,13 +76,13 @@ def __checkPlotRequest(self, reportRequest):
return S_ERROR("Value Error")
if lastSeconds < 3600:
return S_ERROR("lastSeconds must be more than 3600")
now = Time.dateTime()
now = datetime.datetime.utcnow()
reportRequest["endTime"] = now
reportRequest["startTime"] = now - datetime.timedelta(seconds=lastSeconds)
else:
# if enddate is not there, just set it to now
if not reportRequest.get("endTime", False):
reportRequest["endTime"] = Time.dateTime()
reportRequest["endTime"] = datetime.datetime.utcnow()
# Check keys
for key in self.__reportRequestDict:
if key not in reportRequest:
Expand All @@ -94,7 +94,7 @@ def __checkPlotRequest(self, reportRequest):
% (key, str(type(reportRequest[key])), str(self.__reportRequestDict[key]))
)
if key in ("startTime", "endTime"):
reportRequest[key] = int(Time.toEpoch(reportRequest[key]))
reportRequest[key] = int(TimeUtilities.toEpoch(reportRequest[key]))

return S_OK(reportRequest)

Expand Down
4 changes: 2 additions & 2 deletions src/DIRAC/AccountingSystem/private/DBUtils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
""" Class that collects utilities used in Accounting and Monitoring systems
"""
from DIRAC.Core.Utilities import Time
from DIRAC.Core.Utilities import TimeUtilities


class DBUtils(object):
Expand Down Expand Up @@ -63,7 +63,7 @@ def _getBins(self, typeName, startTime, endTime):
return self._acDB.calculateBuckets(self._setup, typeName, startTime, endTime)

def _getBucketLengthForTime(self, typeName, momentEpoch):
nowEpoch = Time.toEpoch()
nowEpoch = TimeUtilities.toEpoch()
return self._acDB.calculateBucketLengthForTime(self._setup, typeName, nowEpoch, momentEpoch)

def _spanToGranularity(self, granularity, bucketsData):
Expand Down
8 changes: 6 additions & 2 deletions src/DIRAC/ConfigurationSystem/Client/CSAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

Most of these functions can only be done by administrators
"""

import datetime

from DIRAC import gLogger, gConfig, S_OK, S_ERROR
from DIRAC.ConfigurationSystem.Client.ConfigurationClient import ConfigurationClient
from DIRAC.Core.Utilities import List, Time
from DIRAC.Core.Utilities import List
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
from DIRAC.Core.Security import Locations
from DIRAC.ConfigurationSystem.private.Modificator import Modificator
Expand Down Expand Up @@ -100,7 +103,8 @@ def initialize(self):
self.__rpcClient = ConfigurationClient(url=gConfig.getValue("/DIRAC/Configuration/MasterServer", ""))
self.__csMod = Modificator(
self.__rpcClient,
"%s - %s - %s" % (self.__userGroup, self.__userDN, Time.dateTime().strftime("%Y-%m-%d %H:%M:%S")),
"%s - %s - %s"
% (self.__userGroup, self.__userDN, datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")),
)
retVal = self.downloadCSData()
if not retVal["OK"]:
Expand Down
7 changes: 4 additions & 3 deletions src/DIRAC/ConfigurationSystem/private/ConfigurationData.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import zipfile
import _thread
import time
import datetime
import DIRAC

from diraccfg import CFG
from DIRAC.Core.Utilities.File import mkDir
from DIRAC.Core.Utilities import List, Time
from DIRAC.Core.Utilities import List
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
from DIRAC.Core.Utilities.LockRing import LockRing
from DIRAC.FrameworkSystem.Client.Logger import gLogger
Expand Down Expand Up @@ -198,7 +199,7 @@ def deleteOptionInCFG(self, path, cfg=False):
self.sync()

def generateNewVersion(self):
self.setVersion(Time.toString())
self.setVersion(str(datetime.datetime.utcnow()))
self.sync()
gLogger.info("Generated new version %s" % self.getVersion())

Expand Down Expand Up @@ -336,7 +337,7 @@ def dumpRemoteCFGToFile(self, fileName):
def __backupCurrentConfiguration(self, backupName):
configurationFilename = "%s.cfg" % self.getName()
configurationFile = os.path.join(DIRAC.rootPath, "etc", configurationFilename)
today = Time.date()
today = datetime.datetime.utcnow().date()
backupPath = os.path.join(self.getBackupDir(), str(today.year), "%02d" % today.month)
mkDir(backupPath)
backupFile = os.path.join(backupPath, configurationFilename.replace(".cfg", ".%s.zip" % backupName))
Expand Down
Loading