Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove old crab cache code #6833

Merged
merged 3 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@
'python': ['TaskWorker', 'TaskWorker/Actions', 'TaskWorker/DataObjects',
'TaskWorker/Actions/Recurring', 'taskbuffer', 'Publisher', 'TransferInterface']
},
'UserFileCache':
{
'py_modules' : ['ServerUtilities'],
'python': ['UserFileCache']
},
'Publisher':
{
'py_modules': ['ServerUtilities', 'MultiProcessingLog', 'RESTInteractions', 'utils'],
Expand All @@ -60,7 +55,7 @@
'All':
{
'py_modules': [''],
'python': ['TaskWorker', 'CRABInterface', 'UserFileCache', 'CRABClient', 'Publisher']
'python': ['TaskWorker', 'CRABInterface', 'CRABClient', 'Publisher']
}
}

Expand Down Expand Up @@ -146,8 +141,8 @@ def define_the_build(dist, system_name, patch_x=''):
class BuildCommand(Command):
"""Build python modules for a specific system."""
description = \
"Build python modules for the specified system. The two supported systems\n" + \
"\t\t at the moment are 'CRABInterface' and 'UserFileCache'. Use with --force \n" + \
"Build python modules for the specified system. The supported system(s)\n" + \
"\t\t at the moment are 'CRABInterface' . Use with --force \n" + \
"\t\t to ensure a clean build of only the requested parts.\n"
user_options = build.user_options
user_options.append(('system=', 's', 'build the specified system (default: CRABInterface)'))
Expand Down
28 changes: 2 additions & 26 deletions src/python/TaskWorker/Actions/DagmanCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@

import WMCore.WMSpec.WMTask
from WMCore.Services.CRIC.CRIC import CRIC
try:
from WMCore.Services.UserFileCache.UserFileCache import UserFileCache
except ImportError:
UserFileCache = None

DAG_HEADER = """

Expand Down Expand Up @@ -1142,8 +1138,8 @@ def executeInternal(self, *args, **kw):
sandboxTarBall = 'sandbox.tar.gz'
debugTarBall = 'debug_files.tar.gz'

# Bootstrap the ISB if we are using S3 and running in the TW
if self.crabserver and 'S3' in kw['task']['tm_cache_url'].upper():
# Bootstrap the ISB if we are running in the TW
if self.crabserver:
username = kw['task']['tm_username']
sandboxName = kw['task']['tm_user_sandbox']
dbgFilesName = kw['task']['tm_debug_files']
Expand All @@ -1161,26 +1157,6 @@ def executeInternal(self, *args, **kw):
except Exception as ex:
self.logger.exception(ex)

# Bootstrap the ISB if we are using UFC
else:
if UserFileCache and kw['task']['tm_cache_url'].find('/crabcache') != -1:
ufc = UserFileCache(mydict={'cert': kw['task']['user_proxy'], 'key': kw['task']['user_proxy'], 'endpoint' : kw['task']['tm_cache_url']})
try:
ufc.download(hashkey=kw['task']['tm_user_sandbox'].split(".")[0], output=sandboxTarBall)
except Exception as ex:
self.logger.exception(ex)
raise TaskWorkerException("The CRAB3 server backend could not download the input sandbox with your code "+\
"from the frontend (crabcache component).\nThis could be a temporary glitch; please try to submit a new task later "+\
"(resubmit will not work) and contact the experts if the error persists.\nError reason: %s" % str(ex)) #TODO url!?
kw['task']['tm_user_sandbox'] = sandboxTarBall

# For an older client (<3.3.1607) this field will be empty and the file will not exist.
if kw['task']['tm_debug_files']:
try:
ufc.download(hashkey=kw['task']['tm_debug_files'].split(".")[0], output=debugTarBall)
except Exception as ex:
self.logger.exception(ex)

# Bootstrap the runtime if it is available.
job_runtime = getLocation('CMSRunAnalysis.tar.gz', 'CRABServer/')
shutil.copy(job_runtime, '.')
Expand Down
20 changes: 5 additions & 15 deletions src/python/TaskWorker/Actions/DryRunUploader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Upload an archive containing all files needed to run the a to the UserFileCache (necessary for crab submit --dryrun.)
Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.)
"""
import os
import json
Expand All @@ -13,7 +13,6 @@
from urllib import urlencode

from WMCore.DataStructs.LumiList import LumiList
from WMCore.Services.UserFileCache.UserFileCache import UserFileCache

from TaskWorker.DataObjects.Result import Result
from TaskWorker.Actions.TaskAction import TaskAction
Expand All @@ -22,7 +21,7 @@

class DryRunUploader(TaskAction):
"""
Upload an archive containing all files needed to run the task to the UserFileCache (necessary for crab submit --dryrun.)
Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.)
"""

def packSandbox(self, inputFiles):
Expand Down Expand Up @@ -51,17 +50,9 @@ def executeInternal(self, *args, **kw):

self.logger.info('Uploading dry run tarball to the user file cache')
t0 = time.time()
if 'S3' in kw['task']['tm_cache_url'].upper():
uploadToS3(crabserver=self.crabserver, filepath='dry-run-sandbox.tar.gz',
objecttype='runtimefiles', taskname=kw['task']['tm_taskname'], logger=self.logger)
result = {'hashkey':'ok'} # a dummy one to keep same semantics as when using UserFileCache
os.remove('dry-run-sandbox.tar.gz')
else:
ufc = UserFileCache(mydict={'cert': kw['task']['user_proxy'], 'key': kw['task']['user_proxy'], 'endpoint': kw['task']['tm_cache_url']})
result = ufc.uploadLog('dry-run-sandbox.tar.gz')
os.remove('dry-run-sandbox.tar.gz')
if 'hashkey' not in result:
raise TaskWorkerException('Failed to upload dry-run-sandbox.tar.gz to the user file cache: ' + str(result))
uploadToS3(crabserver=self.crabserver, filepath='dry-run-sandbox.tar.gz',
objecttype='runtimefiles', taskname=kw['task']['tm_taskname'], logger=self.logger)
os.remove('dry-run-sandbox.tar.gz')
self.logger.info('Uploaded dry run tarball to the user file cache: %s', str(result))
# wait until tarball is available, S3 may take a few seconds for this (ref. issue #6706 )
t1 = time.time()
Expand Down Expand Up @@ -147,4 +138,3 @@ def dump(self, outname):

