Skip to content

Commit

Permalink
WIP: Import results from perflogs
Browse files Browse the repository at this point in the history
  • Loading branch information
vkarak committed Oct 1, 2024
1 parent e2d6a6c commit 33f0062
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 3 deletions.
5 changes: 3 additions & 2 deletions reframe/frontend/argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def add_argument(self, *flags, **kwargs):

if flags and opt_name is None:
# A positional argument
opt_name = flags[-1]
opt_name, flags = flags[-1], flags[:-1]

if opt_name is None:
raise ValueError('could not infer a dest name: no flags defined')
Expand Down Expand Up @@ -230,7 +230,8 @@ def add_argument(self, *flags, **kwargs):
except KeyError:
self._defaults.__dict__[opt_name] = None

if not flags:
positional = kwargs.pop('positional', False)
if not flags and not positional:
return None

return self._holder.add_argument(*flags, **kwargs)
Expand Down
21 changes: 21 additions & 0 deletions reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sys
import time
import traceback
import yaml

import reframe.core.config as config
import reframe.core.exceptions as errors
Expand Down Expand Up @@ -465,6 +466,10 @@ def main():
action_options.add_argument(
'-V', '--version', action='version', version=osext.reframe_version()
)
action_options.add_argument(
'--import-results', action='store', metavar='SPECFILE',
help='Import results to the database'
)

# Run options
run_options.add_argument(
Expand Down Expand Up @@ -815,6 +820,7 @@ def main():
action='store_true',
help='Use a login shell for job scripts'
)
argparser.add_argument('args', metavar='ARGS', nargs='*', positional=True)

def restrict_logging():
'''Restrict logging to errors only.
Expand Down Expand Up @@ -1037,6 +1043,21 @@ def restrict_logging():
)
sys.exit(0)

if options.import_results:
with exit_gracefully_on_error('failed to import results', printer):
with open(options.import_results) as fp:
spec = yaml.load(fp, yaml.Loader)

if spec['import']['from'] == 'perflog':
kwargs = spec['import']
del kwargs['from']
report = reporting.RunReport.create_from_perflog(*options.args,
**kwargs)
# report.save('foo.json', link_to_last=False)
uuid = report.store()
printer.info(f'Results imported successfully as session {uuid}')
sys.exit(0)

# Show configuration after everything is set up
if options.show_config:
# Restore logging level
Expand Down
130 changes: 129 additions & 1 deletion reframe/frontend/reporting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import os
import re
import socket
import sys
import time
import uuid
from collections.abc import Hashable
from datetime import datetime
from filelock import FileLock

import reframe as rfm
Expand All @@ -25,7 +27,7 @@
from reframe.core.logging import getlogger, _format_time_rfc3339, time_function
from reframe.core.runtime import runtime
from reframe.core.warnings import suppress_deprecations
from reframe.utility import nodelist_abbrev, OrderedSet
from reframe.utility import nodelist_abbrev, nodelist_expand, OrderedSet
from .storage import StorageBackend
from .utility import Aggregator, parse_cmp_spec, parse_time_period, is_uuid

Expand Down Expand Up @@ -249,6 +251,132 @@ def __getitem__(self, key):
def __rfm_json_encode__(self):
return self.__report

@classmethod
def create_from_perflog(cls, *logfiles, format=None,
merge_records=None, datefmt=None,
ignore_lines=None, ignore_records=None):
def _filter_record(rec):
if ignore_records is None:
return False
else:
return eval(ignore_records, {}, rec)

def _do_merge(dst, src):
system = src.get('system')
part = src.get('partition')
pvar = src.get('pvar')
pval = src.get('pval')
pref = src.get('pref')
plower = src.get('plower')
pupper = src.get('pupper')
punit = src.get('punit')
if pvar is None:
return dst

if system is not None and part is not None:
pvar = f'{system}:{part}:{pvar}'

# Convert to numbers before inserting
def _convert(x):
if x is None:
return x

if x == 'null':
return None

return float(x)

pval = _convert(pval)
pref = _convert(pref)
pupper = _convert(pupper)
plower = _convert(plower)
dst['perfvalues'][pvar] = (pval, pref, plower, pupper, punit)
dst.pop('pvar', None)
dst.pop('pval', None)
dst.pop('pref', None)
dst.pop('plower', None)
dst.pop('pupper', None)
dst.pop('punit', None)
return dst

patt = re.compile(format)
report = RunReport()
session_uuid = report['session_info']['uuid']
run_index = 0
test_index = 0
t_report_start = sys.maxsize
t_report_end = 0
num_failures = 0
testcases = []
for filename in logfiles:
records = {}
with open(filename) as fp:
for lineno, line in enumerate(fp, start=1):
if lineno in ignore_lines:
continue

m = patt.match(line)
if not m:
continue

rec = m.groupdict()
if _filter_record(rec):
continue

# Add parameters as separate fields
if 'name' in rec:
params = rec['name'].split()[1:]
for spec in params:
p, v = spec.split('=', maxsplit=1)
rec[p[1:]] = v

# Groom the record
if 'job_completion_time' in rec:
key = 'job_completion_time'
date = datetime.strptime(rec[key], datefmt)
rec[key] = date.strftime(_DATETIME_FMT)
ts = date.timestamp()
rec[f'{key}_unix'] = ts
t_report_start = min(t_report_start, ts)
t_report_end = max(t_report_end, ts)

if 'job_nodelist' in rec:
key = 'job_nodelist'
rec[key] = nodelist_expand(rec[key])

rec['uuid'] = f'{session_uuid}:{run_index}:{test_index}'
rec.setdefault('result', 'pass')
if rec['result'] != 'pass':
num_failures += 1

if not merge_records:
key = lineno
elif len(merge_records) == 1:
key = rec[merge_records[0]]
else:
key = tuple(rec[k] for k in merge_records)

if key in records:
records[key] = _do_merge(records[key], rec)
else:
rec['perfvalues'] = {}
records[key] = _do_merge(rec, rec)
test_index += 1

testcases += list(records.values())

report.update_timestamps(t_report_start, t_report_end)
report._add_run({
'num_cases': len(testcases),
'num_failures': num_failures,
'run_index': run_index,
'testcases': testcases
})
return report

def _add_run(self, run):
self.__report['runs'].append(run)

def update_session_info(self, session_info):
# Remove timestamps
for key, val in session_info.items():
Expand Down

0 comments on commit 33f0062

Please sign in to comment.