From 719532829dd0dc2a16d0c8e38cde9d3a34c5a9b7 Mon Sep 17 00:00:00 2001 From: belforte Date: Fri, 29 Oct 2021 11:49:48 +0200 Subject: [PATCH 1/3] remove code in UserFileCache. for #6776 --- src/python/UserFileCache/RESTBaseAPI.py | 30 --- src/python/UserFileCache/RESTExtensions.py | 195 -------------- src/python/UserFileCache/RESTFile.py | 279 --------------------- src/python/UserFileCache/__init__.py | 3 - 4 files changed, 507 deletions(-) delete mode 100644 src/python/UserFileCache/RESTBaseAPI.py delete mode 100644 src/python/UserFileCache/RESTExtensions.py delete mode 100644 src/python/UserFileCache/RESTFile.py delete mode 100644 src/python/UserFileCache/__init__.py diff --git a/src/python/UserFileCache/RESTBaseAPI.py b/src/python/UserFileCache/RESTBaseAPI.py deleted file mode 100644 index 695956cf54..0000000000 --- a/src/python/UserFileCache/RESTBaseAPI.py +++ /dev/null @@ -1,30 +0,0 @@ -# WMCore dependecies here -from WMCore.REST.Server import RESTApi -from WMCore.REST.Format import JSONFormat - -# CRABServer dependecies here -from UserFileCache.RESTFile import RESTFile, RESTLogFile, RESTInfo -import UserFileCache.RESTExtensions - -# external dependecies here -import os - - -class RESTBaseAPI(RESTApi): - """The UserFileCache REST API module""" - - def __init__(self, app, config, mount): - RESTApi.__init__(self, app, config, mount) - - self.formats = [ ('application/json', JSONFormat()) ] - - if not os.path.exists(config.cachedir) or not os.path.isdir(config.cachedir): - raise Exception("Failing to start because of wrong cache directory '%s'" % config.cachedir) - - if hasattr(config, 'powerusers'): - UserFileCache.RESTExtensions.POWER_USERS_LIST = config.powerusers - if hasattr(config, 'quota_user_limit'): - UserFileCache.RESTExtensions.QUOTA_USER_LIMIT = config.quota_user_limit * 1024 * 1024 - self._add( {'logfile': RESTLogFile(app, self, config, mount), - 'file': RESTFile(app, self, config, mount), - 'info': RESTInfo(app, self, config, mount)} ) diff --git a/src/python/UserFileCache/RESTExtensions.py b/src/python/UserFileCache/RESTExtensions.py deleted file mode 100644 index 32517da9bf..0000000000 --- a/src/python/UserFileCache/RESTExtensions.py +++ /dev/null @@ -1,195 +0,0 @@ -""" -This module aims to contain the method specific to the REST interface. -These are extensions which are not directly contained in WMCore.REST module. -Collecting all here since aren't supposed to be many. -""" - -from ServerUtilities import USER_SANDBOX_EXCLUSIONS, NEW_USER_SANDBOX_EXCLUSIONS -from ServerUtilities import FILE_SIZE_LIMIT, FILE_MEMORY_LIMIT - -# WMCore dependecies here -from WMCore.REST.Validation import _validate_one -from WMCore.REST.Error import RESTError, InvalidParameter -from WMCore.Services.UserFileCache.UserFileCache import calculateChecksum -from WMCore.REST.Auth import get_user_info - -# external dependecies here -import tarfile -import hashlib -import cStringIO -import urllib2 -from os import fstat, walk, path, listdir - -# 600MB is the default user quota limit - overwritten in RESTBaseAPI if quota_user_limit is set in the config -QUOTA_USER_LIMIT = 1024*1024*600 -#these users have 10* basic user quota - overwritten in RESTBaseAPI if powerusers is set in the config -POWER_USERS_LIST = [] - -def http_error(msg, code=403): - try: - import requests - err = requests.HTTPError(msg) - err.response.status_code = code - return err - except ImportError: - url = '' # required for urllib2 HTTPError but we should acquire it from elsewhere - return urllib2.HTTPError(url, code, err, None, None) - -###### authz_login_valid is currently duplicatint CRABInterface.RESTExtension . A better solution -###### should be found for authz_* -def authz_login_valid(): - user = get_user_info() - if not user['login']: - err = "You are not allowed to access this resources" - raise http_error(err) - -def authz_operator(username): - """ Check if the the user who is trying to access this resource (i.e.: user['login'], the cert username) is the - same as username. If not check if the user is a CRAB3 operator. {... 'operator': {'group': set(['crab3']) ... in request roles} - If the user is not an operator and is trying to access a file owned by another user than raise - """ - user = get_user_info() - if user['login'] != username and\ - 'crab3' not in user.get('roles', {}).get('operator', {}).get('group', set()): - err = "You are not allowed to access this resource. You need to be a CRAB3 operator in sitedb to access other user's files" - raise http_error(err) - -def file_size(argfile): - """Return the file or cStringIO.StringIO size - - :arg file|cStringIO.StringIO argfile: file object handler or cStringIO.StringIO - :return: size in bytes""" - if isinstance(argfile, file): - return fstat(argfile.fileno()).st_size, True - elif isinstance(argfile, cStringIO.OutputType): - argfile.seek(0, 2) - filesize = argfile.tell() - argfile.seek(0) - return filesize, False - -def list_users(cachedir): - #file are stored in directories like u/username - for name in listdir(cachedir): #iterate over u ... - if name == 'lost+found': # skip root-owned file at top mount point - continue - if path.isdir(path.join(cachedir, name)): - for username in listdir(path.join(cachedir, name)): #list all the users under u - yield username - -def list_files(quotapath): - for _, _, filenames in walk(quotapath): - for f in filenames: - yield f - -def get_size(quotapath): - """Check the quotapath directory size; it doesn't include the 4096 bytes taken by each directory - - :arg str quotapath: the directory for which is needed to calculate the quota - :return: bytes taken by the directory""" - totalsize = 0 - for dirpath, _, filenames in walk(quotapath): - for f in filenames: - fp = path.join(dirpath, f) - totalsize += path.getsize(fp) - return totalsize - -def quota_user_free(quotadir, infile): - """Raise an exception if the input file overflow the user quota - - :arg str quotadir: the user path where the file will be written - :arg file|cStringIO.StringIO infile: file object handler or cStringIO.StringIO - :return: Nothing""" - filesize, _ = file_size(infile.file) - quota = get_size(quotadir) - user = get_user_info() - quotaLimit = QUOTA_USER_LIMIT*10 if user['login'] in POWER_USERS_LIST else QUOTA_USER_LIMIT - if filesize + quota > quotaLimit: - excquota = ValueError("User %s has reached quota of %dB: additional file of %dB cannot be uploaded." \ - % (user['login'], quota, filesize)) - raise InvalidParameter("User quota limit reached; cannot upload the file", errobj=excquota, trace='') - -def _check_file(argname, val): - """Check that `argname` `val` is a file - - :arg str argname: name of the argument - :arg file val: the file object - :return: the val if the validation passes.""" - # checking that is a valid file or an input string - # note: the input string is generated on client side just when the input file is empty - filesize = 0 - if not hasattr(val, 'file') or not (isinstance(val.file, file) or isinstance(val.file, cStringIO.OutputType)): - raise InvalidParameter("Incorrect inputfile parameter") - else: - filesize, realfile = file_size(val.file) - if realfile: - if filesize > FILE_SIZE_LIMIT: - raise InvalidParameter("File size is %sB. This is bigger than the maximum allowed size of %sB." % (filesize, FILE_SIZE_LIMIT)) - elif filesize > FILE_MEMORY_LIMIT: - raise InvalidParameter('File too large to be completely loaded into memory.') - - return val - - -def _check_tarfile(argname, val, hashkey, newchecksum): - """Check that `argname` `val` is a tar file and that provided 'hashkey` - matches with the hashkey calculated on the `val`. - - :arg str argname: name of the argument - :arg file val: the file object - :arg str hashkey: the sha256 hexdigest of the file, calculated over the tuple - (name, size, mtime, uname) of all the tarball members - :return: the val if the validation passes.""" - # checking that is a valid file or an input string - # note: the input string is generated on client side just when the input file is empty - _check_file(argname, val) - - digest = None - try: - #This newchecksum param and the if/else branch is there for backward compatibility. - #The parameter, older exclusion and checksum functions should be removed in the future. - if newchecksum == 2: - digest = calculateChecksum(val.file, exclude=NEW_USER_SANDBOX_EXCLUSIONS) - elif newchecksum == 1: - digest = calculateChecksum(val.file, exclude=USER_SANDBOX_EXCLUSIONS) - else: - tar = tarfile.open(fileobj=val.file, mode='r') - lsl = [(x.name, int(x.size), int(x.mtime), x.uname) for x in tar.getmembers()] - hasher = hashlib.sha256(str(lsl)) - digest = hasher.hexdigest() - except tarfile.ReadError: - raise InvalidParameter('File is not a .tgz file.') - if not digest or hashkey != digest: - raise ChecksumFailed("Checksums do not match") - return val - -class ChecksumFailed(RESTError): - "Checksum calculation failed, file transfer problem." - http_code = 400 - app_code = 302 - message = "Input file hashkey mismatch" - -def validate_tarfile(argname, param, safe, hashkey, optional=False): - """Validates that an argument is a file and matches the hashkey. - - Checks that an argument named `argname` exists in `param.kwargs` - and it is a tar file which matches the provided hashkey. If - successful the string is copied into `safe.kwargs` and the value - is removed from `param.kwargs`. - - If `optional` is True, the argument is not required to exist in - `param.kwargs`; None is then inserted into `safe.kwargs`. Otherwise - a missing value raises an exception.""" - _validate_one(argname, param, safe, _check_tarfile, optional, safe.kwargs[hashkey], safe.kwargs['newchecksum']) - -def validate_file(argname, param, safe, hashkey, optional=False): - """Validates that an argument is a file and matches the hashkey. - - Checks that an argument named `argname` exists in `param.kwargs` - and it is a tar file which matches the provided hashkey. If - successful the string is copied into `safe.kwargs` and the value - is removed from `param.kwargs`. - - If `optional` is True, the argument is not required to exist in - `param.kwargs`; None is then inserted into `safe.kwargs`. Otherwise - a missing value raises an exception.""" - _validate_one(argname, param, safe, _check_file, optional) diff --git a/src/python/UserFileCache/RESTFile.py b/src/python/UserFileCache/RESTFile.py deleted file mode 100644 index c3d665eaa0..0000000000 --- a/src/python/UserFileCache/RESTFile.py +++ /dev/null @@ -1,279 +0,0 @@ -# WMCore dependecies here -from WMCore.REST.Format import RawFormat -from WMCore.REST.Server import RESTEntity, restcall -from WMCore.REST.Validation import validate_str, _validate_one, validate_num -from WMCore.REST.Error import RESTError, InvalidParameter, MissingObject, ExecutionError - -# CRABServer dependecies here -from UserFileCache.__init__ import __version__ -from UserFileCache.RESTExtensions import ChecksumFailed, validate_file, validate_tarfile, authz_login_valid, authz_operator,\ - quota_user_free, get_size, list_files, list_users - -# external dependecies here -import re -import os -import shutil -import tarfile -import hashlib -import cherrypy -from cherrypy.lib.static import serve_file - -# here go the all regex to be used for validation -RX_USERNAME = re.compile(r"^\w+$") #TODO use WMCore regex -RX_HASH = re.compile(r'^[a-f0-9]{64}$') -RX_LOGFILENAME = re.compile(r"^[\w\-.: ]+$") -RX_SUBRES = re.compile(r"^fileinfo|userinfo|powerusers|basicquota|fileremove|listusers|usedspace$") - -def touch(filename): - """Touch the file to keep automated cleanup away - - :arg str filename: the filename path.""" - if os.path.isfile(filename): - os.utime(filename, None) - -def filepath(cachedir, username=None): - # NOTE: if we need to share a file between users (something we do not really want to make default or too easy...) we can: - # - use the group of the user instead of the user name, which can be retrieved from cherrypy.request.user - # - have an extra input parameter group=something (but this wouldn't be transparent when downloading it) - username = username if username else cherrypy.request.user['login'] - return os.path.join(cachedir, username[0], username) - -class RESTFile(RESTEntity): - """The RESTEntity for uploaded and downloaded files""" - - def __init__(self, app, api, config, mount): - RESTEntity.__init__(self, app, api, config, mount) - self.config = config - self.cachedir = config.cachedir - self.overwriteFile = False - - def validate(self, apiobj, method, api, param, safe): - """Validating all the input parameter as enforced by the WMCore.REST module""" - authz_login_valid() - - if method in ['PUT']: - validate_str("hashkey", param, safe, RX_HASH, optional=False) - validate_num("newchecksum", param, safe, optional=True) - validate_tarfile("inputfile", param, safe, 'hashkey', optional=False) - if method in ['GET']: - validate_str("hashkey", param, safe, RX_HASH, optional=False) - validate_str("username", param, safe, RX_USERNAME, optional=True) - if safe.kwargs['username']: - authz_operator(safe.kwargs['username']) - - @restcall - def put(self, inputfile, hashkey, newchecksum=0): - """Allow to upload a tarball file to be written in the local filesystem. - Base path of the local filesystem is configurable. - - The caller needs to be a CMS user with a valid CMS x509 cert/proxy. - - :arg file inputfile: file object to be uploaded - :arg str hashkey: the sha256 hexdigest of the file, calculated over the tuple - (name, size, mtime, uname) of all the tarball members - :return: hashkey, name, size of the uploaded file.""" - outfilepath = filepath(self.cachedir) - outfilename = None - result = {'hashkey': hashkey} - - # using the hash of the file to create a subdir and filename - outfilepath = os.path.join(outfilepath, hashkey[0:2]) - outfilename = os.path.join(outfilepath, hashkey) - - if os.path.isfile(outfilename) and not self.overwriteFile: - # we do not want to upload again a file that already exists - touch(outfilename) - result['size'] = os.path.getsize(outfilename) - else: - # check that the user quota is still below limit - quota_user_free(filepath(self.cachedir), inputfile) - - if not os.path.isdir(outfilepath): - os.makedirs(outfilepath) - handlefile = open(outfilename, 'wb') - inputfile.file.seek(0) - shutil.copyfileobj(inputfile.file, handlefile) - handlefile.close() - result['size'] = os.path.getsize(outfilename) - return [result] - - @restcall(formats = [('application/octet-stream', RawFormat())]) - def get(self, hashkey, username): - """Retrieve a file previously uploaded to the local filesystem. - The base path on the local filesystem is configurable. - - The caller needs to be a CMS user with a valid CMS x509 cert/proxy. - - :arg str hashkey: the sha256 hexdigest of the file, calculated over the tuple - (name, size, mtime, uname) of all the tarball members - :return: the raw file""" - filename = None - infilepath = filepath(self.cachedir, username) - - # defining the path/name from the hash of the file - filename = os.path.join(infilepath, hashkey[0:2], hashkey) - - if not os.path.isfile(filename): - raise MissingObject("Not such file") - touch(filename) - return serve_file(filename, "application/octet-stream", "attachment") - -class RESTLogFile(RESTFile): - """The RESTEntity for uploaded and downloaded logs""" - def __init__(self, app, api, config, mount): - RESTFile.__init__(self, app, api, config, mount) - self.overwriteFile = True - - def validate(self, apiobj, method, api, param, safe): - """Validating all the input parameter as enforced by the WMCore.REST module""" - authz_login_valid() - - if method in ['PUT']: - validate_file("inputfile", param, safe, 'hashkey', optional=False) - validate_str("name", param, safe, RX_LOGFILENAME, optional=False) - if method in ['GET']: - validate_str("name", param, safe, RX_LOGFILENAME, optional=False) - validate_str("username", param, safe, RX_USERNAME, optional=True) - if safe.kwargs['username']: - authz_operator(safe.kwargs['username']) - - @restcall - def put(self, inputfile, name): - return RESTFile.put(self, inputfile, name) - - @restcall(formats = [('application/octet-stream', RawFormat())]) - def get(self, name, username): - return RESTFile.get(self, name, username) - - -class RESTInfo(RESTEntity): - """REST entity for workflows and relative subresources""" - - def __init__(self, app, api, config, mount): - RESTEntity.__init__(self, app, api, config, mount) - self.cachedir = config.cachedir - - def validate(self, apiobj, method, api, param, safe): - """Validating all the input parameter as enforced by the WMCore.REST module""" - authz_login_valid() - if method in ['GET']: - validate_str('subresource', param, safe, RX_SUBRES, optional=True) - validate_str("hashkey", param, safe, RX_HASH, optional=True) - validate_num("verbose", param, safe, optional=True) - validate_str("username", param, safe, RX_USERNAME, optional=True) - if safe.kwargs['username']: - authz_operator(safe.kwargs['username']) - - @restcall - def get(self, subresource, **kwargs): - """Retrieves the server information, like delegateDN, filecacheurls ... - :arg str subresource: the specific server information to be accessed; - """ - if subresource: - return getattr(RESTInfo, subresource)(self, **kwargs) - else: - return [{"crabcache":"Welcome","version":__version__}] - @restcall - def fileinfo(self, **kwargs): - """Retrieve the file summary information. - - The caller needs to be a CMS user with a valid CMS x509 cert/proxy. - - :arg str hashkey: the sha256 hexdigest of the file, calculated over the tuple - (name, size, mtime, uname) of all the tarball members - :return: hashkey, name, size of the requested file""" - - hashkey = kwargs['hashkey'] - result = {} - filename = None - infilepath = filepath(self.cachedir, kwargs['username']) - - # defining the path/name from the hash of the file - filename = os.path.join(infilepath, hashkey[0:2], hashkey) - result['hashkey'] = hashkey - - if not os.path.isfile(filename): - raise MissingObject("Not such file") - result['exists'] = True - result['size'] = os.path.getsize(filename) - result['accessed'] = os.path.getctime(filename) - result['changed'] = os.path.getctime(filename) - result['modified'] = os.path.getmtime(filename) - touch(filename) - - return [result] - - - @restcall - def fileremove(self, **kwargs): - """Remove the file with the specified hashkey. - - The caller needs to be a CMS user with a valid CMS x509 cert/proxy. Users can only delete their own files - - :arg str hashkey: the sha256 hexdigest of the file, calculated over the tuple - (name, size, mtime, uname) of all the tarball members - """ - hashkey = kwargs['hashkey'] - - infilepath = filepath(self.cachedir) - # defining the path/name from the hash of the file - filename = os.path.join(infilepath, hashkey[0:2], hashkey) - - if not os.path.isfile(filename): - raise MissingObject("Not such file") - - try: - os.remove(filename) - except Exception as ex: - raise ExecutionError("Impossible to remove the file: %s" % str(ex)) - - @restcall - def userinfo(self, **kwargs): - """Retrieve the user summary information. - - :arg str username: username for which the informations are retrieved - - :return: quota, list of filenames""" - username = kwargs['username'] - userpath = filepath(self.cachedir, username) - - res = {} - files = list_files(userpath) - if kwargs['verbose']: - files_dict = {} - for file_ in files: - files_dict[file_] = self.fileinfo(hashkey=file_, username=username) - - res["file_list"] = files_dict if kwargs['verbose'] else list(files) - res["used_space"] = [get_size(userpath)] - - yield res - - #inserted by eric obeng summer student - @restcall - def usedspace(self, **kwargs): - """Retrieves only the used space of the user""" - username = kwargs["username"] - userpath = filepath(self.cachedir, username) - yield get_size(userpath) - - @restcall - def listusers(self, **kwargs): - """ Retrieve the list of power users from the config - """ - - return list_users(self.cachedir) - - @restcall - def powerusers(self, **kwargs): - """ Retrieve the list of power users from the config - """ - - return self.config.powerusers - - @restcall - def basicquota(self, **kwargs): - """ Retrieve the basic quota space - """ - - yield {"quota_user_limit" : self.config.quota_user_limit} diff --git a/src/python/UserFileCache/__init__.py b/src/python/UserFileCache/__init__.py deleted file mode 100644 index 42e449a66b..0000000000 --- a/src/python/UserFileCache/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -__version__ = 'development' - -#the __version__ will be automatically changed when building RPMs From a89e390e3b5b5e5c063f6b47775a8ddea271cfc2 Mon Sep 17 00:00:00 2001 From: belforte Date: Fri, 29 Oct 2021 12:23:36 +0200 Subject: [PATCH 2/3] remove reference to UserFileCache in setup.py. For #6776 --- setup.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/setup.py b/setup.py index d7328ddd7f..d5277beda8 100644 --- a/setup.py +++ b/setup.py @@ -47,11 +47,6 @@ 'python': ['TaskWorker', 'TaskWorker/Actions', 'TaskWorker/DataObjects', 'TaskWorker/Actions/Recurring', 'taskbuffer', 'Publisher', 'TransferInterface'] }, - 'UserFileCache': - { - 'py_modules' : ['ServerUtilities'], - 'python': ['UserFileCache'] - }, 'Publisher': { 'py_modules': ['ServerUtilities', 'MultiProcessingLog', 'RESTInteractions', 'utils'], @@ -60,7 +55,7 @@ 'All': { 'py_modules': [''], - 'python': ['TaskWorker', 'CRABInterface', 'UserFileCache', 'CRABClient', 'Publisher'] + 'python': ['TaskWorker', 'CRABInterface', 'CRABClient', 'Publisher'] } } @@ -146,8 +141,8 @@ def define_the_build(dist, system_name, patch_x=''): class BuildCommand(Command): """Build python modules for a specific system.""" description = \ - "Build python modules for the specified system. The two supported systems\n" + \ - "\t\t at the moment are 'CRABInterface' and 'UserFileCache'. Use with --force \n" + \ + "Build python modules for the specified system. The supported system(s)\n" + \ + "\t\t at the moment are 'CRABInterface' . Use with --force \n" + \ "\t\t to ensure a clean build of only the requested parts.\n" user_options = build.user_options user_options.append(('system=', 's', 'build the specified system (default: CRABInterface)')) From e808361847e4310ff25d76c28b81897b8fbdabb8 Mon Sep 17 00:00:00 2001 From: belforte Date: Fri, 29 Oct 2021 12:38:52 +0200 Subject: [PATCH 3/3] remove all code references to UserFileCache. For #6776 --- .../TaskWorker/Actions/DagmanCreator.py | 28 ++-------------- .../TaskWorker/Actions/DryRunUploader.py | 20 +++-------- src/python/TaskWorker/Actions/Handler.py | 32 ++++-------------- .../Actions/Recurring/TapeRecallStatus.py | 33 ------------------- 4 files changed, 14 insertions(+), 99 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 16cb6f4c3f..519d6b35f2 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -29,10 +29,6 @@ import WMCore.WMSpec.WMTask from WMCore.Services.CRIC.CRIC import CRIC -try: - from WMCore.Services.UserFileCache.UserFileCache import UserFileCache -except ImportError: - UserFileCache = None DAG_HEADER = """ @@ -1142,8 +1138,8 @@ def executeInternal(self, *args, **kw): sandboxTarBall = 'sandbox.tar.gz' debugTarBall = 'debug_files.tar.gz' - # Bootstrap the ISB if we are using S3 and running in the TW - if self.crabserver and 'S3' in kw['task']['tm_cache_url'].upper(): + # Bootstrap the ISB if we are running in the TW + if self.crabserver: username = kw['task']['tm_username'] sandboxName = kw['task']['tm_user_sandbox'] dbgFilesName = kw['task']['tm_debug_files'] @@ -1161,26 +1157,6 @@ def executeInternal(self, *args, **kw): except Exception as ex: self.logger.exception(ex) - # Bootstrap the ISB if we are using UFC - else: - if UserFileCache and kw['task']['tm_cache_url'].find('/crabcache') != -1: - ufc = UserFileCache(mydict={'cert': kw['task']['user_proxy'], 'key': kw['task']['user_proxy'], 'endpoint' : kw['task']['tm_cache_url']}) - try: - ufc.download(hashkey=kw['task']['tm_user_sandbox'].split(".")[0], output=sandboxTarBall) - except Exception as ex: - self.logger.exception(ex) - raise TaskWorkerException("The CRAB3 server backend could not download the input sandbox with your code "+\ - "from the frontend (crabcache component).\nThis could be a temporary glitch; please try to submit a new task later "+\ - "(resubmit will not work) and contact the experts if the error persists.\nError reason: %s" % str(ex)) #TODO url!? - kw['task']['tm_user_sandbox'] = sandboxTarBall - - # For an older client (<3.3.1607) this field will be empty and the file will not exist. - if kw['task']['tm_debug_files']: - try: - ufc.download(hashkey=kw['task']['tm_debug_files'].split(".")[0], output=debugTarBall) - except Exception as ex: - self.logger.exception(ex) - # Bootstrap the runtime if it is available. job_runtime = getLocation('CMSRunAnalysis.tar.gz', 'CRABServer/') shutil.copy(job_runtime, '.') diff --git a/src/python/TaskWorker/Actions/DryRunUploader.py b/src/python/TaskWorker/Actions/DryRunUploader.py index 9a6ac97e27..4e642fe2a4 100644 --- a/src/python/TaskWorker/Actions/DryRunUploader.py +++ b/src/python/TaskWorker/Actions/DryRunUploader.py @@ -1,5 +1,5 @@ """ -Upload an archive containing all files needed to run the a to the UserFileCache (necessary for crab submit --dryrun.) +Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) """ import os import json @@ -13,7 +13,6 @@ from urllib import urlencode from WMCore.DataStructs.LumiList import LumiList -from WMCore.Services.UserFileCache.UserFileCache import UserFileCache from TaskWorker.DataObjects.Result import Result from TaskWorker.Actions.TaskAction import TaskAction @@ -22,7 +21,7 @@ class DryRunUploader(TaskAction): """ - Upload an archive containing all files needed to run the task to the UserFileCache (necessary for crab submit --dryrun.) + Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) """ def packSandbox(self, inputFiles): @@ -51,17 +50,9 @@ def executeInternal(self, *args, **kw): self.logger.info('Uploading dry run tarball to the user file cache') t0 = time.time() - if 'S3' in kw['task']['tm_cache_url'].upper(): - uploadToS3(crabserver=self.crabserver, filepath='dry-run-sandbox.tar.gz', - objecttype='runtimefiles', taskname=kw['task']['tm_taskname'], logger=self.logger) - result = {'hashkey':'ok'} # a dummy one to keep same semantics as when using UserFileCache - os.remove('dry-run-sandbox.tar.gz') - else: - ufc = UserFileCache(mydict={'cert': kw['task']['user_proxy'], 'key': kw['task']['user_proxy'], 'endpoint': kw['task']['tm_cache_url']}) - result = ufc.uploadLog('dry-run-sandbox.tar.gz') - os.remove('dry-run-sandbox.tar.gz') - if 'hashkey' not in result: - raise TaskWorkerException('Failed to upload dry-run-sandbox.tar.gz to the user file cache: ' + str(result)) + uploadToS3(crabserver=self.crabserver, filepath='dry-run-sandbox.tar.gz', + objecttype='runtimefiles', taskname=kw['task']['tm_taskname'], logger=self.logger) + os.remove('dry-run-sandbox.tar.gz') self.logger.info('Uploaded dry run tarball to the user file cache: %s', str(result)) # wait until tarball is available, S3 may take a few seconds for this (ref. issue #6706 ) t1 = time.time() @@ -147,4 +138,3 @@ def dump(self, outname): with open(outname, 'wb') as f: json.dump(summary, f) - diff --git a/src/python/TaskWorker/Actions/Handler.py b/src/python/TaskWorker/Actions/Handler.py index 9e06d51094..4787a25a1f 100644 --- a/src/python/TaskWorker/Actions/Handler.py +++ b/src/python/TaskWorker/Actions/Handler.py @@ -6,8 +6,6 @@ import traceback from http.client import HTTPException -from WMCore.Services.UserFileCache.UserFileCache import UserFileCache - from RESTInteractions import CRABRest from RucioUtils import getNativeRucioClient @@ -94,29 +92,13 @@ def executeAction(self, nextinput, work): #TODO: we need to do that also in Worker.py otherwise some messages might only be in the TW file but not in the crabcache. logpath = self.config.TaskWorker.logsDir+'/tasks/%s/%s.log' % (self._task['tm_username'], self.taskname) if os.path.isfile(logpath) and 'user_proxy' in self._task: #the user proxy might not be there if myproxy retrieval failed - cacheurldict = {'endpoint':self._task['tm_cache_url'], 'cert':self._task['user_proxy'], 'key':self._task['user_proxy']} - if 'S3' in self._task['tm_cache_url'].upper(): - # use S3 - try: - uploadToS3(crabserver=self.crabserver, objecttype='twlog', filepath=logpath, - taskname=self.taskname, logger=self.logger) - except Exception as e: - msg = 'Failed to upload logfile to S3 for task %s. ' % self.taskname - msg += 'Details:\n%s' % str(e) - self.logger.error(msg) - else: - # use old crabcache - try: - ufc = UserFileCache(cacheurldict) - logfilename = self.taskname + '_TaskWorker.log' - ufc.uploadLog(logpath, logfilename) - except HTTPException as hte: - msg = "Failed to upload the logfile to %s for task %s. More details in the http headers and body:\n%s\n%s" % (self._task['tm_cache_url'], self.taskname, hte.headers, hte.result) - self.logger.error(msg) - except Exception: #pylint: disable=broad-except - msg = "Unknown error while uploading the logfile for task %s" % self.taskname - self.logger.exception(msg) #upload logfile of the task to the crabcache - + try: + uploadToS3(crabserver=self.crabserver, objecttype='twlog', filepath=logpath, + taskname=self.taskname, logger=self.logger) + except Exception as e: + msg = 'Failed to upload logfile to S3 for task %s. ' % self.taskname + msg += 'Details:\n%s' % str(e) + self.logger.error(msg) return output diff --git a/src/python/TaskWorker/Actions/Recurring/TapeRecallStatus.py b/src/python/TaskWorker/Actions/Recurring/TapeRecallStatus.py index d5a6886c9d..42b9f3b328 100644 --- a/src/python/TaskWorker/Actions/Recurring/TapeRecallStatus.py +++ b/src/python/TaskWorker/Actions/Recurring/TapeRecallStatus.py @@ -19,33 +19,6 @@ class TapeRecallStatus(BaseRecurringAction): pollingTime = 60*4 # minutes rucioClient = None - def refreshSandbox(self, task): - - from WMCore.Services.UserFileCache.UserFileCache import UserFileCache - ufc = UserFileCache({'cert': task['user_proxy'], 'key': task['user_proxy'], - 'endpoint': task['tm_cache_url'], "pycurl": True}) - sandbox = task['tm_user_sandbox'].replace(".tar.gz", "") - debugFiles = task['tm_debug_files'].replace(".tar.gz", "") - sandboxPath = os.path.join("/tmp", sandbox) - debugFilesPath = os.path.join("/tmp", debugFiles) - try: - ufc.download(sandbox, sandboxPath, task['tm_username']) - ufc.download(debugFiles, debugFilesPath, task['tm_username']) - self.logger.info( - "Successfully touched input and debug sandboxes (%s and %s) of task %s (frontend: %s) using the '%s' username (request_id = %s).", - sandbox, debugFiles, task['tm_taskname'], task['tm_cache_url'], task['tm_username'], task['tm_DDM_reqid']) - except Exception as ex: - msg = "The CRAB3 server backend could not download the input and/or debug sandbox (%s and/or %s) " % ( - sandbox, debugFiles) - msg += "of task %s from the frontend (%s) using the '%s' username (request_id = %s). " % \ - (task['tm_taskname'], task['tm_cache_url'], task['tm_username'], task['tm_DDM_reqid']) - msg += "\nThis could be a temporary glitch, will try again in next occurrence of the recurring action." - msg += "Error reason:\n%s" % str(ex) - self.logger.info(msg) - finally: - if os.path.exists(sandboxPath): os.remove(sandboxPath) - if os.path.exists(debugFilesPath): os.remove(debugFilesPath) - def _execute(self, config, task): # setup logger @@ -110,12 +83,6 @@ def _execute(self, config, task): user_proxy = False self.logger.exception(twe) - if not 'S3' in recallingTask['tm_cache_url'].upper(): - # when using old crabcache had to worry about sandbox purging after 3 days - # Make sure the task sandbox in the crabcache is not deleted until the tape recall is completed - if user_proxy: - self.refreshSandbox(recallingTask) - # Retrieve status of recall request if not self.rucioClient: self.rucioClient = getNativeRucioClient(config=config, logger=self.logger)