Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup userproxy from rest fix 6931 #6960

Merged
merged 4 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions src/python/CRABInterface/DataUserWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ def report(self, workflow, userdn, usedbs):
def report2(self, workflow, userdn, usedbs):
return self.workflow.report2(workflow, userdn)

def logs(self, workflow, howmany, exitcode, jobids, userdn, userproxy=None):
def logs(self, workflow, howmany, exitcode, jobids, userdn):
"""Returns the workflow logs PFN. It takes care of the LFN - PFN conversion too.

:arg str workflow: a workflow name
:arg int howmany: the limit on the number of PFN to return
:arg int exitcode: the log has to be of a job ended with this exit_code
:return: a generator of list of logs pfns"""
return self.workflow.logs(workflow, howmany, exitcode, jobids, userdn, userproxy)
return self.workflow.logs(workflow, howmany, exitcode, jobids, userdn)

def logs2(self, workflow, howmany, jobids):
"""Returns information about the workflow log files.
Expand All @@ -64,13 +64,13 @@ def logs2(self, workflow, howmany, jobids):
:return: a generator of list of logs pfns"""
return self.workflow.logs2(workflow, howmany, jobids)

def output(self, workflow, howmany, jobids, userdn, userproxy=None):
def output(self, workflow, howmany, jobids, userdn):
"""Returns the workflow output PFN. It takes care of the LFN - PFN conversion too.

:arg str list workflow: a workflow name
:arg int howmany: the limit on the number of PFN to return
:return: a generator of list of output pfns"""
return self.workflow.output(workflow, howmany, jobids, userdn, userproxy)
return self.workflow.output(workflow, howmany, jobids, userdn)

def output2(self, workflow, howmany, jobids):
"""Returns information about the workflow output files.
Expand Down Expand Up @@ -144,38 +144,34 @@ def submit(self, *args, **kwargs):

return self.workflow.submit(*args, **kwargs)

def resubmit(self, workflow, publication, jobids, force, siteblacklist, sitewhitelist, maxjobruntime, maxmemory, numcores, priority, userdn, userproxy=None):
def resubmit(self, workflow, publication, jobids, force, siteblacklist, sitewhitelist, maxjobruntime, maxmemory, numcores, priority, userdn):
"""Request to Resubmit a workflow.

:arg str workflow: a workflow name"""
return self.workflow.resubmit(workflow, publication, jobids, force, siteblacklist, sitewhitelist, maxjobruntime, maxmemory, numcores, priority, userdn, userproxy)
return self.workflow.resubmit(workflow, publication, jobids, force, siteblacklist, sitewhitelist, maxjobruntime, maxmemory, numcores, priority, userdn)

def resubmit2(self, workflow, publication, jobids, siteblacklist, sitewhitelist, maxjobruntime, maxmemory, numcores, priority,
userproxy=None):
def resubmit2(self, workflow, publication, jobids, siteblacklist, sitewhitelist, maxjobruntime, maxmemory, numcores, priority):
"""Request to Resubmit a workflow.

:arg str workflow: a workflow name"""
return self.workflow.resubmit2(workflow, publication, jobids, siteblacklist, sitewhitelist, maxjobruntime, maxmemory, numcores, priority,
userproxy)
return self.workflow.resubmit2(workflow, publication, jobids, siteblacklist, sitewhitelist, maxjobruntime, maxmemory, numcores, priority)

def status(self, workflow, userdn, userproxy=None, verbose=False):
def status(self, workflow, userdn, verbose=False):
"""Retrieve the status of the workflow

:arg str workflow: a valid workflow name
:arg str userdn: the user dn makind the request
:arg str userproxy: the user proxy retrieved by `retrieveUserCert`
:return: a generator of workflow states
"""
return self.workflow.status(workflow, userdn, userproxy)
return self.workflow.status(workflow, userdn)

def kill(self, workflow, force, killwarning, userdn, userproxy=None):
def kill(self, workflow, force, killwarning, userdn):
"""Request to Abort a workflow.

