Skip to content

Commit

Permalink
sweep: #7223 TransformationCleaningAgent: add Clean With RMS option
Browse files Browse the repository at this point in the history
  • Loading branch information
andresailer authored and web-flow committed Oct 11, 2023
1 parent 3d7415f commit b1128e8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import errno
import os
import re
import time
from datetime import datetime, timedelta
from hashlib import md5

# # from DIRAC
from DIRAC import S_ERROR, S_OK
Expand All @@ -24,7 +26,11 @@
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient
from DIRAC.Resources.Storage.StorageElement import StorageElement
Expand Down Expand Up @@ -74,6 +80,7 @@ def __init__(self, *args, **kwargs):
self.logSE = "LogSE"
# # enable/disable execution
self.enableFlag = "True"
self.cleanWithRMS = False

self.dataProcTTypes = ["MCSimulation", "Merge"]
self.dataManipTTypes = ["Replication", "Removal"]
Expand Down Expand Up @@ -113,6 +120,7 @@ def initialize(self):
self.logSE = Operations().getValue("/LogStorage/LogSE", self.logSE)
self.log.info(f"Will remove logs found on storage element: {self.logSE}")

self.cleanWithRMS = self.am_getOption("CleanWithRMS", self.cleanWithRMS)
# # transformation client
self.transClient = TransformationClient()
# # wms client
Expand Down Expand Up @@ -387,6 +395,11 @@ def cleanContent(self, directory):
# Executing with shifter proxy
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
failed = {}
if self.cleanWithRMS:
res = self.__submitRemovalRequests(filesFound, 0)
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true")
return res

for chunkId, filesChunk in enumerate(breakListIntoChunks(filesFound, 500)):
self.log.info("Removing chunk", chunkId)
res = DataManager().removeFile(filesChunk, force=True)
Expand Down Expand Up @@ -567,10 +580,13 @@ def cleanMetadataCatalogFiles(self, transID):
self.log.info("No files found for transID", transID)
return S_OK()

# Executing with shifter proxy
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
res = DataManager().removeFile(fileToRemove, force=True)
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true")
if self.cleanWithRMS:
res = self.__submitRemovalRequests(fileToRemove, transID)
else:
# Executing with shifter proxy
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
res = DataManager().removeFile(fileToRemove, force=True)
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true")

if not res["OK"]:
return res
Expand Down Expand Up @@ -697,3 +713,51 @@ def __removeWMSTasks(self, transJobIDs):
return S_ERROR("Failed to remove all the request from RequestDB")
self.log.info("Successfully removed all the associated failover requests")
return S_OK()

def __submitRemovalRequests(self, lfns, transID=0):
"""Create removal requests for given lfns.
:param list lfns: list of lfns to be removed
:param int transID: transformationID, only used in RequestName
:returns: S_ERROR/S_OK
"""
for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)):
oRequest = Request()
requestName = "TCA_{transID}_{index}_{md5(repr(time.time())).hexdigest()[:5]}"
oRequest.RequestName = requestName
oOperation = Operation()
oOperation.Type = "RemoveFile"
oOperation.TargetSE = "All"
resMeta = self.metadataClient.getFileMetadata(lfnList)
if not resMeta["OK"]:
self.log.error("Cannot get file metadata", resMeta["Message"])
return resMeta
if resMeta["Value"]["Failed"]:
self.log.warning(
"Could not get the file metadata of the following, so skipping them:", resMeta["Value"]["Failed"]
)

for lfn, lfnInfo in resMeta["Value"]["Successful"].items():
rarFile = File()
rarFile.LFN = lfn
rarFile.ChecksumType = "ADLER32"
rarFile.Size = lfnInfo["Size"]
rarFile.Checksum = lfnInfo["Checksum"]
rarFile.GUID = lfnInfo["GUID"]
oOperation.addFile(rarFile)

oRequest.addOperation(oOperation)
isValid = RequestValidator().validate(oRequest)
if not isValid["OK"]:
self.log.error("Request is not valid:", isValid["Message"])
return isValid
result = self.reqClient.putRequest(oRequest)
if not result["OK"]:
self.log.error("Failed to submit Request: ", result["Message"])
return result
self.log.info(
"RemoveFiles request %d submitted for %d LFNs" % (result["Value"], len(resMeta["Value"]["Successful"]))
)

# after the for loop
return S_OK()
4 changes: 4 additions & 0 deletions src/DIRAC/TransformationSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ Agents
# using the transformation owner for cleanup
shifterProxy=

# If enabled, remove files by submitting requests to the RequestManagementSystem
# instead of during the agent run
CleanWithRMS=False

# Which transformation types to clean
# If not filled, transformation types are taken from
# Operations/Transformations/DataManipulation
Expand Down

0 comments on commit b1128e8

Please sign in to comment.