From 90570d7924869fee8cf7cad9ce92589c15a53a9b Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 13:38:33 +0200 Subject: [PATCH 01/42] test14 --- cicd/gitlab/env/test14 | 4 ++++ cicd/gitlab/parseEnv.sh | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 cicd/gitlab/env/test14 diff --git a/cicd/gitlab/env/test14 b/cicd/gitlab/env/test14 new file mode 100644 index 0000000000..86ad85a125 --- /dev/null +++ b/cicd/gitlab/env/test14 @@ -0,0 +1,4 @@ +KUBECONTEXT=cmsweb-test14 +Environment=crab-dev-tw07 +REST_Instance=test14 +CRABClient_version=prod diff --git a/cicd/gitlab/parseEnv.sh b/cicd/gitlab/parseEnv.sh index f1fb49a316..50a155bab2 100755 --- a/cicd/gitlab/parseEnv.sh +++ b/cicd/gitlab/parseEnv.sh @@ -12,7 +12,7 @@ SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) TAG="${1}" # validate tag -REGEX_DEV_TAG='^pypi-(preprod|test2|test11|test12|test1)-.*' +REGEX_DEV_TAG='^pypi-(preprod|test2|test11|test12|test1|test14)-.*' REGEX_RELEASE_TAG='^v3\.[0-9]{6}.*' if [[ $TAG =~ $REGEX_DEV_TAG ]]; then # Do not quote regexp variable here IFS='-' read -ra TMPSTR <<< "${TAG}" From 03fa08c80a850d19ddd3ef430052cc9637f1a2af Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:53:47 +0200 Subject: [PATCH 02/42] test14 (client) --- cicd/gitlab/env/test14 | 2 +- src/python/ServerUtilities.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cicd/gitlab/env/test14 b/cicd/gitlab/env/test14 index 86ad85a125..2948d9fdfe 100644 --- a/cicd/gitlab/env/test14 +++ b/cicd/gitlab/env/test14 @@ -1,4 +1,4 @@ KUBECONTEXT=cmsweb-test14 Environment=crab-dev-tw07 REST_Instance=test14 -CRABClient_version=prod +CRABClient_version=GH diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 05797268b4..73e85c4ef7 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -56,6 +56,7 @@ 'test6': {'restHost': 'cmsweb-test6.cern.ch', 'dbInstance': 'dev'}, 'test11': {'restHost': 'cmsweb-test11.cern.ch', 'dbInstance': 'devtwo'}, 'test12': {'restHost': 'cmsweb-test12.cern.ch', 'dbInstance': 'devthree'}, + 'test14': {'restHost': 'cmsweb-test14.cern.ch', 'dbInstance': 'tseethon'}, 'stefanovm': {'restHost': 'stefanovm.cern.ch', 'dbInstance': 'dev'}, 'stefanovm2': {'restHost': 'stefanovm2.cern.ch', 'dbInstance': 'dev'}, 'other': {'restHost': None, 'dbInstance': None}, From 1044f49395face9f74dd515c9ec1cbfabe25b997 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 14:10:21 +0200 Subject: [PATCH 03/42] WIP: purge dry run --- .../Databases/TaskDB/Oracle/Task/Task.py | 2 - .../TaskWorker/Actions/DryRunUploader.py | 140 ------------------ src/python/TaskWorker/Actions/Handler.py | 5 +- .../TaskWorker/Actions/StageoutCheck.py | 6 - 4 files changed, 1 insertion(+), 152 deletions(-) delete mode 100644 src/python/TaskWorker/Actions/DryRunUploader.py diff --git a/src/python/Databases/TaskDB/Oracle/Task/Task.py b/src/python/Databases/TaskDB/Oracle/Task/Task.py index cb53ebe965..33ab452a8a 100644 --- a/src/python/Databases/TaskDB/Oracle/Task/Task.py +++ b/src/python/Databases/TaskDB/Oracle/Task/Task.py @@ -160,8 +160,6 @@ class Task(object): UpdateWebUrl_sql = """UPDATE tasks SET tm_user_webdir = :webdirurl \ WHERE tm_taskname = :workflow""" - SetDryRun_sql = "UPDATE tasks set tm_dry_run = :dry_run WHERE tm_taskname = :taskname" - #UpdateSchedd_sql UpdateSchedd_sql = """UPDATE tasks SET tm_schedd = :scheddname \ WHERE tm_taskname = :workflow""" diff --git a/src/python/TaskWorker/Actions/DryRunUploader.py b/src/python/TaskWorker/Actions/DryRunUploader.py deleted file mode 100644 index 16698729f7..0000000000 --- a/src/python/TaskWorker/Actions/DryRunUploader.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) -""" -import os -import json -import tarfile -import time - -import sys -if sys.version_info >= (3, 0): - from urllib.parse import urlencode # pylint: disable=no-name-in-module -if sys.version_info < (3, 0): - from urllib import urlencode - -from WMCore.DataStructs.LumiList import LumiList - -from TaskWorker.DataObjects.Result import Result -from TaskWorker.Actions.TaskAction import TaskAction -from TaskWorker.WorkerExceptions import TaskWorkerException -from ServerUtilities import uploadToS3, downloadFromS3 - -class DryRunUploader(TaskAction): - """ - Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) - """ - - def packSandbox(self, inputFiles): - dryRunSandbox = tarfile.open('dry-run-sandbox.tar.gz', 'w:gz') - for f in inputFiles: - self.logger.debug('Adding %s to dry run tarball', f) - dryRunSandbox.add(f, recursive=True) - - dryRunSandbox.close() - - def executeInternal(self, *args, **kw): - inputFiles = args[0][2] - splitterResult = args[0][3][0] - - cwd = os.getcwd() - try: - os.chdir(kw['tempDir']) - splittingSummary = SplittingSummary(kw['task']['tm_split_algo']) - for jobgroup in splitterResult: - jobs = jobgroup.getJobs() - splittingSummary.addJobs(jobs) - splittingSummary.dump('splitting-summary.json') - inputFiles.append('splitting-summary.json') - - self.packSandbox(inputFiles) - - self.logger.info('Uploading dry run tarball to the user file cache') - t0 = time.time() - uploadToS3(crabserver=self.crabserver, filepath='dry-run-sandbox.tar.gz', - objecttype='runtimefiles', taskname=kw['task']['tm_taskname'], logger=self.logger) - os.remove('dry-run-sandbox.tar.gz') - self.logger.info('Uploaded dry run tarball to the user file cache') - # wait until tarball is available, S3 may take a few seconds for this (ref. issue #6706 ) - t1 = time.time() - lt1 = time.strftime("%H:%M:%S", time.localtime(t1)) - uploadTime = t1-t0 - self.logger.debug('runtimefiles upload took %s secs and completed at %s', uploadTime, lt1) - self.logger.debug('check if tarball is available') - tarballOK = False - while not tarballOK: - try: - self.logger.debug('download tarball to /dev/null') - downloadFromS3(crabserver=self.crabserver, filepath='/dev/null', objecttype='runtimefiles', - taskname=kw['task']['tm_taskname'], logger=self.logger) - self.logger.debug('OK, it worked') - tarballOK = True - except Exception as e: - self.logger.debug('runtimefiles tarball not ready yet') - self.logger.debug('Exception was raised: %s', e) - self.logger.debug('Sleep 5 sec') - time.sleep(5) - update = {'workflow': kw['task']['tm_taskname'], 'subresource': 'state', 'status': 'UPLOADED'} - self.logger.debug('Updating task status: %s', str(update)) - self.crabserver.post(api='workflowdb', data=urlencode(update)) - - finally: - os.chdir(cwd) - - return Result(task=kw['task'], result=args[0]) - - def execute(self, *args, **kw): - try: - return self.executeInternal(*args, **kw) - except Exception as e: - msg = "Failed to upload dry run tarball for %s; '%s'" % (kw['task']['tm_taskname'], str(e)) - raise TaskWorkerException(msg) - -class SplittingSummary(object): - """ - Class which calculates some summary data about the splitting results. - """ - - def __init__(self, algo): - self.algo = algo - self.lumisPerJob = [] - self.eventsPerJob = [] - self.filesPerJob = [] - - def addJobs(self, jobs): - if self.algo == 'FileBased': - self.lumisPerJob += [sum([x.get('lumiCount', 0) for x in job['input_files']]) for job in jobs] - self.eventsPerJob += [sum([x['events'] for x in job['input_files']]) for job in jobs] - self.filesPerJob += [len(job['input_files']) for job in jobs] - elif self.algo == 'EventBased': - self.lumisPerJob += [job['mask']['LastLumi'] - job['mask']['FirstLumi'] for job in jobs] - self.eventsPerJob += [job['mask']['LastEvent'] - job['mask']['FirstEvent'] for job in jobs] - else: - for job in jobs: - avgEventsPerLumi = sum([f['avgEvtsPerLumi'] for f in job['input_files']])/float(len(job['input_files'])) - lumis = LumiList(compactList=job['mask']['runAndLumis']) - self.lumisPerJob.append(len(lumis.getLumis())) - self.eventsPerJob.append(avgEventsPerLumi * self.lumisPerJob[-1]) - - def dump(self, outname): - """ - Save splitting summary to a json file. - """ - - summary = {'algo': self.algo, - 'total_jobs': len(self.lumisPerJob), - 'total_lumis': sum(self.lumisPerJob), - 'total_events': sum(self.eventsPerJob), - 'max_lumis': max(self.lumisPerJob), - 'max_events': max(self.eventsPerJob), - 'avg_lumis': sum(self.lumisPerJob)/float(len(self.lumisPerJob)), - 'avg_events': sum(self.eventsPerJob)/float(len(self.eventsPerJob)), - 'min_lumis': min(self.lumisPerJob), - 'min_events': min(self.eventsPerJob)} - if len(self.filesPerJob) > 0: - summary.update({'total_files': sum(self.filesPerJob), - 'max_files': max(self.filesPerJob), - 'avg_files': sum(self.filesPerJob)/float(len(self.filesPerJob)), - 'min_files': min(self.filesPerJob)}) - - with open(outname, 'w') as f: - json.dump(summary, f) diff --git a/src/python/TaskWorker/Actions/Handler.py b/src/python/TaskWorker/Actions/Handler.py index 6f30a42ac7..21c09731a8 100644 --- a/src/python/TaskWorker/Actions/Handler.py +++ b/src/python/TaskWorker/Actions/Handler.py @@ -198,10 +198,7 @@ def handleNewTask(resthost, dbInstance, config, task, procnum, *args, **kwargs): handler.addWork(MakeFakeFileSet(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(Splitter(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(DagmanCreator(config=config, crabserver=crabserver, procnum=procnum, rucioClient=rucioClient)) - if task['tm_dry_run'] == 'T': - handler.addWork(DryRunUploader(config=config, crabserver=crabserver, procnum=procnum)) - else: - handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum)) + handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum)) return handler.actionWork(args, kwargs) diff --git a/src/python/TaskWorker/Actions/StageoutCheck.py b/src/python/TaskWorker/Actions/StageoutCheck.py index 77dc56f050..c4ae79eba7 100644 --- a/src/python/TaskWorker/Actions/StageoutCheck.py +++ b/src/python/TaskWorker/Actions/StageoutCheck.py @@ -82,12 +82,6 @@ def execute(self, *args, **kw): msg += "because user specified not transfer any output/log files." self.logger.info(msg) return - # Do not need to check if it is dryrun. - if self.task['tm_dry_run'] == 'T': - msg = "Will not check possibility to write to destination site." - msg += "User specified dryrun option." - self.logger.info(msg) - return self.workflow = self.task['tm_taskname'] self.proxy = self.task['user_proxy'] From f52f2f40311d8093eec6976fdc419dd709a8b17a Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 16:57:38 +0200 Subject: [PATCH 04/42] sandbox at schedd instead --- scripts/AdjustSites.py | 46 +++++++++++++++++-- .../TaskWorker/Actions/DagmanCreator.py | 10 ++-- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index 12869e33bc..c21974bb2f 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -14,6 +14,7 @@ import time import glob import shutil +import logging from urllib.parse import urlencode import traceback from datetime import datetime @@ -23,8 +24,17 @@ import htcondor from RESTInteractions import CRABRest -from ServerUtilities import getProxiedWebDir, getColumn - +from ServerUtilities import getProxiedWebDir, getColumn, downloadFromS3 + +def setupStreamLogger(): + logHandler = logging.StreamHandler() + logFormatter = logging.Formatter( + "%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s") + logHandler.setFormatter(logFormatter) + logger = logging.getLogger('AdjustSites') + logger.addHandler(logging.StreamHandler()) + logger.setLevel(logging.DEBUG) + return logger def printLog(msg): """ Utility function to print the timestamp in the log. Can be replaced @@ -395,11 +405,39 @@ def checkTaskInfo(crabserver, ad): sys.exit(3) +def getSandbox(crabserver, ad, logger): + """ + """ + + task = ad['CRAB_ReqName'] + data = {'subresource': 'search', 'workflow': task} + + try: + dictresult, _, _ = crabserver.get(api='task', data=data) + except HTTPException as hte: + printLog(traceback.format_exc()) + printLog(hte.headers) + printLog(hte.result) + sys.exit(2) + + username = getColumn(dictresult, 'tm_username') + sandboxName = getColumn(dictresult, 'tm_user_sandbox') + sandboxTarBall = 'sandbox.tar.gz' + try: + downloadFromS3(crabserver=crabserver, objecttype='sandbox', username=username, + tarballname=sandboxName, filepath=sandboxTarBall, logger=logger) + except Exception as ex: + logger.exception("The CRAB server backend could not download the input sandbox with your code " + \ + "from S3.\nThis could be a temporary glitch; please try to submit a new task later " + \ + "(resubmit will not work) and contact the experts if the error persists.\nError reason: %s" % str(ex)) + + def main(): """ Need a doc string here. """ setupLog() + logger = setupStreamLogger() if '_CONDOR_JOB_AD' not in os.environ or not os.path.exists(os.environ["_CONDOR_JOB_AD"]): printLog("Exiting AdjustSites since _CONDOR_JOB_AD is not in the environment or does not exist") @@ -422,6 +460,9 @@ def main(): time.sleep(60) # give TW time to update taskDB #8411 checkTaskInfo(crabserver, ad) + # get sandbox + getSandbox(crabserver, ad, logger) + # is this the first time this script runs for this task ? (it runs at each resubmit as well !) if not os.path.exists('WEB_DIR'): makeWebDir(ad) @@ -503,4 +544,3 @@ def main(): if __name__ == '__main__': main() - diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index c2d3a4fc5b..8f98adaedb 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -1204,8 +1204,10 @@ def executeInternal(self, *args, **kw): sandboxName = kw['task']['tm_user_sandbox'] dbgFilesName = kw['task']['tm_debug_files'] try: - downloadFromS3(crabserver=self.crabserver, objecttype='sandbox', username=username, - tarballname=sandboxName, filepath=sandboxTarBall, logger=self.logger) + self.logger.debug(f"Skip download sandbox.tar.gz: {sandboxName}") + # TODO: need function to check if sandbox exist instead + #downloadFromS3(crabserver=self.crabserver, objecttype='sandbox', username=username, + # tarballname=sandboxName, filepath=sandboxTarBall, logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ @@ -1233,8 +1235,8 @@ def executeInternal(self, *args, **kw): self.extractMonitorFiles(inputFiles, **kw) - if kw['task'].get('tm_user_sandbox') == 'sandbox.tar.gz': - inputFiles.append('sandbox.tar.gz') + #if kw['task'].get('tm_user_sandbox') == 'sandbox.tar.gz': + # inputFiles.append('sandbox.tar.gz') if os.path.exists("CMSRunAnalysis.tar.gz"): inputFiles.append("CMSRunAnalysis.tar.gz") if os.path.exists("TaskManagerRun.tar.gz"): From 3f3c9a476d8e468a6258dff4dcff8ec4892c43a8 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 17:03:16 +0200 Subject: [PATCH 05/42] Revert "WIP: purge dry run" This reverts commit dd9e76a5ef46dc8a5ca3bcf49fc724eaa090f17b. --- .../Databases/TaskDB/Oracle/Task/Task.py | 2 + .../TaskWorker/Actions/DryRunUploader.py | 140 ++++++++++++++++++ src/python/TaskWorker/Actions/Handler.py | 5 +- .../TaskWorker/Actions/StageoutCheck.py | 6 + 4 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 src/python/TaskWorker/Actions/DryRunUploader.py diff --git a/src/python/Databases/TaskDB/Oracle/Task/Task.py b/src/python/Databases/TaskDB/Oracle/Task/Task.py index 33ab452a8a..cb53ebe965 100644 --- a/src/python/Databases/TaskDB/Oracle/Task/Task.py +++ b/src/python/Databases/TaskDB/Oracle/Task/Task.py @@ -160,6 +160,8 @@ class Task(object): UpdateWebUrl_sql = """UPDATE tasks SET tm_user_webdir = :webdirurl \ WHERE tm_taskname = :workflow""" + SetDryRun_sql = "UPDATE tasks set tm_dry_run = :dry_run WHERE tm_taskname = :taskname" + #UpdateSchedd_sql UpdateSchedd_sql = """UPDATE tasks SET tm_schedd = :scheddname \ WHERE tm_taskname = :workflow""" diff --git a/src/python/TaskWorker/Actions/DryRunUploader.py b/src/python/TaskWorker/Actions/DryRunUploader.py new file mode 100644 index 0000000000..16698729f7 --- /dev/null +++ b/src/python/TaskWorker/Actions/DryRunUploader.py @@ -0,0 +1,140 @@ +""" +Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) +""" +import os +import json +import tarfile +import time + +import sys +if sys.version_info >= (3, 0): + from urllib.parse import urlencode # pylint: disable=no-name-in-module +if sys.version_info < (3, 0): + from urllib import urlencode + +from WMCore.DataStructs.LumiList import LumiList + +from TaskWorker.DataObjects.Result import Result +from TaskWorker.Actions.TaskAction import TaskAction +from TaskWorker.WorkerExceptions import TaskWorkerException +from ServerUtilities import uploadToS3, downloadFromS3 + +class DryRunUploader(TaskAction): + """ + Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) + """ + + def packSandbox(self, inputFiles): + dryRunSandbox = tarfile.open('dry-run-sandbox.tar.gz', 'w:gz') + for f in inputFiles: + self.logger.debug('Adding %s to dry run tarball', f) + dryRunSandbox.add(f, recursive=True) + + dryRunSandbox.close() + + def executeInternal(self, *args, **kw): + inputFiles = args[0][2] + splitterResult = args[0][3][0] + + cwd = os.getcwd() + try: + os.chdir(kw['tempDir']) + splittingSummary = SplittingSummary(kw['task']['tm_split_algo']) + for jobgroup in splitterResult: + jobs = jobgroup.getJobs() + splittingSummary.addJobs(jobs) + splittingSummary.dump('splitting-summary.json') + inputFiles.append('splitting-summary.json') + + self.packSandbox(inputFiles) + + self.logger.info('Uploading dry run tarball to the user file cache') + t0 = time.time() + uploadToS3(crabserver=self.crabserver, filepath='dry-run-sandbox.tar.gz', + objecttype='runtimefiles', taskname=kw['task']['tm_taskname'], logger=self.logger) + os.remove('dry-run-sandbox.tar.gz') + self.logger.info('Uploaded dry run tarball to the user file cache') + # wait until tarball is available, S3 may take a few seconds for this (ref. issue #6706 ) + t1 = time.time() + lt1 = time.strftime("%H:%M:%S", time.localtime(t1)) + uploadTime = t1-t0 + self.logger.debug('runtimefiles upload took %s secs and completed at %s', uploadTime, lt1) + self.logger.debug('check if tarball is available') + tarballOK = False + while not tarballOK: + try: + self.logger.debug('download tarball to /dev/null') + downloadFromS3(crabserver=self.crabserver, filepath='/dev/null', objecttype='runtimefiles', + taskname=kw['task']['tm_taskname'], logger=self.logger) + self.logger.debug('OK, it worked') + tarballOK = True + except Exception as e: + self.logger.debug('runtimefiles tarball not ready yet') + self.logger.debug('Exception was raised: %s', e) + self.logger.debug('Sleep 5 sec') + time.sleep(5) + update = {'workflow': kw['task']['tm_taskname'], 'subresource': 'state', 'status': 'UPLOADED'} + self.logger.debug('Updating task status: %s', str(update)) + self.crabserver.post(api='workflowdb', data=urlencode(update)) + + finally: + os.chdir(cwd) + + return Result(task=kw['task'], result=args[0]) + + def execute(self, *args, **kw): + try: + return self.executeInternal(*args, **kw) + except Exception as e: + msg = "Failed to upload dry run tarball for %s; '%s'" % (kw['task']['tm_taskname'], str(e)) + raise TaskWorkerException(msg) + +class SplittingSummary(object): + """ + Class which calculates some summary data about the splitting results. + """ + + def __init__(self, algo): + self.algo = algo + self.lumisPerJob = [] + self.eventsPerJob = [] + self.filesPerJob = [] + + def addJobs(self, jobs): + if self.algo == 'FileBased': + self.lumisPerJob += [sum([x.get('lumiCount', 0) for x in job['input_files']]) for job in jobs] + self.eventsPerJob += [sum([x['events'] for x in job['input_files']]) for job in jobs] + self.filesPerJob += [len(job['input_files']) for job in jobs] + elif self.algo == 'EventBased': + self.lumisPerJob += [job['mask']['LastLumi'] - job['mask']['FirstLumi'] for job in jobs] + self.eventsPerJob += [job['mask']['LastEvent'] - job['mask']['FirstEvent'] for job in jobs] + else: + for job in jobs: + avgEventsPerLumi = sum([f['avgEvtsPerLumi'] for f in job['input_files']])/float(len(job['input_files'])) + lumis = LumiList(compactList=job['mask']['runAndLumis']) + self.lumisPerJob.append(len(lumis.getLumis())) + self.eventsPerJob.append(avgEventsPerLumi * self.lumisPerJob[-1]) + + def dump(self, outname): + """ + Save splitting summary to a json file. + """ + + summary = {'algo': self.algo, + 'total_jobs': len(self.lumisPerJob), + 'total_lumis': sum(self.lumisPerJob), + 'total_events': sum(self.eventsPerJob), + 'max_lumis': max(self.lumisPerJob), + 'max_events': max(self.eventsPerJob), + 'avg_lumis': sum(self.lumisPerJob)/float(len(self.lumisPerJob)), + 'avg_events': sum(self.eventsPerJob)/float(len(self.eventsPerJob)), + 'min_lumis': min(self.lumisPerJob), + 'min_events': min(self.eventsPerJob)} + if len(self.filesPerJob) > 0: + summary.update({'total_files': sum(self.filesPerJob), + 'max_files': max(self.filesPerJob), + 'avg_files': sum(self.filesPerJob)/float(len(self.filesPerJob)), + 'min_files': min(self.filesPerJob)}) + + with open(outname, 'w') as f: + json.dump(summary, f) diff --git a/src/python/TaskWorker/Actions/Handler.py b/src/python/TaskWorker/Actions/Handler.py index 21c09731a8..6f30a42ac7 100644 --- a/src/python/TaskWorker/Actions/Handler.py +++ b/src/python/TaskWorker/Actions/Handler.py @@ -198,7 +198,10 @@ def handleNewTask(resthost, dbInstance, config, task, procnum, *args, **kwargs): handler.addWork(MakeFakeFileSet(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(Splitter(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(DagmanCreator(config=config, crabserver=crabserver, procnum=procnum, rucioClient=rucioClient)) - handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum)) + if task['tm_dry_run'] == 'T': + handler.addWork(DryRunUploader(config=config, crabserver=crabserver, procnum=procnum)) + else: + handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum)) return handler.actionWork(args, kwargs) diff --git a/src/python/TaskWorker/Actions/StageoutCheck.py b/src/python/TaskWorker/Actions/StageoutCheck.py index c4ae79eba7..77dc56f050 100644 --- a/src/python/TaskWorker/Actions/StageoutCheck.py +++ b/src/python/TaskWorker/Actions/StageoutCheck.py @@ -82,6 +82,12 @@ def execute(self, *args, **kw): msg += "because user specified not transfer any output/log files." self.logger.info(msg) return + # Do not need to check if it is dryrun. + if self.task['tm_dry_run'] == 'T': + msg = "Will not check possibility to write to destination site." + msg += "User specified dryrun option." + self.logger.info(msg) + return self.workflow = self.task['tm_taskname'] self.proxy = self.task['user_proxy'] From ea75f6bb6a9325fc02f5343cae8d79219765582e Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 17:51:59 +0200 Subject: [PATCH 06/42] fix logger --- scripts/AdjustSites.py | 20 ++++++++++---------- scripts/dag_bootstrap_startup.sh | 6 +++++- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index c21974bb2f..1f8ad9c8f8 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -26,23 +26,22 @@ from RESTInteractions import CRABRest from ServerUtilities import getProxiedWebDir, getColumn, downloadFromS3 +def printLog(msg): + """ Utility function to print the timestamp in the log. Can be replaced + with anything (e.g.: logging.info if we decided to set up a logger here) + """ + print("%s: %s" % (datetime.utcnow(), msg)) + def setupStreamLogger(): logHandler = logging.StreamHandler() logFormatter = logging.Formatter( "%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s") logHandler.setFormatter(logFormatter) - logger = logging.getLogger('AdjustSites') - logger.addHandler(logging.StreamHandler()) + logger = logging.getLogger('AdjustSites') # hardcode + logger.addHandler(logHandler) logger.setLevel(logging.DEBUG) return logger -def printLog(msg): - """ Utility function to print the timestamp in the log. Can be replaced - with anything (e.g.: logging.info if we decided to set up a logger here) - """ - print("%s: %s" % (datetime.utcnow(), msg)) - - def adjustPostScriptExitStatus(resubmitJobIds, filename): """ Edit the DAG .nodes.log file changing the POST script exit code from 0|2 to 1 @@ -430,6 +429,7 @@ def getSandbox(crabserver, ad, logger): logger.exception("The CRAB server backend could not download the input sandbox with your code " + \ "from S3.\nThis could be a temporary glitch; please try to submit a new task later " + \ "(resubmit will not work) and contact the experts if the error persists.\nError reason: %s" % str(ex)) + sys.exit(4) def main(): @@ -458,8 +458,8 @@ def main(): crabserver.setDbInstance(dbInstance) time.sleep(60) # give TW time to update taskDB #8411 + # check task status checkTaskInfo(crabserver, ad) - # get sandbox getSandbox(crabserver, ad, logger) diff --git a/scripts/dag_bootstrap_startup.sh b/scripts/dag_bootstrap_startup.sh index 0868864753..bf913a6447 100755 --- a/scripts/dag_bootstrap_startup.sh +++ b/scripts/dag_bootstrap_startup.sh @@ -158,11 +158,15 @@ if [ -e AdjustSites.py ]; then elif [ $ret -eq 2 ]; then echo "Error: Cannot get data from REST Interface" >&2 condor_qedit $CONDOR_ID DagmanHoldReason "'Cannot get data from REST Interface.'" - exit 1 + exit 1 elif [ $ret -eq 3 ]; then echo "Error: this dagman does not match task information in TASKS DB" >&2 condor_qedit $CONDOR_ID DagmanHoldReason "'This dagman does not match task information in TASKS DB'" exit 1 + elif [ $ret -eq 4 ]; then + echo "Error: Failed to get user sandbox from S3." >&2 + condor_qedit $CONDOR_ID DagmanHoldReason "'Failed to get user sandbox from S3.'" + exit 1 fi else echo "Error: AdjustSites.py does not exist." >&2 From 3a2490e18f1db204333fe62aa2d952393172cf51 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 17:58:19 +0200 Subject: [PATCH 07/42] move logger function inside getSandbox --- scripts/AdjustSites.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index 1f8ad9c8f8..ea9d5fe9ef 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -407,6 +407,13 @@ def checkTaskInfo(crabserver, ad): def getSandbox(crabserver, ad, logger): """ """ + sandboxTarBall = 'sandbox.tar.gz' + if os.path.exists(sandboxTarBall): + printLog('sandbox.tar.gz already exist. Do nothing.') + return + + # init logger require by downloadFromS3 + logger = setupStreamLogger() task = ad['CRAB_ReqName'] data = {'subresource': 'search', 'workflow': task} @@ -421,7 +428,6 @@ def getSandbox(crabserver, ad, logger): username = getColumn(dictresult, 'tm_username') sandboxName = getColumn(dictresult, 'tm_user_sandbox') - sandboxTarBall = 'sandbox.tar.gz' try: downloadFromS3(crabserver=crabserver, objecttype='sandbox', username=username, tarballname=sandboxName, filepath=sandboxTarBall, logger=logger) @@ -437,7 +443,6 @@ def main(): Need a doc string here. """ setupLog() - logger = setupStreamLogger() if '_CONDOR_JOB_AD' not in os.environ or not os.path.exists(os.environ["_CONDOR_JOB_AD"]): printLog("Exiting AdjustSites since _CONDOR_JOB_AD is not in the environment or does not exist") @@ -461,7 +466,7 @@ def main(): # check task status checkTaskInfo(crabserver, ad) # get sandbox - getSandbox(crabserver, ad, logger) + getSandbox(crabserver, ad) # is this the first time this script runs for this task ? (it runs at each resubmit as well !) if not os.path.exists('WEB_DIR'): From 419366e2528afe185ddf5b2809ebb9280b027833 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 18:01:27 +0200 Subject: [PATCH 08/42] fix getSandBox function --- scripts/AdjustSites.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index ea9d5fe9ef..d9fce78f27 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -404,7 +404,7 @@ def checkTaskInfo(crabserver, ad): sys.exit(3) -def getSandbox(crabserver, ad, logger): +def getSandbox(crabserver, ad): """ """ sandboxTarBall = 'sandbox.tar.gz' From 7df7f6e8c18443433d42edec018e09e2fffde7a0 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 18:53:36 +0200 Subject: [PATCH 09/42] make downloadFromS3 to be sure that file is not broken --- src/python/ServerUtilities.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 73e85c4ef7..ec17948220 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -24,6 +24,7 @@ import traceback import subprocess import contextlib +import shutil if sys.version_info >= (3, 0): from http.client import HTTPException # Python 3 and Python 2 in modern CMSSW @@ -948,6 +949,7 @@ def downloadFromS3ViaPSU(filepath=None, preSignedUrl=None, logger=None): if not preSignedUrl: raise Exception("mandatory preSignedUrl argument missing") + tmpPath = filepath + '_tmp' downloadCommand = '' # CRAB_useGoCurl env. variable is used to define how download from S3 command should be executed. @@ -955,10 +957,10 @@ def downloadFromS3ViaPSU(filepath=None, preSignedUrl=None, logger=None): # The same variable is also used inside CRABClient, we should keep name changes (if any) synchronized if os.getenv('CRAB_useGoCurl'): downloadCommand += '/cvmfs/cms.cern.ch/cmsmon/gocurl -verbose 2 -method GET' - downloadCommand += ' -out "%s"' % filepath + downloadCommand += ' -out "%s"' % tmpPath downloadCommand += ' -url "%s"' % preSignedUrl else: - downloadCommand += 'wget -Sq -O %s ' % filepath + downloadCommand += 'wget -Sq -O %s ' % tmpPath downloadCommand += ' "%s"' % preSignedUrl downloadProcess = subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -969,6 +971,8 @@ def downloadFromS3ViaPSU(filepath=None, preSignedUrl=None, logger=None): if exitcode != 0: raise Exception('Download command %s failed. stderr is:\n%s' % (downloadCommand, stderr)) + logger.debug('Move downloaded file to destination "%s"', filepath) + shutil.move(tmpPath, filepath) if not os.path.exists(filepath): raise Exception("Download failure with %s. File %s was not created" % (downloadCommand, filepath)) From 039bb0d5859c56b78ff3dc921186a59455febee9 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 19:39:25 +0200 Subject: [PATCH 10/42] merge into single rest call for task info --- scripts/AdjustSites.py | 80 ++++++++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index d9fce78f27..a3b37a5c7d 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -1,4 +1,7 @@ -""" This script is called by dag_bootstrap_startup.sh when the job is (re)submitted and: +""" +This script is called by dag_bootstrap_startup.sh when the job is (re)submitted. +It is not only do adjusting sites (blacklist/whitelist) but also: + - It downloads sandbox from S3 (if not exist). - It creates the webdir if necessary - It updates both the webdir ant the proxied version of it on the REST task db - For resubmission: adjust the exit codes of the PJ in the RunJobs.dag.nodes.log files and @@ -371,28 +374,24 @@ def setupLog(): os.close(logfd) -def checkTaskInfo(crabserver, ad): +def checkTaskInfo(taskDict, ad): """ - Function checks that given task is registered in the database with status SUBMITTED and with the - same clusterId and schedd name in the database as in the condor ads where it is currently running. - In case above condition is not met, script immediately terminates + Function checks that given task is registered in the database with status + SUBMITTED and with the same clusterId and schedd name in the database as in + the condor ads where it is currently running. + In case above condition is not met, script immediately terminates. + + :param taskDict: task info return from REST. + :type taskDict: dict + :param ad: kv classad + :type ad: dict """ - task = ad['CRAB_ReqName'] clusterIdOnSchedd = ad['ClusterId'] - data = {'subresource': 'search', 'workflow': task} - - try: - dictresult, _, _ = crabserver.get(api='task', data=data) - except HTTPException as hte: - printLog(traceback.format_exc()) - printLog(hte.headers) - printLog(hte.result) - sys.exit(2) - taskStatusOnDB = getColumn(dictresult, 'tm_task_status') - clusteridOnDB = getColumn(dictresult, 'clusterid') - scheddOnDB = getColumn(dictresult, 'tm_schedd') + taskStatusOnDB = getColumn(taskDict, 'tm_task_status') + clusteridOnDB = getColumn(taskDict, 'clusterid') + scheddOnDB = getColumn(taskDict, 'tm_schedd') scheddName = os.environ['schedd_name'] @@ -404,8 +403,18 @@ def checkTaskInfo(crabserver, ad): sys.exit(3) -def getSandbox(crabserver, ad): +def getSandbox(taskDict, crabserver): """ + Getting user sandbox (sandbox.tar.gz) from S3. It will not redownload + sandbox if file exists. + + This function contains side effect where sandbox.tar.gz(_tmp) are created in + current directory. + + :param taskDict: task info return from REST. + :type taskDict: dict + :param crabserver: CRABRest object to talk with RESTCache API + :type crabserver: RESTInteractions.CRABRest """ sandboxTarBall = 'sandbox.tar.gz' if os.path.exists(sandboxTarBall): @@ -415,19 +424,11 @@ def getSandbox(crabserver, ad): # init logger require by downloadFromS3 logger = setupStreamLogger() - task = ad['CRAB_ReqName'] - data = {'subresource': 'search', 'workflow': task} + # get info + username = getColumn(taskDict, 'tm_username') + sandboxName = getColumn(taskDict, 'tm_user_sandbox') - try: - dictresult, _, _ = crabserver.get(api='task', data=data) - except HTTPException as hte: - printLog(traceback.format_exc()) - printLog(hte.headers) - printLog(hte.result) - sys.exit(2) - - username = getColumn(dictresult, 'tm_username') - sandboxName = getColumn(dictresult, 'tm_user_sandbox') + # download try: downloadFromS3(crabserver=crabserver, objecttype='sandbox', username=username, tarballname=sandboxName, filepath=sandboxTarBall, logger=logger) @@ -454,7 +455,6 @@ def main(): ad = classad.parseOne(fd) printLog("Parsed ad: %s" % ad) - # instantiate a server object to talk with crabserver host = ad['CRAB_RestHost'] dbInstance = ad['CRAB_DbInstance'] @@ -463,10 +463,22 @@ def main(): crabserver.setDbInstance(dbInstance) time.sleep(60) # give TW time to update taskDB #8411 + + # get task info + task = ad['CRAB_ReqName'] + data = {'subresource': 'search', 'workflow': task} + try: + dictresult, _, _ = crabserver.get(api='task', data=data) + except HTTPException as hte: + printLog(traceback.format_exc()) + printLog(hte.headers) + printLog(hte.result) + sys.exit(2) + # check task status - checkTaskInfo(crabserver, ad) + checkTaskInfo(taskDict=dictresult, ad=ad) # get sandbox - getSandbox(crabserver, ad) + getSandbox(taskDict=dictresult, crabserver=crabserver) # is this the first time this script runs for this task ? (it runs at each resubmit as well !) if not os.path.exists('WEB_DIR'): From c830e317a77e15dadf4cfb9fd7302cd3e06ed00b Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 19:47:10 +0200 Subject: [PATCH 11/42] more print and beautify --- scripts/AdjustSites.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index a3b37a5c7d..e12d58f198 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -36,6 +36,12 @@ def printLog(msg): print("%s: %s" % (datetime.utcnow(), msg)) def setupStreamLogger(): + """ + Setup logger object with stream handler. Needed by `downloadFromS3()`. + + :returns: stream logger object + :rtype: logging.Logger + """ logHandler = logging.StreamHandler() logFormatter = logging.Formatter( "%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s") @@ -453,7 +459,7 @@ def main(): with open(os.environ['_CONDOR_JOB_AD']) as fd: ad = classad.parseOne(fd) - printLog("Parsed ad: %s" % ad) + printLog("Parsed ad: %s\n" % ad) # instantiate a server object to talk with crabserver host = ad['CRAB_RestHost'] @@ -462,6 +468,7 @@ def main(): crabserver = CRABRest(host, cert, cert, retry=3, userAgent='CRABSchedd') crabserver.setDbInstance(dbInstance) + printLog("Sleeping 60 seconds to give TW time to update taskDB") time.sleep(60) # give TW time to update taskDB #8411 # get task info From 3d7039d74df33308e92dfc5462af0b92ec577d6f Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 19:54:33 +0200 Subject: [PATCH 12/42] unbuffer? --- scripts/dag_bootstrap_startup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/dag_bootstrap_startup.sh b/scripts/dag_bootstrap_startup.sh index bf913a6447..c84ce62000 100755 --- a/scripts/dag_bootstrap_startup.sh +++ b/scripts/dag_bootstrap_startup.sh @@ -149,7 +149,7 @@ cp $_CONDOR_JOB_AD ./_CONDOR_JOB_AD if [ -e AdjustSites.py ]; then export schedd_name=`condor_config_val schedd_name` echo "Execute AdjustSites.py ..." - python3 AdjustSites.py + PYTHONUNBUFFERED=1 python3 AdjustSites.py ret=$? if [ $ret -eq 1 ]; then echo "Error: AdjustSites.py failed to update the webdir." >&2 From 4d5ebd06cb6758eaff68d1b36279f8e3f3fc1c3f Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 19:59:33 +0200 Subject: [PATCH 13/42] some comment --- scripts/dag_bootstrap_startup.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/dag_bootstrap_startup.sh b/scripts/dag_bootstrap_startup.sh index c84ce62000..abb93a86f8 100755 --- a/scripts/dag_bootstrap_startup.sh +++ b/scripts/dag_bootstrap_startup.sh @@ -149,6 +149,7 @@ cp $_CONDOR_JOB_AD ./_CONDOR_JOB_AD if [ -e AdjustSites.py ]; then export schedd_name=`condor_config_val schedd_name` echo "Execute AdjustSites.py ..." + # need unbffered otherwise it will got weird looking log. PYTHONUNBUFFERED=1 python3 AdjustSites.py ret=$? if [ $ret -eq 1 ]; then From ea4841292968abf7cf346b9771f57fdcf87f775c Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:12:44 +0200 Subject: [PATCH 14/42] revert downloadFromS3 back to master (/dev/null => /dev/null_tmp) --- src/python/ServerUtilities.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index ec17948220..73e85c4ef7 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -24,7 +24,6 @@ import traceback import subprocess import contextlib -import shutil if sys.version_info >= (3, 0): from http.client import HTTPException # Python 3 and Python 2 in modern CMSSW @@ -949,7 +948,6 @@ def downloadFromS3ViaPSU(filepath=None, preSignedUrl=None, logger=None): if not preSignedUrl: raise Exception("mandatory preSignedUrl argument missing") - tmpPath = filepath + '_tmp' downloadCommand = '' # CRAB_useGoCurl env. variable is used to define how download from S3 command should be executed. @@ -957,10 +955,10 @@ def downloadFromS3ViaPSU(filepath=None, preSignedUrl=None, logger=None): # The same variable is also used inside CRABClient, we should keep name changes (if any) synchronized if os.getenv('CRAB_useGoCurl'): downloadCommand += '/cvmfs/cms.cern.ch/cmsmon/gocurl -verbose 2 -method GET' - downloadCommand += ' -out "%s"' % tmpPath + downloadCommand += ' -out "%s"' % filepath downloadCommand += ' -url "%s"' % preSignedUrl else: - downloadCommand += 'wget -Sq -O %s ' % tmpPath + downloadCommand += 'wget -Sq -O %s ' % filepath downloadCommand += ' "%s"' % preSignedUrl downloadProcess = subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -971,8 +969,6 @@ def downloadFromS3ViaPSU(filepath=None, preSignedUrl=None, logger=None): if exitcode != 0: raise Exception('Download command %s failed. stderr is:\n%s' % (downloadCommand, stderr)) - logger.debug('Move downloaded file to destination "%s"', filepath) - shutil.move(tmpPath, filepath) if not os.path.exists(filepath): raise Exception("Download failure with %s. File %s was not created" % (downloadCommand, filepath)) From d86d45b327517ca18c7cff36d6e5fed56a500c01 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:14:38 +0200 Subject: [PATCH 15/42] make donwload sandbox robust at adjustsites instead --- scripts/AdjustSites.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/AdjustSites.py b/scripts/AdjustSites.py index e12d58f198..cf7a6e35cb 100644 --- a/scripts/AdjustSites.py +++ b/scripts/AdjustSites.py @@ -423,6 +423,7 @@ def getSandbox(taskDict, crabserver): :type crabserver: RESTInteractions.CRABRest """ sandboxTarBall = 'sandbox.tar.gz' + sandboxTarBallTmp = sandboxTarBall + '_tmp' if os.path.exists(sandboxTarBall): printLog('sandbox.tar.gz already exist. Do nothing.') return @@ -437,7 +438,8 @@ def getSandbox(taskDict, crabserver): # download try: downloadFromS3(crabserver=crabserver, objecttype='sandbox', username=username, - tarballname=sandboxName, filepath=sandboxTarBall, logger=logger) + tarballname=sandboxName, filepath=sandboxTarBallTmp, logger=logger) + shutil.move(sandboxTarBallTmp, sandboxTarBall) except Exception as ex: logger.exception("The CRAB server backend could not download the input sandbox with your code " + \ "from S3.\nThis could be a temporary glitch; please try to submit a new task later " + \ From 7bc3d0ffe095516eed0cec462dd4e1eadf110d45 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:21:43 +0200 Subject: [PATCH 16/42] disable submit --dryrun --- src/python/CRABInterface/RESTUserWorkflow.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/python/CRABInterface/RESTUserWorkflow.py b/src/python/CRABInterface/RESTUserWorkflow.py index 2323891b08..03c27e393e 100644 --- a/src/python/CRABInterface/RESTUserWorkflow.py +++ b/src/python/CRABInterface/RESTUserWorkflow.py @@ -426,6 +426,8 @@ def validate(self, apiobj, method, api, param, safe): #pylint: disable=unused-ar validate_str("collector", param, safe, RX_COLLECTOR, optional=True) validate_strlist("extrajdl", param, safe, RX_SCRIPTARGS) validate_num("dryrun", param, safe, optional=True) + if safe.kwargs["dryrun"] == 1: + raise InvalidParameter("Submit with --dryrun is deprecate. Submit task and run \"crab preparelocal\" instead.") validate_num("ignoreglobalblacklist", param, safe, optional=True) validate_num("partialdataset", param, safe, optional=True) validate_num("requireaccelerator", param, safe, optional=True) From 1d5a356c70e7efaa0a774a1639cb2a2809a5a2c0 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:22:47 +0200 Subject: [PATCH 17/42] fix wording --- src/python/CRABInterface/RESTUserWorkflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/CRABInterface/RESTUserWorkflow.py b/src/python/CRABInterface/RESTUserWorkflow.py index 03c27e393e..0a5be01449 100644 --- a/src/python/CRABInterface/RESTUserWorkflow.py +++ b/src/python/CRABInterface/RESTUserWorkflow.py @@ -427,7 +427,7 @@ def validate(self, apiobj, method, api, param, safe): #pylint: disable=unused-ar validate_strlist("extrajdl", param, safe, RX_SCRIPTARGS) validate_num("dryrun", param, safe, optional=True) if safe.kwargs["dryrun"] == 1: - raise InvalidParameter("Submit with --dryrun is deprecate. Submit task and run \"crab preparelocal\" instead.") + raise InvalidParameter("Submit with --dryrun is deprecate. Submit normally and run \"crab preparelocal\" to run test job locally.") validate_num("ignoreglobalblacklist", param, safe, optional=True) validate_num("partialdataset", param, safe, optional=True) validate_num("requireaccelerator", param, safe, optional=True) From 00d950f259563d43dffbeee48b5b0d43f290362d Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:23:39 +0200 Subject: [PATCH 18/42] fix wording2 --- src/python/CRABInterface/RESTUserWorkflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/CRABInterface/RESTUserWorkflow.py b/src/python/CRABInterface/RESTUserWorkflow.py index 0a5be01449..4dfa13c8ac 100644 --- a/src/python/CRABInterface/RESTUserWorkflow.py +++ b/src/python/CRABInterface/RESTUserWorkflow.py @@ -427,7 +427,7 @@ def validate(self, apiobj, method, api, param, safe): #pylint: disable=unused-ar validate_strlist("extrajdl", param, safe, RX_SCRIPTARGS) validate_num("dryrun", param, safe, optional=True) if safe.kwargs["dryrun"] == 1: - raise InvalidParameter("Submit with --dryrun is deprecate. Submit normally and run \"crab preparelocal\" to run test job locally.") + raise InvalidParameter("Submit with --dryrun is deprecated. Submit normally and run \"crab preparelocal\" to run the job locally.") validate_num("ignoreglobalblacklist", param, safe, optional=True) validate_num("partialdataset", param, safe, optional=True) validate_num("requireaccelerator", param, safe, optional=True) From 82756cce5bac41e48e6b280c8e64163d3668d13b Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:38:13 +0200 Subject: [PATCH 19/42] heck if sandbox exist --- src/python/TaskWorker/Actions/DagmanCreator.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 8f98adaedb..87731fcd9e 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -17,7 +17,7 @@ from ast import literal_eval from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME -from ServerUtilities import getLock, downloadFromS3 +from ServerUtilities import getLock, downloadFromS3, getDownloadUrlFromS3 import TaskWorker.DataObjects.Result from TaskWorker.Actions.TaskAction import TaskAction @@ -1204,10 +1204,9 @@ def executeInternal(self, *args, **kw): sandboxName = kw['task']['tm_user_sandbox'] dbgFilesName = kw['task']['tm_debug_files'] try: - self.logger.debug(f"Skip download sandbox.tar.gz: {sandboxName}") - # TODO: need function to check if sandbox exist instead - #downloadFromS3(crabserver=self.crabserver, objecttype='sandbox', username=username, - # tarballname=sandboxName, filepath=sandboxTarBall, logger=self.logger) + self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") + getDownloadUrlFromS3(crabserver=self.crabserver, objecttype='sandbox', + username=username, tarballname=sandboxName+'temp___', logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ @@ -1235,8 +1234,6 @@ def executeInternal(self, *args, **kw): self.extractMonitorFiles(inputFiles, **kw) - #if kw['task'].get('tm_user_sandbox') == 'sandbox.tar.gz': - # inputFiles.append('sandbox.tar.gz') if os.path.exists("CMSRunAnalysis.tar.gz"): inputFiles.append("CMSRunAnalysis.tar.gz") if os.path.exists("TaskManagerRun.tar.gz"): From 99c7496bbf5d88c63b3642e180aa9a3b9847e606 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:40:51 +0200 Subject: [PATCH 20/42] debug --- src/python/TaskWorker/Actions/DagmanCreator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 87731fcd9e..3795487e7a 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -1205,8 +1205,9 @@ def executeInternal(self, *args, **kw): dbgFilesName = kw['task']['tm_debug_files'] try: self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") - getDownloadUrlFromS3(crabserver=self.crabserver, objecttype='sandbox', + x = getDownloadUrlFromS3(crabserver=self.crabserver, objecttype='sandbox', username=username, tarballname=sandboxName+'temp___', logger=self.logger) + self.logger.debug(f"11111111111111111111111111\n {x}") kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ From 8ca7082d4da39fd9ed2d98dbaff7ba5ccd0a7ce7 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:49:01 +0200 Subject: [PATCH 21/42] S3HeadObeject --- src/python/ServerUtilities.py | 29 +++++++++++++++++++ .../TaskWorker/Actions/DagmanCreator.py | 7 ++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 73e85c4ef7..b0966b3ade 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -751,6 +751,35 @@ def downloadFromS3(crabserver=None, filepath=None, objecttype=None, taskname=Non tarballname=tarballname, logger=logger) downloadFromS3ViaPSU(filepath=filepath, preSignedUrl=preSignedUrl, logger=logger) +def S3HeadObject(crabserver=None, objecttype=None, username=None, tarballname=None, + logger=None): + """ + one call to make a 2-step operation: + obtains a preSignedUrl from crabserver RESTCache and use it to check if file exist + :param crabserver: a RESTInteraction/CRABRest object : points to CRAB Server to use + :param objecttype: string : the kind of object to dowbload: clientlog|twlog|sandbox|debugfiles|runtimefiles + :param username: string : the username this sandbox belongs to, in case objecttype=sandbox + :param tarballname: string : for sandbox, taskname is not used but tarballname is needed + :return: nothing. Raises an exception in case of error + """ + preSignedUrl = getDownloadUrlFromS3(crabserver=crabserver, objecttype=objecttype, + username=username, + tarballname=tarballname, logger=logger) + downloadCommand = '' + if os.getenv('CRAB_useGoCurl'): + raise NotImplementedError('HEAD with gocurl is not implemented') + + downloadCommand += 'wget -Sq --method=HEAD' + downloadCommand += ' "%s"' % preSignedUrl + + with subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as downloadProcess: + logger.debug("Will execute:\n%s", downloadCommand) + stdout, stderr = downloadProcess.communicate() + exitcode = downloadProcess.returncode + logger.debug('exitcode: %s\nstdout: %s', exitcode, stdout) + + if exitcode != 0: + raise Exception('Download command %s failed. stderr is:\n%s' % (downloadCommand, stderr)) def retrieveFromS3(crabserver=None, objecttype=None, taskname=None, username=None, tarballname=None, logger=None): diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 3795487e7a..9cc277878f 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -17,7 +17,7 @@ from ast import literal_eval from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME -from ServerUtilities import getLock, downloadFromS3, getDownloadUrlFromS3 +from ServerUtilities import getLock, downloadFromS3, S3HeadObject import TaskWorker.DataObjects.Result from TaskWorker.Actions.TaskAction import TaskAction @@ -1205,9 +1205,8 @@ def executeInternal(self, *args, **kw): dbgFilesName = kw['task']['tm_debug_files'] try: self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") - x = getDownloadUrlFromS3(crabserver=self.crabserver, objecttype='sandbox', - username=username, tarballname=sandboxName+'temp___', logger=self.logger) - self.logger.debug(f"11111111111111111111111111\n {x}") + S3HeadObject(crabserver=self.crabserver, objecttype='sandbox', + username=username, tarballname=sandboxName+'temp___', logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ From 47eb7b4406e3f01c7a4c68f3d269c92cd6386d42 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 20:51:50 +0200 Subject: [PATCH 22/42] fix sandboxName --- src/python/TaskWorker/Actions/DagmanCreator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 9cc277878f..1f6971e832 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -1206,7 +1206,7 @@ def executeInternal(self, *args, **kw): try: self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") S3HeadObject(crabserver=self.crabserver, objecttype='sandbox', - username=username, tarballname=sandboxName+'temp___', logger=self.logger) + username=username, tarballname=sandboxName, logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ From 93aaf611f93037bdc215ab2db04b8fb16d5f7a68 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:01:11 +0200 Subject: [PATCH 23/42] test header range with wget --- src/python/ServerUtilities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index b0966b3ade..354c410d2e 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -769,7 +769,7 @@ def S3HeadObject(crabserver=None, objecttype=None, username=None, tarballname=No if os.getenv('CRAB_useGoCurl'): raise NotImplementedError('HEAD with gocurl is not implemented') - downloadCommand += 'wget -Sq --method=HEAD' + downloadCommand += 'wget -Sq --header="Range: bytes=0-0"' downloadCommand += ' "%s"' % preSignedUrl with subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as downloadProcess: From b9ccf4f43fcae9c6871a1c425d34a6a9447ef031 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:15:04 +0200 Subject: [PATCH 24/42] rename --- src/python/ServerUtilities.py | 23 ++++++++++++------- .../TaskWorker/Actions/DagmanCreator.py | 6 ++--- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 354c410d2e..eb5d5bd6ca 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -751,16 +751,23 @@ def downloadFromS3(crabserver=None, filepath=None, objecttype=None, taskname=Non tarballname=tarballname, logger=logger) downloadFromS3ViaPSU(filepath=filepath, preSignedUrl=preSignedUrl, logger=logger) -def S3HeadObject(crabserver=None, objecttype=None, username=None, tarballname=None, +def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=None, logger=None): """ - one call to make a 2-step operation: - obtains a preSignedUrl from crabserver RESTCache and use it to check if file exist - :param crabserver: a RESTInteraction/CRABRest object : points to CRAB Server to use - :param objecttype: string : the kind of object to dowbload: clientlog|twlog|sandbox|debugfiles|runtimefiles - :param username: string : the username this sandbox belongs to, in case objecttype=sandbox - :param tarballname: string : for sandbox, taskname is not used but tarballname is needed - :return: nothing. Raises an exception in case of error + Check if file exist in S3. Raise exception if wget is exit with non-zero. + Note that presigned url from GetObject API could not use by HeadObject API. + So, use `--header="Range: bytes=0-0"` to fetch zero bytes instead.o + + :param crabserver: CRABRest object, points to CRAB Server to use + :type crabserver: RESTInteractions.CRABRest + :param objecttype: the kind of object to dowbload: clientlog|twlog|sandbox|debugfiles|runtimefiles + :type objecttype: str + :param username: the username this sandbox belongs to, in case objecttype=sandbox + :type username: str + :param tarballname: for sandbox, taskname is not used but tarballname is needed + :type tarballname: str + + :return: None, but raise exception if wget is exit with non-zero. """ preSignedUrl = getDownloadUrlFromS3(crabserver=crabserver, objecttype=objecttype, username=username, diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 1f6971e832..c659c64178 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -17,7 +17,7 @@ from ast import literal_eval from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME -from ServerUtilities import getLock, downloadFromS3, S3HeadObject +from ServerUtilities import getLock, downloadFromS3, checkS3Object import TaskWorker.DataObjects.Result from TaskWorker.Actions.TaskAction import TaskAction @@ -1205,8 +1205,8 @@ def executeInternal(self, *args, **kw): dbgFilesName = kw['task']['tm_debug_files'] try: self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") - S3HeadObject(crabserver=self.crabserver, objecttype='sandbox', - username=username, tarballname=sandboxName, logger=self.logger) + checkS3Object(crabserver=self.crabserver, objecttype='sandbox', + username=username, tarballname=sandboxName, logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ From 1dd582490291ce1a048d79b2dc416ccb70a43cb3 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:15:42 +0200 Subject: [PATCH 25/42] break --- src/python/TaskWorker/Actions/DagmanCreator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index c659c64178..6e7cad1ec8 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -1206,7 +1206,7 @@ def executeInternal(self, *args, **kw): try: self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") checkS3Object(crabserver=self.crabserver, objecttype='sandbox', - username=username, tarballname=sandboxName, logger=self.logger) + username=username, tarballname=sandboxName+'__test', logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ From 1f40a30746654ceef25add7e633edb9a0011f460 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:39:42 +0200 Subject: [PATCH 26/42] check s3 object --- src/python/ServerUtilities.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index eb5d5bd6ca..eea4760480 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -755,8 +755,10 @@ def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=N logger=None): """ Check if file exist in S3. Raise exception if wget is exit with non-zero. + Usually, you will see stderr with http response `404 Not Found` if file does not exists. Note that presigned url from GetObject API could not use by HeadObject API. - So, use `--header="Range: bytes=0-0"` to fetch zero bytes instead.o + Use `` to fetch few bytes instead. + :param crabserver: CRABRest object, points to CRAB Server to use :type crabserver: RESTInteractions.CRABRest @@ -776,8 +778,9 @@ def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=N if os.getenv('CRAB_useGoCurl'): raise NotImplementedError('HEAD with gocurl is not implemented') - downloadCommand += 'wget -Sq --header="Range: bytes=0-0"' + downloadCommand += 'wget -Sq -O -' downloadCommand += ' "%s"' % preSignedUrl + downloadCommand += ' | head -c1000 > /dev/null' with subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as downloadProcess: logger.debug("Will execute:\n%s", downloadCommand) From 963a904bf5b6d6bea55b1364f8616b5d0ab62d60 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:45:19 +0200 Subject: [PATCH 27/42] try to run inside bash --- src/python/ServerUtilities.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index eea4760480..76830a3650 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -778,9 +778,10 @@ def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=N if os.getenv('CRAB_useGoCurl'): raise NotImplementedError('HEAD with gocurl is not implemented') - downloadCommand += 'wget -Sq -O -' - downloadCommand += ' "%s"' % preSignedUrl - downloadCommand += ' | head -c1000 > /dev/null' + downloadCommand += 'bash -c "set -o pipefail;' + downloadCommand += ' wget -Sq -O -' + downloadCommand += ' \\"%s\\"' % preSignedUrl + downloadCommand += ' | head -c1000 > /dev/null"' with subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as downloadProcess: logger.debug("Will execute:\n%s", downloadCommand) From 02fb98c0b28da8233aa5785534731dc188b6a8a1 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:46:13 +0200 Subject: [PATCH 28/42] Revert "break" This reverts commit d8f170fd11e0e78eaaefb31c1cbf14f857f5181a. --- src/python/TaskWorker/Actions/DagmanCreator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 6e7cad1ec8..c659c64178 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -1206,7 +1206,7 @@ def executeInternal(self, *args, **kw): try: self.logger.debug(f"Checking if sandbox file is available: {sandboxName}") checkS3Object(crabserver=self.crabserver, objecttype='sandbox', - username=username, tarballname=sandboxName+'__test', logger=self.logger) + username=username, tarballname=sandboxName, logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ From 325d8b08e8ab14568fec609f85f2da004f6e2da5 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:48:48 +0200 Subject: [PATCH 29/42] update comment --- src/python/ServerUtilities.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 76830a3650..eb4b9f7ddd 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -757,8 +757,9 @@ def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=N Check if file exist in S3. Raise exception if wget is exit with non-zero. Usually, you will see stderr with http response `404 Not Found` if file does not exists. Note that presigned url from GetObject API could not use by HeadObject API. - Use `` to fetch few bytes instead. - + Use 'head -c1000' to fetch few bytes instead, and need to wrap inside bash + with `set -o pipefail` to make it early exit, so exit code from wget can + propagate back to Popen properly. :param crabserver: CRABRest object, points to CRAB Server to use :type crabserver: RESTInteractions.CRABRest From 34894d3bb82a004b5cf8cab3627d4275b243b677 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 21:51:49 +0200 Subject: [PATCH 30/42] comment a bit --- src/python/TaskWorker/Actions/DagmanCreator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index c659c64178..c2ea9af925 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -1209,7 +1209,7 @@ def executeInternal(self, *args, **kw): username=username, tarballname=sandboxName, logger=self.logger) kw['task']['tm_user_sandbox'] = sandboxTarBall except Exception as ex: - raise TaskWorkerException("The CRAB server backend could not download the input sandbox with your code " + \ + raise TaskWorkerException("The CRAB server backend could not find the input sandbox with your code " + \ "from S3.\nThis could be a temporary glitch; please try to submit a new task later " + \ "(resubmit will not work) and contact the experts if the error persists.\nError reason: %s" % str(ex)) from ex try: From 6345a6edf190fac0e0909c3fdcd12b2b6f65ef90 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 9 Aug 2024 22:41:15 +0200 Subject: [PATCH 31/42] can get presigned with head_object api --- src/python/CRABInterface/RESTCache.py | 10 +++++++--- src/python/CRABInterface/Regexps.py | 1 + src/python/ServerUtilities.py | 17 +++++++++-------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/python/CRABInterface/RESTCache.py b/src/python/CRABInterface/RESTCache.py index 1660eaba0f..72fd084af0 100644 --- a/src/python/CRABInterface/RESTCache.py +++ b/src/python/CRABInterface/RESTCache.py @@ -13,7 +13,8 @@ # CRABServer dependecies here from CRABInterface.RESTExtensions import authz_login_valid, authz_operator -from CRABInterface.Regexps import RX_SUBRES_CACHE, RX_CACHE_OBJECTTYPE, RX_TASKNAME, RX_USERNAME, RX_TARBALLNAME +from CRABInterface.Regexps import (RX_SUBRES_CACHE, RX_CACHE_OBJECTTYPE, RX_TASKNAME, + RX_USERNAME, RX_TARBALLNAME, RX_PRESIGNED_CLIENT_METHOD) from ServerUtilities import getUsernameFromTaskname, MeasureTime @@ -99,9 +100,10 @@ def validate(self, apiobj, method, api, param, safe): validate_str('taskname', param, safe, RX_TASKNAME, optional=True) validate_str('username', param, safe, RX_USERNAME, optional=True) validate_str('tarballname', param, safe, RX_TARBALLNAME, optional=True) + validate_str('clientmethod', param, safe, RX_PRESIGNED_CLIENT_METHOD, optional=True) @restcall - def get(self, subresource, objecttype, taskname, username, tarballname): # pylint: disable=redefined-builtin + def get(self, subresource, objecttype, taskname, username, tarballname, clientmethod): # pylint: disable=redefined-builtin """ :arg str subresource: the specific information to be accessed; """ @@ -165,6 +167,8 @@ def get(self, subresource, objecttype, taskname, username, tarballname): # pyli authz_operator(username=ownerName, group='crab3', role='operator') if subresource == 'sandbox' and not username: raise MissingParameter("username is missing") + if not clientmethod: + clientmethod = 'get_object' # returns a PreSignedUrl to download the file within the expiration time expiration = 60 * 60 # 1 hour default is good for retries and debugging if objecttype in ['debugfiles', 'clientlog', 'twlog']: @@ -172,7 +176,7 @@ def get(self, subresource, objecttype, taskname, username, tarballname): # pyli try: with MeasureTime(self.logger, modulename=__name__, label="get.download.generate_presigned_post") as _: response = self.s3_client.generate_presigned_url( - 'get_object', Params={'Bucket': self.s3_bucket, 'Key': s3_objectKey}, + clientmethod, Params={'Bucket': self.s3_bucket, 'Key': s3_objectKey}, ExpiresIn=expiration) preSignedUrl = response except ClientError as e: diff --git a/src/python/CRABInterface/Regexps.py b/src/python/CRABInterface/Regexps.py index 035628f99b..721a033a3e 100644 --- a/src/python/CRABInterface/Regexps.py +++ b/src/python/CRABInterface/Regexps.py @@ -93,6 +93,7 @@ #subresources of Cache resource RX_SUBRES_CACHE = re.compile(r"^(upload|download|retrieve|list|used)$") RX_CACHE_OBJECTTYPE = re.compile(r"^(clientlog|twlog|sandbox|debugfiles|runtimefiles)$") +RX_PRESIGNED_CLIENT_METHOD = re.compile(r"^(get_object|head_object)$") #worker workflow RX_WORKER_NAME = re.compile(r"^[A-Za-z0-9\-\._%]{1,100}$") diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index eb4b9f7ddd..6a30c01b3c 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -773,16 +773,14 @@ def checkS3Object(crabserver=None, objecttype=None, username=None, tarballname=N :return: None, but raise exception if wget is exit with non-zero. """ preSignedUrl = getDownloadUrlFromS3(crabserver=crabserver, objecttype=objecttype, - username=username, - tarballname=tarballname, logger=logger) + username=username, tarballname=tarballname, + clientmethod='head_object', + logger=logger) downloadCommand = '' if os.getenv('CRAB_useGoCurl'): raise NotImplementedError('HEAD with gocurl is not implemented') - - downloadCommand += 'bash -c "set -o pipefail;' - downloadCommand += ' wget -Sq -O -' - downloadCommand += ' \\"%s\\"' % preSignedUrl - downloadCommand += ' | head -c1000 > /dev/null"' + downloadCommand += ' wget -Sq -O /dev/null --method=HEAD' + downloadCommand += ' "%s"' % preSignedUrl with subprocess.Popen(downloadCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as downloadProcess: logger.debug("Will execute:\n%s", downloadCommand) @@ -869,7 +867,8 @@ def uploadToS3(crabserver=None, filepath=None, objecttype=None, taskname=None, def getDownloadUrlFromS3(crabserver=None, objecttype=None, taskname=None, - username=None, tarballname=None, logger=None): + username=None, tarballname=None, clientmethod=None, + logger=None): """ obtains a PreSigned URL to access an existing object in S3 :param crabserver: a RESTInteraction/CRABRest object : points to CRAB Server to use @@ -887,6 +886,8 @@ def getDownloadUrlFromS3(crabserver=None, objecttype=None, taskname=None, dataDict['username'] = username if tarballname: dataDict['tarballname'] = tarballname + if clientmethod: + dataDict['clientmethod'] = clientmethod data = encodeRequest(dataDict) try: # calls to restServer alway return a 3-ple ({'result':a-list}, HTTPcode, HTTPreason) From 57840ae90d14777015b44a4850bd26ce6bd66f36 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Sat, 10 Aug 2024 00:10:44 +0200 Subject: [PATCH 32/42] explicit transfer sandbox.tar.gz --- src/python/TaskWorker/Actions/DagmanSubmitter.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index 5d48d14c83..faa7a6e375 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -398,7 +398,7 @@ def executeInternal(self, info, inputFiles, **kwargs): os.chdir(kwargs['tempDir']) info['start_time'] = task['tm_start_time'] - info['inputFilesString'] = ", ".join(inputFiles + ['subdag.jdl']) + info['inputFilesString'] = ", ".join(inputFiles + ['sandbox.tar.gz', 'subdag.jdl']) outputFiles = ["RunJobs.dag.dagman.out", "RunJobs.dag.rescue.001"] info['outputFilesString'] = ", ".join(outputFiles) arg = "RunJobs.dag" @@ -442,6 +442,8 @@ def executeInternal(self, info, inputFiles, **kwargs): self.logger.debug("Finally submitting to the schedd") if address: try: + import pdb; + pdb.set_trace() self.clusterId = self.submitDirect(schedd, 'dag_bootstrap_startup.sh', arg, info) except Exception as submissionError: msg = f"Something went wrong: {submissionError} \n" @@ -542,6 +544,8 @@ def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201 htcondor.param['DELEGATE_FULL_JOB_GSI_CREDENTIALS'] = 'true' htcondor.param['DELEGATE_JOB_GSI_CREDENTIALS_LIFETIME'] = '0' try: + import pdb; + pdb.set_trace() submitResult = schedd.submit(description=jobJDL, count=1, spool=True) clusterId = submitResult.cluster() numProcs = submitResult.num_procs() From 954b0d3e921f0038369180721f70ee63707a42ce Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Sat, 10 Aug 2024 00:16:35 +0200 Subject: [PATCH 33/42] remove pdb --- src/python/TaskWorker/Actions/DagmanSubmitter.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index faa7a6e375..d6c2a47aea 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -442,8 +442,6 @@ def executeInternal(self, info, inputFiles, **kwargs): self.logger.debug("Finally submitting to the schedd") if address: try: - import pdb; - pdb.set_trace() self.clusterId = self.submitDirect(schedd, 'dag_bootstrap_startup.sh', arg, info) except Exception as submissionError: msg = f"Something went wrong: {submissionError} \n" @@ -544,8 +542,6 @@ def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201 htcondor.param['DELEGATE_FULL_JOB_GSI_CREDENTIALS'] = 'true' htcondor.param['DELEGATE_JOB_GSI_CREDENTIALS_LIFETIME'] = '0' try: - import pdb; - pdb.set_trace() submitResult = schedd.submit(description=jobJDL, count=1, spool=True) clusterId = submitResult.cluster() numProcs = submitResult.num_procs() From e51ee37bb913457563291bfd5d7df1c161e51d98 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Mon, 12 Aug 2024 12:16:36 +0200 Subject: [PATCH 34/42] sandbox --- src/python/TaskWorker/Actions/DagmanCreator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index c2ea9af925..fa43c3fa9e 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -541,8 +541,8 @@ def makeJobSubmit(self, task): info['additional_environment_options'] += ' CRAB_TASKMANAGER_TARBALL=local' else: raise TaskWorkerException("Cannot find TaskManagerRun.tar.gz inside the cwd: %s" % os.getcwd()) - if os.path.exists("sandbox.tar.gz"): - info['additional_input_file'] += ", sandbox.tar.gz" + + info['additional_input_file'] += ", sandbox.tar.gz" # will be available at dag bootstrap time info['additional_input_file'] += ", run_and_lumis.tar.gz" info['additional_input_file'] += ", input_files.tar.gz" info['additional_input_file'] += ", submit_env.sh" From 990baab5cc22b906b05c2faa629e2b1a61a72ca4 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Mon, 12 Aug 2024 15:27:22 +0200 Subject: [PATCH 35/42] Revert "explicit transfer sandbox.tar.gz" This reverts commit 57840ae90d14777015b44a4850bd26ce6bd66f36. --- src/python/TaskWorker/Actions/DagmanSubmitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index d6c2a47aea..5d48d14c83 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -398,7 +398,7 @@ def executeInternal(self, info, inputFiles, **kwargs): os.chdir(kwargs['tempDir']) info['start_time'] = task['tm_start_time'] - info['inputFilesString'] = ", ".join(inputFiles + ['sandbox.tar.gz', 'subdag.jdl']) + info['inputFilesString'] = ", ".join(inputFiles + ['subdag.jdl']) outputFiles = ["RunJobs.dag.dagman.out", "RunJobs.dag.rescue.001"] info['outputFilesString'] = ", ".join(outputFiles) arg = "RunJobs.dag" From 001727ed2ed350f35b0e7aef9ff605289d6ab617 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Wed, 14 Aug 2024 15:01:29 +0200 Subject: [PATCH 36/42] also upload inputFiles.tar.gz to s3 --- src/python/TaskWorker/Actions/DagmanCreator.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index fa43c3fa9e..fb830329e3 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -17,7 +17,7 @@ from ast import literal_eval from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME -from ServerUtilities import getLock, downloadFromS3, checkS3Object +from ServerUtilities import getLock, downloadFromS3, checkS3Object, uploadToS3 import TaskWorker.DataObjects.Result from TaskWorker.Actions.TaskAction import TaskAction @@ -727,6 +727,12 @@ def prepareLocal(self, dagSpecs, info, kw, inputFiles, subdags): finally: tf.close() + # also upload InputFiles.tar.gz to s3 + task = kw['task']['tm_taskname'] + uploadToS3(crabserver=self.crabserver, filepath='InputFiles.tar.gz', + objecttype='runtimefiles', taskname=task, + logger=self.logger) + def createSubdag(self, splitterResult, **kwargs): startjobid = kwargs.get('startjobid', 0) From 73bd9eca1e139c5dd9514617f296b43aace391cf Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Thu, 15 Aug 2024 11:39:31 +0200 Subject: [PATCH 37/42] stop dryrun --- src/python/TaskWorker/Actions/Handler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/python/TaskWorker/Actions/Handler.py b/src/python/TaskWorker/Actions/Handler.py index 6f30a42ac7..b52313c3fa 100644 --- a/src/python/TaskWorker/Actions/Handler.py +++ b/src/python/TaskWorker/Actions/Handler.py @@ -198,9 +198,7 @@ def handleNewTask(resthost, dbInstance, config, task, procnum, *args, **kwargs): handler.addWork(MakeFakeFileSet(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(Splitter(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(DagmanCreator(config=config, crabserver=crabserver, procnum=procnum, rucioClient=rucioClient)) - if task['tm_dry_run'] == 'T': - handler.addWork(DryRunUploader(config=config, crabserver=crabserver, procnum=procnum)) - else: + if not task['tm_dry_run'] == 'T': handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum)) return handler.actionWork(args, kwargs) From 340178d2e8017cb58e73d2cdadc861421cd5ddfa Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Thu, 15 Aug 2024 11:54:32 +0200 Subject: [PATCH 38/42] add comment --- src/python/TaskWorker/Actions/DagmanCreator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index fb830329e3..50a46f696c 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -728,6 +728,10 @@ def prepareLocal(self, dagSpecs, info, kw, inputFiles, subdags): tf.close() # also upload InputFiles.tar.gz to s3 + # Wa: Now (2024) I am still not sure if we need to poll uploaded file + # until it available (see #6706). If this still the case, use + # implementation from the old DryRunUploader action. + # https://github.com/dmwm/CRABServer/blob/9b4679d14bb19ccc7373d56c20631eea34d80a69/src/python/TaskWorker/Actions/DryRunUploader.py#L61-L75 task = kw['task']['tm_taskname'] uploadToS3(crabserver=self.crabserver, filepath='InputFiles.tar.gz', objecttype='runtimefiles', taskname=task, From 1c8ad910929cb0a62c5a6a83081744c385538f8f Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Thu, 15 Aug 2024 11:55:07 +0200 Subject: [PATCH 39/42] dryrun action (only set status to UPLOADED) --- src/python/TaskWorker/Actions/DryRun.py | 27 ++++ .../TaskWorker/Actions/DryRunUploader.py | 140 ------------------ src/python/TaskWorker/Actions/Handler.py | 2 + 3 files changed, 29 insertions(+), 140 deletions(-) create mode 100644 src/python/TaskWorker/Actions/DryRun.py delete mode 100644 src/python/TaskWorker/Actions/DryRunUploader.py diff --git a/src/python/TaskWorker/Actions/DryRun.py b/src/python/TaskWorker/Actions/DryRun.py new file mode 100644 index 0000000000..4d98956718 --- /dev/null +++ b/src/python/TaskWorker/Actions/DryRun.py @@ -0,0 +1,27 @@ +""" + +""" +from urllib.parse import urlencode + +from TaskWorker.DataObjects.Result import Result +from TaskWorker.Actions.TaskAction import TaskAction +from TaskWorker.WorkerExceptions import TaskWorkerException + + +class DryRun(TaskAction): + """ + Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) + """ + def executeInternal(self, *args, **kw): + task = kw['task'] + update = {'workflow': task['tm_taskname'], 'subresource': 'state', 'status': 'UPLOADED'} + self.logger.debug('Updating task status: %s', str(update)) + self.crabserver.post(api='workflowdb', data=urlencode(update)) + return Result(task=task, result=args[0]) + + def execute(self, *args, **kw): + try: + return self.executeInternal(*args, **kw) + except Exception as e: + msg = "Failed to run DryRun action for %s; '%s'" % (task['tm_taskname'], str(e)) + raise TaskWorkerException(msg) from e diff --git a/src/python/TaskWorker/Actions/DryRunUploader.py b/src/python/TaskWorker/Actions/DryRunUploader.py deleted file mode 100644 index 16698729f7..0000000000 --- a/src/python/TaskWorker/Actions/DryRunUploader.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) -""" -import os -import json -import tarfile -import time - -import sys -if sys.version_info >= (3, 0): - from urllib.parse import urlencode # pylint: disable=no-name-in-module -if sys.version_info < (3, 0): - from urllib import urlencode - -from WMCore.DataStructs.LumiList import LumiList - -from TaskWorker.DataObjects.Result import Result -from TaskWorker.Actions.TaskAction import TaskAction -from TaskWorker.WorkerExceptions import TaskWorkerException -from ServerUtilities import uploadToS3, downloadFromS3 - -class DryRunUploader(TaskAction): - """ - Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.) - """ - - def packSandbox(self, inputFiles): - dryRunSandbox = tarfile.open('dry-run-sandbox.tar.gz', 'w:gz') - for f in inputFiles: - self.logger.debug('Adding %s to dry run tarball', f) - dryRunSandbox.add(f, recursive=True) - - dryRunSandbox.close() - - def executeInternal(self, *args, **kw): - inputFiles = args[0][2] - splitterResult = args[0][3][0] - - cwd = os.getcwd() - try: - os.chdir(kw['tempDir']) - splittingSummary = SplittingSummary(kw['task']['tm_split_algo']) - for jobgroup in splitterResult: - jobs = jobgroup.getJobs() - splittingSummary.addJobs(jobs) - splittingSummary.dump('splitting-summary.json') - inputFiles.append('splitting-summary.json') - - self.packSandbox(inputFiles) - - self.logger.info('Uploading dry run tarball to the user file cache') - t0 = time.time() - uploadToS3(crabserver=self.crabserver, filepath='dry-run-sandbox.tar.gz', - objecttype='runtimefiles', taskname=kw['task']['tm_taskname'], logger=self.logger) - os.remove('dry-run-sandbox.tar.gz') - self.logger.info('Uploaded dry run tarball to the user file cache') - # wait until tarball is available, S3 may take a few seconds for this (ref. issue #6706 ) - t1 = time.time() - lt1 = time.strftime("%H:%M:%S", time.localtime(t1)) - uploadTime = t1-t0 - self.logger.debug('runtimefiles upload took %s secs and completed at %s', uploadTime, lt1) - self.logger.debug('check if tarball is available') - tarballOK = False - while not tarballOK: - try: - self.logger.debug('download tarball to /dev/null') - downloadFromS3(crabserver=self.crabserver, filepath='/dev/null', objecttype='runtimefiles', - taskname=kw['task']['tm_taskname'], logger=self.logger) - self.logger.debug('OK, it worked') - tarballOK = True - except Exception as e: - self.logger.debug('runtimefiles tarball not ready yet') - self.logger.debug('Exception was raised: %s', e) - self.logger.debug('Sleep 5 sec') - time.sleep(5) - update = {'workflow': kw['task']['tm_taskname'], 'subresource': 'state', 'status': 'UPLOADED'} - self.logger.debug('Updating task status: %s', str(update)) - self.crabserver.post(api='workflowdb', data=urlencode(update)) - - finally: - os.chdir(cwd) - - return Result(task=kw['task'], result=args[0]) - - def execute(self, *args, **kw): - try: - return self.executeInternal(*args, **kw) - except Exception as e: - msg = "Failed to upload dry run tarball for %s; '%s'" % (kw['task']['tm_taskname'], str(e)) - raise TaskWorkerException(msg) - -class SplittingSummary(object): - """ - Class which calculates some summary data about the splitting results. - """ - - def __init__(self, algo): - self.algo = algo - self.lumisPerJob = [] - self.eventsPerJob = [] - self.filesPerJob = [] - - def addJobs(self, jobs): - if self.algo == 'FileBased': - self.lumisPerJob += [sum([x.get('lumiCount', 0) for x in job['input_files']]) for job in jobs] - self.eventsPerJob += [sum([x['events'] for x in job['input_files']]) for job in jobs] - self.filesPerJob += [len(job['input_files']) for job in jobs] - elif self.algo == 'EventBased': - self.lumisPerJob += [job['mask']['LastLumi'] - job['mask']['FirstLumi'] for job in jobs] - self.eventsPerJob += [job['mask']['LastEvent'] - job['mask']['FirstEvent'] for job in jobs] - else: - for job in jobs: - avgEventsPerLumi = sum([f['avgEvtsPerLumi'] for f in job['input_files']])/float(len(job['input_files'])) - lumis = LumiList(compactList=job['mask']['runAndLumis']) - self.lumisPerJob.append(len(lumis.getLumis())) - self.eventsPerJob.append(avgEventsPerLumi * self.lumisPerJob[-1]) - - def dump(self, outname): - """ - Save splitting summary to a json file. - """ - - summary = {'algo': self.algo, - 'total_jobs': len(self.lumisPerJob), - 'total_lumis': sum(self.lumisPerJob), - 'total_events': sum(self.eventsPerJob), - 'max_lumis': max(self.lumisPerJob), - 'max_events': max(self.eventsPerJob), - 'avg_lumis': sum(self.lumisPerJob)/float(len(self.lumisPerJob)), - 'avg_events': sum(self.eventsPerJob)/float(len(self.eventsPerJob)), - 'min_lumis': min(self.lumisPerJob), - 'min_events': min(self.eventsPerJob)} - if len(self.filesPerJob) > 0: - summary.update({'total_files': sum(self.filesPerJob), - 'max_files': max(self.filesPerJob), - 'avg_files': sum(self.filesPerJob)/float(len(self.filesPerJob)), - 'min_files': min(self.filesPerJob)}) - - with open(outname, 'w') as f: - json.dump(summary, f) diff --git a/src/python/TaskWorker/Actions/Handler.py b/src/python/TaskWorker/Actions/Handler.py index b52313c3fa..ceef9c4dde 100644 --- a/src/python/TaskWorker/Actions/Handler.py +++ b/src/python/TaskWorker/Actions/Handler.py @@ -199,6 +199,8 @@ def handleNewTask(resthost, dbInstance, config, task, procnum, *args, **kwargs): handler.addWork(Splitter(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(DagmanCreator(config=config, crabserver=crabserver, procnum=procnum, rucioClient=rucioClient)) if not task['tm_dry_run'] == 'T': + handler.addWork(DryRun(config=config, crabserver=crabserver, procnum=procnum)) + else: handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum)) return handler.actionWork(args, kwargs) From 42a4b91650d08203ae85e398cc6cc39de3c154ae Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Thu, 15 Aug 2024 12:48:43 +0200 Subject: [PATCH 40/42] fix import --- src/python/TaskWorker/Actions/Handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/Handler.py b/src/python/TaskWorker/Actions/Handler.py index ceef9c4dde..d696997795 100644 --- a/src/python/TaskWorker/Actions/Handler.py +++ b/src/python/TaskWorker/Actions/Handler.py @@ -18,7 +18,7 @@ from TaskWorker.Actions.MyProxyLogon import MyProxyLogon from TaskWorker.Actions.DagmanCreator import DagmanCreator from TaskWorker.Actions.StageoutCheck import StageoutCheck -from TaskWorker.Actions.DryRunUploader import DryRunUploader +from TaskWorker.Actions.DryRun import DryRun from TaskWorker.Actions.MakeFakeFileSet import MakeFakeFileSet from TaskWorker.Actions.DagmanSubmitter import DagmanSubmitter from TaskWorker.Actions.DBSDataDiscovery import DBSDataDiscovery From c7a22d95d258e238de5a7f36bbf12bca1c026657 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Thu, 15 Aug 2024 12:51:27 +0200 Subject: [PATCH 41/42] enable dryrun bacck --- src/python/CRABInterface/RESTUserWorkflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/python/CRABInterface/RESTUserWorkflow.py b/src/python/CRABInterface/RESTUserWorkflow.py index 4dfa13c8ac..2323891b08 100644 --- a/src/python/CRABInterface/RESTUserWorkflow.py +++ b/src/python/CRABInterface/RESTUserWorkflow.py @@ -426,8 +426,6 @@ def validate(self, apiobj, method, api, param, safe): #pylint: disable=unused-ar validate_str("collector", param, safe, RX_COLLECTOR, optional=True) validate_strlist("extrajdl", param, safe, RX_SCRIPTARGS) validate_num("dryrun", param, safe, optional=True) - if safe.kwargs["dryrun"] == 1: - raise InvalidParameter("Submit with --dryrun is deprecated. Submit normally and run \"crab preparelocal\" to run the job locally.") validate_num("ignoreglobalblacklist", param, safe, optional=True) validate_num("partialdataset", param, safe, optional=True) validate_num("requireaccelerator", param, safe, optional=True) From 2d44321d1c687cb6c4d5f9dc63cf58813ecce2d6 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Thu, 15 Aug 2024 13:26:48 +0200 Subject: [PATCH 42/42] fix logic --- src/python/TaskWorker/Actions/Handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/Handler.py b/src/python/TaskWorker/Actions/Handler.py index d696997795..8837d7ab74 100644 --- a/src/python/TaskWorker/Actions/Handler.py +++ b/src/python/TaskWorker/Actions/Handler.py @@ -198,7 +198,7 @@ def handleNewTask(resthost, dbInstance, config, task, procnum, *args, **kwargs): handler.addWork(MakeFakeFileSet(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(Splitter(config=config, crabserver=crabserver, procnum=procnum)) handler.addWork(DagmanCreator(config=config, crabserver=crabserver, procnum=procnum, rucioClient=rucioClient)) - if not task['tm_dry_run'] == 'T': + if task['tm_dry_run'] == 'T': handler.addWork(DryRun(config=config, crabserver=crabserver, procnum=procnum)) else: handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum))