diff --git a/environment-py3.yml b/environment-py3.yml index 0d220ab5270..36a04625788 100644 --- a/environment-py3.yml +++ b/environment-py3.yml @@ -16,6 +16,7 @@ dependencies: - certifi - cmreshandler >1.0.0b4 - docutils + - elasticsearch <7.14 - elasticsearch-dsl - future - gitpython >=2.1.0 diff --git a/environment.yml b/environment.yml index 6b486b6d04b..5cd02f87ef2 100644 --- a/environment.yml +++ b/environment.yml @@ -14,6 +14,7 @@ dependencies: - cmreshandler >1.0.0b4 - cachetools <4 - docutils + - elasticsearch <7.14 - elasticsearch-dsl ~=6.3.1 - fts-rest - future diff --git a/requirements.txt b/requirements.txt index 5cfbf3c04fb..7ae47e48981 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,7 @@ certifi coverage docutils diraccfg +elasticsearch<7.14 elasticsearch-dsl~=6.3.1 CMRESHandler>=1.0.0b4 funcsigs diff --git a/setup.cfg b/setup.cfg index d9162961a4f..77a033370c9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -64,7 +64,7 @@ server = # (it just installs into site-packages) # arc CMRESHandler - elasticsearch + elasticsearch <7.14 elasticsearch_dsl GitPython ldap3 diff --git a/src/DIRAC/AccountingSystem/DB/AccountingDB.py b/src/DIRAC/AccountingSystem/DB/AccountingDB.py index 2252f405e25..5865f6f9c80 100644 --- a/src/DIRAC/AccountingSystem/DB/AccountingDB.py +++ b/src/DIRAC/AccountingSystem/DB/AccountingDB.py @@ -209,7 +209,9 @@ def loadPendingRecords(self): emptySlots = min(100, emptySlots) sqlTableName = _getTableName("in", typeName) sqlFields = ['id'] + self.dbCatalog[typeName]['typeFields'] - sqlCond = "WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s" % self.getWaitingRecordsLifeTime( + sqlCond = ( + "WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s" + % self.getWaitingRecordsLifeTime() ) result = self._query("SELECT %s FROM `%s` %s ORDER BY id ASC LIMIT %d" % ( ", ".join(["`%s`" % f for f in sqlFields]), sqlTableName, sqlCond, emptySlots * recordsPerSlot)) @@ -410,7 +412,10 @@ def getRegisteredTypes(self): """ Get list of registered types """ - retVal = self._query("SELECT `name`, `keyFields`, `valueFields`, `bucketsLength` FROM `%s`" % self.catalogTableName) + retVal = self._query( + "SELECT `name`, `keyFields`, `valueFields`, `bucketsLength` FROM `%s`" + % self.catalogTableName + ) if not retVal['OK']: return retVal typesList = [] @@ -1042,10 +1047,13 @@ def __queryType( if groupFields: try: groupFields[0] % tuple(groupFields[1]) - # We can have the case when we have multiple grouping and the fields in the select does not much the group by conditions + # We can have the case when we have multiple grouping and the fields + # in the select does not much the group by conditions # for example: selectFields = ('%s, %s, %s, SUM(%s)', ['Site', 'startTime', 'bucketLength', 'entriesInBucket']) # groupFields = ('%s, %s', ['startTime', 'Site']) - # in this case the correct query must be: select Site, startTime, bucketlength, sum(entriesInBucket) from xxxx where yyy Group by Site, startTime, bucketlength + # in this case the correct query must be: + # select Site, startTime, bucketlength, sum(entriesInBucket) + # from xxxx where yyy Group by Site, startTime, bucketlength # # When we have multiple grouping then we must have all the fields in Group by. This is from mysql 5.7. # We have fields which are not in the groupFields and it is in selectFields @@ -1230,7 +1238,7 @@ def __compactBucketsForType(self, typeName): Compact all buckets for a given type """ nowEpoch = Time.toEpoch() - #retVal = self.__startTransaction( connObj ) + # retVal = self.__startTransaction(connObj) # if not retVal[ 'OK' ]: # return retVal for bPos in range(len(self.dbBucketsLength[typeName]) - 1): @@ -1245,7 +1253,7 @@ def __compactBucketsForType(self, typeName): # Retrieve the data retVal = self.__selectForCompactBuckets(typeName, timeLimit, bucketLength, nextBucketLength) if not retVal['OK']: - #self.__rollbackTransaction( connObj ) + # self.__rollbackTransaction(connObj) return retVal bucketsData = retVal['Value'] self.log.info("[COMPACT] Got %d records to compact" % len(bucketsData)) @@ -1253,7 +1261,7 @@ def __compactBucketsForType(self, typeName): continue retVal = self.__deleteForCompactBuckets(typeName, timeLimit, bucketLength) if not retVal['OK']: - #self.__rollbackTransaction( connObj ) + # self.__rollbackTransaction( connObj ) return retVal self.log.info( "[COMPACT] Compacting %s records %s seconds size for %s" % @@ -1265,7 +1273,7 @@ def __compactBucketsForType(self, typeName): valuesList = record[:-2] retVal = self.__splitInBuckets(typeName, startTime, endTime, valuesList) if not retVal['OK']: - #self.__rollbackTransaction( connObj ) + # self.__rollbackTransaction( connObj ) self.log.error("[COMPACT] Error while compacting data for record", "%s: %s" % (typeName, retVal['Value'])) self.log.info("[COMPACT] Finished compaction %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1)) # return self.__commitTransaction( connObj ) @@ -1297,7 +1305,7 @@ def __slowCompactBucketsForType(self, typeName): result = self.__selectIndividualForCompactBuckets(typeName, timeLimit, bucketLength, querySize) if not result['OK']: - #self.__rollbackTransaction( connObj ) + # self.__rollbackTransaction( connObj ) return result bucketsData = result['Value'] previousRecordsSelected = len(bucketsData) @@ -1310,7 +1318,7 @@ def __slowCompactBucketsForType(self, typeName): result = self.__deleteIndividualForCompactBuckets(typeName, bucketsData) if not result['OK']: - #self.__rollbackTransaction( connObj ) + # self.__rollbackTransaction(connObj) return result bucketsData = result['Value'] deleteEndTime = time.time() @@ -1329,7 +1337,7 @@ def __slowCompactBucketsForType(self, typeName): self.log.info("[COMPACT] Records compacted (took %.2f secs, %.2f secs/bucket)" % (insertElapsedTime, insertElapsedTime / len(bucketsData))) self.log.info("[COMPACT] Finised compaction %d of %d" % (bPos, len(self.dbBucketsLength[typeName]) - 1)) - # return self.__commitTransaction( connObj ) + # return self.__commitTransaction(connObj) return S_OK() def __selectIndividualForCompactBuckets(self, typeName, timeLimit, bucketLength, querySize, connObj=False): @@ -1416,7 +1424,7 @@ def regenerateBuckets(self, typeName): self.__deleteRecordsOlderThanDataTimespan(typeName) self.log.info("[REBUCKET] Done deleting old records") rawTableName = _getTableName("type", typeName) - #retVal = self.__startTransaction( connObj ) + # retVal = self.__startTransaction(connObj) # if not retVal[ 'OK' ]: # return retVal self.log.info("[REBUCKET] Deleting buckets for %s" % typeName) @@ -1509,7 +1517,7 @@ def regenerateBuckets(self, typeName): retVal = self._query(sqlQuery) if not retVal['OK']: self.log.error("[REBUCKET] Can't retrieve data for rebucketing", retVal['Message']) - #self.__rollbackTransaction( connObj ) + # self.__rollbackTransaction(connObj) return retVal rawData = retVal['Value'] self.log.info("[REBUCKET] Retrieved %s records" % len(rawData)) @@ -1523,7 +1531,7 @@ def regenerateBuckets(self, typeName): values = entry[2:] retVal = self.__splitInBuckets(typeName, startT, endT, values) if not retVal['OK']: - #self.__rollbackTransaction( connObj ) + # self.__rollbackTransaction(connObj) return retVal rebucketedRecords += 1 if rebucketedRecords % 1000 == 0: @@ -1534,7 +1542,7 @@ def regenerateBuckets(self, typeName): expectedEnd = str(datetime.timedelta(seconds=int((numRecords - rebucketedRecords) / blockAvg))) self.log.info("[REBUCKET] Rebucketed %.2f%% %s (%.2f r/s block %.2f r/s query | ETA %s )..." % (perDone, typeName, blockAvg, queryAvg, expectedEnd)) - # return self.__commitTransaction( connObj ) + # return self.__commitTransaction(connObj) return S_OK() def __startTransaction(self, connObj): diff --git a/src/DIRAC/FrameworkSystem/Client/SystemAdministratorClientCLI.py b/src/DIRAC/FrameworkSystem/Client/SystemAdministratorClientCLI.py index d4eedca6e76..9a5835f150c 100644 --- a/src/DIRAC/FrameworkSystem/Client/SystemAdministratorClientCLI.py +++ b/src/DIRAC/FrameworkSystem/Client/SystemAdministratorClientCLI.py @@ -515,9 +515,9 @@ def getInstallations(self, argss): + " " + "Name".center(20) + " " - + "Module".center(20) + + "DIRACModule".center(20) + " " - + "System".center(16) + + "DIRACSystem".center(16) + " " + "Type".center(12) + " " @@ -558,9 +558,9 @@ def getInstallations(self, argss): + "|" + installation["Instance"].center(20) + "|" - + installation["Component"]["Module"].center(20) + + installation["Component"]["DIRACModule"].center(20) + "|" - + installation["Component"]["System"].center(16) + + installation["Component"]["DIRACSystem"].center(16) + "|" + installation["Component"]["Type"].center(12) + "|" diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py index bfb4d0c01c9..aada3e9ae17 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py @@ -52,6 +52,8 @@ def initialize(self): self.backends = self.am_getOption("Backends", "Accounting").replace(' ', '').split(',') messageQueue = self.am_getOption("MessageQueue", "dirac.wmshistory") + self.log.info("Committing to %s backend" % 'and '.join(self.backends)) + self.datastores = {} # For storing the clients to Accounting and Monitoring if 'Accounting' in self.backends: