Skip to content

Commit

Permalink
just formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Aug 10, 2021
1 parent 29a95e4 commit bf27e8c
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions src/DIRAC/AccountingSystem/DB/AccountingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def loadPendingRecords(self):
emptySlots = min(100, emptySlots)
sqlTableName = _getTableName("in", typeName)
sqlFields = ['id'] + self.dbCatalog[typeName]['typeFields']
sqlCond = "WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s" % self.getWaitingRecordsLifeTime(
sqlCond = (
"WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s"
% self.getWaitingRecordsLifeTime()
)
result = self._query("SELECT %s FROM `%s` %s ORDER BY id ASC LIMIT %d" % (
", ".join(["`%s`" % f for f in sqlFields]), sqlTableName, sqlCond, emptySlots * recordsPerSlot))
Expand Down Expand Up @@ -410,7 +412,10 @@ def getRegisteredTypes(self):
"""
Get list of registered types
"""
retVal = self._query("SELECT `name`, `keyFields`, `valueFields`, `bucketsLength` FROM `%s`" % self.catalogTableName)
retVal = self._query(
"SELECT `name`, `keyFields`, `valueFields`, `bucketsLength` FROM `%s`"
% self.catalogTableName
)
if not retVal['OK']:
return retVal
typesList = []
Expand Down Expand Up @@ -1042,10 +1047,13 @@ def __queryType(
if groupFields:
try:
groupFields[0] % tuple(groupFields[1])
# We can have the case when we have multiple grouping and the fields in the select does not much the group by conditions
# We can have the case when we have multiple grouping and the fields
# in the select does not much the group by conditions
# for example: selectFields = ('%s, %s, %s, SUM(%s)', ['Site', 'startTime', 'bucketLength', 'entriesInBucket'])
# groupFields = ('%s, %s', ['startTime', 'Site'])
# in this case the correct query must be: select Site, startTime, bucketlength, sum(entriesInBucket) from xxxx where yyy Group by Site, startTime, bucketlength
# in this case the correct query must be:
# select Site, startTime, bucketlength, sum(entriesInBucket)
# from xxxx where yyy Group by Site, startTime, bucketlength
#
# When we have multiple grouping then we must have all the fields in Group by. This is from mysql 5.7.
# We have fields which are not in the groupFields and it is in selectFields
Expand Down Expand Up @@ -1230,7 +1238,7 @@ def __compactBucketsForType(self, typeName):
Compact all buckets for a given type
"""
nowEpoch = Time.toEpoch()
#retVal = self.__startTransaction( connObj )
# retVal = self.__startTransaction(connObj)
# if not retVal[ 'OK' ]:
# return retVal
for bPos in range(len(self.dbBucketsLength[typeName]) - 1):
Expand All @@ -1245,15 +1253,15 @@ def __compactBucketsForType(self, typeName):
# Retrieve the data
retVal = self.__selectForCompactBuckets(typeName, timeLimit, bucketLength, nextBucketLength)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
bucketsData = retVal['Value']
self.log.info("[COMPACT] Got %d records to compact" % len(bucketsData))
if len(bucketsData) == 0:
continue
retVal = self.__deleteForCompactBuckets(typeName, timeLimit, bucketLength)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
return retVal
self.log.info(
"[COMPACT] Compacting %s records %s seconds size for %s" %
Expand All @@ -1265,7 +1273,7 @@ def __compactBucketsForType(self, typeName):
valuesList = record[:-2]
retVal = self.__splitInBuckets(typeName, startTime, endTime, valuesList)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
self.log.error("[COMPACT] Error while compacting data for record", "%s: %s" % (typeName, retVal['Value']))
self.log.info("[COMPACT] Finished compaction %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1))
# return self.__commitTransaction( connObj )
Expand Down Expand Up @@ -1297,7 +1305,7 @@ def __slowCompactBucketsForType(self, typeName):
result = self.__selectIndividualForCompactBuckets(typeName, timeLimit, bucketLength,
querySize)
if not result['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
return result
bucketsData = result['Value']
previousRecordsSelected = len(bucketsData)
Expand All @@ -1310,7 +1318,7 @@ def __slowCompactBucketsForType(self, typeName):

result = self.__deleteIndividualForCompactBuckets(typeName, bucketsData)
if not result['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return result
bucketsData = result['Value']
deleteEndTime = time.time()
Expand All @@ -1329,7 +1337,7 @@ def __slowCompactBucketsForType(self, typeName):
self.log.info("[COMPACT] Records compacted (took %.2f secs, %.2f secs/bucket)" %
(insertElapsedTime, insertElapsedTime / len(bucketsData)))
self.log.info("[COMPACT] Finised compaction %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1))
# return self.__commitTransaction( connObj )
# return self.__commitTransaction(connObj)
return S_OK()

def __selectIndividualForCompactBuckets(self, typeName, timeLimit, bucketLength, querySize, connObj=False):
Expand Down Expand Up @@ -1416,7 +1424,7 @@ def regenerateBuckets(self, typeName):
self.__deleteRecordsOlderThanDataTimespan(typeName)
self.log.info("[REBUCKET] Done deleting old records")
rawTableName = _getTableName("type", typeName)
#retVal = self.__startTransaction( connObj )
# retVal = self.__startTransaction(connObj)
# if not retVal[ 'OK' ]:
# return retVal
self.log.info("[REBUCKET] Deleting buckets for %s" % typeName)
Expand Down Expand Up @@ -1509,7 +1517,7 @@ def regenerateBuckets(self, typeName):
retVal = self._query(sqlQuery)
if not retVal['OK']:
self.log.error("[REBUCKET] Can't retrieve data for rebucketing", retVal['Message'])
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
rawData = retVal['Value']
self.log.info("[REBUCKET] Retrieved %s records" % len(rawData))
Expand All @@ -1523,7 +1531,7 @@ def regenerateBuckets(self, typeName):
values = entry[2:]
retVal = self.__splitInBuckets(typeName, startT, endT, values)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
rebucketedRecords += 1
if rebucketedRecords % 1000 == 0:
Expand All @@ -1534,7 +1542,7 @@ def regenerateBuckets(self, typeName):
expectedEnd = str(datetime.timedelta(seconds=int((numRecords - rebucketedRecords) / blockAvg)))
self.log.info("[REBUCKET] Rebucketed %.2f%% %s (%.2f r/s block %.2f r/s query | ETA %s )..." %
(perDone, typeName, blockAvg, queryAvg, expectedEnd))
# return self.__commitTransaction( connObj )
# return self.__commitTransaction(connObj)
return S_OK()

def __startTransaction(self, connObj):
Expand Down

0 comments on commit bf27e8c

Please sign in to comment.