Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v7r2] Limit ES client to 7.13 #5322

Merged
merged 4 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment-py3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- certifi
- cmreshandler >1.0.0b4
- docutils
- elasticsearch <7.14
- elasticsearch-dsl
- future
- gitpython >=2.1.0
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies:
- cmreshandler >1.0.0b4
- cachetools <4
- docutils
- elasticsearch <7.14
- elasticsearch-dsl ~=6.3.1
- fts-rest
- future
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ certifi
coverage
docutils
diraccfg
elasticsearch<7.14
elasticsearch-dsl~=6.3.1
CMRESHandler>=1.0.0b4
funcsigs
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ server =
# (it just installs into site-packages)
# arc
CMRESHandler
elasticsearch
elasticsearch <7.14
elasticsearch_dsl
GitPython
ldap3
Expand Down
38 changes: 23 additions & 15 deletions src/DIRAC/AccountingSystem/DB/AccountingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def loadPendingRecords(self):
emptySlots = min(100, emptySlots)
sqlTableName = _getTableName("in", typeName)
sqlFields = ['id'] + self.dbCatalog[typeName]['typeFields']
sqlCond = "WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s" % self.getWaitingRecordsLifeTime(
sqlCond = (
"WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s"
% self.getWaitingRecordsLifeTime()
)
result = self._query("SELECT %s FROM `%s` %s ORDER BY id ASC LIMIT %d" % (
", ".join(["`%s`" % f for f in sqlFields]), sqlTableName, sqlCond, emptySlots * recordsPerSlot))
Expand Down Expand Up @@ -410,7 +412,10 @@ def getRegisteredTypes(self):
"""
Get list of registered types
"""
retVal = self._query("SELECT `name`, `keyFields`, `valueFields`, `bucketsLength` FROM `%s`" % self.catalogTableName)
retVal = self._query(
"SELECT `name`, `keyFields`, `valueFields`, `bucketsLength` FROM `%s`"
% self.catalogTableName
)
if not retVal['OK']:
return retVal
typesList = []
Expand Down Expand Up @@ -1042,10 +1047,13 @@ def __queryType(
if groupFields:
try:
groupFields[0] % tuple(groupFields[1])
# We can have the case when we have multiple grouping and the fields in the select does not much the group by conditions
# We can have the case when we have multiple grouping and the fields
# in the select does not much the group by conditions
# for example: selectFields = ('%s, %s, %s, SUM(%s)', ['Site', 'startTime', 'bucketLength', 'entriesInBucket'])
# groupFields = ('%s, %s', ['startTime', 'Site'])
# in this case the correct query must be: select Site, startTime, bucketlength, sum(entriesInBucket) from xxxx where yyy Group by Site, startTime, bucketlength
# in this case the correct query must be:
# select Site, startTime, bucketlength, sum(entriesInBucket)
# from xxxx where yyy Group by Site, startTime, bucketlength
#
# When we have multiple grouping then we must have all the fields in Group by. This is from mysql 5.7.
# We have fields which are not in the groupFields and it is in selectFields
Expand Down Expand Up @@ -1230,7 +1238,7 @@ def __compactBucketsForType(self, typeName):
Compact all buckets for a given type
"""
nowEpoch = Time.toEpoch()
#retVal = self.__startTransaction( connObj )
# retVal = self.__startTransaction(connObj)
# if not retVal[ 'OK' ]:
# return retVal
for bPos in range(len(self.dbBucketsLength[typeName]) - 1):
Expand All @@ -1245,15 +1253,15 @@ def __compactBucketsForType(self, typeName):
# Retrieve the data
retVal = self.__selectForCompactBuckets(typeName, timeLimit, bucketLength, nextBucketLength)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
bucketsData = retVal['Value']
self.log.info("[COMPACT] Got %d records to compact" % len(bucketsData))
if len(bucketsData) == 0:
continue
retVal = self.__deleteForCompactBuckets(typeName, timeLimit, bucketLength)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
return retVal
self.log.info(
"[COMPACT] Compacting %s records %s seconds size for %s" %
Expand All @@ -1265,7 +1273,7 @@ def __compactBucketsForType(self, typeName):
valuesList = record[:-2]
retVal = self.__splitInBuckets(typeName, startTime, endTime, valuesList)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
self.log.error("[COMPACT] Error while compacting data for record", "%s: %s" % (typeName, retVal['Value']))
self.log.info("[COMPACT] Finished compaction %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1))
# return self.__commitTransaction( connObj )
Expand Down Expand Up @@ -1297,7 +1305,7 @@ def __slowCompactBucketsForType(self, typeName):
result = self.__selectIndividualForCompactBuckets(typeName, timeLimit, bucketLength,
querySize)
if not result['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction( connObj )
return result
bucketsData = result['Value']
previousRecordsSelected = len(bucketsData)
Expand All @@ -1310,7 +1318,7 @@ def __slowCompactBucketsForType(self, typeName):

result = self.__deleteIndividualForCompactBuckets(typeName, bucketsData)
if not result['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return result
bucketsData = result['Value']
deleteEndTime = time.time()
Expand All @@ -1329,7 +1337,7 @@ def __slowCompactBucketsForType(self, typeName):
self.log.info("[COMPACT] Records compacted (took %.2f secs, %.2f secs/bucket)" %
(insertElapsedTime, insertElapsedTime / len(bucketsData)))
self.log.info("[COMPACT] Finised compaction %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1))
# return self.__commitTransaction( connObj )
# return self.__commitTransaction(connObj)
return S_OK()

def __selectIndividualForCompactBuckets(self, typeName, timeLimit, bucketLength, querySize, connObj=False):
Expand Down Expand Up @@ -1416,7 +1424,7 @@ def regenerateBuckets(self, typeName):
self.__deleteRecordsOlderThanDataTimespan(typeName)
self.log.info("[REBUCKET] Done deleting old records")
rawTableName = _getTableName("type", typeName)
#retVal = self.__startTransaction( connObj )
# retVal = self.__startTransaction(connObj)
# if not retVal[ 'OK' ]:
# return retVal
self.log.info("[REBUCKET] Deleting buckets for %s" % typeName)
Expand Down Expand Up @@ -1509,7 +1517,7 @@ def regenerateBuckets(self, typeName):
retVal = self._query(sqlQuery)
if not retVal['OK']:
self.log.error("[REBUCKET] Can't retrieve data for rebucketing", retVal['Message'])
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
rawData = retVal['Value']
self.log.info("[REBUCKET] Retrieved %s records" % len(rawData))
Expand All @@ -1523,7 +1531,7 @@ def regenerateBuckets(self, typeName):
values = entry[2:]
retVal = self.__splitInBuckets(typeName, startT, endT, values)
if not retVal['OK']:
#self.__rollbackTransaction( connObj )
# self.__rollbackTransaction(connObj)
return retVal
rebucketedRecords += 1
if rebucketedRecords % 1000 == 0:
Expand All @@ -1534,7 +1542,7 @@ def regenerateBuckets(self, typeName):
expectedEnd = str(datetime.timedelta(seconds=int((numRecords - rebucketedRecords) / blockAvg)))
self.log.info("[REBUCKET] Rebucketed %.2f%% %s (%.2f r/s block %.2f r/s query | ETA %s )..." %
(perDone, typeName, blockAvg, queryAvg, expectedEnd))
# return self.__commitTransaction( connObj )
# return self.__commitTransaction(connObj)
return S_OK()

def __startTransaction(self, connObj):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ def getInstallations(self, argss):
+ " "
+ "Name".center(20)
+ " "
+ "Module".center(20)
+ "DIRACModule".center(20)
+ " "
+ "System".center(16)
+ "DIRACSystem".center(16)
+ " "
+ "Type".center(12)
+ " "
Expand Down Expand Up @@ -558,9 +558,9 @@ def getInstallations(self, argss):
+ "|"
+ installation["Instance"].center(20)
+ "|"
+ installation["Component"]["Module"].center(20)
+ installation["Component"]["DIRACModule"].center(20)
+ "|"
+ installation["Component"]["System"].center(16)
+ installation["Component"]["DIRACSystem"].center(16)
+ "|"
+ installation["Component"]["Type"].center(12)
+ "|"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def initialize(self):
self.backends = self.am_getOption("Backends", "Accounting").replace(' ', '').split(',')
messageQueue = self.am_getOption("MessageQueue", "dirac.wmshistory")

self.log.info("Committing to %s backend" % 'and '.join(self.backends))

self.datastores = {} # For storing the clients to Accounting and Monitoring

if 'Accounting' in self.backends:
Expand Down