Skip to content

Commit

Permalink
Use s3 for cache. partial solution for #4971 (#4983)
Browse files Browse the repository at this point in the history
* allow to use S3 to upload sandboxes. for #4971
log upload still to be dealt with

heavy changes everywhere will need extensive validation

* remove wait option from resubmit

* stop using getUrl and remove it
  • Loading branch information
belforte authored Apr 9, 2021
1 parent e864413 commit fdf6477
Show file tree
Hide file tree
Showing 20 changed files with 112 additions and 120 deletions.
21 changes: 6 additions & 15 deletions src/python/CRABClient/ClientUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,6 @@ def getColumn(dictresult, columnName):
else:
return value

def getUrl(dbInstance='prod', resource=None):
"""
Retrieve the url depending on the resource we are accessing and the DB instance.
"""
url = '/crabserver/' + dbInstance + '/' + resource if resource else '/crabserver/' + dbInstance
return url

def uploadlogfile(logger, proxyfilename, logfilename=None, logpath=None, instance='prod', serverurl=None, username=None):
## WMCore dependencies. Moved here to minimize dependencies in the bootstrap script
from WMCore.Services.UserFileCache.UserFileCache import UserFileCache
Expand Down Expand Up @@ -228,15 +221,14 @@ def uploadlogfile(logger, proxyfilename, logfilename=None, logpath=None, instanc
logger.debug('No proxy was given')
doupload = False

baseurl = getUrl(dbInstance=instance)
if doupload:
# uploadLog is executed directly from crab main script, does not inherit from SubCommand
# so it needs its own RESTServer instantiation
# so it needs its own REST server instantiation
restClass = CRABClient.Emulator.getEmulator('rest')
RESTServer = restClass(url=serverurl, localcert=proxyfilename, localkey=proxyfilename,
crabserver = restClass(hostname=serverurl, localcert=proxyfilename, localkey=proxyfilename,
retry=2, logger=logger, verbose=False, version=__version__,
userAgent='CRABClient')
cacheurl = server_info(RESTServer=RESTServer, uriNoApi=baseurl, subresource='backendurls')
cacheurl = server_info(crabserver=crabserver, subresource='backendurls')
# Encode in ascii because old pycurl present in old CMSSW versions
# doesn't support unicode.
cacheurl = cacheurl['cacheSSL'].encode('ascii')
Expand Down Expand Up @@ -653,15 +645,14 @@ def validateSubmitOptions(options, args):
#If anyone has a better solution please go on, otherwise live with that one :) :)
from CRABClient import __version__

def server_info(RESTServer=None, uriNoApi='crabserver/prod', subresource=None):
def server_info(crabserver=None, subresource=None):
"""
Get relevant information about the server
"""

uri = uriNoApi + '/info'
api = 'info'
requestdict = {'subresource': subresource} if subresource else {}

dictresult, dummyStatus, dummyReason = RESTServer.get(uri, requestdict)
dictresult, dummyStatus, dummyReason = crabserver.get(api, requestdict)

return dictresult['result'][0]

Expand Down
23 changes: 12 additions & 11 deletions src/python/CRABClient/Commands/SubCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from CRABClient.ClientUtilities import colors
from CRABClient.CRABOptParser import CRABCmdOptParser
from CRABClient.CredentialInteractions import CredentialInteractions
from CRABClient.ClientUtilities import loadCache, getWorkArea, server_info, createWorkArea, getUrl
from CRABClient.ClientUtilities import loadCache, getWorkArea, server_info, createWorkArea
from CRABClient.ClientExceptions import ConfigurationException, MissingOptionException, EnvironmentException, CachefileNotFoundException
from CRABClient.ClientMapping import renamedParams, commandsConfiguration, configParametersInfo, getParamDefaultValue

Expand Down Expand Up @@ -349,32 +349,34 @@ def __init__(self, logger, cmdargs=None, disable_interspersed_args=False):
# Finally, delegate the proxy to myproxy server.
self.handleVomsProxy(proxyOptsSetPlace)

# only if this command talks to the REST we create a RESTServer object to communicate with CRABServer
# only if this command talks to the REST we create a CRABRest object to communicate with CRABServer
# and check/upate credentials on myproxy
# this is usually the first time that a call to the server is made, so where Emulator('rest') is initialized
# arguments to Emulator('rest') call must match those for HTTPRequest.__init__ in RESTInteractions.py
#server = CRABClient.Emulator.getEmulator('rest')(url=serverurl, localcert=proxyfilename, localkey=proxyfilename,
# version=__version__, retry=2, logger=logger)
if self.cmdconf['requiresREST']:
restClass = CRABClient.Emulator.getEmulator('rest')
self.RESTServer = restClass(url=self.serverurl, localcert=self.proxyfilename, localkey=self.proxyfilename,
crabRest = CRABClient.Emulator.getEmulator('rest')
self.crabserver = crabRest(hostname=self.serverurl, localcert=self.proxyfilename, localkey=self.proxyfilename,
retry=2, logger=self.logger, verbose=False, version=__version__,
userAgent='CRABClient')
self.crabserver.setDbInstance(self.instance)
self.handleMyProxy()

# Validate the command options
self.validateOptions()
self.validateOptions()

# Log user command and options used for debuging purpose.
self.logger.debug('Command use: %s' % self.name)
self.logger.debug('Options use: %s' % cmdargs)
if self.cmdconf['requiresREST']:
self.checkversion(getUrl(self.instance))
self.uri = getUrl(self.instance, resource='workflow')
self.checkversion()
self.defaultApi = 'workflow'
self.logger.debug("Instance is %s" %(self.instance))
self.logger.debug("Server base url is %s" %(self.serverurl))
if self.cmdconf['requiresREST']:
self.logger.debug("Command url %s" %(self.uri))
self.logger.debug("Command api %s" %(self.defaultApi))


def serverInstance(self):
Expand Down Expand Up @@ -414,8 +416,8 @@ def serverInstance(self):
return self.dbInstance, self.restHost


def checkversion(self, uriNoApi=None):
compatibleVersions = server_info(RESTServer=self.RESTServer, uriNoApi=uriNoApi, subresource='version')
def checkversion(self):
compatibleVersions = server_info(crabserver=self.crabserver, subresource='version')
for item in compatibleVersions:
if re.match(item, __version__):
self.logger.debug("CRABClient version: %s" % (__version__))
Expand Down Expand Up @@ -461,9 +463,8 @@ def handleMyProxy(self):
return

if not self.options.proxy:
uriNoApi = getUrl(self.instance)
# Get the DN of the task workers from the server.
all_task_workers_dns = server_info(self.RESTServer, uriNoApi=uriNoApi, subresource='delegatedn')
all_task_workers_dns = server_info(self.crabserver, subresource='delegatedn')
for serverdn in all_task_workers_dns['services']:
self.proxy.setServerDN(serverdn)
self.proxy.setMyProxyServer('myproxy.cern.ch')
Expand Down
3 changes: 1 addition & 2 deletions src/python/CRABClient/Commands/createmyproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def __call__(self):

# need an X509 proxy in order to talk with CRABServer to get list of myproxy authorized retrievers
proxy.proxyInfo = proxy.createNewVomsProxy(timeLeftThreshold=720)
baseurl = '/crabserver/prod'
alldns = server_info(RESTServer=self.RESTServer, uriNoApi=baseurl, subresource='delegatedn')
alldns = server_info(crabserver=self.crabserver, subresource='delegatedn')
for serverdn in alldns['services']:
proxy.defaultDelegation['serverDN'] = serverdn
proxy.defaultDelegation['myProxySvr'] = 'myproxy.cern.ch'
Expand Down
10 changes: 4 additions & 6 deletions src/python/CRABClient/Commands/getcommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.ClientExceptions import ConfigurationException, RESTCommunicationException,\
ClientException
from CRABClient.ClientUtilities import validateJobids, colors, getUrl
from CRABClient.ClientUtilities import validateJobids, colors
from CRABClient.UserUtilities import getMutedStatusInfo

class getcommand(SubCommand):
Expand All @@ -31,9 +31,8 @@ def __call__(self, **argv): # pylint: disable=arguments-differ

transferFlag = 'unknown'
inputlist = {'subresource': 'search', 'workflow': self.cachedinfo['RequestName']}
server = self.RESTServer
uri = getUrl(self.instance, resource='task')
dictresult, status, _ = server.get(uri, data=inputlist)
server = self.crabserver
dictresult, status, _ = server.get(api='task', data=inputlist)
self.logger.debug('Server result: %s' % dictresult)
splitting = None
if status == 200:
Expand Down Expand Up @@ -84,8 +83,7 @@ def __call__(self, **argv): # pylint: disable=arguments-differ
if getattr(self.options, 'jobids', None):
self.logger.debug('Retrieving jobs %s' % self.options.jobids)
inputlist.extend(self.options.jobids)
server = self.RESTServer
dictresult, status, reason = server.get(self.uri, data=urllib.urlencode(inputlist))
dictresult, status, reason = server.get(api=self.defaultApi, data=urllib.urlencode(inputlist))
self.logger.debug('Server result: %s' % dictresult)

if status != 200:
Expand Down
11 changes: 4 additions & 7 deletions src/python/CRABClient/Commands/getlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from httplib import HTTPException

from CRABClient.ClientUtilities import colors, getUrl, validateJobids, getColumn
from CRABClient.ClientUtilities import colors, validateJobids, getColumn
from CRABClient.UserUtilities import getFileFromURL
from CRABClient.Commands.getcommand import getcommand
from CRABClient.ClientExceptions import RESTCommunicationException, MissingOptionException
Expand All @@ -25,12 +25,9 @@ def __call__(self): # pylint: disable=arguments-differ
if self.options.short:
taskname = self.cachedinfo['RequestName']
inputlist = {'subresource': 'search', 'workflow': taskname}
server = self.RESTServer
uriNoApi = getUrl(self.instance)
uri = getUrl(self.instance, resource='task')
webdir = getProxiedWebDir(RESTServer=self.RESTServer, task=taskname, uriNoApi=uriNoApi, logFunction=self.logger.debug)
#webdir = getProxiedWebDir(taskname, self.serverurl, uri, self.proxyfilename, self.logger.debug)
dictresult, status, reason = server.get(uri, data=inputlist)
server = self.crabserver
webdir = getProxiedWebDir(crabserver=self.crabserver, task=taskname, logFunction=self.logger.debug)
dictresult, status, reason = server.get(api='task', data=inputlist)
if not webdir:
webdir = dictresult['result'][0]
self.logger.info('Server result: %s' % webdir)
Expand Down
4 changes: 2 additions & 2 deletions src/python/CRABClient/Commands/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ class kill(SubCommand):
visible = True

def __call__(self):
server = self.RESTServer
server = self.crabserver

self.logger.debug("Killing task %s" % self.cachedinfo['RequestName'])
inputs = {'workflow' : self.cachedinfo['RequestName']}
if self.options.killwarning:
inputs.update({'killwarning' : b64encode(self.options.killwarning)})

dictresult, status, reason = server.delete(self.uri, data=urllib.urlencode(inputs))
dictresult, status, reason = server.delete(api=self.defaultApi, data=urllib.urlencode(inputs))
self.logger.debug("Result: %s" % dictresult)

if status != 200:
Expand Down
10 changes: 3 additions & 7 deletions src/python/CRABClient/Commands/preparelocal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ServerUtilities import getProxiedWebDir, getColumn

import CRABClient.Emulator
from CRABClient.ClientUtilities import getUrl
from CRABClient.UserUtilities import getFileFromURL
from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.ClientExceptions import ClientException
Expand Down Expand Up @@ -62,10 +61,8 @@ def getInputFiles(self):

#Get task status from the task DB
self.logger.debug("Getting status from he DB")
uriNoApi = getUrl(self.instance)
uri = getUrl(self.instance, resource='task')
server = self.RESTServer
crabDBInfo, _, _ = server.get(uri, data={'subresource': 'search', 'workflow': taskname})
server = self.crabserver
crabDBInfo, _, _ = server.get(api='task', data={'subresource': 'search', 'workflow': taskname})
status = getColumn(crabDBInfo, 'tm_task_status')
self.destination = getColumn(crabDBInfo, 'tm_asyncdest')

Expand All @@ -78,8 +75,7 @@ def getInputFiles(self):
with tarfile.open('dry-run-sandbox.tar.gz') as tf:
tf.extractall()
elif status == 'SUBMITTED':
#webdir = getProxiedWebDir(taskname, self.serverurl, uri, self.proxyfilename, self.logger.debug)
webdir = getProxiedWebDir(RESTServer=self.RESTServer, task=taskname, uriNoApi=uriNoApi,
webdir = getProxiedWebDir(crabserver=self.crabserver, task=taskname,
logFunction=self.logger.debug)
if not webdir:
webdir = getColumn(crabDBInfo, 'tm_user_webdir')
Expand Down
4 changes: 2 additions & 2 deletions src/python/CRABClient/Commands/proceed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, logger, cmdargs=None):
SubCommand.__init__(self, logger, cmdargs)

def __call__(self):
server = self.RESTServer
server = self.crabserver

msg = "Continuing submission of task %s" % (self.cachedinfo['RequestName'])
self.logger.debug(msg)
Expand All @@ -21,7 +21,7 @@ def __call__(self):

self.logger.info("Sending the request to the server")
self.logger.debug("Submitting %s " % str(request))
result, status, reason = server.post(self.uri, data=urllib.urlencode(request))
result, status, reason = server.post(api=self.defaultApi, data=urllib.urlencode(request))
self.logger.debug("Result: %s" % (result))
if status != 200:
msg = "Problem continuing task submission:\ninput:%s\noutput:%s\nreason:%s" \
Expand Down
12 changes: 5 additions & 7 deletions src/python/CRABClient/Commands/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from WMCore.Services.UserFileCache.UserFileCache import UserFileCache

from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.ClientUtilities import colors, server_info, getUrl
from CRABClient.ClientUtilities import colors, server_info
from CRABClient.ClientExceptions import ConfigurationException, ConfigException

from ServerUtilities import getColumn
Expand All @@ -21,9 +21,8 @@ def __call__(self):

self.logger.info('Getting the tarball hash key')
inputlist = {'subresource': 'search', 'workflow': self.cachedinfo['RequestName']}
server = self.RESTServer
uri = getUrl(self.instance, resource='task')
dictresult, _, _ = server.get(uri, data=inputlist)
server = self.crabserver
dictresult, _, _ = server.get(api='task', data=inputlist)

tm_user_sandbox = getColumn(dictresult, 'tm_user_sandbox')
hashkey = tm_user_sandbox.replace(".tar.gz", "")
Expand All @@ -36,7 +35,7 @@ def __call__(self):
noSchedd = True

self.logger.info('Checking task status')
dictresult, _, _ = server.get(self.uri, data={'workflow': self.cachedinfo['RequestName'], 'verbose': 0})
dictresult, _, _ = server.get(api=self.defaultApi, data={'workflow': self.cachedinfo['RequestName'], 'verbose': 0})
status = dictresult['result'][0]['status']
self.logger.info('Task status: %s' % status)
accepstate = ['SUBMITFAILED', 'KILLED', 'FINISHED', 'FAILED', 'KILLFAILED', 'COMPLETED']
Expand All @@ -49,8 +48,7 @@ def __call__(self):
scheddresult = {}
gsisshdict = {}
if not self.options.scheddonly or noSchedd:
baseurl = getUrl(self.instance)
cacheurl = server_info(RESTServer=server, uriNoApi=baseurl, subresource='backendurls')
cacheurl = server_info(crabserver=server, subresource='backendurls')
cacheurl = cacheurl['cacheSSL']
cacheurldict = {'endpoint': cacheurl, 'pycurl': True}

Expand Down
4 changes: 2 additions & 2 deletions src/python/CRABClient/Commands/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ def collectReportData(self):
"""
reportData = {}

server = self.RESTServer
server = self.crabserver

self.logger.debug('Looking up report for task %s' % self.cachedinfo['RequestName'])

# Query server for information from the taskdb, intput/output file metadata from metadatadb
dictresult, status, _ = server.get(self.uri, data={'workflow': self.cachedinfo['RequestName'], 'subresource': 'report2'})
dictresult, status, _ = server.get(api=self.defaultApi, data={'workflow': self.cachedinfo['RequestName'], 'subresource': 'report2'})

self.logger.debug("Result: %s" % dictresult)
self.logger.info("Running crab status first to fetch necessary information.")
Expand Down
4 changes: 2 additions & 2 deletions src/python/CRABClient/Commands/request_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ class request_type(SubCommand):

def __call__(self):

server = self.RESTServer
server = self.crabserver

self.logger.debug('Looking type for task %s' % self.cachedinfo['RequestName'])
dictresult, status, reason = server.get(self.uri, # pylint: disable=unused-variable
dictresult, status, reason = server.get(api=self.defaultApi, # pylint: disable=unused-variable
data={'workflow': self.cachedinfo['RequestName'], 'subresource': 'type'})
self.logger.debug('Task type %s' % dictresult['result'][0])
return dictresult['result'][0]
Expand Down
23 changes: 6 additions & 17 deletions src/python/CRABClient/Commands/resubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.ClientExceptions import ConfigurationException
from CRABClient.UserUtilities import getMutedStatusInfo, getColumn
from CRABClient.ClientUtilities import validateJobids, checkStatusLoop, colors, getUrl
from CRABClient.ClientUtilities import validateJobids, colors

class resubmit(SubCommand):
"""
Expand Down Expand Up @@ -66,21 +66,17 @@ def __call__(self):
configreq_encoded = self._encodeRequest(configreq)
self.logger.debug("Encoded resubmit request: %s" % (configreq_encoded))

dictresult, _, _ = self.RESTServer.post(self.uri, data=configreq_encoded)
dictresult, _, _ = self.crabserver.post(api=self.defaultApi, data=configreq_encoded)
self.logger.debug("Result: %s" % (dictresult))
self.logger.info("Resubmit request sent to the server.")
if dictresult['result'][0]['result'] != 'ok':
msg = "Server responded with: '%s'" % (dictresult['result'][0]['result'])
self.logger.info(msg)
returndict = {'status': 'FAILED'}
else:
if not self.options.wait:
msg = "Please use ' crab status ' to check how the resubmission process proceeds."
msg += "\nNotice it may take a couple of minutes for the resubmission to get fully processed."
self.logger.info(msg)
else:
targetTaskStatus = 'SUBMITTED'
checkStatusLoop(self.logger, self.server, self.uri, self.cachedinfo['RequestName'], targetTaskStatus, self.name)
msg = "Please use ' crab status ' to check how the resubmission process proceeds."
msg += "\nNotice it may take a couple of minutes for the resubmission to get fully processed."
self.logger.info(msg)
returndict = {'status': 'SUCCESS'}

return returndict
Expand Down Expand Up @@ -244,12 +240,6 @@ def setOptions(self):
help="Set the priority of this task compared to other tasks you own; tasks default to 10." + \
" Tasks with higher priority values run first. This does not change your share compared to other users.")

self.parser.add_option('--wait',
dest='wait',
default=False,
action='store_true',
help="DEPRECATED")

self.parser.add_option('--force',
dest='force',
default=False,
Expand All @@ -270,8 +260,7 @@ def validateOptions(self):
"""
SubCommand.validateOptions(self)

uri = getUrl(self.instance, resource='task')
crabDBInfo, _, _ = self.RESTServer.get(uri, data={'subresource': 'search', 'workflow': self.cachedinfo['RequestName']})
crabDBInfo, _, _ = self.crabserver.get(api='task', data={'subresource': 'search', 'workflow': self.cachedinfo['RequestName']})
self.splitting = getColumn(crabDBInfo, 'tm_split_algo')

if self.options.publication:
Expand Down
Loading

0 comments on commit fdf6477

Please sign in to comment.