Skip to content

Commit

Permalink
feat: removed external SandboxSE functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Feb 6, 2024
1 parent 80ea404 commit 57f997b
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,8 @@ Some extra options are required to configure this service:
+---------------------------+----------------------------------------------+-----------------------------------------+
| **Name** | **Description** | **Example** |
+---------------------------+----------------------------------------------+-----------------------------------------+
| *Backend* | | Backend = local |
+---------------------------+----------------------------------------------+-----------------------------------------+
| *BasePath* | Base path where the files are stored | BasePath = /opt/dirac/storage/sandboxes |
| | task queues in the system | |
+---------------------------+----------------------------------------------+-----------------------------------------+
| *DelayedExternalDeletion* | Boolean used to define if the external | DelayedExternalDeletion = True |
| | deletion must be done | |
+---------------------------+----------------------------------------------+-----------------------------------------+
| *MaxSandboxSize* | Maximum size of sanbox files expressed in MB | MaxSandboxSize = 10 |
+---------------------------+----------------------------------------------+-----------------------------------------+
| *SandboxPrefix* | Path prefix where sandbox are stored | SandboxPrefix = Sandbox |
+---------------------------+----------------------------------------------+-----------------------------------------+
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ def __getTransferClient(self):
return self.__transferClient
return TransferClient(self.__serviceName, **self.__kwargs)

# Upload sandbox to jobs and pilots

def uploadFilesAsSandboxForJob(self, fileList, jobId, sbType, sizeLimit=0):
"""Upload SB for a job"""
if sbType not in self.__validSandboxTypes:
return S_ERROR(f"Invalid Sandbox type {sbType}")
return self.uploadFilesAsSandbox(fileList, sizeLimit, assignTo={f"Job:{jobId}": sbType})

# Upload generic sandbox