:arg str workflow: a workflow name
:arg str force: a flag to know if kill should be brutal
:arg str userproxy: the user proxy retrieved by `retrieveUserCert`
:arg int force: force to delete the workflows in any case; 0 no, everything else yes"""
return self.workflow.kill(workflow, force, killwarning, userdn, userproxy)
return self.workflow.kill(workflow, force, killwarning, userdn)

def proceed(self, workflow):
"""Continue a task initialized with 'crab submit --dryrun'.
Expand Down
14 changes: 7 additions & 7 deletions src/python/CRABInterface/DataWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def submit(self, workflow, activity, jobtype, jobsw, jobarch, use_parent, second
sitewhitelist, splitalgo, algoargs, cachefilename, cacheurl, addoutputfiles,
username, userdn, savelogsflag, publication, publishname, publishname2, asyncdest, dbsurl, publishdbsurl, vorole, vogroup, tfileoutfiles, edmoutfiles,
runs, lumis, totalunits, adduserfiles, oneEventMode=False, maxjobruntime=None, numcores=None, maxmemory=None, priority=None, lfn=None,
ignorelocality=None, saveoutput=None, faillimit=10, userfiles=None, userproxy=None, asourl=None, asodb=None, scriptexe=None, scriptargs=None,
ignorelocality=None, saveoutput=None, faillimit=10, userfiles=None, asourl=None, asodb=None, scriptexe=None, scriptargs=None,
scheddname=None, extrajdl=None, collector=None, dryrun=False, publishgroupname=False, nonvaliddata=False, inputdata=None, primarydataset=None,
debugfilename=None, submitipaddr=None, ignoreglobalblacklist=False):
"""Perform the workflow injection
Expand Down Expand Up @@ -250,9 +250,9 @@ def publicationStatusWrapper(self, workflow, asourl, asodb, username, publicatio
publicationInfo['status'] = {'disabled': []}
return publicationInfo

@conn_handler(services=['servercert'])
@conn_handler(services=[])
def resubmit2(self, workflow, publication, jobids, siteblacklist, sitewhitelist, maxjobruntime, maxmemory,
numcores, priority, userproxy):
numcores, priority):
"""Request to reprocess what the workflow hasn't finished to reprocess.
This needs to create a new workflow in the same campaign
"""
Expand Down Expand Up @@ -323,7 +323,7 @@ def resubmit2(self, workflow, publication, jobids, siteblacklist, sitewhitelist,
## Here we can add a check on the publication status of the documents
## corresponding to the job ids in resubmitjobids and jobids. So far the
## publication resubmission will resubmit all the failed publications.
self.resubmitPublication(asourl, asodb, userproxy, workflow)
self.resubmitPublication(asourl, asodb, workflow)
return [{'result': retmsg}]
else:
self.logger.info("Jobs to resubmit: %s", jobids)
Expand Down Expand Up @@ -370,7 +370,7 @@ def resubmit2(self, workflow, publication, jobids, siteblacklist, sitewhitelist,
self.api.modify(self.Task.SetStatusTask_sql, status = newstate, command = newcommand, taskname = [workflow])
return [{'result': retmsg}]

def status(self, workflow, userdn, userproxy=None):
def status(self, workflow, userdn):
"""Retrieve the status of the workflow.

:arg str workflow: a valid workflow name
Expand All @@ -379,7 +379,7 @@ def status(self, workflow, userdn, userproxy=None):


@conn_handler(services=['centralconfig'])
def kill(self, workflow, force, jobids, killwarning, userdn, userproxy=None):
def kill(self, workflow, force, jobids, killwarning, userdn):
"""Request to Abort a workflow.

:arg str workflow: a workflow name"""
Expand Down Expand Up @@ -428,7 +428,7 @@ def proceed(self, workflow):

return [{'result': 'ok'}]

def resubmitPublication(self, asourl, asodb, proxy, taskname):
def resubmitPublication(self, asourl, asodb, taskname):

return self.resubmitOraclePublication(taskname)

Expand Down
6 changes: 3 additions & 3 deletions src/python/CRABInterface/HTCondorDataWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ def report2(self, workflow, userdn):
yield res

@throttle.make_throttled()
@conn_handler(services=['centralconfig', 'servercert'])
def status(self, workflow, userdn, userproxy=None):
@conn_handler(services=['centralconfig'])
def status(self, workflow, userdn):
"""Retrieve the status of the workflow.

:arg str workflow: a valid workflow name
Expand Down Expand Up @@ -477,7 +477,7 @@ def taskWebStatus(self, task_ad, statusResult):
fp.close()
hbuf.close()

@conn_handler(services=['servercert'])
@conn_handler(services=[])
def publicationStatus(self, workflow, asourl, asodb, user):
"""Here is what basically the function return, a dict called publicationInfo in the subcalls:
publicationInfo['status']: something like {'publishing': 0, 'publication_failed': 0, 'not_published': 0, 'published': 5}.
Expand Down
8 changes: 2 additions & 6 deletions src/python/CRABInterface/RESTBaseAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ def __init__(self, app, config, mount):

self.formats = [ ('application/json', JSONFormat()) ]

status, serverdn = getstatusoutput('openssl x509 -noout -subject -in %s | cut -f2- -d\ ' % config.serverhostcert)
if status is not 0:
raise ExecutionError("Internal issue when retrieving crabserver service DN.")

extconfig = ConfigCache(centralconfig=getCentralConfig(extconfigurl=config.extconfigurl, mode=config.mode),
cachetime=mktime(gmtime()))

Expand All @@ -49,12 +45,12 @@ def __init__(self, app, config, mount):
DataWorkflow.globalinit(dbapi=self, credpath=config.credpath, centralcfg=extconfig, config=config)
DataFileMetadata.globalinit(dbapi=self, config=config)
RESTTask.globalinit(centralcfg=extconfig)
globalinit(config.serverhostkey, config.serverhostcert, serverdn, config.credpath)
globalinit(config.credpath)

