From 87a2224e3a00b56c3a27e3a8cbe66c23daad969b Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Tue, 1 Oct 2024 19:37:01 +0200 Subject: [PATCH 1/6] WIP: Import results from perflogs --- reframe/frontend/argparse.py | 5 +- reframe/frontend/cli.py | 21 ++++ reframe/frontend/reporting/__init__.py | 130 ++++++++++++++++++++++++- 3 files changed, 153 insertions(+), 3 deletions(-) diff --git a/reframe/frontend/argparse.py b/reframe/frontend/argparse.py index ceec757b1..288735df5 100644 --- a/reframe/frontend/argparse.py +++ b/reframe/frontend/argparse.py @@ -196,7 +196,7 @@ def add_argument(self, *flags, **kwargs): if flags and opt_name is None: # A positional argument - opt_name = flags[-1] + opt_name, flags = flags[-1], flags[:-1] if opt_name is None: raise ValueError('could not infer a dest name: no flags defined') @@ -230,7 +230,8 @@ def add_argument(self, *flags, **kwargs): except KeyError: self._defaults.__dict__[opt_name] = None - if not flags: + positional = kwargs.pop('positional', False) + if not flags and not positional: return None return self._holder.add_argument(*flags, **kwargs) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 92d9a1e01..f6d0f6c22 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -13,6 +13,7 @@ import sys import time import traceback +import yaml import reframe.core.config as config import reframe.core.exceptions as errors @@ -479,6 +480,10 @@ def main(): action_options.add_argument( '-V', '--version', action='version', version=osext.reframe_version() ) + action_options.add_argument( + '--import-results', action='store', metavar='SPECFILE', + help='Import results to the database' + ) # Run options run_options.add_argument( @@ -831,6 +836,7 @@ def main(): action='store_true', help='Use a login shell for job scripts' ) + argparser.add_argument('args', metavar='ARGS', nargs='*', positional=True) def restrict_logging(): '''Restrict logging to errors only. @@ -1066,6 +1072,21 @@ def restrict_logging(): ) sys.exit(0) + if options.import_results: + with exit_gracefully_on_error('failed to import results', printer): + with open(options.import_results) as fp: + spec = yaml.load(fp, yaml.Loader) + + if spec['import']['from'] == 'perflog': + kwargs = spec['import'] + del kwargs['from'] + report = reporting.RunReport.create_from_perflog(*options.args, + **kwargs) + # report.save('foo.json', link_to_last=False) + uuid = report.store() + printer.info(f'Results imported successfully as session {uuid}') + sys.exit(0) + # Show configuration after everything is set up if options.show_config: # Restore logging level diff --git a/reframe/frontend/reporting/__init__.py b/reframe/frontend/reporting/__init__.py index c1ff5745c..29fb39a88 100644 --- a/reframe/frontend/reporting/__init__.py +++ b/reframe/frontend/reporting/__init__.py @@ -13,10 +13,12 @@ import os import re import socket +import sys import time import uuid from collections import UserDict from collections.abc import Hashable +from datetime import datetime from filelock import FileLock import reframe as rfm @@ -26,7 +28,7 @@ from reframe.core.logging import getlogger, _format_time_rfc3339, time_function from reframe.core.runtime import runtime from reframe.core.warnings import suppress_deprecations -from reframe.utility import nodelist_abbrev, OrderedSet +from reframe.utility import nodelist_abbrev, nodelist_expand, OrderedSet from .storage import StorageBackend from .utility import Aggregator, parse_cmp_spec, parse_query_spec @@ -270,6 +272,132 @@ def __getitem__(self, key): def __rfm_json_encode__(self): return self.__report + @classmethod + def create_from_perflog(cls, *logfiles, format=None, + merge_records=None, datefmt=None, + ignore_lines=None, ignore_records=None): + def _filter_record(rec): + if ignore_records is None: + return False + else: + return eval(ignore_records, {}, rec) + + def _do_merge(dst, src): + system = src.get('system') + part = src.get('partition') + pvar = src.get('pvar') + pval = src.get('pval') + pref = src.get('pref') + plower = src.get('plower') + pupper = src.get('pupper') + punit = src.get('punit') + if pvar is None: + return dst + + if system is not None and part is not None: + pvar = f'{system}:{part}:{pvar}' + + # Convert to numbers before inserting + def _convert(x): + if x is None: + return x + + if x == 'null': + return None + + return float(x) + + pval = _convert(pval) + pref = _convert(pref) + pupper = _convert(pupper) + plower = _convert(plower) + dst['perfvalues'][pvar] = (pval, pref, plower, pupper, punit) + dst.pop('pvar', None) + dst.pop('pval', None) + dst.pop('pref', None) + dst.pop('plower', None) + dst.pop('pupper', None) + dst.pop('punit', None) + return dst + + patt = re.compile(format) + report = RunReport() + session_uuid = report['session_info']['uuid'] + run_index = 0 + test_index = 0 + t_report_start = sys.maxsize + t_report_end = 0 + num_failures = 0 + testcases = [] + for filename in logfiles: + records = {} + with open(filename) as fp: + for lineno, line in enumerate(fp, start=1): + if lineno in ignore_lines: + continue + + m = patt.match(line) + if not m: + continue + + rec = m.groupdict() + if _filter_record(rec): + continue + + # Add parameters as separate fields + if 'name' in rec: + params = rec['name'].split()[1:] + for spec in params: + p, v = spec.split('=', maxsplit=1) + rec[p[1:]] = v + + # Groom the record + if 'job_completion_time' in rec: + key = 'job_completion_time' + date = datetime.strptime(rec[key], datefmt) + rec[key] = date.strftime(_DATETIME_FMT) + ts = date.timestamp() + rec[f'{key}_unix'] = ts + t_report_start = min(t_report_start, ts) + t_report_end = max(t_report_end, ts) + + if 'job_nodelist' in rec: + key = 'job_nodelist' + rec[key] = nodelist_expand(rec[key]) + + rec['uuid'] = f'{session_uuid}:{run_index}:{test_index}' + rec.setdefault('result', 'pass') + if rec['result'] != 'pass': + num_failures += 1 + + if not merge_records: + key = lineno + elif len(merge_records) == 1: + key = rec[merge_records[0]] + else: + key = tuple(rec[k] for k in merge_records) + + if key in records: + records[key] = _do_merge(records[key], rec) + else: + rec['perfvalues'] = {} + records[key] = _do_merge(rec, rec) + test_index += 1 + + testcases += list(records.values()) + + report.update_timestamps(t_report_start, t_report_end) + report._add_run({ + 'num_cases': len(testcases), + 'num_failures': num_failures, + 'run_index': run_index, + 'testcases': testcases + }) + return report + + def _add_run(self, run): + self.__report['runs'].append(run) + def update_session_info(self, session_info): # Remove timestamps for key, val in session_info.items(): From 70194d78a49319844cf4d30dcd5b800573b28668 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 2 Oct 2024 16:49:14 +0200 Subject: [PATCH 2/6] Support importing results from other DB files --- reframe/frontend/cli.py | 22 +++-- reframe/frontend/reporting/__init__.py | 112 +++++++++++++++++++------ reframe/frontend/reporting/storage.py | 24 ++++-- 3 files changed, 121 insertions(+), 37 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index f6d0f6c22..6a17decf0 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1080,11 +1080,23 @@ def restrict_logging(): if spec['import']['from'] == 'perflog': kwargs = spec['import'] del kwargs['from'] - report = reporting.RunReport.create_from_perflog(*options.args, - **kwargs) - # report.save('foo.json', link_to_last=False) - uuid = report.store() - printer.info(f'Results imported successfully as session {uuid}') + reports = reporting.RunReport.create_from_perflog( + *options.args, **kwargs + ) + elif spec['import']['from'] == 'sqlite': + kwargs = spec['import'] + del kwargs['from'] + reports = reporting.RunReport.create_from_sqlite_db( + *options.args, **kwargs + ) + + for rpt in reports: + uuid = rpt.store() + printer.info(f'Successfully imported session {uuid}') + + if not reports: + printer.info('No sessions have been imported') + sys.exit(0) # Show configuration after everything is set up diff --git a/reframe/frontend/reporting/__init__.py b/reframe/frontend/reporting/__init__.py index 29fb39a88..c6957c013 100644 --- a/reframe/frontend/reporting/__init__.py +++ b/reframe/frontend/reporting/__init__.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: BSD-3-Clause import decimal +import collections import functools import inspect import json @@ -246,8 +247,8 @@ class RunReport: ''' def __init__(self): # Initialize the report with the required fields - self.__filename = None - self.__report = { + self._filename = None + self._report = { 'session_info': { 'data_version': DATA_VERSION, 'hostname': socket.gethostname(), @@ -261,16 +262,16 @@ def __init__(self): @property def filename(self): - return self.__filename + return self._filename def __getattr__(self, name): - return getattr(self.__report, name) + return getattr(self._report, name) def __getitem__(self, key): - return self.__report[key] + return self._report[key] def __rfm_json_encode__(self): - return self.__report + return self._report @classmethod def create_from_perflog(cls, *logfiles, format=None, @@ -393,23 +394,60 @@ def _convert(x): 'run_index': run_index, 'testcases': testcases }) - return report + return [report] + + @classmethod + def create_from_sqlite_db(cls, *dbfiles, exclude_sessions=None, + include_sessions=None, time_period=None): + dst_backend = StorageBackend.default() + dst_schema = dst_backend.schema_version() + if not time_period: + time_period = {'start': '19700101T0000+0000', 'end': 'now'} + + start = time_period.get('start', '19700101T0000+0000') + end = time_period.get('end', 'now') + ts_start, ts_end = parse_time_period(f'{start}:{end}') + include_sessions = set(include_sessions) if include_sessions else set() + exclude_sessions = set(exclude_sessions) if exclude_sessions else set() + reports = [] + for filename in dbfiles: + src_backend = StorageBackend.create('sqlite', filename) + src_schema = src_backend.schema_version() + if src_schema != dst_schema: + getlogger().warning( + f'ignoring DB file {filename}: schema version mismatch: ' + f'cannot import from DB v{src_schema} to v{dst_schema}' + ) + continue + + sessions = src_backend.fetch_sessions_time_period(ts_start, ts_end) + for sess in sessions: + uuid = sess['session_info']['uuid'] + if include_sessions and uuid not in include_sessions: + continue + + if exclude_sessions and uuid in exclude_sessions: + continue + + reports.append(_ImportedRunReport(sess)) + + return reports def _add_run(self, run): - self.__report['runs'].append(run) + self._report['runs'].append(run) def update_session_info(self, session_info): # Remove timestamps for key, val in session_info.items(): if not key.startswith('time_'): - self.__report['session_info'][key] = val + self._report['session_info'][key] = val def update_restored_cases(self, restored_cases, restored_session): - self.__report['restored_cases'] = [restored_session.case(c) - for c in restored_cases] + self._report['restored_cases'] = [restored_session.case(c) + for c in restored_cases] def update_timestamps(self, ts_start, ts_end): - self.__report['session_info'].update({ + self._report['session_info'].update({ 'time_start': time.strftime(_DATETIME_FMT, time.localtime(ts_start)), 'time_start_unix': ts_start, @@ -426,10 +464,10 @@ def update_extras(self, extras): raise ValueError('cannot use reserved keys ' f'`{",".join(clashed_keys)}` as session extras') - self.__report['session_info'].update(extras) + self._report['session_info'].update(extras) def update_run_stats(self, stats): - session_uuid = self.__report['session_info']['uuid'] + session_uuid = self._report['session_info']['uuid'] for runidx, tasks in stats.runs(): testcases = [] num_failures = 0 @@ -530,7 +568,7 @@ def update_run_stats(self, stats): testcases.append(entry) - self.__report['runs'].append({ + self._report['runs'].append({ 'num_cases': len(tasks), 'num_failures': num_failures, 'num_aborted': num_aborted, @@ -540,23 +578,23 @@ def update_run_stats(self, stats): }) # Update session info from stats - self.__report['session_info'].update({ - 'num_cases': self.__report['runs'][0]['num_cases'], - 'num_failures': self.__report['runs'][-1]['num_failures'], - 'num_aborted': self.__report['runs'][-1]['num_aborted'], - 'num_skipped': self.__report['runs'][-1]['num_skipped'] + self._report['session_info'].update({ + 'num_cases': self._report['runs'][0]['num_cases'], + 'num_failures': self._report['runs'][-1]['num_failures'], + 'num_aborted': self._report['runs'][-1]['num_aborted'], + 'num_skipped': self._report['runs'][-1]['num_skipped'] }) def _save(self, filename, compress, link_to_last): filename = _expand_report_filename(filename, newfile=True) with open(filename, 'w') as fp: if compress: - jsonext.dump(self.__report, fp) + jsonext.dump(self._report, fp) else: - jsonext.dump(self.__report, fp, indent=2) + jsonext.dump(self._report, fp, indent=2) fp.write('\n') - self.__filename = filename + self._filename = filename if not link_to_last: return @@ -576,7 +614,7 @@ def _save(self, filename, compress, link_to_last): def is_empty(self): '''Return :obj:`True` is no test cases where run''' - return self.__report['session_info']['num_cases'] == 0 + return self._report['session_info']['num_cases'] == 0 def save(self, filename, compress=False, link_to_last=True): prefix = os.path.dirname(filename) or '.' @@ -591,7 +629,7 @@ def store(self): def generate_xml_report(self): '''Generate a JUnit report from a standard ReFrame JSON report.''' - report = self.__report + report = self._report xml_testsuites = etree.Element('testsuites') # Create a XSD-friendly timestamp session_ts = time.strftime( @@ -688,6 +726,30 @@ def __missing__(self, key): raise KeyError(key) +class _ImportedRunReport(RunReport): + def __init__(self, report): + self._filename = f'{report["session_info"]["uuid"]}.json' + self._report = report + + def _add_run(self, run): + raise NotImplementedError + + def update_session_info(self, session_info): + raise NotImplementedError + + def update_restored_cases(self, restored_cases, restored_session): + raise NotImplementedError + + def update_timestamps(self, ts_start, ts_end): + raise NotImplementedError + + def update_extras(self, extras): + raise NotImplementedError + + def update_run_stats(self, stats): + raise NotImplementedError + + def _group_key(groups, testcase: _TCProxy): key = [] for grp in groups: diff --git a/reframe/frontend/reporting/storage.py b/reframe/frontend/reporting/storage.py index 42a591a11..c9a955a87 100644 --- a/reframe/frontend/reporting/storage.py +++ b/reframe/frontend/reporting/storage.py @@ -74,11 +74,11 @@ def remove_sessions(self, selector: QuerySelector): class _SqliteStorage(StorageBackend): - SCHEMA_VERSION = '1.0' + _SCHEMA_VERSION = '1.0' - def __init__(self): - self.__db_file = os.path.join( - osext.expandvars(runtime().get_option('storage/0/sqlite_db_file')) + def __init__(self, dbfile=None): + self.__db_file = dbfile or osext.expandvars( + runtime().get_option('storage/0/sqlite_db_file') ) mode = runtime().get_option( 'storage/0/sqlite_db_file_mode' @@ -88,6 +88,16 @@ def __init__(self): else: self.__db_file_mode = mode + def schema_version(self): + with self._db_connect(self._db_file()) as conn: + result = conn.execute( + 'SELECT schema_version FROM metadata LIMIT 1' + ).fetchone() + if not result: + raise ReframeError(f'no DB metadata found in {self.__db_file}') + + return result[0] + def _db_file(self): prefix = os.path.dirname(self.__db_file) if not os.path.exists(self.__db_file): @@ -173,14 +183,14 @@ def _db_schema_check(self): # DB is new, insert the schema version with self._db_connect(self.__db_file) as conn: conn.execute('INSERT INTO metadata VALUES(:schema_version)', - {'schema_version': self.SCHEMA_VERSION}) + {'schema_version': self._SCHEMA_VERSION}) else: found_ver = results[0][0] - if found_ver != self.SCHEMA_VERSION: + if found_ver != self._SCHEMA_VERSION: raise ReframeError( f'results DB in {self.__db_file!r} is ' 'of incompatible version: ' - f'found {found_ver}, required: {self.SCHEMA_VERSION}' + f'found {found_ver}, required: {self._SCHEMA_VERSION}' ) def _db_store_report(self, conn, report, report_file_path): From 50650264dc0ccd1719e95bfd07b87c39fe222d07 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 24 Oct 2024 11:48:38 +0200 Subject: [PATCH 3/6] Fix post-rebase failures --- reframe/frontend/cli.py | 5 +++-- reframe/frontend/reporting/__init__.py | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 6a17decf0..e0b7494d0 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1092,10 +1092,11 @@ def restrict_logging(): for rpt in reports: uuid = rpt.store() - printer.info(f'Successfully imported session {uuid}') + printer.verbose(f'Successfully imported session {uuid}') + printer.info(f'Successfully imported {len(reports)} sessions') if not reports: - printer.info('No sessions have been imported') + printer.warning('No sessions have been imported') sys.exit(0) diff --git a/reframe/frontend/reporting/__init__.py b/reframe/frontend/reporting/__init__.py index c6957c013..c7d325c62 100644 --- a/reframe/frontend/reporting/__init__.py +++ b/reframe/frontend/reporting/__init__.py @@ -31,7 +31,8 @@ from reframe.core.warnings import suppress_deprecations from reframe.utility import nodelist_abbrev, nodelist_expand, OrderedSet from .storage import StorageBackend -from .utility import Aggregator, parse_cmp_spec, parse_query_spec +from .utility import (Aggregator, parse_cmp_spec, + parse_query_spec, parse_time_period) # The schema data version # Major version bumps are expected to break the validation of previous schemas @@ -406,7 +407,7 @@ def create_from_sqlite_db(cls, *dbfiles, exclude_sessions=None, start = time_period.get('start', '19700101T0000+0000') end = time_period.get('end', 'now') - ts_start, ts_end = parse_time_period(f'{start}:{end}') + qs = parse_query_spec(f'{start}:{end}') include_sessions = set(include_sessions) if include_sessions else set() exclude_sessions = set(exclude_sessions) if exclude_sessions else set() reports = [] @@ -420,8 +421,7 @@ def create_from_sqlite_db(cls, *dbfiles, exclude_sessions=None, ) continue - sessions = src_backend.fetch_sessions_time_period(ts_start, ts_end) - for sess in sessions: + for sess in src_backend.fetch_sessions(qs): uuid = sess['session_info']['uuid'] if include_sessions and uuid not in include_sessions: continue From d49847b5eda4adfee5caffed07949bc2d027a843 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Tue, 29 Oct 2024 09:11:59 -0700 Subject: [PATCH 4/6] WIP: add unit tests --- unittests/test_cli.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/unittests/test_cli.py b/unittests/test_cli.py index 29bbb3457..71b36e875 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -1374,12 +1374,6 @@ def test_session_annotations(run_reframe): def test_performance_compare(run_reframe, table_format): - def assert_no_crash(returncode, stdout, stderr, exitcode=0): - assert returncode == exitcode - assert 'Traceback' not in stdout - assert 'Traceback' not in stderr - return returncode, stdout, stderr - run_reframe2 = functools.partial( run_reframe, checkpath=['unittests/resources/checks/frontend_checks.py'], @@ -1407,3 +1401,7 @@ def assert_no_crash(returncode, stdout, stderr, exitcode=0): action='--performance-compare=now-1m:now/now-1d:now/mean:+foo/+bar' ), exitcode=1 ) + + +def test_import_from_perflog(run_reframe): + pass From e8b4901d8e03a4281f8bd4c7e16fdb7e57083e10 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Tue, 5 Nov 2024 20:44:05 +0100 Subject: [PATCH 5/6] Remove unused imports --- reframe/frontend/reporting/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/reframe/frontend/reporting/__init__.py b/reframe/frontend/reporting/__init__.py index c7d325c62..7150d18c5 100644 --- a/reframe/frontend/reporting/__init__.py +++ b/reframe/frontend/reporting/__init__.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD-3-Clause import decimal -import collections import functools import inspect import json @@ -31,8 +30,7 @@ from reframe.core.warnings import suppress_deprecations from reframe.utility import nodelist_abbrev, nodelist_expand, OrderedSet from .storage import StorageBackend -from .utility import (Aggregator, parse_cmp_spec, - parse_query_spec, parse_time_period) +from .utility import Aggregator, parse_cmp_spec, parse_query_spec # The schema data version # Major version bumps are expected to break the validation of previous schemas From 4eb979ea7ccb619d11286ce082fba8c9eb018dcf Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 6 Nov 2024 00:31:45 +0100 Subject: [PATCH 6/6] WIP: Add unit tests --- unittests/test_cli.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/unittests/test_cli.py b/unittests/test_cli.py index 71b36e875..07e0648c9 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -13,6 +13,7 @@ import re import sys import time +from pathlib import Path import reframe.core.environments as env import reframe.core.logging as logging @@ -1403,5 +1404,11 @@ def test_performance_compare(run_reframe, table_format): ) -def test_import_from_perflog(run_reframe): - pass +def test_import_from_perflog(run_reframe, monkeypatch): + run_reframe(checkpath=['unittests/resources/checks/frontend_checks.py'], + more_options=['--repeat=2', '-n', '^PerformanceFailureCheck'], + action='-r') + assert os.path.exists(Path('perflogs') / 'generic' / 'default' / + 'PerformanceFailureCheck.log') + + monkeypatch.setenv('RFM_SQLITE_DB_FILE', 'local.db') \ No newline at end of file