def uploadFilesAsSandbox(self, fileList, sizeLimit=0, assignTo=None):
Expand Down
8 changes: 0 additions & 8 deletions src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,8 @@ Services
Port = 9196
LocalSE = ProductionSandboxSE
MaxThreads = 200
toClientMaxThreads = 100
Backend = local
MaxSandboxSizeMiB = 10
SandboxPrefix = Sandbox
BasePath = /opt/dirac/storage/sandboxes
DelayedExternalDeletion = True
# If true, uploads the sandbox via diracx on an S3 storage
UseDiracXBackend = False
Authorization
Expand All @@ -160,12 +156,8 @@ Services
Protocol = https
LocalSE = ProductionSandboxSE
MaxThreads = 200
toClientMaxThreads = 100
Backend = local
MaxSandboxSizeMiB = 10
SandboxPrefix = Sandbox
BasePath = /opt/dirac/storage/sandboxes
DelayedExternalDeletion = True
Authorization
{
Default = authenticated
Expand Down
186 changes: 31 additions & 155 deletions src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,21 @@
"""
import hashlib
import os
import requests
import tempfile
import threading
import time

from DIRAC import S_ERROR, S_OK, gLogger, gConfig
import requests
from diracx.client.models import SandboxInfo

from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Security import Locations, Properties, X509Certificate
from DIRAC.Core.Utilities.File import mkDir
from DIRAC.Core.Security import Properties
from DIRAC.Core.Utilities.File import getGlobbedTotalSize, mkDir
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.DataManagementSystem.Service.StorageElementHandler import getDiskSpace
from DIRAC.FrameworkSystem.Utilities.diracx import TheImpersonator
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.Core.Utilities.File import getGlobbedTotalSize

from diracx.client.models import SandboxInfo


class SandboxStoreHandlerMixin:
Expand All @@ -52,34 +45,24 @@ def initializeHandler(cls, serviceInfoDict):
return S_OK()

def initializeRequest(self):
self.__backend = self.getCSOption("Backend", "local")
self.__localSEName = self.getCSOption("LocalSE", "SandboxSE")
self._useDiracXBackend = self.getCSOption("UseDiracXBackend", False)
self._maxUploadBytes = self.getCSOption("MaxSandboxSizeMiB", 10) * 1048576
if self.__backend.lower() == "local" or self.__backend == self.__localSEName:
self.__useLocalStorage = True
self.__seNameToUse = self.__localSEName
else:
self.__useLocalStorage = False
self.__externalSEName = self.__backend
self.__seNameToUse = self.__backend
# Execute the purge once every 1000 calls
SandboxStoreHandler.__purgeCount += 1
if SandboxStoreHandler.__purgeCount > self.getCSOption("QueriesBeforePurge", 1000):
if SandboxStoreHandler.__purgeCount > 1000:
SandboxStoreHandler.__purgeCount = 0
if SandboxStoreHandler.__purgeCount == 0:
threading.Thread(target=self.purgeUnusedSandboxes).start()

def __getSandboxPath(self, md5):
"""Generate the sandbox path"""
# prefix = self.getCSOption( "SandboxPrefix", "SandBox" )
prefix = "SandBox"
credDict = self.getRemoteCredentials()
if Properties.JOB_SHARING in credDict["properties"]:
idField = credDict["group"]
else:
idField = f"{credDict['username']}.{credDict['group']}"
pathItems = ["/", prefix, idField[0], idField]
pathItems = ["/", "SandBox", idField[0], idField]
pathItems.extend([md5[0:3], md5[3:6], md5])
return os.path.join(*pathItems)

Expand Down Expand Up @@ -162,28 +145,21 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
return S_OK(res.pfn)

sbPath = self.__getSandboxPath(f"{aHash}.{extension}")
# Generate the location
result = self.__generateLocation(sbPath)
if not result["OK"]:
return result
seName, sePFN = result["Value"]

result = self.sandboxDB.getSandboxId(seName, sePFN, credDict["username"], credDict["group"])
result = self.sandboxDB.getSandboxId(self.__localSEName, sbPath, credDict["username"], credDict["group"])
if result["OK"]:
gLogger.info("Sandbox already exists. Skipping upload")
if fileHelper:
fileHelper.markAsTransferred()
sbURL = f"SB:{seName}|{sePFN}"
sbURL = f"SB:{self.__localSEName}|sbPath"
assignTo = {key: [(sbURL, assignTo[key])] for key in assignTo}
result = self.export_assignSandboxesToEntities(assignTo)
if not result["OK"]:
return result
return S_OK(sbURL)

if self.__useLocalStorage:
hdPath = self.__sbToHDPath(sbPath)
else:
hdPath = False
hdPath = self.__sbToHDPath(sbPath)

# Write to local file

if fileHelper:
Expand Down Expand Up @@ -217,29 +193,21 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
self.__secureUnlinkFile(hdPath)
gLogger.error("Hashes don't match! Client defined hash is different with received data hash!")
return S_ERROR("Hashes don't match!")
# If using remote storage, copy there!
if not self.__useLocalStorage:
gLogger.info("Uploading sandbox to external storage")
result = self.__copyToExternalSE(hdPath, sbPath)
self.__secureUnlinkFile(hdPath)
if not result["OK"]:
return result
sbPath = result["Value"][1]
# Register!
gLogger.info("Registering sandbox in the DB with", f"SB:{self.__seNameToUse}|{sbPath}")
gLogger.info("Registering sandbox in the DB with", f"SB:{self.__localSEName}|{sbPath}")
fSize = getGlobbedTotalSize(hdPath)
result = self.sandboxDB.registerAndGetSandbox(
credDict["username"],
credDict["group"],
self.__seNameToUse,
self.__localSEName,
sbPath,
fSize,
)
if not result["OK"]:
self.__secureUnlinkFile(hdPath)
return result

sbURL = f"SB:{self.__seNameToUse}|{sbPath}"
sbURL = f"SB:{self.__localSEName}|{sbPath}"
assignTo = {key: [(sbURL, assignTo[key])] for key in assignTo}
result = self.export_assignSandboxesToEntities(assignTo)
if not result["OK"]:
Expand All @@ -266,19 +234,14 @@ def transfer_bulkFromClient(self, fileId, token, _fileSize, fileHelper):
extension = fileId[fileId.find(".tar") + 1 :]
sbPath = f"{self.__getSandboxPath(fileHelper.getHash())}.{extension}"
gLogger.info("Sandbox path will be", sbPath)
# Generate the location
result = self.__generateLocation(sbPath)
if not result["OK"]:
return result
seName, sePFN = result["Value"]
# Register in DB
credDict = self.getRemoteCredentials()
result = self.sandboxDB.getSandboxId(seName, sePFN, credDict["username"], credDict["group"])
result = self.sandboxDB.getSandboxId(self.__localSEName, sbPath, credDict["username"], credDict["group"])
if result["OK"]:
return S_OK(f"SB:{seName}|{sePFN}")
return S_OK(f"SB:{self.__localSEName}|{sbPath}")

result = self.sandboxDB.registerAndGetSandbox(
credDict["username"], credDict["group"], seName, sePFN, fileHelper.getTransferedBytes()
credDict["username"], credDict["group"], self.__localSEName, sbPath, fileHelper.getTransferedBytes()
)
if not result["OK"]:
self.__secureUnlinkFile(tmpFilePath)
Expand All @@ -296,28 +259,7 @@ def transfer_bulkFromClient(self, fileId, token, _fileSize, fileHelper):

# Unlink temporal file if it's there
self.__secureUnlinkFile(tmpFilePath)
return S_OK(f"SB:{seName}|{sePFN}")

def __generateLocation(self, sbPath):
"""
Generate the location string
"""
if self.__useLocalStorage:
return S_OK((self.__localSEName, sbPath))
# It's external storage
storageElement = StorageElement(self.__externalSEName)
res = storageElement.isValid()
if not res["OK"]:
errStr = "Failed to instantiate destination StorageElement"
gLogger.error(errStr, self.__externalSEName)
return S_ERROR(errStr)
result = storageElement.getURL(sbPath)
if not result["OK"] or sbPath not in result["Value"]["Successful"]:
errStr = "Failed to generate PFN"
gLogger.error(errStr, self.__externalSEName)
return S_ERROR(errStr)
destPfn = result["Value"]["Successful"][sbPath]
return S_OK((self.__externalSEName, destPfn))
return S_OK(f"SB:{self.__localSEName}|{sbPath}")

def __sbToHDPath(self, sbPath):
while sbPath and sbPath[0] == "/":
Expand Down Expand Up @@ -364,46 +306,22 @@ def __secureUnlinkFile(self, filePath):
return True

def __moveToFinalLocation(self, localFilePath, sbPath):
if self.__useLocalStorage:
hdFilePath = self.__sbToHDPath(sbPath)
result = S_OK((self.__localSEName, sbPath))
if os.path.isfile(hdFilePath):
gLogger.info("There was already a sandbox with that name, skipping copy", sbPath)
else:
hdDirPath = os.path.dirname(hdFilePath)
mkDir(hdDirPath)
try:
os.rename(localFilePath, hdFilePath)
except OSError as e:
errMsg = "Cannot move temporal file to final path"
gLogger.error(errMsg, repr(e).replace(",)", ")"))
result = S_ERROR(errMsg)
hdFilePath = self.__sbToHDPath(sbPath)
result = S_OK((self.__localSEName, sbPath))
if os.path.isfile(hdFilePath):
gLogger.info("There was already a sandbox with that name, skipping copy", sbPath)
else:
result = self.__copyToExternalSE(localFilePath, sbPath)
hdDirPath = os.path.dirname(hdFilePath)
mkDir(hdDirPath)
try:
os.rename(localFilePath, hdFilePath)
except OSError as e:
errMsg = "Cannot move temporal file to final path"
gLogger.error(errMsg, repr(e).replace(",)", ")"))
result = S_ERROR(errMsg)

return result

def __copyToExternalSE(self, localFilePath, sbPath):
"""
Copy uploaded file to external SE
"""
try:
dm = DataManager()
result = dm.put(sbPath, localFilePath, self.__externalSEName)
if not result["OK"]:
return result
if "Successful" not in result["Value"]:
gLogger.verbose("Oops, no successful transfers there", str(result))
return S_ERROR("RM returned OK to the action but no successful transfers were there")
okTrans = result["Value"]["Successful"]
if sbPath not in okTrans:
gLogger.verbose("Ooops, SB transfer wasn't in the successful ones", str(result))
return S_ERROR("RM returned OK to the action but SB transfer wasn't in the successful ones")
return S_OK((self.__externalSEName, okTrans[sbPath]))
except Exception as e:
gLogger.error("Error while moving sandbox to SE", f"{repr(e).replace(',)', ')')}")
return S_ERROR("Error while moving sandbox to SE")

##################
# Assigning sbs to jobs

Expand Down Expand Up @@ -572,8 +490,6 @@ def __purgeSandbox(self, sbId, SEName, SEPFN):

def __deleteSandboxFromBackend(self, SEName, SEPFN):
gLogger.info("Purging sandbox", f"SB:{SEName}|{SEPFN}")
if SEName != self.__localSEName:
return self.__deleteSandboxFromExternalBackend(SEName, SEPFN)
hdPath = self.__sbToHDPath(SEPFN)
try:
if not os.path.isfile(hdPath):
Expand Down Expand Up @@ -601,46 +517,6 @@ def __deleteSandboxFromBackend(self, SEName, SEPFN):
break
return S_OK()

def __deleteSandboxFromExternalBackend(self, SEName, SEPFN):
if self.getCSOption("DelayedExternalDeletion", True):
gLogger.info("Setting deletion request")
try:
# We need the hostDN used in order to pass these credentials to the
# SandboxStoreDB..
hostCertLocation, _ = Locations.getHostCertificateAndKeyLocation()
hostCert = X509Certificate.X509Certificate()
hostCert.loadFromFile(hostCertLocation)
hostDN = hostCert.getSubjectDN().get("Value")

# use the host authentication to fetch the data
result = self.sandboxDB.getSandboxOwner(SEName, SEPFN, hostDN, "hosts")
if not result["OK"]:
return result
owner, _ownerDN, ownerGroup = result["Value"]

request = Request()
request.RequestName = f"RemoteSBDeletion:{SEName}|{SEPFN}:{time.time()}"
request.Owner = owner
request.OwnerGroup = ownerGroup
physicalRemoval = Operation()
physicalRemoval.Type = "PhysicalRemoval"
physicalRemoval.TargetSE = SEName
fileToRemove = File()
fileToRemove.PFN = SEPFN
physicalRemoval.addFile(fileToRemove)
request.addOperation(physicalRemoval)
return ReqClient().putRequest(request)
except Exception as e:
gLogger.exception("Exception while setting deletion request")
return S_ERROR(f"Cannot set deletion request: {e}")
else:
gLogger.info("Deleting external Sandbox")
try:
return StorageElement(SEName).removeFile(SEPFN)
except Exception:
gLogger.exception("RM raised an exception while trying to delete a remote sandbox")
return S_ERROR("RM raised an exception while trying to delete a remote sandbox")


class SandboxStoreHandler(SandboxStoreHandlerMixin, RequestHandler):
def initialize(self):
Expand Down
Loading

0 comments on commit 57f997b

Please sign in to comment.