with open(outname, 'wb') as f:
json.dump(summary, f)

32 changes: 7 additions & 25 deletions src/python/TaskWorker/Actions/Handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import traceback
from http.client import HTTPException

from WMCore.Services.UserFileCache.UserFileCache import UserFileCache

from RESTInteractions import CRABRest
from RucioUtils import getNativeRucioClient

Expand Down Expand Up @@ -94,29 +92,13 @@ def executeAction(self, nextinput, work):
#TODO: we need to do that also in Worker.py otherwise some messages might only be in the TW file but not in the crabcache.
logpath = self.config.TaskWorker.logsDir+'/tasks/%s/%s.log' % (self._task['tm_username'], self.taskname)
if os.path.isfile(logpath) and 'user_proxy' in self._task: #the user proxy might not be there if myproxy retrieval failed
cacheurldict = {'endpoint':self._task['tm_cache_url'], 'cert':self._task['user_proxy'], 'key':self._task['user_proxy']}
if 'S3' in self._task['tm_cache_url'].upper():
# use S3
try:
uploadToS3(crabserver=self.crabserver, objecttype='twlog', filepath=logpath,
taskname=self.taskname, logger=self.logger)
except Exception as e:
msg = 'Failed to upload logfile to S3 for task %s. ' % self.taskname
msg += 'Details:\n%s' % str(e)
self.logger.error(msg)
else:
# use old crabcache
try:
ufc = UserFileCache(cacheurldict)
logfilename = self.taskname + '_TaskWorker.log'
ufc.uploadLog(logpath, logfilename)
except HTTPException as hte:
msg = "Failed to upload the logfile to %s for task %s. More details in the http headers and body:\n%s\n%s" % (self._task['tm_cache_url'], self.taskname, hte.headers, hte.result)
self.logger.error(msg)
except Exception: #pylint: disable=broad-except
msg = "Unknown error while uploading the logfile for task %s" % self.taskname
self.logger.exception(msg) #upload logfile of the task to the crabcache

try:
uploadToS3(crabserver=self.crabserver, objecttype='twlog', filepath=logpath,
taskname=self.taskname, logger=self.logger)
except Exception as e:
msg = 'Failed to upload logfile to S3 for task %s. ' % self.taskname
msg += 'Details:\n%s' % str(e)
self.logger.error(msg)
return output


Expand Down
33 changes: 0 additions & 33 deletions src/python/TaskWorker/Actions/Recurring/TapeRecallStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,6 @@ class TapeRecallStatus(BaseRecurringAction):
pollingTime = 60*4 # minutes
rucioClient = None

def refreshSandbox(self, task):

from WMCore.Services.UserFileCache.UserFileCache import UserFileCache
ufc = UserFileCache({'cert': task['user_proxy'], 'key': task['user_proxy'],
'endpoint': task['tm_cache_url'], "pycurl": True})
sandbox = task['tm_user_sandbox'].replace(".tar.gz", "")
debugFiles = task['tm_debug_files'].replace(".tar.gz", "")
sandboxPath = os.path.join("/tmp", sandbox)
debugFilesPath = os.path.join("/tmp", debugFiles)
try:
ufc.download(sandbox, sandboxPath, task['tm_username'])
ufc.download(debugFiles, debugFilesPath, task['tm_username'])
self.logger.info(
"Successfully touched input and debug sandboxes (%s and %s) of task %s (frontend: %s) using the '%s' username (request_id = %s).",
sandbox, debugFiles, task['tm_taskname'], task['tm_cache_url'], task['tm_username'], task['tm_DDM_reqid'])
except Exception as ex:
msg = "The CRAB3 server backend could not download the input and/or debug sandbox (%s and/or %s) " % (
sandbox, debugFiles)
msg += "of task %s from the frontend (%s) using the '%s' username (request_id = %s). " % \
(task['tm_taskname'], task['tm_cache_url'], task['tm_username'], task['tm_DDM_reqid'])
msg += "\nThis could be a temporary glitch, will try again in next occurrence of the recurring action."
msg += "Error reason:\n%s" % str(ex)
self.logger.info(msg)
finally:
if os.path.exists(sandboxPath): os.remove(sandboxPath)
if os.path.exists(debugFilesPath): os.remove(debugFilesPath)

def _execute(self, config, task):

# setup logger
Expand Down Expand Up @@ -110,12 +83,6 @@ def _execute(self, config, task):
user_proxy = False
self.logger.exception(twe)

if not 'S3' in recallingTask['tm_cache_url'].upper():
# when using old crabcache had to worry about sandbox purging after 3 days
# Make sure the task sandbox in the crabcache is not deleted until the tape recall is completed
if user_proxy:
self.refreshSandbox(recallingTask)

# Retrieve status of recall request
if not self.rucioClient:
self.rucioClient = getNativeRucioClient(config=config, logger=self.logger)
Expand Down
30 changes: 0 additions & 30 deletions src/python/UserFileCache/RESTBaseAPI.py

This file was deleted.

Loading