Skip to content

Commit

Permalink
Merge pull request #5322 from fstagni/v7r2-fixes58
Browse files Browse the repository at this point in the history
[v7r2] Limit ES client to 7.13
  • Loading branch information
fstagni authored Aug 11, 2021
2 parents 41d51b3 + b85c7ca commit 893f60a
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 20 deletions.
1 change: 1 addition & 0 deletions environment-py3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- certifi
- cmreshandler >1.0.0b4
- docutils
- elasticsearch <7.14
- elasticsearch-dsl
- future
- gitpython >=2.1.0
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies:
- cmreshandler >1.0.0b4
- cachetools <4
- docutils
- elasticsearch <7.14
- elasticsearch-dsl ~=6.3.1
- fts-rest
- future
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ certifi
coverage
docutils
diraccfg
elasticsearch<7.14
elasticsearch-dsl~=6.3.1
CMRESHandler>=1.0.0b4
funcsigs
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ server =
# (it just installs into site-packages)
# arc
CMRESHandler
elasticsearch
elasticsearch <7.14
elasticsearch_dsl
GitPython
ldap3
Expand Down
38 changes: 23 additions & 15 deletions src/DIRAC/AccountingSystem/DB/AccountingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def loadPendingRecords(self):
emptySlots = min(100, emptySlots)
sqlTableName = _getTableName("in", typeName)
sqlFields = ['id'] + self.dbCatalog[typeName]['typeFields']
sqlCond = "WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s" % self.getWaitingRecordsLifeTime(
sqlCond = (
"WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s"
% self.getWaitingRecordsLifeTime()
)
result = self._query("SELECT %s FROM `%s` %s ORDER BY id ASC LIMIT %d" % (
", ".join(["`%s`" % f for f in sqlFields]), sqlTableName, sqlCond, emptySlots * recordsPerSlot))
Expand Down Expand Up @@ -410,7 +412,10 @@ def getRegisteredTypes(self):
"""
Get list of registered types
"""
retVal = self._query("SELECT `name`, `keyFields`, `valueFields`, `bucketsLength` FROM `%s`" % self.catalogTableName)
retVal = self._query(
"SELECT `name`, `keyFields`, `valueFields`, `bucketsLength` FROM `%s`"
% self.catalogTableName
)
if not retVal['OK']:
return retVal
typesList = []
Expand Down Expand Up @@ -1042,10 +1047,13 @@ def __queryType(
if groupFields:
try:
groupFields[0] % tuple(groupFields[1])
# We can have the case when we have multiple grouping and the fields in the select does not much the group by conditions
# We can have the case when we have multiple grouping and the fields
# in the select does not much the group by conditions
# for example: selectFields = ('%s, %s, %s, SUM(%s)', ['Site', 'startTime', 'bucketLength', 'entriesInBucket'])
# groupFields = ('%s, %s', ['startTime', 'Site'])
# in this case the correct query must be: select Site, startTime, bucketlength, sum(entriesInBucket) from xxxx where yyy Group by Site, startTime, bucketlength
# in this case the correct query must be:
# select Site, startTime, bucketlength, sum(entriesInBucket)
# from xxxx where yyy Group by Site, startTime, bucketlength
#
# When we have multiple grouping then we must have all the fields in Group by. This is from mysql 5.7.
# We have fields which are not in the groupFields and it is in selectFields
Expand Down Expand Up @@ -1230,7 +1238,7 @@ def __compactBucketsForType(self, typeName):
Compact all buckets for a given type
"""
nowEpoch = Time.toEpoch()
#retVal = self.__startTransaction( connObj )
# retVal = self.__startTransaction(connObj)
# if not retVal[ 'OK' ]:
# return retVal
for bPos in range(len(self.dbBucketsLength[typeName]) - 1):
Expand All @@ -1245,15 +1253,15 @@ def __compactBucketsForType(self, typeName):
# Retrieve the data
retVal = self.__selectForCompactBuckets(typeName, timeLimit, bucketLength, nextBucketLength)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
bucketsData = retVal['Value']
self.log.info("[COMPACT] Got %d records to compact" % len(bucketsData))
if len(bucketsData) == 0:
continue
retVal = self.__deleteForCompactBuckets(typeName, timeLimit, bucketLength)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
return retVal
self.log.info(
"[COMPACT] Compacting %s records %s seconds size for %s" %
Expand All @@ -1265,7 +1273,7 @@ def __compactBucketsForType(self, typeName):
valuesList = record[:-2]
retVal = self.__splitInBuckets(typeName, startTime, endTime, valuesList)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
self.log.error("[COMPACT] Error while compacting data for record", "%s: %s" % (typeName, retVal['Value']))
self.log.info("[COMPACT] Finished compaction %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1))
# return self.__commitTransaction( connObj )
Expand Down Expand Up @@ -1297,7 +1305,7 @@ def __slowCompactBucketsForType(self, typeName):
result = self.__selectIndividualForCompactBuckets(typeName, timeLimit, bucketLength,
querySize)
if not result['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
return result
bucketsData = result['Value']
previousRecordsSelected = len(bucketsData)
Expand All @@ -1310,7 +1318,7 @@ def __slowCompactBucketsForType(self, typeName):

result = self.__deleteIndividualForCompactBuckets(typeName, bucketsData)
if not result['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return result
bucketsData = result['Value']
deleteEndTime = time.time()
Expand All @@ -1329,7 +1337,7 @@ def __slowCompactBucketsForType(self, typeName):
self.log.info("[COMPACT] Records compacted (took %.2f secs, %.2f secs/bucket)" %
(insertElapsedTime, insertElapsedTime / len(bucketsData)))
self.log.info("[COMPACT] Finised compaction %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1))
# return self.__commitTransaction( connObj )
# return self.__commitTransaction(connObj)
return S_OK()

def __selectIndividualForCompactBuckets(self, typeName, timeLimit, bucketLength, querySize, connObj=False):
Expand Down Expand Up @@ -1416,7 +1424,7 @@ def regenerateBuckets(self, typeName):
self.__deleteRecordsOlderThanDataTimespan(typeName)
self.log.info("[REBUCKET] Done deleting old records")
rawTableName = _getTableName("type", typeName)
#retVal = self.__startTransaction( connObj )
# retVal = self.__startTransaction(connObj)
# if not retVal[ 'OK' ]:
# return retVal
self.log.info("[REBUCKET] Deleting buckets for %s" % typeName)
Expand Down Expand Up @@ -1509,7 +1517,7 @@ def regenerateBuckets(self, typeName):
retVal = self._query(sqlQuery)
if not retVal['OK']:
self.log.error("[REBUCKET] Can't retrieve data for rebucketing", retVal['Message'])
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
rawData = retVal['Value']
self.log.info("[REBUCKET] Retrieved %s records" % len(rawData))
Expand All @@ -1523,7 +1531,7 @@ def regenerateBuckets(self, typeName):
values = entry[2:]
retVal = self.__splitInBuckets(typeName, startT, endT, values)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
rebucketedRecords += 1
if rebucketedRecords % 1000 == 0:
Expand All @@ -1534,7 +1542,7 @@ def regenerateBuckets(self, typeName):
expectedEnd = str(datetime.timedelta(seconds=int((numRecords - rebucketedRecords) / blockAvg)))
self.log.info("[REBUCKET] Rebucketed %.2f%% %s (%.2f r/s block %.2f r/s query | ETA %s )..." %
(perDone, typeName, blockAvg, queryAvg, expectedEnd))
# return self.__commitTransaction( connObj )
# return self.__commitTransaction(connObj)
return S_OK()

def __startTransaction(self, connObj):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ def getInstallations(self, argss):
+ " "
+ "Name".center(20)
+ " "
+ "Module".center(20)
+ "DIRACModule".center(20)
+ " "
+ "System".center(16)
+ "DIRACSystem".center(16)
+ " "
+ "Type".center(12)
+ " "
Expand Down Expand Up @@ -558,9 +558,9 @@ def getInstallations(self, argss):
+ "|"
+ installation["Instance"].center(20)
+ "|"
+ installation["Component"]["Module"].center(20)
+ installation["Component"]["DIRACModule"].center(20)
+ "|"
+ installation["Component"]["System"].center(16)
+ installation["Component"]["DIRACSystem"].center(16)
+ "|"
+ installation["Component"]["Type"].center(12)
+ "|"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def initialize(self):
self.backends = self.am_getOption("Backends", "Accounting").replace(' ', '').split(',')
messageQueue = self.am_getOption("MessageQueue", "dirac.wmshistory")

self.log.info("Committing to %s backend" % 'and '.join(self.backends))

self.datastores = {} # For storing the clients to Accounting and Monitoring

if 'Accounting' in self.backends:
Expand Down

0 comments on commit 893f60a

Please sign in to comment.