From e0260024b75b3398aac7e03c4fc2f6b06d92e74f Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 15 Dec 2016 14:11:34 +1100 Subject: [PATCH 01/20] git ignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 3645a572a8..1df5afed00 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ __pycache__ /docker/Dockerfile /docker/toil-*.tar.gz /src/toil/version.py +.project +.pydevproject From 1fe188a44a4262f868a0de65ee36b5be93144dcb Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Tue, 15 Nov 2016 14:44:10 +1100 Subject: [PATCH 02/20] decouple concrete batch systems from common.py --- .project | 17 ++++ .pydevproject | 11 +++ src/toil/batchSystems/abstractBatchSystem.py | 11 ++- src/toil/batchSystems/mesos/batchSystem.py | 6 ++ src/toil/batchSystems/options.py | 63 +++++++++++++ src/toil/batchSystems/parasol.py | 8 ++ src/toil/batchSystems/registry.py | 80 ++++++++++++++++ src/toil/batchSystems/singleMachine.py | 4 + src/toil/common.py | 98 +++++++------------- 9 files changed, 230 insertions(+), 68 deletions(-) create mode 100644 .project create mode 100644 .pydevproject create mode 100644 src/toil/batchSystems/options.py create mode 100644 src/toil/batchSystems/registry.py diff --git a/.project b/.project new file mode 100644 index 0000000000..f68e6d2440 --- /dev/null +++ b/.project @@ -0,0 +1,17 @@ + + + toil + + + + + + org.python.pydev.PyDevBuilder + + + + + + org.python.pydev.pythonNature + + diff --git a/.pydevproject b/.pydevproject new file mode 100644 index 0000000000..8e3be93aaf --- /dev/null +++ b/.pydevproject @@ -0,0 +1,11 @@ + + +python 2.7 +python + +/${PROJECT_DIR_NAME}/src + + +/Users/thomas.e/site-packages + + diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index 689e62f483..98fe133060 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -184,6 +184,16 @@ def getRescueBatchJobFrequency(cls): raise NotImplementedError() + @staticmethod + def setOptions(setOption): + """ + Process command line or configuration options relevant to this batch system + + :param common.Config: the Config object + """ + pass + + class BatchSystemSupport(AbstractBatchSystem): """ Partial implementation of AbstractBatchSystem, support methods. @@ -311,7 +321,6 @@ def workerCleanup(info): and workflowDirContents in ([], [cacheDirName(info.workflowID)])): shutil.rmtree(workflowDir) - class NodeInfo(namedtuple("_NodeInfo", "cores memory workers")): """ The cores attribute is a floating point value between 0 (all cores idle) and 1 (all cores diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index 2fb68f8747..c6348bd9ef 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -572,6 +572,12 @@ def executorLost(self, driver, executorId, slaveId, status): log.warning("Executor '%s' lost.", executorId) + @staticmethod + def setOptions(config): + super.setOptions(config) + config.setOptions.setOption("mesosMasterAddress", None, None, 'localhost:5050') + + def toMiB(n): return n / 1024 / 1024 diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py new file mode 100644 index 0000000000..987bac7e9d --- /dev/null +++ b/src/toil/batchSystems/options.py @@ -0,0 +1,63 @@ +# Copyright (C) 2015-2016 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# + +from registry import batchSystemFactoryFor, defaultBatchSystem, uniqueNames + + +def __parasolOptions(addOptionFn): + addOptionFn("--parasolCommand", dest="parasolCommand", default=None, + help="The name or path of the parasol program. Will be looked up on PATH " + "unless it starts with a slashdefault=%s" % 'parasol') + addOptionFn("--parasolMaxBatches", dest="parasolMaxBatches", default=None, + help="Maximum number of job batches the Parasol batch is allowed to create. One " + "batch is created for jobs with a a unique set of resource requirements. " + "default=%i" % 1000) + +def __singleMachineOptions(addOptionFn): + addOptionFn("--scale", dest="scale", default=None, + help=("A scaling factor to change the value of all submitted tasks's submitted cores. " + "Used in singleMachine batch system. default=%s" % 1)) + +def __mesosOptions(addOptionFn): + addOptionFn("--mesosMaster", dest="mesosMasterAddress", default=None, + help=("The host and port of the Mesos master separated by colon. default=%s" % 'localhost:5050')) + +__OPTIONS = [ + __parasolOptions, + __singleMachineOptions, + __mesosOptions + ] + +__options = list(__OPTIONS) + +def addOptionsDefinition(optionsDefinition): + __options.append(optionsDefinition) + + +def setOptions(config, setOption): + batchSystem = config.batchSystem + + factory = batchSystemFactoryFor(batchSystem) + batchSystem = factory() + + batchSystem.setOptions(setOption) + +def addOptions(addOptionFn): + + addOptionFn("--batchSystem", dest="batchSystem", default=defaultBatchSystem(), + help=("The type of batch system to run the job(s) with, currently can be one " + "of %s'. default=%s" % (', '.join(uniqueNames()), defaultBatchSystem()))) + + for o in __options: + o(addOptionFn) diff --git a/src/toil/batchSystems/parasol.py b/src/toil/batchSystems/parasol.py index 9299f1ed18..39e3b98cf7 100644 --- a/src/toil/batchSystems/parasol.py +++ b/src/toil/batchSystems/parasol.py @@ -368,3 +368,11 @@ def shutdown(self): for results in self.resultsFiles.values(): os.remove(results) os.rmdir(self.parasolResultsDir) + + + @staticmethod + def setOptions(setOption): + from toil.common import iC + setOption("parasolCommand", None, None, 'parasol') + setOption("parasolMaxBatches", int, iC(1), 10000) + diff --git a/src/toil/batchSystems/registry.py b/src/toil/batchSystems/registry.py new file mode 100644 index 0000000000..c9b71c105e --- /dev/null +++ b/src/toil/batchSystems/registry.py @@ -0,0 +1,80 @@ +# Copyright (C) 2015-2016 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# + +def _gridengineBatchSystemFactory(): + from toil.batchSystems.gridengine import GridengineBatchSystem + return GridengineBatchSystem + +def _parasolBatchSystemFactory(): + from toil.batchSystems.parasol import ParasolBatchSystem + return ParasolBatchSystem + +def _lsfBatchSystemFactory(): + from toil.batchSystems.lsf import LSFBatchSystem + return LSFBatchSystem + +def _singleMachineBatchSystemFactory(): + from toil.batchSystems.singleMachine import SingleMachineBatchSystem + return SingleMachineBatchSystem + +def _mesosBatchSystemFactory(): + from toil.batchSystems.mesos.batchSystem import MesosBatchSystem + return MesosBatchSystem + +def _slurmBatchSystemFactory(): + from toil.batchSystems.slurm import SlurmBatchSystem + return SlurmBatchSystem + + +_DEFAULT_REGISTRY = { + 'parasol' : _parasolBatchSystemFactory, + 'singleMachine' : _singleMachineBatchSystemFactory, + 'single_machine' : _singleMachineBatchSystemFactory, + 'gridEngine' : _gridengineBatchSystemFactory, + 'gridengine' : _gridengineBatchSystemFactory, + 'lsf' : _lsfBatchSystemFactory, + 'LSF' : _lsfBatchSystemFactory, + 'mesos' : _mesosBatchSystemFactory, + 'Mesos' : _mesosBatchSystemFactory, + 'slurm' : _slurmBatchSystemFactory, + 'Slurm' : _slurmBatchSystemFactory + } + +_UNIQUE_NAME = { + 'parasol', + 'singleMachine', + 'gridEngine', + 'LSF', + 'Mesos', + 'Slurm' + } + +_batchSystemRegistry = _DEFAULT_REGISTRY.copy() +_batchSystemNames = set(_UNIQUE_NAME) + +def addBatchSystemFactory(key, batchSystemFactory): + _batchSystemNames.add(key) + _batchSystemRegistry[key] = batchSystemFactory + +def batchSystemFactoryFor(batchSystem): + return _batchSystemRegistry[batchSystem ] + +def defaultBatchSystem(): + return 'singleMachine' + +def uniqueNames(): + return list(_batchSystemNames) + +def batchSystems(): + list(set(_batchSystemRegistry.values())) diff --git a/src/toil/batchSystems/singleMachine.py b/src/toil/batchSystems/singleMachine.py index 2229452702..056ab08802 100644 --- a/src/toil/batchSystems/singleMachine.py +++ b/src/toil/batchSystems/singleMachine.py @@ -291,6 +291,10 @@ def getRescueBatchJobFrequency(cls): """ return 5400 + @staticmethod + def setOptions(setOption): + setOption("scale", default=1) + class Info(object): # Can't use namedtuple here since killIntended needs to be mutable def __init__(self, startTime, popen, killIntended): diff --git a/src/toil/common.py b/src/toil/common.py index bc8d41c720..af8ddf6e96 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -30,6 +30,8 @@ from toil import logProcessContext from toil.lib.bioio import addLoggingOptions, getLogLevelString, setLoggingFromOptions from toil.realtimeLogger import RealtimeLogger +from toil.batchSystems.options import setOptions as setBatchOptions +from toil.batchSystems.options import addOptions as addBatchOptions logger = logging.getLogger(__name__) @@ -64,12 +66,8 @@ def __init__(self): #Restarting the workflow options self.restart = False - #Batch system options + #Batch system common options self.batchSystem = "singleMachine" - self.scale = 1 - self.mesosMasterAddress = 'localhost:5050' - self.parasolCommand = "parasol" - self.parasolMaxBatches = 10000 self.environment = {} #Autoscaling options @@ -125,10 +123,13 @@ def setOptions(self, options): """ from bd2k.util.humanize import human2bytes #This import is used to convert #from human readable quantites to integers - def setOption(varName, parsingFn=None, checkFn=None): + def setOption(varName, parsingFn=None, checkFn=None, default=None): #If options object has the option "varName" specified #then set the "varName" attrib to this value in the config object x = getattr(options, varName, None) + if x is None: + x = default + if x is not None: if parsingFn is not None: x = parsingFn(x) @@ -143,20 +144,6 @@ def setOption(varName, parsingFn=None, checkFn=None): # Function to parse integer from string expressed in different formats h2b = lambda x : human2bytes(str(x)) - def iC(minValue, maxValue=sys.maxint): - # Returns function that checks if a given int is in the given half-open interval - assert isinstance(minValue, int) and isinstance(maxValue, int) - return lambda x: minValue <= x < maxValue - - def fC(minValue, maxValue=None): - # Returns function that checks if a given float is in the given half-open interval - assert isinstance(minValue, float) - if maxValue is None: - return lambda x: minValue <= x - else: - assert isinstance(maxValue, float) - return lambda x: minValue <= x < maxValue - def parseJobStore(s): name, rest = Toil.parseLocator(s) if name == 'file': @@ -194,13 +181,10 @@ def parseJobStore(s): #Batch system options setOption("batchSystem") - setOption("scale", float, fC(0.0)) - setOption("mesosMasterAddress") - setOption("parasolCommand") - setOption("parasolMaxBatches", int, iC(1)) + setBatchOptions(self, setOption) setOption("environment", parseSetEnv) - + #Autoscaling options setOption("provisioner") setOption("nodeType") @@ -327,21 +311,7 @@ def _addOptions(addGroupFn, config): addOptionFn = addGroupFn("toil options for specifying the batch system", "Allows the specification of the batch system, and arguments to the batch system/big batch system (see below).") - addOptionFn("--batchSystem", dest="batchSystem", default=None, - help=("The type of batch system to run the job(s) with, currently can be one " - "of singleMachine, parasol, gridEngine, lsf or mesos'. default=%s" % config.batchSystem)) - addOptionFn("--scale", dest="scale", default=None, - help=("A scaling factor to change the value of all submitted tasks's submitted cores. " - "Used in singleMachine batch system. default=%s" % config.scale)) - addOptionFn("--mesosMaster", dest="mesosMasterAddress", default=None, - help=("The host and port of the Mesos master separated by colon. default=%s" % config.mesosMasterAddress)) - addOptionFn("--parasolCommand", dest="parasolCommand", default=None, - help="The name or path of the parasol program. Will be looked up on PATH " - "unless it starts with a slashdefault=%s" % config.parasolCommand) - addOptionFn("--parasolMaxBatches", dest="parasolMaxBatches", default=None, - help="Maximum number of job batches the Parasol batch is allowed to create. One " - "batch is created for jobs with a a unique set of resource requirements. " - "default=%i" % config.parasolMaxBatches) + addBatchOptions(addOptionFn) # #Auto scaling options @@ -765,33 +735,12 @@ def createBatchSystem(config): maxMemory=config.maxMemory, maxDisk=config.maxDisk) - if config.batchSystem == 'parasol': - from toil.batchSystems.parasol import ParasolBatchSystem - batchSystemClass = ParasolBatchSystem - - elif config.batchSystem == 'single_machine' or config.batchSystem == 'singleMachine': - from toil.batchSystems.singleMachine import SingleMachineBatchSystem - batchSystemClass = SingleMachineBatchSystem - - elif config.batchSystem == 'gridengine' or config.batchSystem == 'gridEngine': - from toil.batchSystems.gridengine import GridengineBatchSystem - batchSystemClass = GridengineBatchSystem - - elif config.batchSystem == 'lsf' or config.batchSystem == 'LSF': - from toil.batchSystems.lsf import LSFBatchSystem - batchSystemClass = LSFBatchSystem - - elif config.batchSystem == 'mesos' or config.batchSystem == 'Mesos': - from toil.batchSystems.mesos.batchSystem import MesosBatchSystem - batchSystemClass = MesosBatchSystem - - kwargs['masterAddress'] = config.mesosMasterAddress - - elif config.batchSystem == 'slurm' or config.batchSystem == 'Slurm': - from toil.batchSystems.slurm import SlurmBatchSystem - batchSystemClass = SlurmBatchSystem - - else: + from toil.batchSystems.registry import batchSystemFactoryFor + + try: + factory = batchSystemFactoryFor(config.batchSystem) + batchSystemClass = factory() + except: raise RuntimeError('Unrecognised batch system: %s' % config.batchSystem) if not config.disableCaching and not batchSystemClass.supportsWorkerCleanup(): @@ -1035,6 +984,21 @@ def parseSetEnv(l): return d +def iC(minValue, maxValue=sys.maxint): + # Returns function that checks if a given int is in the given half-open interval + assert isinstance(minValue, int) and isinstance(maxValue, int) + return lambda x: minValue <= x < maxValue + +def fC(minValue, maxValue=None): + # Returns function that checks if a given float is in the given half-open interval + assert isinstance(minValue, float) + if maxValue is None: + return lambda x: minValue <= x + else: + assert isinstance(maxValue, float) + return lambda x: minValue <= x < maxValue + + def cacheDirName(workflowID): """ :return: Name of the cache directory. From 74397a197ffa3ece98f5549aa94da7c2f74373fd Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 17 Nov 2016 09:34:23 +1100 Subject: [PATCH 03/20] Fix underscores --- src/toil/batchSystems/options.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index 987bac7e9d..d0ff3c1293 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -15,7 +15,7 @@ from registry import batchSystemFactoryFor, defaultBatchSystem, uniqueNames -def __parasolOptions(addOptionFn): +def _parasolOptions(addOptionFn): addOptionFn("--parasolCommand", dest="parasolCommand", default=None, help="The name or path of the parasol program. Will be looked up on PATH " "unless it starts with a slashdefault=%s" % 'parasol') @@ -24,25 +24,25 @@ def __parasolOptions(addOptionFn): "batch is created for jobs with a a unique set of resource requirements. " "default=%i" % 1000) -def __singleMachineOptions(addOptionFn): +def _singleMachineOptions(addOptionFn): addOptionFn("--scale", dest="scale", default=None, help=("A scaling factor to change the value of all submitted tasks's submitted cores. " "Used in singleMachine batch system. default=%s" % 1)) -def __mesosOptions(addOptionFn): +def _mesosOptions(addOptionFn): addOptionFn("--mesosMaster", dest="mesosMasterAddress", default=None, help=("The host and port of the Mesos master separated by colon. default=%s" % 'localhost:5050')) -__OPTIONS = [ - __parasolOptions, - __singleMachineOptions, - __mesosOptions +_OPTIONS = [ + _parasolOptions, + _singleMachineOptions, + _mesosOptions ] -__options = list(__OPTIONS) +_options = list(_OPTIONS) def addOptionsDefinition(optionsDefinition): - __options.append(optionsDefinition) + _options.append(optionsDefinition) def setOptions(config, setOption): @@ -59,5 +59,7 @@ def addOptions(addOptionFn): help=("The type of batch system to run the job(s) with, currently can be one " "of %s'. default=%s" % (', '.join(uniqueNames()), defaultBatchSystem()))) - for o in __options: + for o in _options: o(addOptionFn) + + From 63bcf44c3062d579f5581a33d52645a1da338a18 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Fri, 9 Dec 2016 10:32:23 +1100 Subject: [PATCH 04/20] cleanup unnecessary files --- ._.project | Bin 0 -> 4096 bytes ._.pydevproject | Bin 0 -> 4096 bytes .gitignore | 6 ++++++ .project | 17 ----------------- .pydevproject | 11 ----------- 5 files changed, 6 insertions(+), 28 deletions(-) create mode 100644 ._.project create mode 100644 ._.pydevproject delete mode 100644 .project delete mode 100644 .pydevproject diff --git a/._.project b/._.project new file mode 100644 index 0000000000000000000000000000000000000000..d2f4feaaded43baa62a232725aceb32bbb372829 GIT binary patch literal 4096 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDJkFz{^v(m+1nBL)UWIUt(=a103vTK7-O z52Ax%22eRanifVNA1W@IoS&Bh0KGXW AbpQYW literal 0 HcmV?d00001 diff --git a/._.pydevproject b/._.pydevproject new file mode 100644 index 0000000000000000000000000000000000000000..43a97db451db414234b8867f2e5398224f3aa3ed GIT binary patch literal 4096 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDJkFz{^v(m+1nBL)UWIUt(=a103vTK7*Y z5Tb)&22eRanifVNA1W@IoS&Bh0PH9! AcmMzZ literal 0 HcmV?d00001 diff --git a/.gitignore b/.gitignore index 1df5afed00..137b491ba2 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,12 @@ __pycache__ /docs/generated_rst/ /docker/Dockerfile /docker/toil-*.tar.gz +<<<<<<< HEAD /src/toil/version.py .project .pydevproject +======= +.project +.pydevproject +.gitignore +>>>>>>> ca66e03... cleanup unnecessary files diff --git a/.project b/.project deleted file mode 100644 index f68e6d2440..0000000000 --- a/.project +++ /dev/null @@ -1,17 +0,0 @@ - - - toil - - - - - - org.python.pydev.PyDevBuilder - - - - - - org.python.pydev.pythonNature - - diff --git a/.pydevproject b/.pydevproject deleted file mode 100644 index 8e3be93aaf..0000000000 --- a/.pydevproject +++ /dev/null @@ -1,11 +0,0 @@ - - -python 2.7 -python - -/${PROJECT_DIR_NAME}/src - - -/Users/thomas.e/site-packages - - From 1c25313407a2f56ef1416f939ddb66f60d14a0c8 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Wed, 14 Dec 2016 10:46:25 +1100 Subject: [PATCH 05/20] Fix passing of environment variables to slurm jobs --- src/toil/batchSystems/slurm.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 60295753e3..34101a5c64 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -183,10 +183,14 @@ def prepareSbatch(self, cpu, mem, jobID): sbatch_line = ['sbatch', '-Q', '-J', 'toil_job_{}'.format(jobID)] if self.boss.environment: + comma = '' + ex = '--export=' for k, v in self.boss.environment.iteritems(): quoted_value = quote(os.environ[k] if v is None else v) - sbatch_line.append('--export={}={}'.format(k, quoted_value)) - + ex = ex + ('{}{}={}'.format(comma, k, quoted_value)) + comma = ',' + sbatch_line.append(ex) + if mem is not None: # memory passed in is in bytes, but slurm expects megabytes sbatch_line.append('--mem={}'.format(int(mem) / 2 ** 20)) From ed850faa453bc8789aa6896a924754f8edf3674e Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Wed, 14 Dec 2016 11:26:28 +1100 Subject: [PATCH 06/20] slurm batch system handles case where accounting is not active --- src/toil/batchSystems/slurm.py | 59 +++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 34101a5c64..9b90bdbdc9 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -222,9 +222,24 @@ def sbatch(self, sbatch_line): except OSError as e: logger.error("sbatch command failed") raise e - + def getJobExitCode(self, slurmJobID): logger.debug("Getting exit code for slurm job %d", slurmJobID) + + state, rc = self._getJobExitCodeFromSacct(self, slurmJobID) + + if rc == -999: + state, rc = self._getJobDetailsFromScontrol(slurmJobID) + + logger.debug("s job state is %s", state) + # If Job is in a running state, return None to indicate we don't have an update + if state in ('PENDING', 'RUNNING', 'CONFIGURING', 'COMPLETING', 'RESIZING', 'SUSPENDED'): + return None + + return rc + + def _getJobExitCodeFromSacct(self, slurmJobID): + logger.debug("Getting exit code for slurm job %d", slurmJobID) # SLURM job exit codes are obtained by running sacct. args = ['sacct', '-n', # no header @@ -234,20 +249,54 @@ def getJobExitCode(self, slurmJobID): '-S', '1970-01-01'] # override start time limit process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in process.stdout: + + if line == 'SLURM accounting storage is disabled': + return (None, -999) + values = line.strip().split('|') if len(values) < 2: continue state, exitcode = values logger.debug("sacct job state is %s", state) # If Job is in a running state, return None to indicate we don't have an update - if state in ('PENDING', 'RUNNING', 'CONFIGURING', 'COMPLETING', 'RESIZING', 'SUSPENDED'): - return None status, _ = exitcode.split(':') logger.debug("sacct exit code is %s, returning status %s", exitcode, status) - return int(status) + return (state, int(status)) + logger.debug("Did not find exit code for job in sacct output") - return None + return (None, None) + + def _getJobDetailsFromScontrol(self, slurmJobID): + args = ['scontrol', + 'show', + 'job', + str(slurmJobID)] + process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + job = dict() + for line in process.stdout: + values = line.strip().split() + + if len(values)>0 and values[0] == 'slurm_load_jobs': + return None + + for v in values: + bits = v.split('=') + job[bits[0]] = bits[1] + state = job['JobState'] + try: + exitcode = job['ExitCode'] + if exitcode is not None: + status, _ = exitcode.split(':') + logger.debug("scontrol exit code is %s, returning status %s", exitcode, status) + rc = int(status) + else: + rc = None + except KeyError: + rc = None + + return (state, rc) class SlurmBatchSystem(BatchSystemSupport): """ From 2c6dcf1380fa245db21ef25601b2842308951e22 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Fri, 9 Dec 2016 10:35:48 +1100 Subject: [PATCH 07/20] Clean up --- ._.project | Bin 4096 -> 0 bytes ._.pydevproject | Bin 4096 -> 0 bytes .gitignore | 4 ++++ 3 files changed, 4 insertions(+) delete mode 100644 ._.project delete mode 100644 ._.pydevproject diff --git a/._.project b/._.project deleted file mode 100644 index d2f4feaaded43baa62a232725aceb32bbb372829..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4096 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDJkFz{^v(m+1nBL)UWIUt(=a103vTK7-O z52Ax%22eRanifVNA1W@IoS&Bh0KGXW AbpQYW diff --git a/._.pydevproject b/._.pydevproject deleted file mode 100644 index 43a97db451db414234b8867f2e5398224f3aa3ed..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4096 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDJkFz{^v(m+1nBL)UWIUt(=a103vTK7*Y z5Tb)&22eRanifVNA1W@IoS&Bh0PH9! AcmMzZ diff --git a/.gitignore b/.gitignore index 137b491ba2..0d45a7f417 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,8 @@ __pycache__ .project .pydevproject .gitignore +<<<<<<< HEAD >>>>>>> ca66e03... cleanup unnecessary files +======= +._* +>>>>>>> 27fd6a5... Clean up From 118d2f2c4b8dec409a3cf0aeaba09dab4ef5cd97 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Fri, 9 Dec 2016 10:37:27 +1100 Subject: [PATCH 08/20] Clean up From bbb0cda0c0efa2c8a424c867e1bf7cb6de6238dd Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Tue, 13 Dec 2016 16:25:15 +1100 Subject: [PATCH 09/20] change AbstractBatchSystem.setOptions to a classmethod --- src/toil/batchSystems/abstractBatchSystem.py | 7 ++++--- src/toil/batchSystems/mesos/batchSystem.py | 4 ++-- src/toil/batchSystems/options.py | 1 + src/toil/batchSystems/parasol.py | 4 ++-- src/toil/batchSystems/singleMachine.py | 4 ++-- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index 98fe133060..c0dabf553a 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -184,10 +184,11 @@ def getRescueBatchJobFrequency(cls): raise NotImplementedError() - @staticmethod - def setOptions(setOption): + @classmethod + def setOptions(cls, setOption): """ - Process command line or configuration options relevant to this batch system + Process command line or configuration options relevant to this batch system. + The :param common.Config: the Config object """ diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index c6348bd9ef..0dc973bc67 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -572,8 +572,8 @@ def executorLost(self, driver, executorId, slaveId, status): log.warning("Executor '%s' lost.", executorId) - @staticmethod - def setOptions(config): + @classmethod + def setOptions(cl, config): super.setOptions(config) config.setOptions.setOption("mesosMasterAddress", None, None, 'localhost:5050') diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index d0ff3c1293..3db918c2ba 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -33,6 +33,7 @@ def _mesosOptions(addOptionFn): addOptionFn("--mesosMaster", dest="mesosMasterAddress", default=None, help=("The host and port of the Mesos master separated by colon. default=%s" % 'localhost:5050')) +# Built in batch systems that have options _OPTIONS = [ _parasolOptions, _singleMachineOptions, diff --git a/src/toil/batchSystems/parasol.py b/src/toil/batchSystems/parasol.py index 39e3b98cf7..ef61645180 100644 --- a/src/toil/batchSystems/parasol.py +++ b/src/toil/batchSystems/parasol.py @@ -370,8 +370,8 @@ def shutdown(self): os.rmdir(self.parasolResultsDir) - @staticmethod - def setOptions(setOption): + @classmethod + def setOptions(cls, setOption): from toil.common import iC setOption("parasolCommand", None, None, 'parasol') setOption("parasolMaxBatches", int, iC(1), 10000) diff --git a/src/toil/batchSystems/singleMachine.py b/src/toil/batchSystems/singleMachine.py index 056ab08802..3a471011b5 100644 --- a/src/toil/batchSystems/singleMachine.py +++ b/src/toil/batchSystems/singleMachine.py @@ -291,8 +291,8 @@ def getRescueBatchJobFrequency(cls): """ return 5400 - @staticmethod - def setOptions(setOption): + @classmethod + def setOptions(cls, setOption): setOption("scale", default=1) class Info(object): From 99b4c87604155385b56c481ac67d546749ca2436 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 15 Dec 2016 15:19:48 +1100 Subject: [PATCH 10/20] better comment in AbstractBatchSystem.setOptions --- src/toil/batchSystems/abstractBatchSystem.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index c0dabf553a..87ec98a1bf 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -190,7 +190,8 @@ def setOptions(cls, setOption): Process command line or configuration options relevant to this batch system. The - :param common.Config: the Config object + :param setOption: A function with signature setOption(varName, parsingFn=None, checkFn=None, default=None) + used to update run configuration """ pass From 1af94db0546c2227f98c8d93300f4cea244c3cc2 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 15 Dec 2016 16:12:51 +1100 Subject: [PATCH 11/20] Cleanup prior to submitting PR --- .gitignore | 12 ------ src/toil/batchSystems/slurm.py | 69 ++++------------------------------ 2 files changed, 8 insertions(+), 73 deletions(-) diff --git a/.gitignore b/.gitignore index 0d45a7f417..3645a572a8 100644 --- a/.gitignore +++ b/.gitignore @@ -13,16 +13,4 @@ __pycache__ /docs/generated_rst/ /docker/Dockerfile /docker/toil-*.tar.gz -<<<<<<< HEAD /src/toil/version.py -.project -.pydevproject -======= -.project -.pydevproject -.gitignore -<<<<<<< HEAD ->>>>>>> ca66e03... cleanup unnecessary files -======= -._* ->>>>>>> 27fd6a5... Clean up diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 9b90bdbdc9..f58c64e2dc 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -183,14 +183,10 @@ def prepareSbatch(self, cpu, mem, jobID): sbatch_line = ['sbatch', '-Q', '-J', 'toil_job_{}'.format(jobID)] if self.boss.environment: - comma = '' - ex = '--export=' for k, v in self.boss.environment.iteritems(): quoted_value = quote(os.environ[k] if v is None else v) - ex = ex + ('{}{}={}'.format(comma, k, quoted_value)) - comma = ',' - sbatch_line.append(ex) - + sbatch_line.append('--export={}={}'.format(k, quoted_value)) + if mem is not None: # memory passed in is in bytes, but slurm expects megabytes sbatch_line.append('--mem={}'.format(int(mem) / 2 ** 20)) @@ -222,23 +218,8 @@ def sbatch(self, sbatch_line): except OSError as e: logger.error("sbatch command failed") raise e - - def getJobExitCode(self, slurmJobID): - logger.debug("Getting exit code for slurm job %d", slurmJobID) - - state, rc = self._getJobExitCodeFromSacct(self, slurmJobID) - - if rc == -999: - state, rc = self._getJobDetailsFromScontrol(slurmJobID) - - logger.debug("s job state is %s", state) - # If Job is in a running state, return None to indicate we don't have an update - if state in ('PENDING', 'RUNNING', 'CONFIGURING', 'COMPLETING', 'RESIZING', 'SUSPENDED'): - return None - return rc - - def _getJobExitCodeFromSacct(self, slurmJobID): + def getJobExitCode(self, slurmJobID): logger.debug("Getting exit code for slurm job %d", slurmJobID) # SLURM job exit codes are obtained by running sacct. args = ['sacct', @@ -249,54 +230,20 @@ def _getJobExitCodeFromSacct(self, slurmJobID): '-S', '1970-01-01'] # override start time limit process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in process.stdout: - - if line == 'SLURM accounting storage is disabled': - return (None, -999) - values = line.strip().split('|') if len(values) < 2: continue state, exitcode = values logger.debug("sacct job state is %s", state) # If Job is in a running state, return None to indicate we don't have an update + if state in ('PENDING', 'RUNNING', 'CONFIGURING', 'COMPLETING', 'RESIZING', 'SUSPENDED'): + return None status, _ = exitcode.split(':') logger.debug("sacct exit code is %s, returning status %s", exitcode, status) - return (state, int(status)) - + return int(status) logger.debug("Did not find exit code for job in sacct output") - return (None, None) - - def _getJobDetailsFromScontrol(self, slurmJobID): - args = ['scontrol', - 'show', - 'job', - str(slurmJobID)] - process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + return None - job = dict() - for line in process.stdout: - values = line.strip().split() - - if len(values)>0 and values[0] == 'slurm_load_jobs': - return None - - for v in values: - bits = v.split('=') - job[bits[0]] = bits[1] - - state = job['JobState'] - try: - exitcode = job['ExitCode'] - if exitcode is not None: - status, _ = exitcode.split(':') - logger.debug("scontrol exit code is %s, returning status %s", exitcode, status) - rc = int(status) - else: - rc = None - except KeyError: - rc = None - - return (state, rc) class SlurmBatchSystem(BatchSystemSupport): """ @@ -419,4 +366,4 @@ def obtainSystemConstants(): max_mem = max(max_mem, MemoryString(mem + 'M')) if max_cpu == 0 or max_mem.byteVal() == 0: RuntimeError('sinfo did not return memory or cpu info') - return max_cpu, max_mem + return max_cpu, max_mem \ No newline at end of file From d5da2fd3e321091f3fbcdae2268d5b88c00e08f0 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 15 Dec 2016 16:18:29 +1100 Subject: [PATCH 12/20] Cleanup prior to submitting PR --- src/toil/batchSystems/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index f58c64e2dc..60295753e3 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -366,4 +366,4 @@ def obtainSystemConstants(): max_mem = max(max_mem, MemoryString(mem + 'M')) if max_cpu == 0 or max_mem.byteVal() == 0: RuntimeError('sinfo did not return memory or cpu info') - return max_cpu, max_mem \ No newline at end of file + return max_cpu, max_mem From 8247f406213226737d5da4040d18a371e9326b00 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Mon, 19 Dec 2016 08:59:50 +1100 Subject: [PATCH 13/20] Fix setOptions method in mesos.batchSystem --- src/toil/batchSystems/mesos/batchSystem.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index 0dc973bc67..bea06abe43 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -573,9 +573,8 @@ def executorLost(self, driver, executorId, slaveId, status): @classmethod - def setOptions(cl, config): - super.setOptions(config) - config.setOptions.setOption("mesosMasterAddress", None, None, 'localhost:5050') + def setOptions(cl, setOption): + setOption("mesosMasterAddress", None, None, 'localhost:5050') def toMiB(n): From 6ae39f1fd6dc14e2c5f5cf3c2d00310129b203a3 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Wed, 21 Dec 2016 17:15:54 +1100 Subject: [PATCH 14/20] Fix incompatibility between mesos batch system constructor and dynamic batch systems; add appropriate default options to the configs when testing --- src/toil/batchSystems/mesos/batchSystem.py | 4 ++-- src/toil/test/batchSystems/batchSystemTest.py | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index bea06abe43..bc0fa8a357 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -71,7 +71,7 @@ def __init__(self, nodeAddress, slaveId, nodeInfo, lastSeen): self.nodeInfo = nodeInfo self.lastSeen = lastSeen - def __init__(self, config, maxCores, maxMemory, maxDisk, masterAddress): + def __init__(self, config, maxCores, maxMemory, maxDisk): super(MesosBatchSystem, self).__init__(config, maxCores, maxMemory, maxDisk) # The hot-deployed resource representing the user script. Will be passed along in every @@ -86,7 +86,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk, masterAddress): self.jobQueues = defaultdict(list) # Address of the Mesos master in the form host:port where host can be an IP or a hostname - self.masterAddress = masterAddress + self.masterAddress = config.masterAddress # Written to when Mesos kills tasks, as directed by Toil self.killedJobIds = set() diff --git a/src/toil/test/batchSystems/batchSystemTest.py b/src/toil/test/batchSystems/batchSystemTest.py index 5f9c5e8a59..ceabbb7e82 100644 --- a/src/toil/test/batchSystems/batchSystemTest.py +++ b/src/toil/test/batchSystems/batchSystemTest.py @@ -93,6 +93,13 @@ def createConfig(cls): config.workflowID = str(uuid4()) return config + @classmethod + def setBatchSystemOptions(cls): + """ + Set the appropriate default options for this batch system + """ + pass + def _createConfig(self): """ Returns a dummy config for the batch system tests. We need a workflowID to be set up @@ -317,9 +324,9 @@ def supportsWallTime(self): def createBatchSystem(self): from toil.batchSystems.mesos.batchSystem import MesosBatchSystem self._startMesos(numCores) + self.config.masterAddress='127.0.0.1:5050' return MesosBatchSystem(config=self.config, - maxCores=numCores, maxMemory=1e9, maxDisk=1001, - masterAddress='127.0.0.1:5050') + maxCores=numCores, maxMemory=1e9, maxDisk=1001) def tearDown(self): self._stopMesos() @@ -335,10 +342,10 @@ def supportsWallTime(self): return True def createBatchSystem(self): + self.config.scale = 1 return SingleMachineBatchSystem(config=self.config, maxCores=numCores, maxMemory=1e9, maxDisk=2001) - - + class MaxCoresSingleMachineBatchSystemTest(ToilTest): """ This test ensures that single machine batch system doesn't exceed the configured number of @@ -530,6 +537,9 @@ def _createConfig(self): def createBatchSystem(self): memory = int(3e9) self._startParasol(numCores=numCores, memory=memory) + self.config.parasolCommand = 'parasol' + self.config.parasolMaxBatches = 10000 + return ParasolBatchSystem(config=self.config, maxCores=numCores, maxMemory=memory, From e3b81bd80213a60aac8bf8aca517b8b7895f6012 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Sun, 25 Dec 2016 16:22:16 +1100 Subject: [PATCH 15/20] Config now inserts default options for all built in batch systems --- src/toil/batchSystems/options.py | 17 +++++++++++++++++ src/toil/common.py | 4 ++-- src/toil/test/batchSystems/batchSystemTest.py | 11 ----------- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index 3db918c2ba..315909d5d1 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -63,4 +63,21 @@ def addOptions(addOptionFn): for o in _options: o(addOptionFn) +def setDefaultOptions(config): + ''' + Set default options for builtin batch systems. This is required if a Config + object is not constructed from an Options object. + ''' + + config.batchSystem = "singleMachine" + config.environment = {} + # single machine + config.scale = 1 + + # mesos + config.masterAddress = 'localhost:5050' + + # parasol + config.parasolCommand = 'parasol' + config.parasolMaxBatches = 10000 diff --git a/src/toil/common.py b/src/toil/common.py index af8ddf6e96..776917889e 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -32,6 +32,7 @@ from toil.realtimeLogger import RealtimeLogger from toil.batchSystems.options import setOptions as setBatchOptions from toil.batchSystems.options import addOptions as addBatchOptions +from toil.batchSystems.options import setDefaultOptions as setDefaultBatchOptions logger = logging.getLogger(__name__) @@ -67,8 +68,7 @@ def __init__(self): self.restart = False #Batch system common options - self.batchSystem = "singleMachine" - self.environment = {} + setDefaultBatchOptions(self) #Autoscaling options self.provisioner = None diff --git a/src/toil/test/batchSystems/batchSystemTest.py b/src/toil/test/batchSystems/batchSystemTest.py index ceabbb7e82..fbf787b432 100644 --- a/src/toil/test/batchSystems/batchSystemTest.py +++ b/src/toil/test/batchSystems/batchSystemTest.py @@ -93,13 +93,6 @@ def createConfig(cls): config.workflowID = str(uuid4()) return config - @classmethod - def setBatchSystemOptions(cls): - """ - Set the appropriate default options for this batch system - """ - pass - def _createConfig(self): """ Returns a dummy config for the batch system tests. We need a workflowID to be set up @@ -324,7 +317,6 @@ def supportsWallTime(self): def createBatchSystem(self): from toil.batchSystems.mesos.batchSystem import MesosBatchSystem self._startMesos(numCores) - self.config.masterAddress='127.0.0.1:5050' return MesosBatchSystem(config=self.config, maxCores=numCores, maxMemory=1e9, maxDisk=1001) @@ -342,7 +334,6 @@ def supportsWallTime(self): return True def createBatchSystem(self): - self.config.scale = 1 return SingleMachineBatchSystem(config=self.config, maxCores=numCores, maxMemory=1e9, maxDisk=2001) @@ -537,8 +528,6 @@ def _createConfig(self): def createBatchSystem(self): memory = int(3e9) self._startParasol(numCores=numCores, memory=memory) - self.config.parasolCommand = 'parasol' - self.config.parasolMaxBatches = 10000 return ParasolBatchSystem(config=self.config, maxCores=numCores, From 6f85aae3e71e5592d220043e98e09e2b2204b73f Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Fri, 9 Jun 2017 14:14:14 +1000 Subject: [PATCH 16/20] Fix adding options for built in batch systems --- src/toil/batchSystems/options.py | 29 +++++++++++--------- src/toil/common.py | 45 +++++++------------------------- 2 files changed, 27 insertions(+), 47 deletions(-) diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index 315909d5d1..4ebfd5a88f 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -23,7 +23,7 @@ def _parasolOptions(addOptionFn): help="Maximum number of job batches the Parasol batch is allowed to create. One " "batch is created for jobs with a a unique set of resource requirements. " "default=%i" % 1000) - + def _singleMachineOptions(addOptionFn): addOptionFn("--scale", dest="scale", default=None, help=("A scaling factor to change the value of all submitted tasks's submitted cores. " @@ -44,40 +44,45 @@ def _mesosOptions(addOptionFn): def addOptionsDefinition(optionsDefinition): _options.append(optionsDefinition) - - + + def setOptions(config, setOption): batchSystem = config.batchSystem - + factory = batchSystemFactoryFor(batchSystem) batchSystem = factory() - + batchSystem.setOptions(setOption) - + def addOptions(addOptionFn): - + addOptionFn("--batchSystem", dest="batchSystem", default=defaultBatchSystem(), help=("The type of batch system to run the job(s) with, currently can be one " "of %s'. default=%s" % (', '.join(uniqueNames()), defaultBatchSystem()))) - + addOptionFn("--disableHotDeployment", dest="disableHotDeployment", action='store_true', default=None, + help=("Should hot-deployment of the user script be deactivated? If True, the user " + "script/package should be present at the same location on all workers. " + "default=false")) + for o in _options: o(addOptionFn) def setDefaultOptions(config): ''' - Set default options for builtin batch systems. This is required if a Config + Set default options for builtin batch systems. This is required if a Config object is not constructed from an Options object. ''' - + config.batchSystem = "singleMachine" + config.disableHotDeployment = False config.environment = {} # single machine config.scale = 1 - + # mesos config.masterAddress = 'localhost:5050' - + # parasol config.parasolCommand = 'parasol' config.parasolMaxBatches = 10000 diff --git a/src/toil/common.py b/src/toil/common.py index d1b569961b..8f041382c8 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -71,13 +71,7 @@ def __init__(self): self.restart = False #Batch system options - self.batchSystem = "singleMachine" - self.disableHotDeployment = False - self.scale = 1 - self.mesosMasterAddress = 'localhost:5050' - self.parasolCommand = "parasol" - self.parasolMaxBatches = 10000 - self.environment = {} + setDefaultBatchOptions(self) #Autoscaling options self.provisioner = None @@ -93,9 +87,9 @@ def __init__(self): self.betaInertia = 1.2 self.scaleInterval = 30 self.preemptableCompensation = 0.0 - + # Parameters to limit service jobs, so preventing deadlock scheduling scenarios - self.maxPreemptableServiceJobs = sys.maxint + self.maxPreemptableServiceJobs = sys.maxint self.maxServiceJobs = sys.maxint self.deadlockWait = 60 # Wait one minute before declaring a deadlock @@ -140,7 +134,7 @@ def setOption(varName, parsingFn=None, checkFn=None, default=None): x = getattr(options, varName, None) if x is None: x = default - + if x is not None: if parsingFn is not None: x = parsingFn(x) @@ -199,7 +193,7 @@ def parseJobStore(s): setOption("parasolMaxBatches", int, iC(1)) setOption("environment", parseSetEnv) - + #Autoscaling options setOption("provisioner") setOption("nodeType") @@ -218,7 +212,7 @@ def parseJobStore(s): require(0.0 <= self.preemptableCompensation <= 1.0, '--preemptableCompensation (%f) must be >= 0.0 and <= 1.0', self.preemptableCompensation) - + # Parameters to limit service jobs / detect deadlocks setOption("maxServiceJobs", int) setOption("maxPreemptableServiceJobs", int) @@ -290,7 +284,6 @@ def _addOptions(addGroupFn, config): "run should be placed. Temp files and folders will be placed in a directory " "toil- within workDir (The workflowID is generated by Toil and " "will be reported in the workflow logs. Default is determined by the " - "user-defined environmental variable TOIL_TEMPDIR, or the environment " "variables (TMPDIR, TEMP, TMP) via mkdtemp. This directory needs to exist on " "all machines running jobs.") addOptionFn("--stats", dest="stats", action="store_true", default=None, @@ -329,25 +322,7 @@ def _addOptions(addGroupFn, config): addOptionFn = addGroupFn("toil options for specifying the batch system", "Allows the specification of the batch system, and arguments to the batch system/big batch system (see below).") - addOptionFn("--batchSystem", dest="batchSystem", default=None, - help=("The type of batch system to run the job(s) with, currently can be one " - "of singleMachine, parasol, gridEngine, lsf or mesos'. default=%s" % config.batchSystem)) - addOptionFn("--disableHotDeployment", dest="disableHotDeployment", action='store_true', default=None, - help=("Should hot-deployment of the user script be deactivated? If True, the user " - "script/package should be present at the same location on all workers. " - "default=%s" % config.disableHotDeployment)) - addOptionFn("--scale", dest="scale", default=None, - help=("A scaling factor to change the value of all submitted tasks's submitted cores. " - "Used in singleMachine batch system. default=%s" % config.scale)) - addOptionFn("--mesosMaster", dest="mesosMasterAddress", default=None, - help=("The host and port of the Mesos master separated by colon. default=%s" % config.mesosMasterAddress)) - addOptionFn("--parasolCommand", dest="parasolCommand", default=None, - help="The name or path of the parasol program. Will be looked up on PATH " - "unless it starts with a slashdefault=%s" % config.parasolCommand) - addOptionFn("--parasolMaxBatches", dest="parasolMaxBatches", default=None, - help="Maximum number of job batches the Parasol batch is allowed to create. One " - "batch is created for jobs with a a unique set of resource requirements. " - "default=%i" % config.parasolMaxBatches) + addBatchOptions(addOptionFn) # #Auto scaling options @@ -411,8 +386,8 @@ def _addOptionFn(*name, **kwargs): "missing preemptable nodes with a non-preemptable one. A value of 1.0 " "replaces every missing pre-emptable node with a non-preemptable one." % config.preemptableCompensation)) - - # + + # # Parameters to limit service jobs / detect service deadlocks # addOptionFn = addGroupFn("toil options for limiting the number of service jobs and detecting service deadlocks", @@ -781,7 +756,7 @@ def createBatchSystem(config): maxDisk=config.maxDisk) from toil.batchSystems.registry import batchSystemFactoryFor - + try: factory = batchSystemFactoryFor(config.batchSystem) batchSystemClass = factory() From ee3016760b13050f4fd252e7bb47f5251b45e14c Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 29 Jun 2017 09:45:35 +1000 Subject: [PATCH 17/20] Putatitive fix for hanging test --- src/toil/batchSystems/mesos/batchSystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index ec9612d883..767371eecb 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -90,7 +90,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): self.jobQueues = JobQueue() # Address of the Mesos master in the form host:port where host can be an IP or a hostname - self.masterAddress = config.masterAddress + self.masterAddress = config.mesosMasterAddress # Written to when Mesos kills tasks, as directed by Toil self.killedJobIds = set() From 561843a5331927108a7b9fbfa15c4fa1ea408f51 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Fri, 30 Jun 2017 14:35:23 +1000 Subject: [PATCH 18/20] Fix error in mesos test --- src/toil/batchSystems/mesos/batchSystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index 9af9dad080..fc38911284 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -90,7 +90,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): self.jobQueues = JobQueue() # Address of the Mesos master in the form host:port where host can be an IP or a hostname - self.masterAddress = config.mesosMasterAddress + self.masterAddress = config.masterAddress # Written to when Mesos kills tasks, as directed by Toil self.killedJobIds = set() From be5a368798e06102571b2fa53beb782d63aecec3 Mon Sep 17 00:00:00 2001 From: Edwin Jacox Date: Wed, 5 Jul 2017 13:18:33 +0100 Subject: [PATCH 19/20] Update options.py --- src/toil/batchSystems/options.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index 4ebfd5a88f..0c960e2793 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -81,7 +81,7 @@ def setDefaultOptions(config): config.scale = 1 # mesos - config.masterAddress = 'localhost:5050' + config.mesosMasterAddress = 'localhost:5050' # parasol config.parasolCommand = 'parasol' From b04c8a5c62cfa2da66a9c7769debdecc309a26b1 Mon Sep 17 00:00:00 2001 From: Edwin Jacox Date: Wed, 5 Jul 2017 13:19:54 +0100 Subject: [PATCH 20/20] Update batchSystem.py --- src/toil/batchSystems/mesos/batchSystem.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index fc38911284..523dfbc0f1 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -90,7 +90,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): self.jobQueues = JobQueue() # Address of the Mesos master in the form host:port where host can be an IP or a hostname - self.masterAddress = config.masterAddress + self.mesosMasterAddress = config.mesosMasterAddress # Written to when Mesos kills tasks, as directed by Toil self.killedJobIds = set() @@ -272,7 +272,7 @@ def _startDriver(self): framework.principal = framework.name self.driver = mesos.native.MesosSchedulerDriver(self, framework, - self._resolveAddress(self.masterAddress), + self._resolveAddress(self.mesosMasterAddress), True) # enable implicit acknowledgements assert self.driver.start() == mesos_pb2.DRIVER_RUNNING