From 517b1e28821c94e625fc2be036d52c2d1ce91e36 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Mon, 21 Oct 2024 18:53:32 +0200 Subject: [PATCH 1/2] remove debug_files handling in TW. Do it in scheduler --- scripts/AdjustSites.py | 63 +++++++++++++++++-- .../TaskWorker/Actions/DagmanCreator.py | 51 +-------------- 2 files changed, 58 insertions(+), 56 deletions(-) diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index f98f41688d..2d233a6712 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -22,6 +22,7 @@ import glob import shutil import logging +import tarfile from urllib.parse import urlencode import traceback from datetime import datetime @@ -399,7 +400,7 @@ def checkTaskInfo(taskDict, ad): printLog('Exiting AdjustSites because this dagman does not match task information in TASKS DB') sys.exit(3) -def getSandbox(taskDict, crabserver): +def getSandbox(taskDict, crabserver, logger): """ Getting user sandbox (sandbox.tar.gz) from S3. It will not redownload sandbox if file exists. @@ -409,6 +410,8 @@ def getSandbox(taskDict, crabserver): :type taskDict: dict :param crabserver: CRABRest object to talk with RESTCache API :type crabserver: RESTInteractions.CRABRest + :param logger: downloadFromS3 requires a logger ! + :type logger: logging.logger object """ sandboxTarBall = 'sandbox.tar.gz' sandboxTarBallTmp = sandboxTarBall + '_tmp' @@ -416,9 +419,6 @@ def getSandbox(taskDict, crabserver): printLog('sandbox.tar.gz already exist. Do nothing.') return - # init logger require by downloadFromS3 - logger = setupStreamLogger() - # get info username = getColumn(taskDict, 'tm_username') sandboxName = getColumn(taskDict, 'tm_user_sandbox') @@ -434,6 +434,54 @@ def getSandbox(taskDict, crabserver): "(resubmit will not work) and contact the experts if the error persists.\nError reason: %s", str(ex)) sys.exit(4) +def getDebugFiles(taskDict, crabserver, logger): + """ + Ops mon (crabserver/ui) needs access to files from the debug_files.tar.gz + Retrieve and expand debug_files.tar.gz from S3 in here for http access. + It will not redownload tarball if file exists. + This function contains side effect: debug_files.tar.gz(_tmp) and debug directory + are created in current directory. + :param taskDict: task info return from REST. + :type taskDict: dict + :param crabserver: CRABRest object to talk with RESTCache API + :type crabserver: RESTInteractions.CRABRest + :param logger: downloadFromS3 requires a logger ! + :type logger: logging.logger object + """ + debugTarball = 'debug_files.tar.gz' + if os.path.exists(debugTarball): + printLog('sandbox.tar.gz already exist. Do nothing.') + return + + # get info + username = getColumn(taskDict, 'tm_username') + sandboxName = getColumn(taskDict, 'tm_debug_files') + + # download + debugTarballTmp = debugTarball + '_tmp' + try: + downloadFromS3(crabserver=crabserver, objecttype='sandbox', username=username, + tarballname=sandboxName, filepath=debugTarballTmp, logger=logger) + shutil.move(debugTarballTmp, debugTarball) + except Exception as ex: # pylint: disable=broad-except + logger.exception("CRAB server backend could not download the tarball with debug files " + \ + "from S3, ops monitor will not work. Exception:\n %s", str(ex)) + return + + # extract files + try: + with tarfile.open(name=debugTarball, mode='r') as debugTar: + debugTar.extractall() + except Exception as ex: # pylint: disable=broad-except + logger.exception("CRAB server backend could expand the tarball with debug files, " + \ + "ops monitor will not work. Exception:\n %s", str(ex)) + return + + # Change permissions of extracted files to allow Ops mon to read them. + for _, _, filenames in os.walk('debug'): + for f in filenames: + os.chmod('debug/' + f, 0o644) + def main(): """ @@ -474,8 +522,11 @@ def main(): # check task status checkTaskInfo(taskDict=dictresult, ad=ad) - # get sandbox - getSandbox(taskDict=dictresult, crabserver=crabserver) + # init logger required by downloadFromS3 + logger = setupStreamLogger() + # get sandboxes + getSandbox(taskDict=dictresult, crabserver=crabserver, logger=logger) + getDebugFiles(taskDict=dictresult, crabserver=crabserver, logger=logger) # is this the first time this script runs for this task ? (it runs at each resubmit as well !) if not os.path.exists('WEB_DIR'): diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 3a347d514e..6e098b9a12 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -1117,43 +1117,6 @@ def getBlacklistMsg(): return info, splitterResult, subdags, dagSpecs - - def extractMonitorFiles(self, inputFiles, **kw): - """ - Ops mon needs access to some files from the debug_files.tar.gz or sandbox.tar.gz. - tarball. - If an older client is used, the files are in the sandbox, the newer client (3.3.1607) - separates them into a debug_file tarball to allow sandbox recycling and not break the ops mon. - The files are extracted here to the debug folder to be later sent to the schedd. - - Modifies inputFiles list by appending the debug folder if the extraction succeeds. - """ - - tarFileName = 'sandbox.tar.gz' if not os.path.isfile('debug_files.tar.gz') else 'debug_files.tar.gz' - try: - debugTar = tarfile.open(tarFileName) - debugTar.extract('debug/crabConfig.py') - debugTar.extract('debug/originalPSet.py') - scriptExeName = kw['task'].get('tm_scriptexe') - if scriptExeName != None: - debugTar.extract(scriptExeName) - shutil.copy(scriptExeName, 'debug/' + scriptExeName) - debugTar.close() - - inputFiles.append('debug') - - # Change permissions of extracted files to allow Ops mon to read them. - for _, _, filenames in os.walk('debug'): - for f in filenames: - os.chmod('debug/' + f, 0o644) - except Exception as ex: # pylint: disable=broad-except - self.logger.exception(ex) - self.uploadWarning("Extracting files from %s failed, ops monitor will not work." % tarFileName, \ - kw['task']['user_proxy'], kw['task']['tm_taskname']) - - return - - def getHighPrioUsers(self, userProxy, workflow, egroups): # Import needed because the DagmanCreator module is also imported in the schedd, # where there is no ldap available. This function however is only called @@ -1193,16 +1156,14 @@ def executeInternal(self, *args, **kw): shutil.copy(bootstrap_location, '.') shutil.copy(adjust_location, '.') - # amke sure we have InputSandBox and debug files + # make sure we have InputSandBox sandboxTarBall = 'sandbox.tar.gz' - debugTarBall = 'debug_files.tar.gz' # Bootstrap the ISB if we are running in the TW if self.crabserver: username = kw['task']['tm_username'] taskname = kw['task']['tm_taskname'] sandboxName = kw['task']['tm_user_sandbox'] - dbgFilesName = kw['task']['tm_debug_files'] self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") try: checkS3Object(crabserver=self.crabserver, objecttype='sandbox', taskname=taskname, @@ -1213,12 +1174,6 @@ def executeInternal(self, *args, **kw): "from S3.\nThis could be a temporary glitch; please try to submit a new task later " + \ "(resubmit will not work) and contact the experts if the error persists." + \ f"\nError reason: {ex}") from ex - # still need this download, until we change AdjustSites.py to do it there - try: - downloadFromS3(crabserver=self.crabserver, objecttype='sandbox', username=username, - tarballname=dbgFilesName, filepath=debugTarBall, logger=self.logger) - except Exception as ex: # pylint: disable=broad-except - self.logger.exception(ex) # Bootstrap the runtime if it is available. job_runtime = getLocation('CMSRunAnalysis.tar.gz', 'CRABServer/') @@ -1234,8 +1189,6 @@ def executeInternal(self, *args, **kw): 'AdjustSites.py', 'site.ad.json', 'datadiscovery.pkl', 'taskinformation.pkl', 'taskworkerconfig.pkl', 'run_and_lumis.tar.gz', 'input_files.tar.gz'] - self.extractMonitorFiles(inputFiles, **kw) - if os.path.exists("CMSRunAnalysis.tar.gz"): inputFiles.append("CMSRunAnalysis.tar.gz") if os.path.exists("TaskManagerRun.tar.gz"): @@ -1243,8 +1196,6 @@ def executeInternal(self, *args, **kw): if kw['task']['tm_input_dataset']: inputFiles.append("input_dataset_lumis.json") inputFiles.append("input_dataset_duplicate_lumis.json") - if kw['task']['tm_debug_files']: - inputFiles.append("debug_files.tar.gz") info, splitterResult, subdags, dagSpecs = self.createSubdag(*args, **kw) From 21b3fc27265ca0ba0dd7b8ea96bda94be82b371b Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Mon, 21 Oct 2024 22:39:28 +0200 Subject: [PATCH 2/2] pylint and some cleanup --- .../TaskWorker/Actions/DagmanCreator.py | 168 +++++++++--------- 1 file changed, 84 insertions(+), 84 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 6e098b9a12..1590207d64 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -4,6 +4,10 @@ Generates the condor submit files and the master DAG. """ # pylint: disable=invalid-name # have a lot of snake_case varaibles here from "old times" +# also.. yeah.. this is a long and somehow complex file, but we are not going to break it now +# pylint: disable=too-many-locals, too-many-branches, too-many-statements, too-many-lines, too-many-arguments +# there just one very long line in the HTCondor JDL template +# pylint: disable=line-too-long import os import re @@ -17,7 +21,7 @@ from ast import literal_eval from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME -from ServerUtilities import getLock, downloadFromS3, checkS3Object, uploadToS3 +from ServerUtilities import getLock, checkS3Object, uploadToS3 import TaskWorker.DataObjects.Result from TaskWorker.Actions.TaskAction import TaskAction @@ -26,6 +30,7 @@ from CMSGroupMapper import get_egroup_users import WMCore.WMSpec.WMTask +from WMCore import Lexicon from WMCore.Services.CRIC.CRIC import CRIC from WMCore.WMRuntime.Tools.Scram import ARCH_TO_OS, SCRAM_TO_ARCH @@ -128,7 +133,8 @@ WhenToTransferOutput = ON_EXIT_OR_EVICT +SpoolOnEvict = false -# Keep job in the queue upon completion long enough for the postJob to run, allowing the monitoring script to fetch the postJob status and job exit-code updated by the postJob +# Keep job in the queue upon completion long enough for the postJob to run, +# allowing the monitoring script to fetch the postJob status and job exit-code updated by the postJob LeaveJobInQueue = ifThenElse((JobStatus=?=4 || JobStatus=?=3) && (time() - EnteredCurrentStatus < 30 * 60*60), true, false) universe = vanilla @@ -189,12 +195,16 @@ def getCreateTimestamp(taskname): + """ name says it all """ return "_".join(taskname.split(":")[:1]) def makeLFNPrefixes(task): - ## Once we don't care anymore about backward compatibility with crab server < 3.3.1511 - ## we can uncomment the 1st line below and remove the next 6 lines. + """ + create LFN's for output files both on /store/temp and on final destination + Once we don't care anymore about backward compatibility with crab server < 3.3.1511 + we can uncomment the 1st line below and remove the next 6 lines. + """ #primaryds = task['tm_primary_dataset'] if task['tm_primary_dataset']: primaryds = task['tm_primary_dataset'] @@ -211,7 +221,7 @@ def makeLFNPrefixes(task): hash_input = hash_input.encode('utf-8') pset_hash = hashlib.sha1(hash_input).hexdigest() user = task['tm_username'] - tmp_user = "%s.%s" % (user, pset_hash) + tmp_user = f"{user}.{pset_hash}" publish_info = task['tm_publish_name'].rsplit('-', 1) #publish_info[0] is the publishname or the taskname timestamp = getCreateTimestamp(task['tm_taskname']) splitlfn = lfn.split('/') @@ -231,7 +241,6 @@ def validateLFNs(path, outputFiles): :param outputFiles: list of strings: the filenames to be published (w/o the jobId, i.e. out.root not out_1.root) :return: nothing if all OK. If LFN is not valid Lexicon raises an AssertionError exception """ - from WMCore import Lexicon # fake values to get proper LFN length, actual numbers chance job by job jobId = '10000' # current max is 10k jobs per task dirCounter = '0001' # need to be same length as 'counter' used later in makeDagSpecs @@ -239,14 +248,14 @@ def validateLFNs(path, outputFiles): for origFile in outputFiles: info = origFile.rsplit(".", 1) if len(info) == 2: # filename ends with ., put jobId before the dot - fileName = "%s_%s.%s" % (info[0], jobId, info[1]) + fileName = f"{info[0]}_{jobId}.{info[1]}" else: - fileName = "%s_%s" % (origFile, jobId) + fileName = f"{origFile}_{jobId}" testLfn = os.path.join(path, dirCounter, fileName) Lexicon.lfn(testLfn) # will raise if testLfn is not a valid lfn # since Lexicon does not have lenght check, do it manually here. if len(testLfn) > 500: - msg = "\nYour task specifies an output LFN %d-char long " % len(testLfn) + msg = f"\nYour task specifies an output LFN {len(testLfn)}-char long " msg += "\n which exceeds maximum length of 500" msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) @@ -258,7 +267,6 @@ def validateUserLFNs(path, outputFiles): :param outputFiles: list of strings: the filenames to be published (w/o the jobId, i.e. out.root not out_1.root) :return: nothing if all OK. If LFN is not valid Lexicon raises an AssertionError exception """ - from WMCore import Lexicon # fake values to get proper LFN length, actual numbers chance job by job jobId = '10000' # current max is 10k jobs per task dirCounter = '0001' # need to be same length as 'counter' used later in makeDagSpecs @@ -266,18 +274,17 @@ def validateUserLFNs(path, outputFiles): for origFile in outputFiles: info = origFile.rsplit(".", 1) if len(info) == 2: # filename ends with ., put jobId before the dot - fileName = "%s_%s.%s" % (info[0], jobId, info[1]) + fileName = f"{info[0]}_{jobId}.{info[1]}" else: - fileName = "%s_%s" % (origFile, jobId) + fileName = f"{origFile}_{jobId}" testLfn = os.path.join(path, dirCounter, fileName) Lexicon.userLfn(testLfn) # will raise if testLfn is not a valid lfn # since Lexicon does not have lenght check, do it manually here. if len(testLfn) > 500: - msg = "\nYour task specifies an output LFN %d-char long " % len(testLfn) + msg = f"\nYour task specifies an output LFN {len(testLfn)}-char long " msg += "\n which exceeds maximum length of 500" msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) - return def transform_strings(data): """ @@ -293,7 +300,7 @@ def transform_strings(data): 'required_arch', 'resthost', 'dbinstance', 'submitter_ip_addr', \ 'task_lifetime_days', 'task_endtime', 'maxproberuntime', 'maxtailruntime': val = data.get(var, None) - if val == None: + if val is None: info[var] = 'undefined' else: info[var] = json.dumps(val) @@ -306,7 +313,7 @@ def transform_strings(data): for var in 'siteblacklist', 'sitewhitelist', 'addoutputfiles', 'tfileoutfiles', 'edmoutfiles': val = data[var] - if val == None: + if val is None: info[var] = "{}" else: info[var] = "{" + json.dumps(val)[1:-1] + "}" @@ -332,15 +339,13 @@ def transform_strings(data): return info -def getLocation(default_name, checkout_location): +def getLocation(default_name): """ Get the location of the runtime code (job wrapper, postjob, anything executed on the schedd and on the worker node) First check if the files are present in the current working directory Then check if CRABTASKWORKER_ROOT is in the environment and use that location (that viariable is set by the taskworker init script. In the prod source script we use "export CRABTASKWORKER_ROOT") - Finally, check if the CRAB3_CHECKOUT variable is set. That option is interesting for developer who - can use this to point to their github repository. (Marco: we need to check this) """ loc = default_name if not os.path.exists(loc): @@ -349,9 +354,7 @@ def getLocation(default_name, checkout_location): fname = os.path.join(os.environ['CRABTASKWORKER_ROOT'], path, loc) if os.path.exists(fname): return fname - if 'CRAB3_CHECKOUT' not in os.environ: - raise Exception("Unable to locate %s" % loc) - loc = os.path.join(os.environ['CRAB3_CHECKOUT'], checkout_location, loc) + raise Exception(f"Unable to locate {loc}") # pylint: disable=broad-exception-raised loc = os.path.abspath(loc) return loc @@ -363,17 +366,19 @@ class DagmanCreator(TaskAction): """ def __init__(self, config, crabserver, procnum=-1, rucioClient=None): + """ need a comment line here """ TaskAction.__init__(self, config, crabserver, procnum) self.rucioClient = rucioClient def populateGlideinMatching(self, info): + """ actually simply set the required arch """ scram_arch = info['tm_job_arch'] # Set defaults info['required_arch'] = "X86_64" # The following regex matches a scram arch into four groups # for example el9_amd64_gcc10 is matched as (el)(9)_(amd64)_(gcc10) # later, only the third group is returned, the one corresponding to the arch. - m = re.match("([a-z]+)(\d+)_(\w+)_(\w+)", scram_arch) + m = re.match(r"([a-z]+)(\d+)_(\w+)_(\w+)", scram_arch) if m: _, _, arch, _ = m.groups() if arch not in SCRAM_TO_ARCH: @@ -392,12 +397,11 @@ def getDashboardTaskType(self, task): return task['tm_activity'] def isHammerCloud(self, task): - if task['tm_activity'] and 'HC' in task['tm_activity'].upper(): - return True - else: - return False + " name says it all " + return task['tm_activity'] and 'HC' in task['tm_activity'].upper() def setCMS_WMTool(self, task): + " for reporting to MONIT " if self.isHammerCloud(task): WMTool = 'HammerCloud' else: @@ -405,6 +409,7 @@ def setCMS_WMTool(self, task): return WMTool def setCMS_TaskType(self, task): + " for reporting to MONIT " if self.isHammerCloud(task): taskType = task['tm_activity'] else: @@ -415,6 +420,7 @@ def setCMS_TaskType(self, task): return taskType def setCMS_Type(self, task): + " for reporting to MONIT " if self.isHammerCloud(task): cms_type = 'Test' else: @@ -494,7 +500,6 @@ def makeJobSubmit(self, task): self.populateGlideinMatching(info) - # TODO: pass through these correctly. info['runs'] = [] info['lumis'] = [] info['saveoutput'] = 1 if info['tm_transfer_outputs'] == 'T' else 0 # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py @@ -539,11 +544,11 @@ def makeJobSubmit(self, task): info['additional_environment_options'] += 'CRAB_RUNTIME_TARBALL=local' info['additional_input_file'] += ", CMSRunAnalysis.tar.gz" else: - raise TaskWorkerException("Cannot find CMSRunAnalysis.tar.gz inside the cwd: %s" % os.getcwd()) + raise TaskWorkerException(f"Cannot find CMSRunAnalysis.tar.gz inside the cwd: {os.getcwd()}") if os.path.exists("TaskManagerRun.tar.gz"): info['additional_environment_options'] += ' CRAB_TASKMANAGER_TARBALL=local' else: - raise TaskWorkerException("Cannot find TaskManagerRun.tar.gz inside the cwd: %s" % os.getcwd()) + raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}") info['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap info['additional_input_file'] += ", run_and_lumis.tar.gz" info['additional_input_file'] += ", input_files.tar.gz" @@ -572,13 +577,14 @@ def getPreScriptDefer(self, task, jobid): releaseTimeout = int(ej.split('=')[1]) if slowJobRelease: - prescriptDeferString = 'DEFER 4 %s' % (jobid * releaseTimeout) + prescriptDeferString = f"DEFER 4 {jobid * releaseTimeout}" else: prescriptDeferString = '' return prescriptDeferString def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasites, outfiles, startjobid, parent=None, stage='conventional'): + """ need a comment line here """ dagSpecs = [] i = startjobid temp_dest, dest = makeLFNPrefixes(task) @@ -593,7 +599,7 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite except AssertionError as ex: msg = "\nYour task specifies an output LFN which fails validation in" msg += "\n WMCore/Lexicon and therefore can not be handled in our DataBase" - msg += "\nError detail: %s" % (str(ex)) + msg += f"\nError detail: {ex}" raise SubmissionRefusedException(msg) from ex groupid = len(siteinfo['group_sites']) siteinfo['group_sites'][groupid] = list(availablesites) @@ -620,23 +626,21 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite if parent is None or parent == "": count = str(i) else: - count = '{parent}-{i}'.format(parent=parent, i=i) + count = f"{parent}-{i}" siteinfo[count] = groupid remoteOutputFiles = [] localOutputFiles = [] for origFile in outfiles: info = origFile.rsplit(".", 1) if len(info) == 2: - fileName = "%s_%s.%s" % (info[0], count, info[1]) + fileName = f"{info[0]}_{count}.{info[1]}" else: - fileName = "%s_%s" % (origFile, count) - remoteOutputFiles.append("%s" % fileName) - localOutputFiles.append("%s=%s" % (origFile, fileName)) + fileName = f"{origFile}_{count}" + remoteOutputFiles.append(fileName) + localOutputFiles.append(f"{origFile}={fileName}") remoteOutputFilesStr = " ".join(remoteOutputFiles) localOutputFiles = ", ".join(localOutputFiles) - # no need to use // in the next line, thanks to integer formatting with `%d` - # see: https://docs.python.org/3/library/string.html#formatstrings - counter = "%04d" % (i / 1000) + counter = f"{(i // 1000):04d}" # counter=0000 for i<999, 1 for 1000 200 # info['faillimit'] = 100 @@ -1118,9 +1120,7 @@ def getBlacklistMsg(): return info, splitterResult, subdags, dagSpecs def getHighPrioUsers(self, userProxy, workflow, egroups): - # Import needed because the DagmanCreator module is also imported in the schedd, - # where there is no ldap available. This function however is only called - # in the TW (where ldap is installed) during submission. + """ get the list of high priority users """ highPrioUsers = set() try: @@ -1129,23 +1129,22 @@ def getHighPrioUsers(self, userProxy, workflow, egroups): except Exception as ex: # pylint: disable=broad-except msg = "Error when getting the high priority users list." \ " Will ignore the high priority list and continue normally." \ - " Error reason: %s" % str(ex) + f" Error reason: {ex}" self.uploadWarning(msg, userProxy, workflow) return [] return highPrioUsers def executeInternal(self, *args, **kw): - # So, the filename becomes http:// -- and doesn't really work. Hardcoding the analysis wrapper. - #transform_location = getLocation(kw['task']['tm_transformation'], 'CAFUtilities/src/python/transformation/CMSRunAnalysis/') - transform_location = getLocation('CMSRunAnalysis.sh', 'CRABServer/scripts/') - cmscp_location = getLocation('cmscp.py', 'CRABServer/scripts/') - cmscpsh_location = getLocation('cmscp.sh', 'CRABServer/scripts/') - gwms_location = getLocation('gWMS-CMSRunAnalysis.sh', 'CRABServer/scripts/') - env_location = getLocation('submit_env.sh', 'CRABServer/scripts/') - dag_bootstrap_location = getLocation('dag_bootstrap_startup.sh', 'CRABServer/scripts/') - bootstrap_location = getLocation("dag_bootstrap.sh", "CRABServer/scripts/") - adjust_location = getLocation("AdjustSites.py", "CRABServer/scripts/") + """ all real work is done here """ + transform_location = getLocation('CMSRunAnalysis.sh') + cmscp_location = getLocation('cmscp.py') + cmscpsh_location = getLocation('cmscp.sh') + gwms_location = getLocation('gWMS-CMSRunAnalysis.sh') + env_location = getLocation('submit_env.sh') + dag_bootstrap_location = getLocation('dag_bootstrap_startup.sh') + bootstrap_location = getLocation("dag_bootstrap.sh") + adjust_location = getLocation("AdjustSites.py") shutil.copy(transform_location, '.') shutil.copy(cmscp_location, '.') @@ -1176,9 +1175,9 @@ def executeInternal(self, *args, **kw): f"\nError reason: {ex}") from ex # Bootstrap the runtime if it is available. - job_runtime = getLocation('CMSRunAnalysis.tar.gz', 'CRABServer/') + job_runtime = getLocation('CMSRunAnalysis.tar.gz',) shutil.copy(job_runtime, '.') - task_runtime = getLocation('TaskManagerRun.tar.gz', 'CRABServer/') + task_runtime = getLocation('TaskManagerRun.tar.gz') shutil.copy(task_runtime, '.') kw['task']['resthost'] = self.crabserver.server['host'] @@ -1205,6 +1204,7 @@ def executeInternal(self, *args, **kw): def execute(self, *args, **kw): + """ entry point called by Hanlder """ cwd = os.getcwd() try: os.chdir(kw['tempDir'])