diff --git a/reframe.py b/reframe.py index d7556f0403..f7afde8b96 100755 --- a/reframe.py +++ b/reframe.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import sys import reframe.frontend.cli as cli if __name__ == '__main__': diff --git a/reframe/core/environments.py b/reframe/core/environments.py index 77fa29dd30..2f5598ad5a 100644 --- a/reframe/core/environments.py +++ b/reframe/core/environments.py @@ -3,8 +3,7 @@ import subprocess import reframe.utility.os as os_ext -from reframe.core.exceptions import ReframeError, CommandError, \ - CompilationError +from reframe.core.exceptions import ReframeError, CommandError, CompilationError from reframe.core.fields import * from reframe.core.modules import * @@ -40,18 +39,20 @@ def set_variable(self, name, value): def load(self): """Load environment.""" - for k, v in self.variables.items(): - if k in os.environ: - self._saved_variables[k] = os.environ[k] - os.environ[k] = v - # conlicted module list must be filled at the time of load + # conflicted module list must be filled at the time of load for m in self.modules: if module_present(m): self._preloaded.add(m) self._conflicted += module_force_load(m) + for k, v in self.variables.items(): + if k in os.environ: + self._saved_variables[k] = os.environ[k] + + os.environ[k] = os.path.expandvars(v) + self.loaded = True @@ -94,15 +95,15 @@ def emit_load_instructions(self, builder): # FIXME: Does not correspond to the actual process in unload() def emit_unload_instructions(self, builder): """Emit shell instructions for loading this environment.""" + for k, v in self.variables.items(): + builder.unset_variable(k) + for m in self.modules: builder.verbatim('module unload %s' % m) for m in self._conflicted: builder.verbatim('module load %s' % m) - for k, v in self.variables.items(): - builder.unset_variable(k) - def __eq__(self, other): return \ diff --git a/reframe/core/exceptions.py b/reframe/core/exceptions.py index 07db923e81..1b8bd7ad80 100644 --- a/reframe/core/exceptions.py +++ b/reframe/core/exceptions.py @@ -13,7 +13,7 @@ def __str__(self): return self.message -class RegressionFatalError(ReframeError): +class ReframeFatalError(ReframeError): pass @@ -31,18 +31,19 @@ class ConfigurationError(ReframeError): class CommandError(ReframeError): def __init__(self, command, stdout, stderr, exitcode, timeout=0): + if not isinstance(command, str): + self.command = ' '.join(command) + else: + self.command = command + if timeout: super().__init__( - "Command '%s' timed out after %d s" % (command, timeout)) + "Command `%s' timed out after %d s" % (self.command, timeout)) else: super().__init__( - "Command '%s' failed with exit code: %d" % (command, exitcode)) - - if not isinstance(command, str): - self.command = ' '.join(command) - else: - self.command = command + "Command `%s' failed with exit code: %d" % \ + (self.command, exitcode)) self.stdout = stdout self.stderr = stderr @@ -51,10 +52,12 @@ def __init__(self, command, stdout, stderr, exitcode, timeout=0): def __str__(self): - return '{command : "%s", stdout : "%s", stderr : "%s", ' \ - 'exitcode : %s, timeout : %d }' % \ - (self.command, self.stdout, self.stderr, - self.exitcode, self.timeout) + ret = '\n' + super().__str__() + \ + "\n=== STDOUT ===\n" + \ + self.stdout + \ + "\n=== STDERR ===\n" + \ + self.stderr + return ret class CompilationError(CommandError): diff --git a/reframe/core/logging.py b/reframe/core/logging.py new file mode 100644 index 0000000000..d3a0ab1272 --- /dev/null +++ b/reframe/core/logging.py @@ -0,0 +1,256 @@ +import logging +import os +import logging.handlers +import sys +import shutil + +from datetime import datetime + +from reframe.settings import settings +from reframe.core.exceptions import ConfigurationError, ReframeError + +# Reframe's log levels +CRITICAL = 50 +ERROR = 40 +WARNING = 30 +INFO = 20 +VERBOSE = 19 +DEBUG = 10 +NOTSET = 0 + + +_log_level_names = { + CRITICAL : 'critical', + ERROR : 'error', + WARNING : 'warning', + INFO : 'info', + VERBOSE : 'verbose', + DEBUG : 'debug', + NOTSET : 'undefined' +} + +_log_level_values = { + 'critical' : CRITICAL, + 'error' : ERROR, + 'warning' : WARNING, + 'info' : INFO, + 'verbose' : VERBOSE, + 'debug' : DEBUG, + 'undefined' : NOTSET, + 'notset' : NOTSET +} + +def _check_level(level): + if isinstance(level, int): + ret = level + elif isinstance(level, str): + norm_level = level.lower() + if norm_level not in _log_level_values: + raise ReframeError('logger level %s not available' % level) + else: + ret = _log_level_values[norm_level] + else: + raise TypeError('logger level %s not an int or a valid string' % level) + + return ret + + +# Redefine handlers so as to use our levels + +class Handler(logging.Handler): + def setLevel(self, level): + self.level = _check_level(level) + + +class StreamHandler(Handler, logging.StreamHandler): + pass + + +class RotatingFileHandler(Handler, logging.handlers.RotatingFileHandler): + pass + + +class FileHandler(Handler, logging.FileHandler): + pass + + +class NullHandler(Handler, logging.NullHandler): + pass + + +def load_from_dict(logging_config): + if not isinstance(logging_config, dict): + raise ConfigurationError('logging configuration is not a dict') + + level = logging_config.get('level', 'info').lower() + handlers_dict = logging_config.get('handlers', None) + + # if not handlers_dict: + # raise ConfigurationError('no entry for handlers was found') + + logger = Logger('reframe') + logger.setLevel(_log_level_values[level]) + + for handler in _extract_handlers(handlers_dict): + logger.addHandler(handler) + + return logger + + +def _extract_handlers(handlers_dict): + handlers = [] + if not handlers_dict: + raise ConfigurationError('no handlers are defined for logger') + + for filename, handler_config in handlers_dict.items(): + if not isinstance(handler_config, dict): + raise ConfigurationError( + 'handler %s is not a dictionary' % filename + ) + + level = handler_config.get('level', 'debug').lower() + fmt = handler_config.get('format', '%(message)s') + datefmt = handler_config.get('datefmt', '%FT%T') + append = handler_config.get('append', False) + timestamp = handler_config.get('timestamp', None) + + if filename == '&1': + hdlr = StreamHandler(stream=sys.stdout) + elif filename == '&2': + hdlr = StreamHandler(stream=sys.stderr) + else: + if timestamp: + basename, ext = os.path.splitext(filename) + filename = '%s_%s%s' % ( + basename, datetime.now().strftime(timestamp), ext + ) + + hdlr = RotatingFileHandler( + filename, mode='a+' if append else 'w+' + ) + + hdlr.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt)) + hdlr.setLevel(level) + handlers.append(hdlr) + + return handlers + + +class Logger(logging.Logger): + def __init__(self, name, level=logging.NOTSET): + # We will set the logger level ourselves so as to bypass the base class' + # check + super().__init__(name, logging.NOTSET) + self.level = _check_level(level) + self.check = None + + + def setLevel(self, level): + self.level = _check_level(level) + + + def makeRecord(self, name, level, fn, lno, msg, args, exc_info, + func=None, extra=None, sinfo=None): + # Setup dynamic fields of the check + if self.check and self.check.job: + extra['check_jobid'] = self.check.job.jobid + + record = super().makeRecord(name, level, fn, lno, msg, args, exc_info, + func, extra, sinfo) + try: + # Fill in our name for the record + record.levelname = _log_level_names[level] + except KeyError: + # Go with the default level name of Python logging + pass + + return record + + + # Override all the convenience logging functions, because we want to make + # sure that they map to our level definitions + + def critical(self, msg, *args, **kwargs): + return self.log(CRITICAL, msg, *args, **kwargs) + + + def error(self, msg, *args, **kwargs): + return self.log(ERROR, msg, *args, **kwargs) + + + def warning(self, msg, *args, **kwargs): + return self.log(WARNING, msg, *args, **kwargs) + + + def info(self, msg, *args, **kwargs): + return self.log(INFO, msg, *args, **kwargs) + + + def verbose(self, message, *args, **kwargs): + self.log(VERBOSE, message, *args, **kwargs) + + + def debug(self, message, *args, **kwargs): + self.log(DEBUG, message, *args, **kwargs) + + +class LoggerAdapter(logging.LoggerAdapter): + def __init__(self, logger = None, check = None): + super().__init__( + logger, + { + 'check_name' : check.name if check else 'reframe', + 'check_jobid' : '-1' + } + ) + if self.logger: + self.logger.check = check + + + def setLevel(self, level): + if self.logger: + super().setLevel(level) + + + # Override log() function to treat `None` loggers + def log(self, level, msg, *args, **kwargs): + if self.logger: + super().log(level, msg, *args, **kwargs) + + + def verbose(self, message, *args, **kwargs): + self.log(VERBOSE, message, *args, **kwargs) + + +# A logger that doesn't log anything +null_logger = LoggerAdapter() + +_logger = None +_frontend_logger = null_logger + +def configure_logging(config): + global _logger + global _frontend_logger + + if config == None: + _logger = None + _frontend_logger = null_logger + return + + _logger = load_from_dict(config) + _frontend_logger = LoggerAdapter(_logger) + + +def save_log_files(dest): + os.makedirs(dest, exist_ok=True) + for hdlr in _logger.handlers: + if isinstance(hdlr, logging.FileHandler): + shutil.copy(hdlr.baseFilename, dest, follow_symlinks=True) + +def getlogger(logger_kind, *args, **kwargs): + if logger_kind == 'frontend': + return _frontend_logger + elif logger_kind == 'check': + return LoggerAdapter(_logger, *args, **kwargs) + else: + raise ReframeError('unknown kind of logger: %s' % logger_kind) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index f7809d4e95..c718a430e5 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -5,26 +5,27 @@ import copy import glob import os -import logging.config import shutil import reframe +import reframe.core.logging as logging import reframe.settings as settings import reframe.utility.os as os_ext from reframe.core.environments import Environment -from reframe.core.exceptions import ReframeError +from reframe.core.exceptions import ReframeFatalError from reframe.core.fields import * from reframe.core.launchers import * +from reframe.core.logging import getlogger, LoggerAdapter, null_logger from reframe.core.schedulers import * from reframe.core.shell import BashScriptBuilder from reframe.core.systems import System, SystemPartition from reframe.frontend.resources import ResourcesManager -class RegressionTest: +class RegressionTest(object): """Base class for regression checks providing the implementation of the - different phases the regression goes through""" + different phases the regression goes through.""" name = AlphanumericField('name') valid_prog_environs = TypedListField('valid_prog_environs', str) @@ -52,14 +53,15 @@ class RegressionTest: num_cpus_per_task = IntegerField('num_cpus_per_task', allow_none=True) num_tasks_per_core = IntegerField('num_tasks_per_core', allow_none=True) num_tasks_per_socket = IntegerField('num_tasks_per_socket', allow_none=True) - use_multithreading = BooleanField('use_multithreading') + use_multithreading = BooleanField('use_multithreading', allow_none=True) local = BooleanField('local') prefix = StringField('prefix') sourcesdir = StringField('sourcesdir') stagedir = StringField('stagedir', allow_none=True) stdout = StringField('stdout', allow_none=True) stderr = StringField('stderr', allow_none=True) - _logfile = StringField('_logfile', allow_none=True) + logger = TypedField('logger', LoggerAdapter) + _perf_logfile = StringField('_perf_logfile', allow_none=True) reference = ScopedDictField('reference', object) sanity_patterns = SanityPatternField('sanity_patterns', allow_none=True) perf_patterns = SanityPatternField('perf_patterns', allow_none=True) @@ -100,7 +102,7 @@ def __init__(self, name, prefix, system, resources): self.num_cpus_per_task = None self.num_tasks_per_core = None self.num_tasks_per_socket = None - self.use_multithreading = False + self.use_multithreading = None # True only if check is to be run locally self.local = False @@ -113,7 +115,7 @@ def __init__(self, name, prefix, system, resources): self.stagedir = None self.stdout = None self.stderr = None - self._logfile = None + self._perf_logfile = None # Output patterns self.sanity_patterns = None @@ -136,7 +138,8 @@ def __init__(self, name, prefix, system, resources): self._compile_task = None # Check-specific logging - self._logger = None + self._perf_logger = null_logger + self.logger = null_logger # Type of launcher to use for launching jobs self._launcher_type = None @@ -171,6 +174,7 @@ def is_local(self): def _setup_environ(self, environ): """Setup the current environment and load it.""" + self.logger.debug('setting up the environment') self.current_environ = environ # Add user modules and variables to the environment @@ -181,12 +185,17 @@ def _setup_environ(self, environ): self.current_environ.set_variable(k, v) # First load the local environment of the partition + self.logger.debug('loading environment for partition %s' % + self.current_partition.fullname) self.current_partition.local_env.load() + + self.logger.debug('loading environment %s' % self.current_environ.name) self.current_environ.load() def _setup_paths(self): """Setup the check's dynamic paths.""" + self.logger.debug('setting up paths') self.stagedir = self._resources.stagedir( self.current_partition.name, self.name, self.current_environ.name) self.outputdir = self._resources.outputdir( @@ -198,6 +207,11 @@ def _setup_paths(self): def _setup_job(self, **job_opts): """Setup the job related to this check.""" + self.logger.debug('setting up the job descriptor') + self.logger.debug( + 'job scheduler backend: %s' % + ('local' if self.is_local() else self.current_partition.scheduler)) + # num_gpus_per_node is a managed resource if self.num_gpus_per_node > 0: self.job_resources.setdefault('num_gpus_per_node', @@ -213,8 +227,8 @@ def _setup_job(self, **job_opts): self._launcher_type = AlpsLauncher else: # Oops - raise RegressionFatalError('Oops: unsupported launcher: %s' % - self.current_partition.scheduler) + raise ReframeFatalError('Oops: unsupported launcher: %s' % + self.current_partition.scheduler) job_name = '%s_%s_%s_%s' % (self.name, self.current_system.name, @@ -233,10 +247,6 @@ def _setup_job(self, **job_opts): time_limit=self.time_limit, **job_opts) else: - # We need to deep copy job_opts since we may be called repeatedly - # from the front-end - job_opts = copy.deepcopy(job_opts) - self.job = SlurmJob( job_name=job_name, job_environ_list=[ @@ -272,55 +282,65 @@ def _setup_job(self, **job_opts): # FIXME: This is a temporary solution to address issue #157 - def _setup_logging(self): - self._logfile = os.path.join( + def _setup_perf_logging(self): + self.logger.debug('setting up performance logging') + self._perf_logfile = os.path.join( self._resources.logdir(self.current_partition.name), self.name + '.log' ) - self._logger = logging.getLogger('reframe.checks.%s' % self.name) - formatter = logging.Formatter( - fmt='[%(asctime)s] %(name)s: %(levelname)s: %(message)s', - datefmt='%FT%T' - ) - - handler = logging.handlers.RotatingFileHandler( - filename=self._logfile, - maxBytes=10*1024*1024, + perf_logging_config = { + 'level': 'INFO', + 'handlers': { + self._perf_logfile : { + 'level' : 'DEBUG', + 'format' : '[%(asctime)s] %(check_name)s ' + '(jobid=%(check_jobid)s): %(message)s', + 'append' : True, + } + } + } + + self._perf_logger = LoggerAdapter( + logger=logging.load_from_dict(perf_logging_config), + check=self ) - handler.setLevel(logging.INFO) - handler.setFormatter(formatter) - - self._logger.addHandler(handler) - self._logger.setLevel(logging.INFO) - def setup(self, system, environ, **job_opts): + # Logging prevents deep copy, so we initialize the check's logger late + # during the check's setup phase + self.logger = getlogger('check', check=self) + self.current_partition = system self._setup_environ(environ) self._setup_paths() self._setup_job(**job_opts) if self.perf_patterns != None: - self._setup_logging() + self._setup_perf_logging() def _copy_to_stagedir(self, path): + self.logger.debug('copying %s to stage directory (%s)' % + (path, self.stagedir)) + self.logger.debug('symlinking files: %s' % self.readonly_files) os_ext.copytree_virtual(path, self.stagedir, self.readonly_files) def prebuild(self): for cmd in self.prebuild_cmd: + self.logger.debug('executing prebuild command: %s' % cmd) os_ext.run_command(cmd, check=True) def postbuild(self): for cmd in self.postbuild_cmd: + self.logger.debug('executing postbuild command: %s' % cmd) os_ext.run_command(cmd, check=True) def compile(self, **compile_opts): if not self.current_environ: - raise ReframeError('No programming environment set') + raise ReframeError('no programming environment set') # if self.sourcepath refers to a directory, stage it first target_sourcepath = os.path.join(self.sourcesdir, self.sourcepath) @@ -332,6 +352,7 @@ def compile(self, **compile_opts): else: includedir = os.path.abspath(self.sourcesdir) + # Add the the correct source directory to the include path self.current_environ.include_search_path.append(includedir) @@ -347,48 +368,61 @@ def compile(self, **compile_opts): os.chdir(self.stagedir) try: self.prebuild() + self.logger.debug('compilation started') self._compile_task = self.current_environ.compile( sourcepath=target_sourcepath, executable=os.path.join(self.stagedir, self.executable), **compile_opts) + self.logger.debug('compilation stdout:\n%s' % + self._compile_task.stdout) + self.logger.debug('compilation stderr:\n%s' % + self._compile_task.stderr) self.postbuild() finally: # Always restore working directory os.chdir(wd_save) + self.logger.debug('compilation finished') def run(self): if not self.current_system or not self.current_partition: - raise ReframeError('No system or system partition is set') + raise ReframeError('no system or system partition is set') self.job.submit(cmd='%s %s' % (self.executable, ' '.join(self.executable_opts)), workdir=self.stagedir) - if self._logger: - msg = 'submitted job' if not self.is_local() else 'launched process' - self._logger.info('%s (id=%s)' % (msg, self.job.jobid)) + + msg = 'spawned job (%s=%s)' % \ + ('pid' if self.is_local() else 'jobid', self.job.jobid) + self.logger.debug(msg) + + + def poll(self): + """Poll the test's status. + + Returns `True` if the associated job has finished, `False` otherwise.""" + if not self.job: + return True + + return self.job.finished() def wait(self): self.job.wait() + self.logger.debug('spawned job finished') def check_sanity(self): return self._match_patterns(self.sanity_patterns, None) - def check_performance_relaxed(self): - """Implements the relaxed performance check logic.""" - ret = self.check_performance() - return ret if self.strict_check else True - - def check_performance(self): return self._match_patterns(self.perf_patterns, self.reference) def cleanup(self, remove_files=False, unload_env=True): # Copy stdout/stderr and job script + self.logger.debug('copying interesting files to output directory') shutil.copy(self.stdout, self.outputdir) shutil.copy(self.stderr, self.outputdir) if self.job: @@ -401,9 +435,11 @@ def cleanup(self, remove_files=False, unload_env=True): shutil.copy(f, self.outputdir) if remove_files: + self.logger.debug('removing stage directory') shutil.rmtree(self.stagedir) if unload_env: + self.logger.debug("unloading test's environment") self.current_environ.unload() self.current_partition.local_env.unload() @@ -439,7 +475,7 @@ def _resolve_tag(tag): if reference != None else None if thres(value=conv(match.group(tag)), reference=ref, - logger=self._logger): + logger=self._perf_logger): found_tags.add(tag) except (OSError, ValueError) as e: raise ReframeError('Caught %s: %s' % (type(e).__name__, str(e))) @@ -490,7 +526,7 @@ def _match_patterns(self, multi_patterns, reference): # We need eof_handler to be called anyway that's why we do not # combine this check with the above and we delay the breaking # out of the loop here - if eof_handler and not eof_handler(logger=self._logger): + if eof_handler and not eof_handler(logger=self._perf_logger): ret = False break diff --git a/reframe/core/schedulers.py b/reframe/core/schedulers.py index 0baa20b2d5..768380e1e0 100644 --- a/reframe/core/schedulers.py +++ b/reframe/core/schedulers.py @@ -47,7 +47,7 @@ def __init__(self, self.post_run = [] # Live job information; to be filled during job's lifetime - self.jobid = None + self.jobid = -1 self.state = None self.exitcode = None @@ -71,6 +71,11 @@ def wait(self): raise NotImplementedError('Attempt to call an abstract method') + def finished(self): + """Status of the job.""" + raise NotImplementedError('Attempt to call an abstract method') + + def submit(self, cmd, workdir = '.'): # Build the submission script and submit it self.emit_preamble(self.script_builder) @@ -99,6 +104,7 @@ def __eq__(self, other): def __ne__(self, other): return not self.__eq__(other) + def __str__(self): return self.state @@ -195,6 +201,15 @@ def wait(self): self._stderr.close() + def finished(self): + # poll spawned process + self.proc.poll() + if self.proc.returncode == None: + return False + + return True + + class SlurmJobState(JobState): def __init__(self, state): super().__init__(state) @@ -210,8 +225,8 @@ def __init__(self, state): SLURM_JOB_NODE_FAILED = SlurmJobState('NODE_FAILED') SLURM_JOB_PENDING = SlurmJobState('PENDING') SLURM_JOB_PREEMPTED = SlurmJobState('PREEMPTED') -SLURM_JOB_RUNNING = SlurmJobState('RUNNING') SLURM_JOB_RESIZING = SlurmJobState('RESIZING') +SLURM_JOB_RUNNING = SlurmJobState('RUNNING') SLURM_JOB_SUSPENDED = SlurmJobState('SUSPENDED') SLURM_JOB_TIMEOUT = SlurmJobState('TIMEOUT') @@ -219,7 +234,7 @@ def __init__(self, state): class SlurmJob(Job): def __init__(self, time_limit = (0, 10, 0), - use_smt = False, + use_smt = None, exclusive = True, nodelist = None, exclude = None, @@ -248,6 +263,13 @@ def __init__(self, self.num_cpus_per_task = num_cpus_per_task self.num_tasks_per_core = num_tasks_per_core self.num_tasks_per_socket = num_tasks_per_socket + self.completion_states = [ SLURM_JOB_BOOT_FAIL, + SLURM_JOB_CANCELLED, + SLURM_JOB_COMPLETED, + SLURM_JOB_FAILED, + SLURM_JOB_NODE_FAILED, + SLURM_JOB_PREEMPTED, + SLURM_JOB_TIMEOUT ] def emit_preamble(self, builder): builder.verbatim('%s --job-name="%s"' % (self.prefix, self.name)) @@ -284,10 +306,9 @@ def emit_preamble(self, builder): builder.verbatim( '%s --exclude=%s' % (self.prefix, self.exclude)) - if self.use_smt: - builder.verbatim('%s --hint=multithread' % self.prefix) - else: - builder.verbatim('%s --hint=nomultithread' % self.prefix) + if self.use_smt != None: + hint = 'multithread' if self.use_smt else 'nomultithread' + builder.verbatim('%s --hint=%s'%(self.prefix, hint)) if self.reservation: builder.verbatim('%s --reservation=%s' % (self.prefix, @@ -355,7 +376,7 @@ def _update_state(self): if state_match.group('jobid') != self.jobid: # this shouldn't happen - raise RegressionFatalError( + raise ReframeFatalError( 'Oops: job ids do not match. Expected %s, got %s' % \ (self.jobid, state_match.group('jobid'))) @@ -363,19 +384,18 @@ def _update_state(self): self.exitcode = int(state_match.group('exitcode')) self.signal = int(state_match.group('signal')) - def wait(self, states = [ SLURM_JOB_BOOT_FAIL, - SLURM_JOB_CANCELLED, - SLURM_JOB_COMPLETED, - SLURM_JOB_FAILED, - SLURM_JOB_NODE_FAILED, - SLURM_JOB_PREEMPTED, - SLURM_JOB_TIMEOUT ]): - if not states: - raise RuntimeError('No state was specified to wait for.') - + def wait(self): intervals = itertools.cycle(settings.job_state_poll_intervals) + # Quickly return in case we have finished already + if self.state in self.completion_states: + return + self._update_state() - while not self.state or not self.state in states: + while not self.state in self.completion_states: time.sleep(next(intervals)) self._update_state() + + def finished(self): + self._update_state() + return self.state in self.completion_states diff --git a/reframe/core/systems.py b/reframe/core/systems.py index 1634ec01e4..e2007c59ff 100644 --- a/reframe/core/systems.py +++ b/reframe/core/systems.py @@ -5,19 +5,22 @@ from reframe.core.exceptions import ReframeError class SystemPartition: - name = NonWhitespaceField('name') - descr = StringField('descr') - scheduler = NonWhitespaceField('scheduler', allow_none=True) - access = TypedListField('access', str) - environs = TypedListField('environs', Environment) - resources = TypedDictField('resources', str, (list, str)) - local_env = TypedField('local_env', Environment, allow_none=True) - prefix = StringField('prefix') - stagedir = StringField('stagedir') - outputdir = StringField('outputdir') - logdir = StringField('logdir') - - def __init__(self, name): + name = NonWhitespaceField('name') + descr = StringField('descr') + scheduler = NonWhitespaceField('scheduler', allow_none=True) + access = TypedListField('access', str) + environs = TypedListField('environs', Environment) + resources = TypedDictField('resources', str, (list, str)) + local_env = TypedField('local_env', Environment, allow_none=True) + prefix = StringField('prefix') + stagedir = StringField('stagedir') + outputdir = StringField('outputdir') + logdir = StringField('logdir') + + # maximum concurrent jobs + max_jobs = IntegerField('max_jobs') + + def __init__(self, name, system): self.name = name self.descr = name self.scheduler = None @@ -25,6 +28,14 @@ def __init__(self, name): self.environs = [] self.resources = {} self.local_env = None + self.system = system + self.max_jobs = 1 + + + @property + def fullname(self): + """Return fully-qualified name for this partition.""" + return '%s:%s' % (self.system.name, self.name) def get_resource(self, name, value): diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 4601de5a94..cc4c017969 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1,182 +1,36 @@ import argparse -import datetime import os +import socket import sys import traceback -import reframe.utility.os as os_ext -from reframe.core.environments import EnvironmentSnapshot -from reframe.core.exceptions import \ - ModuleError, RegressionFatalError, ReframeError +import reframe.core.logging as logging + +from reframe.core.exceptions import ModuleError from reframe.core.modules import module_force_load, module_unload -from reframe.frontend.loader import \ - RegressionCheckLoader, SiteConfiguration, autodetect_system -from reframe.frontend.printer import Printer +from reframe.frontend.executors import Runner +from reframe.frontend.executors.policies import SerialExecutionPolicy, \ + AsynchronousExecutionPolicy +from reframe.frontend.loader import RegressionCheckLoader, \ + SiteConfiguration, autodetect_system +from reframe.frontend.printer import PrettyPrinter from reframe.frontend.resources import ResourcesManager -from reframe.frontend.statistics import RegressionStats from reframe.settings import settings -from reframe.utility.sandbox import Sandbox - - -def print_error(msg): - print('%s: %s' % (sys.argv[0], msg), file=sys.stderr) -def list_supported_systems(systems): - print('List of supported systems:', file=sys.stderr) +def list_supported_systems(systems, printer): + printer.info('List of supported systems:') for s in systems: - print(' ', s) + printer.info(' ', s) -def list_checks(checks): - print('List of matched checks') - print('======================') +def list_checks(checks, printer): + printer.info('List of matched checks') + printer.info('======================') for c in checks: - print(' * %s' % c) - - print('Found %d check(s).' % len(checks)) - - -def run_checks_partition(checks, options, partition, printer, stats): - """Run checks on partition.""" - - # Sandbox variables passed to setup - sandbox = Sandbox() - - # Prepare for running the tests - environ_save = EnvironmentSnapshot() - for check in checks: - if not options.skip_system_check and \ - not check.supports_system(partition.name): - printer.print_unformatted( - 'Skipping unsupported test %s...' % check.name) - continue - - stats.num_checks += 1 - if options.force_local: - check.local = True - - if not options.relax_performance_check: - check.strict_check = True - - for env in partition.environs: - # Add current partition and environment to the sandbox - sandbox.system = partition - sandbox.environ = env - try: - if not options.skip_prgenv_check and \ - not check.supports_progenv(sandbox.environ.name): - printer.print_unformatted( - 'Skipping unsupported environment %s...' % - sandbox.environ.name) - continue - - stats.num_cases += 1 - printer.print_check_title(check, sandbox.environ) - printer.print_check_progress( - 'Setting up', check.setup, - system=sandbox.system, - environ=sandbox.environ, - account=options.account, - partition=options.partition, - reservation=options.reservation, - nodelist=options.nodelist, - exclude=options.exclude_nodes, - options=options.job_options - ) - printer.print_check_progress('Compiling', check.compile) - printer.print_check_progress('Submitting job', check.run) - printer.print_check_progress( - 'Waiting %s (id=%s)' % \ - ('process' if check.is_local() else 'job', - check.job.jobid if check.job else '-1'), check.wait) - - remove_stage_files = not options.keep_stage_files - success = True - if not options.skip_sanity_check and \ - not printer.print_check_progress('Checking sanity', - check.check_sanity, - expected_ret=True): - remove_stage_files = False - success = False - - if not options.skip_performance_check and \ - not printer.print_check_progress( - 'Verifying performance', - check.check_performance_relaxed, - expected_ret=True): - if check._logfile: - printer.print_unformatted( - 'Check log file: %s' % check._logfile - ) - - remove_stage_files = False - success = False - - printer.print_check_progress('Cleaning up', check.cleanup, - remove_files=remove_stage_files, - unload_env=False) - if success: - printer.print_check_success(check) - else: - stats.add_failure(check, env) - printer.print_check_failure(check) - - except (NotImplementedError, RegressionFatalError) as e: - # These are fatal; mark the failure and reraise them - stats.add_failure(check, env) - printer.print_check_failure(check) - raise - except ReframeError as e: - stats.add_failure(check, env) - printer.print_check_failure(check, str(e)) - print(check.current_partition.local_env) - print(check.current_environ) - except Exception as e: - stats.add_failure(check, env) - printer.print_check_failure(check, str(e)) - traceback.print_exc() - finally: - environ_save.load() - - -def run_checks(checks, system, options): - printer = Printer(colorize=options.colorize) - printer.print_sys_info(system) - printer.print_timestamp('start date') - stats = [] - - for part in system.partitions: - # Keep stats per partition - part_stats = RegressionStats() - if options.prgenv: - # Groom the environments of this partition - part.environs = [ - e for e in filter( - lambda e: e if e.name in options.prgenv else None, - part.environs - ) - ] - - printer.print_unformatted( - '>>>> Running regression on partition: %s' % part.name - ) - run_checks_partition(checks, options, part, printer, part_stats) - printer.print_separator() - stats.append((part.name, part_stats)) - - # Print summary of failed checks - success = True - for st in stats: - partname, part_stats = st - printer.print_unformatted('Stats for partition: %s' % partname) - printer.print_unformatted(part_stats) - if part_stats.num_fails != 0: - printer.print_unformatted(part_stats.details()) - success = False + printer.info(' * %s' % c) - printer.print_timestamp('end date') - return success + printer.info('Found %d check(s).' % len(checks)) def main(): @@ -193,9 +47,6 @@ def main(): 'Options controlling execution of checks') misc_options = argparser.add_argument_group('Miscellaneous options') - argparser.add_argument('--version', action='version', - version=settings.version) - # Output directory options output_options.add_argument( '--prefix', action='store', metavar='DIR', @@ -287,7 +138,11 @@ def main(): run_options.add_argument( '--skip-prgenv-check', action='store_true', help='Skip prog. environment check') - + run_options.add_argument( + '--exec-policy', metavar='POLICY', action='store', + choices=[ 'serial', 'async' ], default='serial', + help='Specify the execution policy for running the regression tests. ' + 'Available policies: "serial" (default), "async"') misc_options.add_argument( '-m', '--module', action='append', default=[], @@ -305,7 +160,13 @@ def main(): misc_options.add_argument( '--system', action='store', help='Load SYSTEM configuration explicitly') - + misc_options.add_argument( + '--save-log-files', action='store_true', dest='save_log_files', + default=False, + help='Copy the log file from the work dir to the output dir at the ' + 'end of the program') + misc_options.add_argument('-V', '--version', action='version', + version=settings.version) if len(sys.argv) == 1: argparser.print_help() @@ -314,6 +175,13 @@ def main(): # Parse command line options = argparser.parse_args() + # Configure logging + logging.configure_logging(settings.logging_config) + + # Setup printer + printer = PrettyPrinter() + printer.colorize = options.colorize + # Load site configuration site_config = SiteConfiguration() site_config.load_from_dict(settings.site_configuration) @@ -323,8 +191,8 @@ def main(): load_path = [] for d in options.checkpath: if not os.path.exists(d): - print("%s: path `%s' does not exist. Skipping...\n" % - (argparser.prog, d)) + printer.info("%s: path `%s' does not exist. Skipping...\n" % + (argparser.prog, d)) continue load_path.append(d) @@ -356,16 +224,16 @@ def main(): raise KeyError(options.system) except KeyError: - print_error("unknown system specified: `%s'" % options.system) - list_supported_systems(site_config.systems.values()) + printer.error("unknown system specified: `%s'" % options.system) + list_supported_systems(site_config.systems.values(), printer) sys.exit(1) else: # Try to autodetect system system = autodetect_system(site_config) if not system: - print_error("could not auto-detect system. Please specify " - "it manually using the `--system' option.") - list_supported_systems(site_config.systems.values()) + printer.error("could not auto-detect system. Please specify " + "it manually using the `--system' option.") + list_supported_systems(site_config.systems.values(), printer) sys.exit(1) # Adjust system directories @@ -393,18 +261,21 @@ def main(): timefmt=options.timefmt) # Print command line - print('Command line:', ' '.join(sys.argv)) - print('Reframe version: ' + settings.version) + printer.info('Command line: %s' % ' '.join(sys.argv)) + printer.info('Reframe version: ' + settings.version) + printer.info('Launched by user: ' + os.environ['USER']) + printer.info('Launched on host: ' + socket.gethostname()) # Print important paths - print('Reframe paths') - print('=============') - print(' Check prefix :', loader.prefix) - print('%03s Check search path :' % ('(R)' if loader.recurse else ''), - "'%s'" % ':'.join(loader.load_path)) - print(' Stage dir prefix :', resources.stage_prefix) - print(' Output dir prefix :', resources.output_prefix) - print(' Logging dir :', resources.log_prefix) + printer.info('Reframe paths') + printer.info('=============') + printer.info(' Check prefix : %s' % loader.prefix) + printer.info('%03s Check search path : %s' % \ + ('(R)' if loader.recurse else '', + "'%s'" % ':'.join(loader.load_path))) + printer.info(' Stage dir prefix : %s' % resources.stage_prefix) + printer.info(' Output dir prefix : %s' % resources.output_prefix) + printer.info(' Logging dir : %s' % resources.log_prefix) try: # Locate and load checks checks_found = loader.load_all(system=system, resources=resources) @@ -440,8 +311,8 @@ def main(): # Filter checks further if options.gpu_only and options.cpu_only: - print_error("options `--gpu-only' and `--cpu-only' " - "are mutually exclusive") + printer.error("options `--gpu-only' and `--cpu-only' " + "are mutually exclusive") sys.exit(1) if options.gpu_only: @@ -466,25 +337,70 @@ def main(): try: module_force_load(m) except ModuleError: - print("Could not load module `%s': Skipping..." % m) + printer.info("Could not load module `%s': Skipping..." % m) success = True if options.list: # List matched checks - list_checks(list(checks_matched)) + list_checks(list(checks_matched), printer) + elif options.run: - success = run_checks(checks_matched, system, options) + # Setup the execution policy + if options.exec_policy == 'serial': + exec_policy = SerialExecutionPolicy() + elif options.exec_policy == 'async': + exec_policy = AsynchronousExecutionPolicy() + else: + # This should not happen, since choices are handled by argparser + printer.error("unknown execution policy `%s': Exiting...") + sys.exit(1) + + exec_policy.skip_system_check = options.skip_system_check + exec_policy.force_local = options.force_local + exec_policy.relax_performance_check = options.relax_performance_check + exec_policy.skip_environ_check = options.skip_prgenv_check + exec_policy.skip_sanity_check = options.skip_sanity_check + exec_policy.skip_performance_check = options.skip_performance_check + exec_policy.only_environs = options.prgenv + exec_policy.keep_stage_files = options.keep_stage_files + exec_policy.sched_account = options.account + exec_policy.sched_partition = options.partition + exec_policy.sched_reservation = options.reservation + exec_policy.sched_nodelist = options.nodelist + exec_policy.sched_exclude_nodelist = options.exclude_nodes + exec_policy.sched_options = options.job_options + + runner = Runner(exec_policy) + try: + runner.runall(checks_matched, system) + finally: + # always print a report + if runner.stats.num_failures(): + printer.info(runner.stats.failure_report()) + success = False + else: - print('No action specified. Exiting...') - print("Try `%s -h' for a list of available actions." % - argparser.prog) + printer.info('No action specified. Exiting...') + printer.info("Try `%s -h' for a list of available actions." % + argparser.prog) if not success: sys.exit(1) sys.exit(0) + except KeyboardInterrupt: + sys.exit(1) + except OSError as e: + printer.error("`%s': %s" % (e.filename, e.strerror)) + sys.exit(1) except Exception as e: - print_error('fatal error: %s\n' % str(e)) + printer.error('fatal error: %s\n' % str(e)) traceback.print_exc() sys.exit(1) + finally: + try: + if options.save_log_files: + logging.save_log_files(resources.output_prefix) + except OSError as e: + printer.error("`%s': %s" % (e.filename, e.strerror)) diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py new file mode 100644 index 0000000000..f825a29249 --- /dev/null +++ b/reframe/frontend/executors/__init__.py @@ -0,0 +1,241 @@ +import sys + +from reframe.core.environments import EnvironmentSnapshot +from reframe.core.exceptions import ReframeFatalError, ReframeError +from reframe.core.fields import StringField, TypedField +from reframe.core.pipeline import RegressionTest +from reframe.frontend.printer import PrettyPrinter +from reframe.frontend.statistics import TestStats +from reframe.utility.sandbox import Sandbox + +class TestCase(object): + """Test case result placeholder class.""" + STATE_SUCCESS = 0 + STATE_FAILURE = 1 + + def __init__(self, executor): + self.executor = executor + self.result = None + self.failed_stage = None + self.exc_info = None + + def valid(self): + return self.result != None + + def success(self): + self.result = TestCase.STATE_SUCCESS + + def fail(self, exc_info = None): + self.result = TestCase.STATE_FAILURE + self.failed_stage = self.executor.current_stage + self.exc_info = exc_info + + def failed(self): + return self.result == TestCase.STATE_FAILURE + + +class RegressionTestExecutor(object): + """Responsible for the execution of `RegressionTest`'s pipeline stages. + + Keeps track of the current stage and implements relaxed performance checking + logic.""" + check = TypedField('check', RegressionTest) + current_stage = StringField('current_stage') + + def __init__(self, check): + self.current_stage = 'init' + self.check = check + self.relax_performance_check = False + + def setup(self, system, environ, **job_opts): + self.current_stage = 'setup' + self.check.setup(system, environ, **job_opts) + + def compile(self): + self.current_stage = 'compile' + self.check.compile() + + def run(self): + self.current_stage = 'run' + self.check.run() + + def wait(self): + self.current_stage = 'wait' + self.check.wait() + + def check_sanity(self): + # check_sanity() may be overriden by the user tests; we log this phase + # here then + self.current_stage = 'sanity' + ret = self.check.check_sanity() + self.check.logger.debug('sanity check result: %s' % ret) + return ret + + def check_performance(self): + # check_performance() may be overriden by the user tests; we log this + # phase here then + self.current_stage = 'performance' + ret = self.check.check_performance() + self.check.logger.debug('performance check result: %s' % ret) + if self.check.strict_check: + return ret + + return True if self.relax_performance_check else ret + + def cleanup(self, remove_files=False, unload_env=True): + self.current_stage = 'cleanup' + self.check.cleanup(remove_files, unload_env) + self.current_stage = 'completed' + + +class Runner(object): + """Responsible for executing a set of regression tests based on an execution + policy.""" + def __init__(self, policy, printer = None): + self.printer = PrettyPrinter() if not printer else printer + self.policy = policy + self.policy.printer = self.printer + self.policy.runner = self + self.sandbox = Sandbox() + self.stats = None + + + def runall(self, checks, system): + try: + self.printer.separator('short double line', + 'Running %d check(s)' % len(checks)) + self.printer.timestamp('Started on', 'short double line') + self.printer.info() + self._runall(checks, system) + finally: + # Always update statistics and print the summary line + self.stats = self.policy.getstats() + num_failures = self.stats.num_failures() + self.printer.status( + 'FAILED' if num_failures else 'PASSED', + 'Ran %d test case(s) from %d check(s) (%d failure(s))' % \ + (self.stats.num_cases(), len(checks), num_failures), + just='center' + ) + self.printer.timestamp('Finished on', 'short double line') + + + def _partition_supported(self, check, partition): + if self.policy.skip_system_check: + return True + + return check.supports_system(partition.name) + + + def _environ_supported(self, check, environ): + precond = True + if self.policy.only_environs: + precond = environ.name in self.policy.only_environs + + if self.policy.skip_environ_check: + return precond + else: + return precond and check.supports_progenv(environ.name) + + + def _runall(self, checks, system): + self.policy.enter() + for c in checks: + self.policy.enter_check(c) + for p in system.partitions: + if not self._partition_supported(c, p): + self.printer.status('SKIP', + 'skipping %s' % p.fullname, + just='center') + continue + + self.policy.enter_partition(c, p) + for e in p.environs: + if not self._environ_supported(c, e): + self.printer.status('SKIP', + 'skipping %s for %s' % \ + (e.name, p.fullname), + just='center') + continue + + self.sandbox.system = p + self.sandbox.environ = e + self.sandbox.check = c + self.policy.enter_environ(self.sandbox.check, + self.sandbox.system, + self.sandbox.environ) + self.policy.run_check(self.sandbox.check, + self.sandbox.system, + self.sandbox.environ) + self.policy.exit_environ(self.sandbox.check, + self.sandbox.system, + self.sandbox.environ) + + self.policy.exit_partition(c, p) + + self.policy.exit_check(c) + + self.policy.exit() + + +class ExecutionPolicy(object): + """Base abstract class for execution policies. + + An execution policy implements the regression check pipeline.""" + def __init__(self): + # Options controlling the check execution + self.skip_system_check = False + self.force_local = False + self.relax_performance_check = False + self.skip_environ_check = False + self.skip_sanity_check = False + self.skip_performance_check = False + self.keep_stage_files = False + self.only_environs = None + self.printer = None + self.environ_snapshot = EnvironmentSnapshot() + + # Scheduler options + self.sched_account = None + self.sched_partition = None + self.sched_reservation = None + self.sched_nodelist = None + self.sched_exclude_nodelist = None + self.sched_options = [] + + def enter(self): + pass + + def exit(self): + pass + + def enter_check(self, check): + self.printer.separator( + 'short single line', + 'started processing %s (%s)' % (check.name, check.descr) + ) + + def exit_check(self, check): + self.printer.separator( + 'short single line', + 'finished processing %s (%s)\n' % (check.name, check.descr) + ) + + def enter_partition(self, c, p): + pass + + def exit_partition(self, c, p): + pass + + def enter_environ(self, c, p, e): + pass + + def exit_environ(self, c, p, e): + pass + + def run_check(self, c, p, e): + raise NotImplementedError + + def getstats(self): + """Return test case statistics of the run.""" + raise NotImplementedError diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py new file mode 100644 index 0000000000..fdf315a491 --- /dev/null +++ b/reframe/frontend/executors/policies.py @@ -0,0 +1,374 @@ +import itertools +import time +import sys + +from reframe.core.exceptions import ReframeFatalError +from reframe.core.logging import getlogger +from reframe.frontend.executors import ExecutionPolicy, \ + RegressionTestExecutor, \ + TestCase +from reframe.frontend.statistics import TestStats +from reframe.settings import settings +from reframe.core.environments import EnvironmentSnapshot + + +class SerialExecutionPolicy(ExecutionPolicy): + def __init__(self): + super().__init__() + self.test_cases = [] + + + def getstats(self): + return TestStats(self.test_cases) + + + def run_check(self, check, partition, environ): + self.printer.status( + 'RUN', "%s on %s using %s" % \ + (check.name, partition.fullname, environ.name) + ) + try: + executor = RegressionTestExecutor(check) + executor.relax_performance_check = self.relax_performance_check + testcase = TestCase(executor) + + executor.setup( + system=partition, + environ=environ, + account=self.sched_account, + partition=self.sched_partition, + reservation=self.sched_reservation, + nodelist=self.sched_nodelist, + exclude=self.sched_exclude_nodelist, + options=self.sched_options + ) + + executor.compile() + executor.run() + executor.wait() + if not self.skip_sanity_check: + if not executor.check_sanity(): + testcase.fail() + + if not self.skip_performance_check: + if not executor.check_performance(): + testcase.fail() + + if testcase.failed(): + remove_stage_files = False + else: + remove_stage_files = not self.keep_stage_files + + executor.cleanup(remove_files=remove_stage_files, + unload_env=False) + if not testcase.failed(): + testcase.success() + + except (NotImplementedError, KeyboardInterrupt, + ReframeFatalError, AssertionError): + testcase.fail(sys.exc_info()) + raise + except: + testcase.fail(sys.exc_info()) + finally: + self.test_cases.append(testcase) + self.printer.result(check, partition, environ, + not testcase.failed()) + self.environ_snapshot.load() + + +class RunningTestCase(object): + def __init__(self, testcase, environ): + self.testcase = testcase + self.environ = environ + + # Test case has finished, but has not been waited for yet + self.zombie = False + + +class WaitError(BaseException): + """Mark wait errors during the asynchronous execution of test cases. + + It stores the `RunningTestCase` that has failed during waiting and the + associated exception info.""" + def __init__(self, running_testcase, exc_info): + self.running_case = running_testcase + self.exc_info = exc_info + + +class AsynchronousExecutionPolicy(ExecutionPolicy): + def __init__(self): + super().__init__() + # all currently running cases + self.running_cases = [] + + # counts of running cases per partition + self.running_cases_counts = {} + + # ready cases to be executed per partition + self.ready_cases = {} + + # Test case results + self.test_cases = [] + + # Job limit per partition + self.max_jobs = {} + + self.logger = getlogger('frontend') + + + def _compile_run_testcase(self, testcase): + try: + executor = testcase.executor + executor.compile() + executor.run() + except (NotImplementedError, KeyboardInterrupt, + ReframeFatalError, AssertionError): + testcase.fail(sys.exc_info()) + raise + except: + testcase.fail(sys.exc_info()) + finally: + if testcase.valid(): + self.printer.result(executor.check, + executor.check.current_partition, + executor.check.current_environ, + not testcase.failed()) + + + def _finalize_testcase(self, ready_testcase): + try: + ready_testcase.environ.load() + testcase = ready_testcase.testcase + executor = testcase.executor + if not self.skip_sanity_check: + if not executor.check_sanity(): + testcase.fail() + + if not self.skip_performance_check: + if not executor.check_performance(): + testcase.fail() + + if testcase.failed(): + remove_stage_files = False + else: + remove_stage_files = not self.keep_stage_files + + executor.cleanup(remove_files=remove_stage_files, unload_env=False) + if not testcase.failed(): + testcase.success() + + except (NotImplementedError, KeyboardInterrupt, + ReframeFatalError, AssertionError): + testcase.fail(sys.exc_info()) + raise + except: + testcase.fail(sys.exc_info()) + finally: + partname = executor.check.current_partition.fullname + self.printer.result(executor.check, + executor.check.current_partition, + executor.check.current_environ, + not testcase.failed()) + + + def _failall(self): + """Mark all tests as failures""" + for rc in self.running_cases: + rc.testcase.fail(sys.exc_info()) + + for ready_list in self.ready_cases.values(): + for rc in ready_list: + rc.testcase.fail(sys.exc_info()) + + + def enter_partition(self, c, p): + self.running_cases_counts.setdefault(p.fullname, 0) + self.ready_cases.setdefault(p.fullname, []) + self.max_jobs.setdefault(p.fullname, p.max_jobs) + + + def getstats(self): + return TestStats(self.test_cases) + + + def _print_executor_status(self, status, executor): + checkname = executor.check.name + partname = executor.check.current_partition.fullname + envname = executor.check.current_environ.name + msg = '%s on %s using %s' % (checkname, partname, envname) + self.logger.debug('%s %s' % (status.lower(), msg)) + self.printer.status(status, msg) + + + def run_check(self, check, partition, environ): + try: + executor = RegressionTestExecutor(check) + executor.relax_performance_check = self.relax_performance_check + testcase = TestCase(executor) + + executor.setup( + system=partition, + environ=environ, + account=self.sched_account, + partition=self.sched_partition, + reservation=self.sched_reservation, + nodelist=self.sched_nodelist, + exclude=self.sched_exclude_nodelist, + options=self.sched_options + ) + + ready_testcase = RunningTestCase(testcase, EnvironmentSnapshot()) + partname = partition.fullname + if self.running_cases_counts[partname] >= partition.max_jobs: + # Make sure that we still exceeded the job limit + self.logger.debug('reached job limit (%s) for partition %s' % + (partition.max_jobs, partname)) + self._update_running_counts() + + if self.running_cases_counts[partname] < partition.max_jobs: + # Test's environment is already loaded; no need to be reloaded + self._reschedule(ready_testcase, load_env=False) + else: + self._print_executor_status('HOLD', executor) + self.ready_cases[partname].append(ready_testcase) + + except (NotImplementedError, KeyboardInterrupt, + ReframeFatalError, AssertionError): + if not testcase.failed(): + # test case failed during setup + testcase.fail(sys.exc_info()) + self._failall() + raise + except: + # Here we are sure that test case has failed during setup, since + # _compile_and_run() handles already non-fatal exceptions. Though we + # check again the testcase, just in case. + if not testcase.failed(): + testcase.fail(sys.exc_info()) + finally: + if testcase.valid() and testcase.failed_stage == 'setup': + # We need to print the result here only if the setup stage has + # finished, since otherwise _compile_and_run() prints it + self.printer.result(executor.check, partition, environ, + not testcase.failed()) + + self.test_cases.append(testcase) + self.environ_snapshot.load() + + + def _update_running_counts(self): + """Update the counts of running checks per partition.""" + freed_slots = {} + for rc in self.running_cases: + check = rc.testcase.executor.check + if not rc.zombie and check.poll(): + # Tests without a job descriptor are considered finished + rc.zombie = True + partname = check.current_partition.fullname + self.running_cases_counts[partname] -= 1 + freed_slots.setdefault(partname, 0) + freed_slots[partname] += 1 + + for p, ns in freed_slots.items(): + self.logger.debug('freed %s slot(s) on partition %s' % (ns, p)) + + + def _reschedule(self, ready_testcase, load_env=True): + testcase = ready_testcase.testcase + executor = testcase.executor + partname = executor.check.current_partition.fullname + + # Restore the test case's environment and run it + if load_env: + ready_testcase.environ.load() + + self._print_executor_status('RUN', executor) + self._compile_run_testcase(testcase) + if not testcase.failed(): + self.running_cases_counts[partname] += 1 + self.running_cases.append(ready_testcase) + + + def _reschedule_all(self): + self._update_running_counts() + for partname, num_jobs in self.running_cases_counts.items(): + assert(num_jobs >= 0) + num_empty_slots = self.max_jobs[partname] - num_jobs + num_schedule_jobs = min([ + num_empty_slots, len(self.ready_cases[partname]) + ]) + + if num_schedule_jobs: + self.logger.debug('rescheduling %s job(s) on %s' % + (num_schedule_jobs, partname)) + + for i in range(0, num_schedule_jobs): + ready_case = self.ready_cases[partname].pop() + ready_case.environ.load() + self._reschedule(ready_case) + + + def _waitany(self): + intervals = itertools.cycle(settings.job_state_poll_intervals) + while True: + for i in range(len(self.running_cases)): + running = self.running_cases[i] + testcase = running.testcase + executor = testcase.executor + running_check = executor.check + if running_check.poll(): + try: + running_check.wait() + return running + except (NotImplementedError, KeyboardInterrupt, + ReframeFatalError, AssertionError): + # These errors should be propagated as-is + testcase.fail(sys.exc_info()) + raise + except: + testcase.fail(sys.exc_info()) + raise WaitError(running, sys.exc_info()) + finally: + # Case is no more running; update our logs + del self.running_cases[i] + if not running.zombie: + partname = running_check.current_partition.fullname + self.running_cases_counts[partname] -= 1 + + # This is just for completeness; the case is no more + # a zombie, since it has been waited for + running.zombie = False + + if testcase.valid(): + self.printer.result( + executor.check, + executor.check.current_partition, + executor.check.current_environ, + not testcase.failed()) + + time.sleep(next(intervals)) + + + def exit(self): + self.printer.separator( + 'short single line', 'waiting for spawned checks' + ) + + while len(self.running_cases): + try: + ready_testcase = self._waitany() + self._finalize_testcase(ready_testcase) + self._reschedule_all() + except (NotImplementedError, KeyboardInterrupt, + ReframeFatalError, AssertionError): + self._failall() + raise + except WaitError: + pass + finally: + self.environ_snapshot.load() + + self.printer.separator( + 'short single line', 'all spawned checks finished' + ) diff --git a/reframe/frontend/loader.py b/reframe/frontend/loader.py index b5ba867e8e..d777d16174 100644 --- a/reframe/frontend/loader.py +++ b/reframe/frontend/loader.py @@ -2,6 +2,7 @@ # Regression test loader # +import ast import os import logging import sys @@ -15,6 +16,21 @@ from reframe.settings import settings +class RegressionCheckValidator(ast.NodeVisitor): + def __init__(self): + self._validated = False + + @property + def valid(self): + return self._validated + + def visit_FunctionDef(self, node): + if node.name == '_get_checks' and \ + node.col_offset == 0 and \ + node.args.kwarg: + self._validated = True + + class RegressionCheckLoader: def __init__(self, load_path, prefix = '', recurse = False): self.load_path = load_path @@ -34,6 +50,22 @@ def _module_name(self, filename): return (os.path.splitext(filename)[0]).replace('/', '.') + def _validate_source(self, filename): + """Check if `filename` is a valid Reframe source file. + + This is not a full validation test, but rather a first step that + verifies that the file defines the `_get_checks()` method correctly. + A second step follows, which actually loads the test file, performing + further tests and finalizes and validation.""" + + with open(filename, 'r') as f: + source_tree = ast.parse(f.read()) + + validator = RegressionCheckValidator() + validator.visit(source_tree) + return validator.valid + + def load_from_module(self, module, **check_args): """Load user checks from module. @@ -41,22 +73,21 @@ def load_from_module(self, module, **check_args): and validates its return value.""" from reframe.core.pipeline import RegressionTest - if hasattr(module, '_get_checks') and callable(module._get_checks): - try: - checks = [ c for c in module._get_checks(**check_args) - if isinstance(c, RegressionTest) ] - except TypeError: - # Guard against _get_checks() returning a non-iterable - checks = [] + # We can safely call `_get_checks()` here, since the source file is + # already validated + candidates = module._get_checks(**check_args) + if isinstance(candidates, list): + return [ c for c in candidates if isinstance(c, RegressionTest) ] else: - checks = [] - - return checks + return [] def load_from_file(self, filename, **check_args): module_name = self._module_name(filename) try: + if not self._validate_source(filename): + return [] + loader = SourceFileLoader(module_name, filename) return self.load_from_module(loader.load_module(), **check_args) except OSError as e: @@ -189,11 +220,11 @@ def create_env(system, partition, name): "as a dictionary" % partname ) - partition = SystemPartition(partname) + partition = SystemPartition(partname, system) partition.descr = partconfig.get('descr', partname) partition.scheduler = partconfig.get('scheduler', 'local') partition.local_env = Environment( - name='__env_%s' % partname, + name='__rfm_env_%s' % partname, modules=partconfig.get('modules', []), variables=partconfig.get('variables', {}) ) @@ -203,6 +234,7 @@ def create_env(system, partition, name): ] partition.access = partconfig.get('access', []) partition.resources = partconfig.get('resources', {}) + partition.max_jobs = partconfig.get('max_jobs', 1) system.partitions.append(partition) self.systems[sysname] = system diff --git a/reframe/frontend/printer.py b/reframe/frontend/printer.py index 1342d38b88..d3ce88869b 100644 --- a/reframe/frontend/printer.py +++ b/reframe/frontend/printer.py @@ -1,9 +1,8 @@ import datetime -import os -import re import sys -from reframe.core.exceptions import ReframeError +from reframe.core.logging import LoggerAdapter, load_from_dict, getlogger + class Colorizer: def colorize(string, foreground, background): @@ -31,100 +30,75 @@ class AnsiColorizer(Colorizer): def colorize(string, foreground, background = None): return AnsiColorizer.escape_seq + \ - AnsiColorizer.fgcolor + foreground + string + \ - AnsiColorizer.escape_seq + AnsiColorizer.reset_term - - -class Printer: - def __init__(self, colorize = True): - self.ostream = sys.stdout - self.linefill = 77 - self.status_msg_fill = 10 - self.colorize = colorize - - def print_sys_info(self, system): - from socket import gethostname - self._print('Regression suite started by %s on %s' % - (os.environ['USER'], gethostname())) - self._print('Using configuration for system: %s' % system.name) + AnsiColorizer.fgcolor + foreground + string + \ + AnsiColorizer.escape_seq + AnsiColorizer.reset_term - def print_separator(self): - self._print('=' * self.linefill) +class PrettyPrinter(object): + """Pretty printing facility for the framework. + Final printing is delegated to an internal logger, which is responsible for + printing both to standard output and in a specfial output file.""" - def print_timestamp(self, prefix=''): - self._print('===> %s %s' % - (prefix, datetime.datetime.today().strftime('%c %Z'))) + def __init__(self): + self.colorize = True + self.line_width = 78 + self.status_width = 10 + self._logger = getlogger('frontend') - def print_check_title(self, check, environ): - self._print('Test: %s for %s' % (check.descr, environ.name)) - self.print_separator() + def separator(self, linestyle, msg = ''): + if linestyle == 'short double line': + line = self.status_width * '=' + elif linestyle == 'short single line': + line = self.status_width * '-' + else: + raise ValueError('unknown line style') + self.info('[%s] %s' % (line, msg)) - def print_check_progress(self, msg, op, expected_ret = None, **op_args): - try: - msg = ' | %s ...' % msg - self._print(msg, end='', flush=True) - success = False - ret = op(**op_args) - if ret == expected_ret: - success = True + def status(self, status, message = '', just=None): + if just == 'center': + status = status.center(self.status_width - 2) + elif just == 'right': + status = status.rjust(self.status_width - 2) + else: + status = status.ljust(self.status_width - 2) - except Exception: - ret = None - raise - finally: - if success: - color = AnsiColorizer.green - status_msg = 'OK' + if self.colorize: + status_stripped = status.strip().lower() + if status_stripped == 'skip': + status = AnsiColorizer.colorize(status, AnsiColorizer.yellow) + elif status_stripped in [ 'fail', 'failed' ]: + status = AnsiColorizer.colorize(status, AnsiColorizer.red) else: - color = AnsiColorizer.red - status_msg = 'FAILED' - - self._print_result_line(None, status_msg, color, len(msg)) - - return ret - - - def print_unformatted(self, msg): - self._print(msg) + status = AnsiColorizer.colorize(status, AnsiColorizer.green) + self.info('[ %s ] %s' % (status, message)) - def print_check_success(self, check): - self._print_result_line(check.descr, 'PASSED', AnsiColorizer.green) + def result(self, check, partition, environ, success): + if success: + result_str = 'OK' + else: + result_str = 'FAIL' - def print_check_failure(self, check, msg = None): - self._print_result_line(check.descr, 'FAILED', AnsiColorizer.red) - # print out also the maintainers of the test - self._print('| Please contact: %s' % - (check.maintainers if check.maintainers else - 'No maintainers specified!')) - self._print("Check's files are left in `%s'" % check.stagedir) - if msg: - self._print('More information: %s' % msg) + self.status(result_str, '%s on %s using %s' % \ + (check.name, partition.fullname, environ.name), just='right') - def _print_result_line(self, status_msg, result_msg, result_color, - status_len=0): - if status_msg: - msg = '| Result: %s' % status_msg - status_len = len(msg) - self._print(msg, end='') + def timestamp(self, msg='', separator=None): + msg = '%s %s' % (msg, datetime.datetime.today().strftime('%c %Z')) + if separator: + self.separator(separator, msg) + else: + self.info(msg) - rem_fill = self.linefill - status_len - msg = ('[ %s ]' % result_msg).center(self.status_msg_fill) - if result_color and self.colorize: - colored_msg = msg.replace( - result_msg, AnsiColorizer.colorize(result_msg, result_color)) - rem_fill = rem_fill + len(colored_msg) - len(msg) - msg = colored_msg - self._print(msg.rjust(rem_fill)) + def error(self, msg): + self._logger.error('%s: %s' % (sys.argv[0], msg)) - def _print(self, msg, **print_opts): - print(msg, file=self.ostream, **print_opts) + def info(self, msg = ''): + self._logger.info(msg) diff --git a/reframe/frontend/resources.py b/reframe/frontend/resources.py index b41b1fcd8b..e53f60afa3 100644 --- a/reframe/frontend/resources.py +++ b/reframe/frontend/resources.py @@ -29,7 +29,7 @@ def __init__(self, prefix = '.', output_prefix = None, stage_prefix = None, else: self.stage_prefix = os.path.join(self.prefix, 'stage', time) - # regression logs + # regression performance logs if not log_prefix: self.log_prefix = os.path.join(self.prefix, 'logs') else: diff --git a/reframe/frontend/statistics.py b/reframe/frontend/statistics.py index b3420dbfc8..47a4365fd2 100644 --- a/reframe/frontend/statistics.py +++ b/reframe/frontend/statistics.py @@ -1,40 +1,95 @@ -import copy +import traceback -class CheckFailureInfo: - def __init__(self, check, env): - self.check_name = check.name - self.check_stagedir = check.stagedir - self.prgenv = env.name +from reframe.core.exceptions import ReframeError -class RegressionStats: - def __init__(self): - self.num_checks = 0 - self.num_fails = 0 - self.num_cases = 0 - self.failed_info = [] +class TestStats: + """Stores test case statistics.""" + def __init__(self, test_cases = []): + if not isinstance(test_cases, list): + raise TypeError('TestStats is expecting a list of TestCase') + # Store test cases per partition internally + self.test_cases_bypart = dict() + for t in test_cases: + partition = t.executor.check.current_partition + partname = partition.fullname if partition else 'None' - def add_failure(self, check, env): - self.num_fails += 1 - self.failed_info.append(CheckFailureInfo(check, env)) + tclist = self.test_cases_bypart.setdefault(partname, []) + tclist.append(t) - def details(self): - lines = [ ' | Summary of failed tests' ] - for fail in self.failed_info: - lines += [ - " * %s failed with `%s'" % (fail.check_name, fail.prgenv), - " Staged in `%s'" % fail.check_stagedir - ] + def num_failures(self, partition = None): + num_fails = 0 + if partition: + num_fails += len([ + t for t in self.test_cases_bypart[partition] if t.failed() + ]) + else: + # count all failures + for tclist in self.test_cases_bypart.values(): + num_fails += len([ t for t in tclist if t.failed() ]) - return '\n'.join(lines) + return num_fails - def summary(self): - return ' | Ran %d case(s) of %d supported check(s) (%d failure(s))' % \ - (self.num_cases, self.num_checks, self.num_fails) + def num_failures_stage(self, stage): + num_fails = 0 + for tclist in self.test_cases_bypart.values(): + num_fails += len([ t for t in tclist if t.failed_stage == stage ]) + return num_fails - def __str__(self): - return self.summary() + + def num_cases(self, partition = None): + num_cases = 0 + if partition: + num_cases += len(self.test_cases_bypart[partition]) + else: + # count all failures + for tclist in self.test_cases_bypart.values(): + num_cases += len(tclist) + + return num_cases + + + def failure_report(self): + line_width = 78 + report = line_width*'=' + '\n' + report += 'SUMMARY OF FAILURES\n' + for partname, tclist in self.test_cases_bypart.items(): + for tf in [ t for t in tclist if t.failed() ]: + check = tf.executor.check + environ_name = check.current_environ.name \ + if check.current_environ else 'None' + report += line_width*'-' + '\n' + report += 'FAILURE INFO for %s\n' % check.name + report += ' * System partition: %s\n' % partname + report += ' * Environment: %s\n' % environ_name + report += ' * Stage directory: %s\n' % check.stagedir + + job_type = 'local' if check.is_local() else 'batch job' + jobid = check.job.jobid if check.job else -1 + report += ' * Job type: %s (id=%s)\n' % (job_type, jobid) + report += ' * Maintainers: %s\n' % check.maintainers + report += ' * Failing phase: %s\n' % tf.failed_stage + report += ' * Reason: ' + if tf.exc_info: + etype, value, stacktrace = tf.exc_info + if isinstance(value, ReframeError): + report += 'caught framework exception: %s\n' % value + elif isinstance(value, KeyboardInterrupt): + report += 'cancelled by user\n' + else: + report += 'caught unexpected exception: %s (%s)\n' % \ + (etype.__name__, value) + report += ''.join( + traceback.format_exception(*tf.exc_info)) + else: + report += "sanity/performance check failure " \ + "(performance log kept in `%s')\n" % \ + check._perf_logfile + + + report += line_width*'-' + '\n' + return report diff --git a/reframe/settings.py b/reframe/settings.py index 1ed680b8a7..6e2f2f6c14 100644 --- a/reframe/settings.py +++ b/reframe/settings.py @@ -8,8 +8,8 @@ from reframe.core.fields import ReadOnlyField class RegressionSettings: - version = ReadOnlyField('2.3') - module_name = ReadOnlyField('PyRegression') + version = ReadOnlyField('2.4') + module_name = ReadOnlyField('reframe') job_state_poll_intervals = ReadOnlyField([ 1, 2, 3 ]) job_init_poll_intervals = ReadOnlyField([ 1 ]) job_init_poll_max_tries = ReadOnlyField(30) @@ -75,5 +75,28 @@ class RegressionSettings: } }) + logging_config = { + 'level': 'DEBUG', + 'handlers': { + 'reframe.log' : { + 'level' : 'DEBUG', + 'format' : '[%(asctime)s] %(levelname)s: ' + '%(check_name)s: %(message)s', + 'append' : False, + }, + + # Output handling + '&1': { + 'level' : 'INFO', + 'format' : '%(message)s' + }, + 'reframe.out' : { + 'level' : 'INFO', + 'format' : '%(message)s', + 'append' : False, + } + } + } + settings = RegressionSettings() diff --git a/reframe/tools/__init__.py b/reframe/tools/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/reframe/utility/sandbox.py b/reframe/utility/sandbox.py index 179139bcfc..df305dff50 100644 --- a/reframe/utility/sandbox.py +++ b/reframe/utility/sandbox.py @@ -1,8 +1,7 @@ from reframe.core.fields import CopyOnWriteField class Sandbox(object): - """ - Sandbox class for manipulating shared resources - """ - environ = CopyOnWriteField('environ') - system = CopyOnWriteField('system') + """Sandbox class for manipulating shared resources.""" + environ = CopyOnWriteField('environ') + system = CopyOnWriteField('system') + check = CopyOnWriteField('check') diff --git a/test_reframe.py b/test_reframe.py index 3fa45b4d11..9422536756 100755 --- a/test_reframe.py +++ b/test_reframe.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 +# import unittest import nose if __name__ == '__main__': + # unittest.main() nose.main() diff --git a/unittests/fixtures.py b/unittests/fixtures.py index 00eb805d74..7adfa0fcb7 100644 --- a/unittests/fixtures.py +++ b/unittests/fixtures.py @@ -74,7 +74,6 @@ } - def force_remove_file(filename): try: os.remove(filename) diff --git a/unittests/resources/badchecks/badargs.py b/unittests/resources/badchecks/badargs.py new file mode 100644 index 0000000000..76e0e835f9 --- /dev/null +++ b/unittests/resources/badchecks/badargs.py @@ -0,0 +1,11 @@ +import os + +from reframe.core.pipeline import RegressionTest + +class EmptyTest(RegressionTest): + def __init__(self, **kwargs): + super().__init__('emptycheck', os.path.dirname(__file__), **kwargs) + + +def _get_checks(): + return [ EmptyTest() ] diff --git a/unittests/resources/badchecks/badentry.py b/unittests/resources/badchecks/badentry.py new file mode 100644 index 0000000000..da9b44622e --- /dev/null +++ b/unittests/resources/badchecks/badentry.py @@ -0,0 +1,10 @@ +import os + +from reframe.core.pipeline import RegressionTest + +class EmptyTest(RegressionTest): + def __init__(self, **kwargs): + super().__init__('emptycheck', os.path.dirname(__file__), **kwargs) + + def _get_checks(**kwargs): + return [ self ] diff --git a/unittests/resources/badchecks/invalid_iterable.py b/unittests/resources/badchecks/invalid_iterable.py index b4ec22b734..4e9e020b9c 100644 --- a/unittests/resources/badchecks/invalid_iterable.py +++ b/unittests/resources/badchecks/invalid_iterable.py @@ -1,2 +1,2 @@ def _get_checks(**kwargs): - return 'foo' + return 123 diff --git a/unittests/resources/badchecks/notacheck.py b/unittests/resources/badchecks/notacheck.py new file mode 100644 index 0000000000..b295837df8 --- /dev/null +++ b/unittests/resources/badchecks/notacheck.py @@ -0,0 +1 @@ +import _foo diff --git a/unittests/resources/frontend_checks.py b/unittests/resources/frontend_checks.py index 294c6b0b21..30712ed10f 100644 --- a/unittests/resources/frontend_checks.py +++ b/unittests/resources/frontend_checks.py @@ -3,10 +3,11 @@ # import re +import sys from reframe.core.pipeline import RunOnlyRegressionTest from reframe.core.environments import * -from reframe.core.exceptions import ReframeError, RegressionFatalError +from reframe.core.exceptions import ReframeError class BaseFrontendCheck(RunOnlyRegressionTest): @@ -34,6 +35,18 @@ def setup(self, system, environ, **job_opts): raise ReframeError('Setup failure') +class BadSetupCheckEarly(BaseFrontendCheck): + def __init__(self, **kwargs): + super().__init__(type(self).__name__, **kwargs) + + self.valid_systems = [ '*' ] + self.valid_prog_environs = [ '*' ] + + + def setup(self, system, environ, **job_opts): + raise ReframeError('Setup failure') + + class NoSystemCheck(BaseFrontendCheck): def __init__(self, **kwargs): super().__init__(type(self).__name__, **kwargs) @@ -69,12 +82,12 @@ def __init__(self, **kwargs): ] } } - self.reference = { '*' : { 'match' : 'foo' } } + self.strict_check = False class CustomPerformanceFailureCheck(BaseFrontendCheck): @@ -83,14 +96,64 @@ def __init__(self, **kwargs): super().__init__(type(self).__name__, **kwargs) self.valid_systems = [ '*' ] self.valid_prog_environs = [ '*' ] + self.strict_check = False def check_performance(self): return False +class KeyboardInterruptCheck(BaseFrontendCheck): + """Simulate keyboard interrupt during test's execution.""" + def __init__(self, phase='wait', **kwargs): + super().__init__(type(self).__name__, **kwargs) + + self.valid_systems = [ '*' ] + self.valid_prog_environs = [ '*' ] + self.phase = phase + + + def setup(self, system, environ, **job_opts): + super().setup(system, environ, **job_opts) + if self.phase == 'setup': + raise KeyboardInterrupt + + + def wait(self): + # We do our nasty stuff in wait() to make things more complicated + if self.phase == 'wait': + raise KeyboardInterrupt + else: + super().wait() + + +class SystemExitCheck(BaseFrontendCheck): + """Simulate system exit from within a check.""" + def __init__(self, **kwargs): + super().__init__(type(self).__name__, **kwargs) + + self.valid_systems = [ '*' ] + self.valid_prog_environs = [ '*' ] + + def wait(self): + # We do our nasty stuff in wait() to make things more complicated + sys.exit(1) + + +class SleepCheck(BaseFrontendCheck): + def __init__(self, sleep_time, **kwargs): + super().__init__(type(self).__name__, **kwargs) + self.name += str(id(self)) + self.sleep_time = sleep_time + self.executable = 'sleep %s' % self.sleep_time + self.sanity_patterns = None + self.valid_systems = [ '*' ] + self.valid_prog_environs = [ '*' ] + + def _get_checks(**kwargs): return [ BadSetupCheck(**kwargs), + BadSetupCheckEarly(**kwargs), NoSystemCheck(**kwargs), NoPrgEnvCheck(**kwargs), SanityFailureCheck(**kwargs), diff --git a/unittests/test_cli.py b/unittests/test_cli.py index 4b476132e3..25a3556f0c 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -2,29 +2,47 @@ import re import shutil import stat - +import sys import unittest import tempfile import reframe.utility.os as os_ext +import reframe.core.logging as logging +from contextlib import redirect_stdout, redirect_stderr +from io import StringIO from reframe.frontend.loader import SiteConfiguration, autodetect_system +from reframe.settings import settings from unittests.fixtures import guess_system, system_with_scheduler + +def run_command_inline(argv, funct, *args, **kwargs): + argv_save = sys.argv + captured_stdout = StringIO() + captured_stderr = StringIO() + sys.argv = argv + try: + with redirect_stdout(captured_stdout): + with redirect_stderr(captured_stderr): + exitcode = funct(*args, **kwargs) + except SystemExit as e: + exitcode = e.code + finally: + sys.argv = argv_save + return (exitcode, + captured_stdout.getvalue(), + captured_stderr.getvalue()) + + class TestFrontend(unittest.TestCase): def setUp(self): self.prefix = tempfile.mkdtemp(dir='unittests') - self.stagedir = os.path.join(self.prefix, 'stage') - self.outputdir = os.path.join(self.prefix, 'output') - self.logdir = os.path.join(self.prefix, 'logs') - self.python = 'python3' - self.executable = 'reframe.py' + self.executable = './reframe.py' self.sysopt = 'generic:login' self.checkfile = 'unittests/resources/hellocheck.py' self.prgenv = 'builtin-gcc' - self.cmdstr = '{python} {executable} {checkopt} ' \ - '-o {outputdir} -s {stagedir} ' \ - '--logdir {logdir} {prgenvopt} ' \ + self.cmdstr = '{executable} {checkopt} ' \ + '--prefix {prefix} {prgenvopt} ' \ '--notimestamp --nocolor {action} ' \ '{sysopt} {local} {options}' self.options = '' @@ -33,55 +51,78 @@ def setUp(self): # needed for enabling/disabling tests based on the current system self.system = autodetect_system(SiteConfiguration()) - - - def _invocation_cmd(self): - return self.cmdstr.format( - python = self.python, + self.logfile = os.path.join(self.prefix, 'reframe.log') + + settings.logging_config = { + 'level': 'INFO', + 'handlers': { + self.logfile : { + 'level' : 'DEBUG', + 'format' : '[%(asctime)s] %(levelname)s: ' + '%(check_name)s: %(message)s', + 'datefmt' : '%FT%T', + 'append' : False, + }, + '&1': { + 'level' : 'INFO', + 'format' : '%(message)s' + }, + } + } + + + def _run_reframe(self): + import reframe.frontend.cli as cli + + argv = self.cmdstr.format( executable = self.executable, checkopt = ('-c %s' % self.checkfile) if self.checkfile else '', - outputdir = self.outputdir, - stagedir = self.stagedir, - logdir = self.logdir, + prefix = self.prefix, prgenvopt = ('-p %s' % self.prgenv) if self.prgenv else '', action = self.action, local = '--force-local' if self.local else '', options = ' '.join(self.options), sysopt = ('--system %s' % self.sysopt) if self.sysopt else '' - ) + ).split() + + return run_command_inline(argv, cli.main) def _stage_exists(self, check_name, partitions, prgenv_name): + stagedir = os.path.join(self.prefix, 'stage') + for p in partitions: if not os.path.exists(os.path.join( - self.stagedir, p, check_name, prgenv_name)): + stagedir, p, check_name, prgenv_name)): return False return True def _perflog_exists(self, check_name, partitions): + logdir = os.path.join(self.prefix, 'logs') for p in partitions: - logfile = os.path.join(self.logdir, p, check_name + '.log') + logfile = os.path.join(logdir, p, check_name + '.log') if not os.path.exists(logfile): return False return True - def test_unsupported_python(self): - # The framework must make sure that an informative message is printed in - # such case. If a SyntaxError happens, that's a problem - self.python = 'python2' - self.action = '-l' - command = os_ext.run_command(self._invocation_cmd()) - self.assertIn('Unsupported Python version', command.stderr) + def assert_log_file_is_saved(self): + outputdir = os.path.join(self.prefix, 'output') + self.assertTrue(os.path.exists(self.logfile)) + self.assertTrue(os.path.exists( + os.path.join(outputdir,os.path.basename(self.logfile)))) def test_check_success(self): - command = os_ext.run_command(self._invocation_cmd(), check=True) - self.assertNotIn('FAILED', command.stdout) - self.assertIn('PASSED', command.stdout) + self.options = [ '--save-log-files' ] + returncode, stdout, stderr = self._run_reframe() + self.assertNotIn('FAILED', stdout) + self.assertIn('PASSED', stdout) + self.assertEqual(0, returncode) + self.assert_log_file_is_saved() @unittest.skipIf(not system_with_scheduler(None), @@ -92,87 +133,85 @@ def test_check_submit_success(self): partition = system_with_scheduler(None) self.local = False - self.sysopt = '%s:%s' % (system.name, partition.name) + self.sysopt = partition.fullname # pick up the programming environment of the partition self.prgenv = partition.environs[0].name - command = os_ext.run_command(self._invocation_cmd(), check=True) - self.assertNotIn('FAILED', command.stdout) - self.assertIn('PASSED', command.stdout) + returncode, stdout, stderr = self._run_reframe() + self.assertNotIn('FAILED', stdout) + self.assertIn('PASSED', stdout) + self.assertEqual(0, returncode) def test_check_failure(self): self.checkfile = 'unittests/resources/frontend_checks.py' self.options = [ '--tag BadSetupCheck' ] - command = os_ext.run_command(self._invocation_cmd()) - self.assertIn('FAILED', command.stdout) - self.assertNotEqual(command.returncode, 0) + + returncode, stdout, stderr = self._run_reframe() + self.assertIn('FAILED', stdout) + self.assertNotEqual(returncode, 0) def test_check_sanity_failure(self): self.checkfile = 'unittests/resources/frontend_checks.py' self.options = [ '--tag SanityFailureCheck' ] - command = os_ext.run_command(self._invocation_cmd()) - self.assertIn('FAILED', command.stdout) - # This is a normal failure, it should not raise any exception - self.assertNotIn('Traceback', command.stderr) - self.assertNotEqual(command.returncode, 0) + returncode, stdout, stderr = self._run_reframe() + self.assertIn('FAILED', stdout) - partitions = re.findall('>>>> Running regression on partition: (\S+)', - command.stdout) + # This is a normal failure, it should not raise any exception + self.assertNotIn('Traceback', stderr) + self.assertNotEqual(returncode, 0) self.assertTrue(self._stage_exists('SanityFailureCheck', - partitions, self.prgenv)) + [ 'login' ], self.prgenv)) def test_performance_check_failure(self): self.checkfile = 'unittests/resources/frontend_checks.py' self.options = [ '--tag PerformanceFailureCheck' ] - command = os_ext.run_command(self._invocation_cmd()) - self.assertIn('FAILED', command.stdout) + returncode, stdout, stderr = self._run_reframe() - # This is a normal failure, it should not raise any exception - self.assertNotIn('Traceback', command.stderr) - self.assertNotEqual(command.returncode, 0) + self.assertIn('FAILED', stdout) - partitions = re.findall('>>>> Running regression on partition: (\S+)', - command.stdout) + # This is a normal failure, it should not raise any exception + self.assertNotIn('Traceback', stderr) + self.assertNotEqual(0, returncode) self.assertTrue(self._stage_exists('PerformanceFailureCheck', - partitions, self.prgenv)) + [ 'login' ], self.prgenv)) self.assertTrue(self._perflog_exists('PerformanceFailureCheck', - partitions)) + [ 'login' ])) def test_custom_performance_check_failure(self): self.checkfile = 'unittests/resources/frontend_checks.py' self.options = [ '--tag CustomPerformanceFailureCheck' ] - command = os_ext.run_command(self._invocation_cmd()) - self.assertIn('FAILED', command.stdout) + + returncode, stdout, stderr = self._run_reframe() + self.assertIn('FAILED', stdout) # This is a normal failure, it should not raise any exception - self.assertNotIn('Traceback', command.stderr) - self.assertNotEqual(command.returncode, 0) + self.assertNotIn('Traceback', stderr) + self.assertNotEqual(0, returncode) - partitions = re.findall('>>>> Running regression on partition: (\S+)', - command.stdout) self.assertTrue(self._stage_exists('CustomPerformanceFailureCheck', - partitions, self.prgenv)) - self.assertNotIn('Check log file:', command.stdout) + [ 'login' ], self.prgenv)) + self.assertNotIn('Check log file:', stdout) def test_skip_system_check_option(self): self.checkfile = 'unittests/resources/frontend_checks.py' self.options = [ '--skip-system-check', '--tag NoSystemCheck' ] - command = os_ext.run_command(self._invocation_cmd(), check=True) - self.assertIn('PASSED', command.stdout) + returncode, stdout, stderr = self._run_reframe() + self.assertIn('PASSED', stdout) def test_skip_prgenv_check_option(self): self.checkfile = 'unittests/resources/frontend_checks.py' self.options = [ '--skip-prgenv-check', '--tag NoPrgEnvCheck' ] - command = os_ext.run_command(self._invocation_cmd(), check=True) - self.assertIn('PASSED', command.stdout) + returncode, stdout, stderr = self._run_reframe() + self.assertIn('PASSED', stdout) + self.assertEqual(0, returncode) def test_sanity_of_checks(self): @@ -180,35 +219,39 @@ def test_sanity_of_checks(self): # will force a syntactic and runtime check at least for the constructor # of the checks self.action = '-l' + self.options = [ '--save-log-files' ] self.checkfile = None - command = os_ext.run_command(self._invocation_cmd(), check=True) + returncode, stdout, stderr = self._run_reframe() + + self.assertEqual(0, returncode) + self.assert_log_file_is_saved() def test_sanity_of_optconfig(self): # Test the sanity of the command line options configuration self.action = '-h' self.checkfile = None - command = os_ext.run_command(self._invocation_cmd(), check=True) + returncode, stdout, stderr = self._run_reframe() def test_checkpath_recursion(self): self.action = '-l' self.checkfile = None - command = os_ext.run_command(self._invocation_cmd(), check=True) + returncode, stdout, stderr = self._run_reframe() num_checks_default = re.search( - 'Found (\d+) check', command.stdout, re.MULTILINE).group(1) + 'Found (\d+) check', stdout, re.MULTILINE).group(1) self.checkfile = 'checks/' self.options = [ '-R' ] - command = os_ext.run_command(self._invocation_cmd(), check=True) + returncode, stdout, stderr = self._run_reframe() num_checks_in_checkdir = re.search( - 'Found (\d+) check', command.stdout, re.MULTILINE).group(1) + 'Found (\d+) check', stdout, re.MULTILINE).group(1) self.assertEqual(num_checks_in_checkdir, num_checks_default) self.options = [] - command = os_ext.run_command(self._invocation_cmd(), check=True) + returncode, stdout, stderr = self._run_reframe() num_checks_in_checkdir = re.search( - 'Found (\d+) check', command.stdout, re.MULTILINE).group(1) + 'Found (\d+) check', stdout, re.MULTILINE).group(1) self.assertEqual('0', num_checks_in_checkdir) diff --git a/unittests/test_core.py b/unittests/test_core.py index 4227f0f4e6..496fff052b 100644 --- a/unittests/test_core.py +++ b/unittests/test_core.py @@ -41,11 +41,15 @@ def setUp(self): module_load('testmod_base') os.environ['_fookey1'] = 'origfoo' + os.environ['_fookey1b'] = 'foovalue1' + os.environ['_fookey2b'] = 'foovalue2' self.environ_save = EnvironmentSnapshot() self.environ = Environment(name='TestEnv1', modules=['testmod_foo']) self.environ.set_variable(name='_fookey1', value='value1') self.environ.set_variable(name='_fookey2', value='value2') self.environ.set_variable(name='_fookey1', value='value3') + self.environ.set_variable(name='_fookey3b', value='$_fookey1b') + self.environ.set_variable(name='_fookey4b', value='${_fookey2b}') self.environ_other = Environment(name='TestEnv2', modules=['testmod_boo']) self.environ_other.set_variable(name='_fookey11', value='value11') @@ -58,7 +62,7 @@ def tearDown(self): def test_setup(self): self.assertEqual(len(self.environ.modules), 1) - self.assertEqual(len(self.environ.variables.keys()), 2) + self.assertEqual(len(self.environ.variables.keys()), 4) self.assertEqual(self.environ.variables['_fookey1'], 'value3') self.assertEqual(self.environ.variables['_fookey2'], 'value2') self.assertIn('testmod_foo', self.environ.modules) @@ -80,6 +84,8 @@ def test_load_restore(self): self.environ.load() self.assertEnvironmentVariable(name='_fookey1', value='value3') self.assertEnvironmentVariable(name='_fookey2', value='value2') + self.assertEnvironmentVariable(name='_fookey3b', value='foovalue1') + self.assertEnvironmentVariable(name='_fookey4b', value='foovalue2') self.assertModulesLoaded(self.environ.modules) self.assertTrue(self.environ.loaded) @@ -213,9 +219,6 @@ def test_compile(self): self.compile_with_env(env, skip_fortran=True) self.compile_dir_with_env(env, skip_fortran=True) except CompilationError as e: - print('Environment: %s' % e.environ.name) - print('Compiler command: %s' % e.command) - print('Compiler error:\n%s' % e.stderr) self.fail("Compilation failed\n") diff --git a/unittests/test_loader.py b/unittests/test_loader.py index 08f83c7434..d2f54b4018 100644 --- a/unittests/test_loader.py +++ b/unittests/test_loader.py @@ -22,6 +22,8 @@ def test_load_success(self): self.assertEqual(2, len(system.partitions)) self.assertNotEqual(None, system.partition('login')) self.assertNotEqual(None, system.partition('gpu')) + self.assertEqual('testsys:login', system.partition('login').fullname) + self.assertEqual('testsys:gpu', system.partition('gpu').fullname) self.assertEqual(3, len(system.partition('login').environs)) self.assertEqual(2, len(system.partition('gpu').environs)) @@ -169,13 +171,13 @@ def test_load_recursive(self): 'unittests/resources', recurse=True, system=self.system, resources=self.resources ) - self.assertEqual(10, len(checks)) + self.assertEqual(11, len(checks)) def test_load_all(self): checks = self.loader_with_path.load_all(system=self.system, resources=self.resources) - self.assertEqual(9, len(checks)) + self.assertEqual(10, len(checks)) def test_load_all_with_prefix(self): diff --git a/unittests/test_logging.py b/unittests/test_logging.py new file mode 100644 index 0000000000..a48c2a8901 --- /dev/null +++ b/unittests/test_logging.py @@ -0,0 +1,287 @@ +import os +import logging +import tempfile +import unittest +import sys + +from contextlib import contextmanager +from datetime import datetime +from io import StringIO +from unittest.mock import patch + +from reframe.core.exceptions import ReframeError, ConfigurationError +from reframe.core.logging import * +from reframe.core.pipeline import RegressionTest +from reframe.core.systems import System +from reframe.frontend.resources import ResourcesManager + + +class TestLogger(unittest.TestCase): + def setUp(self): + tmpfd, self.logfile = tempfile.mkstemp() + os.close(tmpfd) + + self.logger = Logger('reframe') + self.handler = RotatingFileHandler(self.logfile) + self.formatter = logging.Formatter( + fmt='[%(asctime)s] %(levelname)s: %(check_name)s: %(message)s', + datefmt='%FT%T') + + self.handler.setFormatter(self.formatter) + self.logger.addHandler(self.handler) + + # Use the logger adapter that defines check_name + self.logger_without_check = LoggerAdapter(self.logger) + + # Logger adapter with an associated check + self.logger_with_check = LoggerAdapter( + self.logger, RegressionTest( + 'random_check', '.', System('foosys'), ResourcesManager() + ) + ) + + + def tearDown(self): + os.remove(self.logfile) + + + def found_in_logfile(self, string): + found = False + with open(self.logfile, 'rt') as f: + found = string in f.read() + + return found + + + def test_invalid_loglevel(self): + self.assertRaises(ReframeError, self.logger.setLevel, 'level') + self.assertRaises(ReframeError, Logger, 'logger', 'level') + + + def test_custom_loglevels(self): + self.logger_without_check.info('foo') + self.logger_without_check.verbose('bar') + + self.assertTrue(os.path.exists(self.logfile)) + self.assertTrue(self.found_in_logfile('info')) + self.assertTrue(self.found_in_logfile('verbose')) + self.assertTrue(self.found_in_logfile('reframe')) + + + def test_check_logger(self): + self.logger_with_check.info('foo') + self.logger_with_check.verbose('bar') + + self.assertTrue(os.path.exists(self.logfile)) + self.assertTrue(self.found_in_logfile('info')) + self.assertTrue(self.found_in_logfile('verbose')) + self.assertTrue(self.found_in_logfile('random_check')) + + + def test_custom_handler_levels(self): + self.handler.setLevel('verbose') + self.handler.setLevel(VERBOSE) + + self.logger_with_check.debug('foo') + self.logger_with_check.verbose('bar') + + self.assertFalse(self.found_in_logfile('foo')) + self.assertTrue(self.found_in_logfile('bar')) + + + def test_logger_levels(self): + self.logger_with_check.setLevel('verbose') + self.logger_with_check.setLevel(VERBOSE) + + self.logger_with_check.debug('bar') + self.logger_with_check.verbose('foo') + + self.assertFalse(self.found_in_logfile('bar')) + self.assertTrue(self.found_in_logfile('foo')) + + +class TestLoggerConfiguration(unittest.TestCase): + def setUp(self): + + self.logfile = 'reframe.log' + self.logging_config = { + 'level': 'INFO', + 'handlers': { + self.logfile: { + 'level' : 'WARNING', + 'format' : '[%(asctime)s] %(levelname)s: %(message)s', + 'datefmt' : '%F', + 'append' : True, + } + } + } + self.logger = None + self.check = RegressionTest( + 'random_check', '.', System('foosys'), ResourcesManager() + ) + + + def tearDown(self): + if os.path.exists(self.logfile): + os.remove(self.logfile) + + + def found_in_logfile(self, string): + for handler in self.logger.handlers: + handler.flush() + handler.close() + + found = False + with open(self.logfile, 'rt') as f: + found = string in f.read() + + return found + + + def set_logger(self): + from reframe.core.logging import load_from_dict + self.logger = load_from_dict(self.logging_config) + + + def close_handlers(self): + for h in self.logger.handlers: + h.close() + + + def flush_handlers(self): + for h in self.logger.handlers: + h.flush() + + + def test_valid_level(self): + self.set_logger() + self.assertEqual(INFO, self.logger.getEffectiveLevel()) + + + def test_no_handlers(self): + del self.logging_config['handlers'] + self.assertRaises(ConfigurationError, self.set_logger) + + + def test_empty_handlers(self): + self.logging_config['handlers'] = {} + self.assertRaises(ConfigurationError, self.set_logger) + + + def test_handler_level(self): + self.set_logger() + self.logger.info('foo') + self.logger.warning('bar') + + self.assertFalse(self.found_in_logfile('foo')) + self.assertTrue(self.found_in_logfile('bar')) + + + def test_handler_append(self): + self.set_logger() + self.logger.warning('foo') + self.close_handlers() + + # Reload logger + self.set_logger() + self.logger.warning('bar') + + self.assertTrue(self.found_in_logfile('foo')) + self.assertTrue(self.found_in_logfile('bar')) + + + def test_handler_noappend(self): + self.logging_config = { + 'level' : 'INFO', + 'handlers': { + self.logfile: { + 'level' : 'WARNING', + 'format' : '[%(asctime)s] %(levelname)s: %(message)s', + 'datefmt': '%F', + 'append' : False, + } + } + } + + self.set_logger() + self.logger.warning('foo') + self.close_handlers() + + # Reload logger + self.set_logger() + self.logger.warning('bar') + + self.assertFalse(self.found_in_logfile('foo')) + self.assertTrue(self.found_in_logfile('bar')) + + + # FIXME: this test is not robust + def test_date_format(self): + self.set_logger() + self.logger.warning('foo') + self.flush_handlers() + self.assertTrue(self.found_in_logfile(datetime.now().strftime('%F'))) + + + def test_stream_handler_stdout(self): + self.logging_config = { + 'level' : 'INFO', + 'handlers': { + '&1': {}, + } + } + self.set_logger() + + self.assertEqual(len(self.logger.handlers), 1) + handler = self.logger.handlers[0] + + self.assertTrue(isinstance(handler, StreamHandler)) + self.assertEqual(handler.stream, sys.stdout) + + + def test_stream_handler_stderr(self): + self.logging_config = { + 'level' : 'INFO', + 'handlers': { + '&2': {}, + } + } + self.set_logger() + + self.assertEqual(len(self.logger.handlers), 1) + handler = self.logger.handlers[0] + + self.assertTrue(isinstance(handler, StreamHandler)) + self.assertEqual(handler.stream, sys.stderr) + + + def test_multiple_handlers(self): + self.logging_config = { + 'level' : 'INFO', + 'handlers': { + '&1': {}, + self.logfile: {}, + } + } + self.set_logger() + self.assertEqual(len(self.logger.handlers), 2) + + + def test_global_noconfig(self): + # This is to test the case when no configuration is set, but since the + # order the unit tests are invoked is arbitrary, we emulate the + # 'no-config' state by passing `None` to `configure_logging()` + + configure_logging(None) + frontend_logger = getlogger('frontend') + check_logger = getlogger('check', self.check) + self.assertEqual(None, frontend_logger.logger) + self.assertEqual(None, check_logger.logger) + + + def test_global_config(self): + configure_logging(self.logging_config) + frontend_logger = getlogger('frontend') + check_logger = getlogger('check', self.check) + self.assertNotEqual(None, frontend_logger.logger) + self.assertNotEqual(None, check_logger.logger) diff --git a/unittests/test_parsers.py b/unittests/test_parsers.py index 4f2ee1c67d..701cbbc48c 100644 --- a/unittests/test_parsers.py +++ b/unittests/test_parsers.py @@ -15,7 +15,7 @@ class StatefulParserTest(unittest.TestCase): def setUp(self): self.system = System('daint') - self.system.partitions.append(SystemPartition('gpu')) + self.system.partitions.append(SystemPartition('gpu', self.system)) self.resourcesdir = tempfile.mkdtemp(dir='unittests') self.resources = ResourcesManager(prefix=self.resourcesdir) diff --git a/unittests/test_pipeline.py b/unittests/test_pipeline.py index 17f91de895..81716a5a5b 100644 --- a/unittests/test_pipeline.py +++ b/unittests/test_pipeline.py @@ -230,7 +230,7 @@ def test_supports_system(self): class TestRegressionOutputScan(unittest.TestCase): def setUp(self): self.system = System('testsys') - self.system.partitions.append(SystemPartition('gpu')) + self.system.partitions.append(SystemPartition('gpu', self.system)) self.resourcesdir = tempfile.mkdtemp(dir='unittests') self.resources = ResourcesManager(prefix=self.resourcesdir) @@ -353,7 +353,6 @@ def test_sanity_multiple_patterns(self): def test_multiple_files(self): - # Create multiple files following the same pattern files = [ tempfile.NamedTemporaryFile(mode='wt', prefix='regtmp', dir=self.test.prefix, @@ -428,14 +427,6 @@ def test_strict_performance_check(self): self.assertFalse(self.test.check_performance()) - def test_nostrict_performance_check(self): - self.write_performance_output(performance1=1.4, - performance2=2.7, - performance3=3.2) - self.test.strict_check = False - self.assertTrue(self.test.check_performance_relaxed()) - - def test_invalid_threshold(self): self.write_performance_output(performance1=1.3, performance2=1.8, diff --git a/unittests/test_policies.py b/unittests/test_policies.py new file mode 100644 index 0000000000..22531034f3 --- /dev/null +++ b/unittests/test_policies.py @@ -0,0 +1,253 @@ +import shutil +import tempfile +import unittest + +from datetime import datetime +from reframe.frontend.executors import * +from reframe.frontend.executors.policies import * +from reframe.frontend.loader import * +from reframe.frontend.resources import ResourcesManager +from reframe.settings import settings + +from unittests.fixtures import TEST_SITE_CONFIG + +class TestSerialExecutionPolicy(unittest.TestCase): + def setUp(self): + # Load a system configuration + self.site_config = SiteConfiguration() + self.site_config.load_from_dict(settings.site_configuration) + self.system = self.site_config.systems['generic'] + self.resourcesdir = tempfile.mkdtemp(dir='unittests') + self.resources = ResourcesManager(prefix=self.resourcesdir) + self.loader = RegressionCheckLoader(['unittests/resources']) + + # Setup the runner + self.runner = Runner(SerialExecutionPolicy()) + self.checks = self.loader.load_all(system=self.system, + resources=self.resources) + + def tearDown(self): + shutil.rmtree(self.resourcesdir, ignore_errors=True) + + + def test_runall(self): + self.runner.runall(self.checks, self.system) + + stats = self.runner.stats + self.assertEqual(7, stats.num_cases()) + self.assertEqual(5, stats.num_failures()) + self.assertEqual(2, stats.num_failures_stage('setup')) + self.assertEqual(1, stats.num_failures_stage('sanity')) + self.assertEqual(2, stats.num_failures_stage('performance')) + + + def test_runall_skip_system_check(self): + self.runner.policy.skip_system_check = True + self.runner.runall(self.checks, self.system) + + stats = self.runner.stats + self.assertEqual(8, stats.num_cases()) + self.assertEqual(5, stats.num_failures()) + self.assertEqual(2, stats.num_failures_stage('setup')) + self.assertEqual(1, stats.num_failures_stage('sanity')) + self.assertEqual(2, stats.num_failures_stage('performance')) + + + def test_runall_skip_prgenv_check(self): + self.runner.policy.skip_environ_check = True + self.runner.runall(self.checks, self.system) + + stats = self.runner.stats + self.assertEqual(8, stats.num_cases()) + self.assertEqual(5, stats.num_failures()) + self.assertEqual(2, stats.num_failures_stage('setup')) + self.assertEqual(1, stats.num_failures_stage('sanity')) + self.assertEqual(2, stats.num_failures_stage('performance')) + + + def test_runall_skip_sanity_check(self): + self.runner.policy.skip_sanity_check = True + self.runner.runall(self.checks, self.system) + + stats = self.runner.stats + self.assertEqual(7, stats.num_cases()) + self.assertEqual(4, stats.num_failures()) + self.assertEqual(2, stats.num_failures_stage('setup')) + self.assertEqual(0, stats.num_failures_stage('sanity')) + self.assertEqual(2, stats.num_failures_stage('performance')) + + + def test_runall_skip_performance_check(self): + self.runner.policy.skip_performance_check = True + self.runner.runall(self.checks, self.system) + + stats = self.runner.stats + self.assertEqual(7, stats.num_cases()) + self.assertEqual(3, stats.num_failures()) + self.assertEqual(2, stats.num_failures_stage('setup')) + self.assertEqual(1, stats.num_failures_stage('sanity')) + self.assertEqual(0, stats.num_failures_stage('performance')) + + + def test_run_relaxed_performance_check(self): + self.runner.policy.relax_performance_check = True + self.runner.runall(self.checks, self.system) + + stats = self.runner.stats + self.assertEqual(7, stats.num_cases()) + self.assertEqual(3, stats.num_failures()) + self.assertEqual(2, stats.num_failures_stage('setup')) + self.assertEqual(1, stats.num_failures_stage('sanity')) + self.assertEqual(0, stats.num_failures_stage('performance')) + + + def test_kbd_interrupt_within_test(self): + from unittests.resources.frontend_checks import KeyboardInterruptCheck + + check = KeyboardInterruptCheck(system=self.system, + resources=self.resources) + self.assertRaises(KeyboardInterrupt, self.runner.runall, + [ check ], self.system) + stats = self.runner.stats + self.assertEqual(1, stats.num_failures()) + + + def test_system_exit_within_test(self): + from unittests.resources.frontend_checks import SystemExitCheck + + check = SystemExitCheck(system=self.system, resources=self.resources) + + # This should not raise and should not exit + self.runner.runall([ check ], self.system) + stats = self.runner.stats + self.assertEqual(1, stats.num_failures()) + + +class TestAsynchronousExecutionPolicy(TestSerialExecutionPolicy): + def setUp(self): + super().setUp() + self.runner = Runner(AsynchronousExecutionPolicy()) + + + def set_max_jobs(self, value): + for p in self.system.partitions: + p.max_jobs = value + + + def test_concurrency_unlimited(self): + from unittests.resources.frontend_checks import SleepCheck + + checks = [ SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources) ] + self.set_max_jobs(3) + + t_run = datetime.now() + self.runner.runall(checks, self.system) + t_run = datetime.now() - t_run + self.assertLess(t_run.seconds, 2) + + self.assertEqual(3, self.runner.stats.num_cases()) + self.assertEqual(0, self.runner.stats.num_failures()) + + + def test_concurrency_limited(self): + from unittests.resources.frontend_checks import SleepCheck + + checks = [ SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources) ] + self.set_max_jobs(2) + + t_run = datetime.now() + self.runner.runall(checks, self.system) + t_run = datetime.now() - t_run + self.assertGreaterEqual(t_run.seconds, 2) + self.assertLess(t_run.seconds, 3) + + self.assertEqual(3, self.runner.stats.num_cases()) + self.assertEqual(0, self.runner.stats.num_failures()) + + + def test_concurrency_none(self): + from unittests.resources.frontend_checks import SleepCheck + + checks = [ SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources) ] + self.set_max_jobs(1) + + t_run = datetime.now() + self.runner.runall(checks, self.system) + t_run = datetime.now() - t_run + self.assertGreaterEqual(t_run.seconds, 3) + + self.assertEqual(3, self.runner.stats.num_cases()) + self.assertEqual(0, self.runner.stats.num_failures()) + + + def _run_checks(self, checks, max_jobs): + self.set_max_jobs(max_jobs) + self.assertRaises(KeyboardInterrupt, self.runner.runall, + checks, self.system) + + self.assertEqual(4, self.runner.stats.num_cases()) + self.assertEqual(4, self.runner.stats.num_failures()) + + + def test_kbd_interrupt_in_wait_with_concurrency(self): + from unittests.resources.frontend_checks import SleepCheck, \ + KeyboardInterruptCheck + + checks = [ + KeyboardInterruptCheck(system=self.system, + resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources) + ] + self._run_checks(checks, 4) + + + def test_kbd_interrupt_in_wait_with_limited_concurrency(self): + from unittests.resources.frontend_checks import SleepCheck, \ + KeyboardInterruptCheck + + checks = [ + KeyboardInterruptCheck(system=self.system, + resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources) + ] + self._run_checks(checks, 2) + + + def test_kbd_interrupt_in_setup_with_concurrency(self): + from unittests.resources.frontend_checks import SleepCheck, \ + KeyboardInterruptCheck + + checks = [ + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + KeyboardInterruptCheck(phase='setup', + system=self.system, + resources=self.resources), + ] + self._run_checks(checks, 4) + + + def test_kbd_interrupt_in_setup_with_limited_concurrency(self): + from unittests.resources.frontend_checks import SleepCheck, \ + KeyboardInterruptCheck + + checks = [ + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + SleepCheck(1, system=self.system, resources=self.resources), + KeyboardInterruptCheck(phase='setup', + system=self.system, + resources=self.resources), + ] + self._run_checks(checks, 2)