Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add checkS3Object and verify S3 upload with it #8740

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions src/python/CRABInterface/RESTCache.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
""" REST entity to deal with CRAB Cache on S3 """
# for some variables we prefer S3 naming even if not our standard camelCase
# pylint: disable=invalid-name
# yes, we need all those branches
# pylint: disable=too-many-branches
# need to use http GET for many things, so return has to differ
# pylint: disable=inconsistent-return-statements

from __future__ import division
import os
import uuid
Expand All @@ -13,7 +21,8 @@

# CRABServer dependecies here
from CRABInterface.RESTExtensions import authz_login_valid, authz_operator
from CRABInterface.Regexps import RX_SUBRES_CACHE, RX_CACHE_OBJECTTYPE, RX_TASKNAME, RX_USERNAME, RX_TARBALLNAME
from CRABInterface.Regexps import (RX_SUBRES_CACHE, RX_CACHE_OBJECTTYPE, RX_TASKNAME,
RX_USERNAME, RX_TARBALLNAME, RX_PRESIGNED_CLIENT_METHOD)
from ServerUtilities import getUsernameFromTaskname, MeasureTime


Expand Down Expand Up @@ -90,7 +99,7 @@ def __init__(self, app, api, config, mount, extconfig):
self.s3_client = boto3.client('s3', endpoint_url=endpoint, aws_access_key_id=access_key,
aws_secret_access_key=secret_key)

def validate(self, apiobj, method, api, param, safe):
def validate(self, apiobj, method, api, param, safe): # pylint: disable=unused-argument
"""Validating all the input parameter as enforced by the WMCore.REST module"""
authz_login_valid()
if method in ['GET']:
Expand All @@ -99,11 +108,14 @@ def validate(self, apiobj, method, api, param, safe):
validate_str('taskname', param, safe, RX_TASKNAME, optional=True)
validate_str('username', param, safe, RX_USERNAME, optional=True)
validate_str('tarballname', param, safe, RX_TARBALLNAME, optional=True)
validate_str('clientmethod', param, safe, RX_PRESIGNED_CLIENT_METHOD, optional=True)

@restcall
def get(self, subresource, objecttype, taskname, username, tarballname): # pylint: disable=redefined-builtin
def get(self, subresource, objecttype, taskname, username, tarballname, clientmethod): # pylint: disable=redefined-builtin
"""
:arg str subresource: the specific information to be accessed;
:arg str clientmethod: get_object (to retrieve content) (default) or head_object (to check existence)
only meaningful when subresource is 'upload'
"""
authenticatedUserName = cherrypy.request.user['login'] # the username of who's calling
# a bit of code common to 3 subresource's: validate args and prepare the s3_objectKey inside the bucket
Expand All @@ -113,7 +125,10 @@ def get(self, subresource, objecttype, taskname, username, tarballname): # pyli
if objecttype == 'sandbox':
if not tarballname:
raise MissingParameter("tarballname is missing")
ownerName = authenticatedUserName if subresource == 'upload' else username
if subresource == 'upload' or (subresource == 'download' and clientmethod == 'head_object'):
ownerName = authenticatedUserName
else:
ownerName = username
if not ownerName:
raise MissingParameter("username is missing")
# sandbox goes in bucket/username/sandboxes/
Expand Down Expand Up @@ -165,14 +180,16 @@ def get(self, subresource, objecttype, taskname, username, tarballname): # pyli
authz_operator(username=ownerName, group='crab3', role='operator')
if subresource == 'sandbox' and not username:
raise MissingParameter("username is missing")
if not clientmethod:
clientmethod = 'get_object'
# returns a PreSignedUrl to download the file within the expiration time
expiration = 60 * 60 # 1 hour default is good for retries and debugging
if objecttype in ['debugfiles', 'clientlog', 'twlog']:
expiration = 60*60 * 24 * 30 # for logs make url valid as long as we keep files (1 month)
try:
with MeasureTime(self.logger, modulename=__name__, label="get.download.generate_presigned_post") as _:
response = self.s3_client.generate_presigned_url(
'get_object', Params={'Bucket': self.s3_bucket, 'Key': s3_objectKey},
clientmethod, Params={'Bucket': self.s3_bucket, 'Key': s3_objectKey},
ExpiresIn=expiration)
preSignedUrl = response
except ClientError as e:
Expand All @@ -188,7 +205,7 @@ def get(self, subresource, objecttype, taskname, username, tarballname): # pyli
self.s3_client.download_file(self.s3_bucket, s3_objectKey, tempFile)
except ClientError as e:
raise ExecutionError(f"Connection to s3.cern.ch failed:\n{str(e)}") from e
with open(tempFile) as f:
with open(tempFile, 'r', encoding='utf-8') as f:
txt = f.read()
os.remove(tempFile)
return txt
Expand Down
5 changes: 3 additions & 2 deletions src/python/CRABInterface/Regexps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# (this probably requires to adapt something on Lexicon)
pNameRE = r"(?=.{0,400}$)[a-zA-Z0-9\-_.]+"
lfnParts.update( {'publishname' : pNameRE,
'psethash' : '[a-f0-9]+',
'filename' : '[a-zA-Z0-9\-_\.]'}
'psethash' : r'[a-f0-9]+',
'filename' : r'[a-zA-Z0-9\-_\.]'}
)
## Although the taskname column in the TaskDB accepts tasknames of up to 255
## characters, we limit the taskname to something less than that in order to
Expand Down Expand Up @@ -93,6 +93,7 @@
#subresources of Cache resource
RX_SUBRES_CACHE = re.compile(r"^(upload|download|retrieve|list|used)$")
RX_CACHE_OBJECTTYPE = re.compile(r"^(clientlog|twlog|sandbox|debugfiles|runtimefiles)$")
RX_PRESIGNED_CLIENT_METHOD = re.compile(r"^(get_object|head_object)$")