## TODO need a check to verify the format depending on the resource
## the RESTFileMetadata has the specifc requirement of getting xml reports
self._add( {'workflow': RESTUserWorkflow(app, self, config, mount, extconfig),
'info': RESTServerInfo(app, self, config, mount, serverdn, extconfig),
'info': RESTServerInfo(app, self, config, mount, extconfig),
'filemetadata': RESTFileMetadata(app, self, config, mount),
'workflowdb': RESTWorkerWorkflow(app, self, config, mount),
'task': RESTTask(app, self, config, mount),
Expand Down
3 changes: 1 addition & 2 deletions src/python/CRABInterface/RESTServerInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
class RESTServerInfo(RESTEntity):
"""REST entity for workflows and relative subresources"""

def __init__(self, app, api, config, mount, serverdn, centralcfg):
def __init__(self, app, api, config, mount, centralcfg):
RESTEntity.__init__(self, app, api, config, mount)
self.centralcfg = centralcfg
self.serverdn = serverdn
self.logger = logging.getLogger("CRABLogger:RESTServerInfo")
#used by the client to get the url where to update the cache (cacheSSL)

Expand Down
56 changes: 5 additions & 51 deletions src/python/CRABInterface/Utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
from WMCore.WMFactory import WMFactory
from WMCore.REST.Error import ExecutionError, InvalidParameter
from WMCore.Services.CRIC.CRIC import CRIC
from WMCore.Credential.SimpleMyProxy import SimpleMyProxy, MyProxyException
from WMCore.Credential.Proxy import Proxy
from WMCore.Services.pycurl_manager import ResponseHeader

from Utils.Utilities import encodeUnicodeToBytes
Expand All @@ -29,9 +27,6 @@
ConfigCache = namedtuple("ConfigCache", ["cachetime", "centralconfig"])

#These parameters are set in the globalinit (called in RESTBaseAPI)
serverCert = None
serverKey = None
serverDN = None
credServerPath = None

def getDBinstance(config, namespace, name):
Expand All @@ -41,13 +36,13 @@ def getDBinstance(config, namespace, name):
backend = 'Oracle'

#factory = WMFactory(name = 'TaskQuery', namespace = 'Databases.TaskDB.%s.Task' % backend)
factory = WMFactory(name = name, namespace = 'Databases.%s.%s.%s' % (namespace, backend, name))
factory = WMFactory(name=name, namespace='Databases.%s.%s.%s' % (namespace, backend, name))

return factory.loadObject( name )

def globalinit(serverkey, servercert, serverdn, credpath):
global serverCert, serverKey, serverDN, credServerPath # pylint: disable=global-statement
serverCert, serverKey, serverDN, credServerPath = servercert, serverkey, serverdn, credpath
def globalinit(credpath):
global credServerPath # pylint: disable=global-statement
credServerPath = credpath

def execute_command(command, logger, timeout):
"""
Expand Down Expand Up @@ -172,8 +167,7 @@ def conn_handler(services):
as CRIC, WMStats monitoring

arg str list services: list of string telling which service connections
should be started; currently availables are
'monitor' and 'asomonitor'.
should be started
"""
def wrap(func):
def wrapped_func(*args, **kwargs):
Expand All @@ -182,46 +176,6 @@ def wrapped_func(*args, **kwargs):
args[0].allPNNNames = CMSSitesCache(sites=CRIC().getAllPhEDExNodeNames(), cachetime=mktime(gmtime()))
if 'centralconfig' in services and (not args[0].centralcfg.centralconfig or (args[0].centralcfg.cachetime+1800 < mktime(gmtime()))):
args[0].centralcfg = ConfigCache(centralconfig=getCentralConfig(extconfigurl=args[0].config.extconfigurl, mode=args[0].config.mode), cachetime=mktime(gmtime()))
if 'servercert' in services:
args[0].serverCert = serverCert
args[0].serverKey = serverKey
return func(*args, **kwargs)
return wrapped_func
return wrap

def retrieveUserCert(func):
def wrapped_func(*args, **kwargs):
logger = logging.getLogger("CRABLogger.Utils")
myproxyserver = "myproxy.cern.ch"
defaultDelegation = {'logger': logger,
'proxyValidity': '192:00',
'min_time_left': 36000,
'server_key': serverKey,
'server_cert': serverCert,}
mypclient = SimpleMyProxy(defaultDelegation)
userproxy = None
userhash = sha1(encodeUnicodeToBytes(kwargs['userdn'])).hexdigest()
if serverDN:
try:
userproxy = mypclient.logonRenewMyProxy(username=userhash, myproxyserver=myproxyserver, myproxyport=7512)
except MyProxyException as me:
# Unsure if this works in standalone mode...
cherrypy.log(str(me))
cherrypy.log(str(serverKey))
cherrypy.log(str(serverCert))
invalidp = InvalidParameter("Impossible to retrieve proxy from %s for %s and hash %s" %
(myproxyserver, kwargs['userdn'], userhash))
setattr(invalidp, 'trace', str(me))
raise invalidp

else:
if not re.match(RX_CERT, userproxy):
raise InvalidParameter("Retrieved malformed proxy from %s for %s and hash %s" %
(myproxyserver, kwargs['userdn'], userhash))
else:
proxy = Proxy(defaultDelegation)
userproxy = proxy.getProxyFilename()
kwargs['userproxy'] = userproxy
out = func(*args, **kwargs)
return out
return wrapped_func