Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] feat (DM): add a protocol option to getReplicas method family #7633

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/DIRAC/DataManagementSystem/Client/DataManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,10 +1558,16 @@ def put(self, lfn, fileName, diracSE, path=None):
# File catalog methods
#

def getActiveReplicas(self, lfns, getUrl=True, diskOnly=False, preferDisk=False):
def getActiveReplicas(self, lfns, getUrl=True, diskOnly=False, preferDisk=False, protocol=None):
"""Get all the replicas for the SEs which are in Active status for reading."""
return self.getReplicas(
lfns, allStatus=False, getUrl=getUrl, diskOnly=diskOnly, preferDisk=preferDisk, active=True
lfns,
allStatus=False,
getUrl=getUrl,
diskOnly=diskOnly,
preferDisk=preferDisk,
active=True,
protocol=protocol,
)

def __filterTapeReplicas(self, replicaDict, diskOnly=False):
Expand Down Expand Up @@ -1666,12 +1672,17 @@ def __checkSEStatus(self, se, status="Read"):
"""returns the value of a certain SE status flag (access or other)"""
return StorageElement(se, vo=self.voName).status().get(status, False)

def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferDisk=False, active=False):
def getReplicas(
self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferDisk=False, active=False, protocol=None
):
"""get replicas from catalogue and filter if requested
Warning: all filters are independent, hence active and preferDisk should be set if using forJobs
"""
catalogReplicas = {}
failed = {}
if not protocol:
protocol = self.registrationProtocol

for lfnChunk in breakListIntoChunks(lfns, 1000):
res = self.fileCatalog.getReplicas(lfnChunk, allStatus=allStatus)
if res["OK"]:
Expand All @@ -1692,9 +1703,7 @@ def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferD

for se in se_lfn:
seObj = StorageElement(se, vo=self.voName)
succPfn = (
seObj.getURL(se_lfn[se], protocol=self.registrationProtocol).get("Value", {}).get("Successful", {})
)
succPfn = seObj.getURL(se_lfn[se], protocol=protocol).get("Value", {}).get("Successful", {})
for lfn in succPfn:
catalogReplicas[lfn][se] = succPfn[lfn]

Expand All @@ -1705,10 +1714,10 @@ def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferD
self.__filterTapeReplicas(result, diskOnly=diskOnly)
return S_OK(result)

def getReplicasForJobs(self, lfns, allStatus=False, getUrl=True, diskOnly=False):
def getReplicasForJobs(self, lfns, allStatus=False, getUrl=True, diskOnly=False, protocol=None):
"""get replicas useful for jobs"""
# Call getReplicas with no filter and enforce filters in this method
result = self.getReplicas(lfns, allStatus=allStatus, getUrl=getUrl)
result = self.getReplicas(lfns, allStatus=allStatus, getUrl=getUrl, protocol=protocol)
if not result["OK"]:
return result
replicaDict = result["Value"]
Expand Down
Loading