#worker workflow
RX_WORKER_NAME = re.compile(r"^[A-Za-z0-9\-\._%]{1,100}$")
Expand Down
59 changes: 58 additions & 1 deletion src/python/ServerUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ def __exit__(self, a, b, c):
# downloadFromS3 to retrieve an object into a file
# retrieveFromS3 to retrieve an object as JSON to use in the code
# uploadToS3 to upload a file into an S3 object
# checkS3Object to check if an object is present in S3
# getDownloadUrlFromS3 to obtain a PreSigned URL to access an existing object in S3
# this can be used e.g. to share access to logs
# Common signature is: (crabserver, filepath, objecttype, taskname, username, tarballname, logger)
Expand All @@ -721,6 +722,42 @@ def downloadFromS3(crabserver=None, filepath=None, objecttype=None, taskname=Non
tarballname=tarballname, logger=logger)
downloadFromS3ViaPSU(filepath=filepath, preSignedUrl=preSignedUrl, logger=logger)

def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=None,
taskname=None, logger=None):
"""
Check if file exist in S3.

:param crabserver: CRABRest object, points to CRAB Server to use
:type crabserver: RESTInteractions.CRABRest
:param objecttype: the kind of object to dowbload: clientlog|twlog|sandbox|debugfiles|runtimefiles
:type objecttype: str
:param username: the username this sandbox belongs to, in case objecttype=sandbox
:type username: str
:param tarballname: for sandbox, taskname is not used but tarballname is needed
:type tarballname: str
:param taskname: for e.g. logfiles, taskanem is needed
:type taskname: str

:return: None, but raise exception if wget is exit with non-zero.
"""
preSignedUrl = getDownloadUrlFromS3(crabserver=crabserver, objecttype=objecttype,
username=username, tarballname=tarballname,
taskname=taskname, clientmethod='head_object',
logger=logger)
downloadCommand = ''
if os.getenv('CRAB_useGoCurl'):
raise NotImplementedError('HEAD with gocurl is not implemented')
downloadCommand += ' wget -Sq -O /dev/null --method=HEAD'
downloadCommand += ' "%s"' % preSignedUrl

with subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as downloadProcess:
logger.debug("Will execute:\n%s", downloadCommand)
stdout, stderr = downloadProcess.communicate()
exitcode = downloadProcess.returncode
logger.debug('exitcode: %s\nstdout: %s', exitcode, stdout)

if exitcode != 0:
raise Exception('Download command %s failed. stderr is:\n%s' % (downloadCommand, stderr))

def retrieveFromS3(crabserver=None, objecttype=None, taskname=None,
username=None, tarballname=None, logger=None):
Expand Down Expand Up @@ -794,11 +831,29 @@ def uploadToS3(crabserver=None, filepath=None, objecttype=None, taskname=None,
uploadToS3ViaPSU(filepath=filepath, preSignedUrlFields=preSignedUrl, logger=logger)
except Exception as e:
raise Exception('Upload to S3 failed\n%s' % str(e))
logger.debug('upload to S3 of %s %s completed', objecttype, filepath)
logger.debug('make sure that object is ready for retrieve') # for large files it may take up to minutes
waitTime = 0.
waitStep = 1 # seconds
while True:
time.sleep(waitStep)
waitTime += waitStep / 60. # minutes
try:
checkS3Object(crabserver=crabserver, objecttype=objecttype, taskname=taskname,
username=username, tarballname=tarballname, logger=logger)
except Exception: # pylint: disable=broad-except
if waitTime > 10 :
raise Exception("Object not available in S3 after 10 minutes. Something badly wrong")
waitStep = 2 * waitStep
logger.debug('not ready yet, wait another %s seconds', waitStep)
continue
break
logger.debug('%s %s successfully uploaded to S3', objecttype, filepath)


def getDownloadUrlFromS3(crabserver=None, objecttype=None, taskname=None,
username=None, tarballname=None, logger=None):
username=None, tarballname=None, clientmethod=None,
logger=None):
"""
obtains a PreSigned URL to access an existing object in S3
:param crabserver: a RESTInteraction/CRABRest object : points to CRAB Server to use
Expand All @@ -816,6 +871,8 @@ def getDownloadUrlFromS3(crabserver=None, objecttype=None, taskname=None,
dataDict['username'] = username
if tarballname:
dataDict['tarballname'] = tarballname
if clientmethod:
dataDict['clientmethod'] = clientmethod
data = encodeRequest(dataDict)
try:
# calls to restServer alway return a 3-ple ({'result':a-list}, HTTPcode, HTTPreason)
Expand Down