Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(anta): Fix ResultManager statistics #962

Merged
merged 12 commits into from
Dec 19, 2024
79 changes: 64 additions & 15 deletions anta/result_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import json
import logging
from collections import defaultdict
from functools import cached_property
from itertools import chain
Expand All @@ -15,7 +16,10 @@

from .models import CategoryStats, DeviceStats, TestStats

logger = logging.getLogger(__name__)


# pylint: disable=too-many-instance-attributes
class ResultManager:
"""Helper to manage Test Results and generate reports.

Expand Down Expand Up @@ -69,6 +73,15 @@ class ResultManager:
]
"""

_result_entries: list[TestResult]
status: AntaTestStatus
error_status: bool

_device_stats: defaultdict[str, DeviceStats]
_category_stats: defaultdict[str, CategoryStats]
_test_stats: defaultdict[str, TestStats]
_stats_in_sync: bool

def __init__(self) -> None:
"""Class constructor.

Expand Down Expand Up @@ -98,9 +111,8 @@ def reset(self) -> None:
self.status: AntaTestStatus = AntaTestStatus.UNSET
self.error_status = False

self.device_stats: defaultdict[str, DeviceStats] = defaultdict(DeviceStats)
self.category_stats: defaultdict[str, CategoryStats] = defaultdict(CategoryStats)
self.test_stats: defaultdict[str, TestStats] = defaultdict(TestStats)
# Initialize the statistics attributes
self._reset_stats()

def __len__(self) -> int:
"""Implement __len__ method to count number of results."""
Expand All @@ -115,14 +127,7 @@ def results(self) -> list[TestResult]:
def results(self, value: list[TestResult]) -> None:
"""Set the list of TestResult."""
# When setting the results, we need to reset the state of the current instance
self._result_entries = []
self.status = AntaTestStatus.UNSET
self.error_status = False

# Also reset the stats attributes
self.device_stats = defaultdict(DeviceStats)
self.category_stats = defaultdict(CategoryStats)
self.test_stats = defaultdict(TestStats)
self.reset()

for result in value:
self.add(result)
Expand All @@ -137,9 +142,28 @@ def json(self) -> str:
"""Get a JSON representation of the results."""
return json.dumps(self.dump, indent=4)

@property
def device_stats(self) -> defaultdict[str, DeviceStats]:
"""Get the device statistics."""
self._ensure_stats_in_sync()
return self._device_stats

@property
def category_stats(self) -> defaultdict[str, CategoryStats]:
"""Get the category statistics."""
self._ensure_stats_in_sync()
return self._category_stats

@property
def test_stats(self) -> defaultdict[str, TestStats]:
"""Get the test statistics."""
self._ensure_stats_in_sync()
return self._test_stats

@property
def sorted_category_stats(self) -> dict[str, CategoryStats]:
"""A property that returns the category_stats dictionary sorted by key name."""
self._ensure_stats_in_sync()
return dict(sorted(self.category_stats.items()))

@cached_property
Expand All @@ -163,6 +187,13 @@ def _update_status(self, test_status: AntaTestStatus) -> None:
elif self.status == "success" and test_status == "failure":
self.status = AntaTestStatus.FAILURE

def _reset_stats(self) -> None:
"""Create or reset the statistics attributes."""
self._device_stats = defaultdict(DeviceStats)
self._category_stats = defaultdict(CategoryStats)
self._test_stats = defaultdict(TestStats)
self._stats_in_sync = False

def _update_stats(self, result: TestResult) -> None:
"""Update the statistics based on the test result.

Expand All @@ -174,7 +205,7 @@ def _update_stats(self, result: TestResult) -> None:
count_attr = f"tests_{result.result}_count"

# Update device stats
device_stats: DeviceStats = self.device_stats[result.name]
device_stats: DeviceStats = self._device_stats[result.name]
setattr(device_stats, count_attr, getattr(device_stats, count_attr) + 1)
if result.result in ("failure", "error"):
device_stats.tests_failure.add(result.test)
Expand All @@ -184,16 +215,34 @@ def _update_stats(self, result: TestResult) -> None:

# Update category stats
for category in result.categories:
category_stats: CategoryStats = self.category_stats[category]
category_stats: CategoryStats = self._category_stats[category]
setattr(category_stats, count_attr, getattr(category_stats, count_attr) + 1)

# Update test stats
count_attr = f"devices_{result.result}_count"
test_stats: TestStats = self.test_stats[result.test]
test_stats: TestStats = self._test_stats[result.test]
setattr(test_stats, count_attr, getattr(test_stats, count_attr) + 1)
if result.result in ("failure", "error"):
test_stats.devices_failure.add(result.name)

def _compute_stats(self) -> None:
"""Compute all statistics from the current results."""
logger.info("Computing statistics for all results.")

# Reset all stats
self._reset_stats()

# Recompute stats for all results
for result in self._result_entries:
self._update_stats(result)

self._stats_in_sync = True

def _ensure_stats_in_sync(self) -> None:
"""Ensure statistics are in sync with current results."""
if not self._stats_in_sync:
self._compute_stats()

def add(self, result: TestResult) -> None:
"""Add a result to the ResultManager instance.

Expand All @@ -207,7 +256,7 @@ def add(self, result: TestResult) -> None:
"""
self._result_entries.append(result)
self._update_status(result.result)
self._update_stats(result)
self._stats_in_sync = False

# Every time a new result is added, we need to clear the cached property
self.__dict__.pop("results_by_status", None)
Expand Down
13 changes: 8 additions & 5 deletions anta/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ def prepare_tests(
return device_to_tests


def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager) -> list[Coroutine[Any, Any, TestResult]]:
def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager | None = None) -> list[Coroutine[Any, Any, TestResult]]:
"""Get the coroutines for the ANTA run.

Parameters
----------
selected_tests
A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function.
manager
A ResultManager
An optional ResultManager object to pre-populate with the test results. Used in dry-run mode.

Returns
-------
Expand All @@ -199,7 +199,8 @@ def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinitio
for test in test_definitions:
try:
test_instance = test.test(device=device, inputs=test.inputs)
manager.add(test_instance.result)
if manager is not None:
manager.add(test_instance.result)
coros.append(test_instance.test())
except Exception as e: # noqa: PERF203, BLE001
# An AntaTest instance is potentially user-defined code.
Expand Down Expand Up @@ -292,7 +293,7 @@ async def main(
"Please consult the ANTA FAQ."
)

coroutines = get_coroutines(selected_tests, manager)
coroutines = get_coroutines(selected_tests, manager if dry_run else None)
gmuloc marked this conversation as resolved.
Show resolved Hide resolved

if dry_run:
logger.info("Dry-run mode, exiting before running the tests.")
Expand All @@ -304,6 +305,8 @@ async def main(
AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=len(coroutines))

with Catchtime(logger=logger, message="Running ANTA tests"):
await asyncio.gather(*coroutines)
results = await asyncio.gather(*coroutines)
for result in results:
manager.add(result)

log_cache_statistics(selected_inventory.devices)
2 changes: 1 addition & 1 deletion tests/units/cli/nrfu/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_anta_nrfu_text(click_runner: CliRunner) -> None:
def test_anta_nrfu_text_multiple_failures(click_runner: CliRunner) -> None:
"""Test anta nrfu text with multiple failures, catalog is given via env."""
result = click_runner.invoke(anta, ["nrfu", "text"], env={"ANTA_CATALOG": str(DATA_DIR / "test_catalog_double_failure.yml")})
assert result.exit_code == ExitCode.OK
assert result.exit_code == ExitCode.TESTS_FAILED
gmuloc marked this conversation as resolved.
Show resolved Hide resolved
assert (
"""spine1 :: VerifyInterfacesSpeed :: FAILURE
Interface `Ethernet2` is not found.
Expand Down
101 changes: 101 additions & 0 deletions tests/units/result_manager/test__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import json
import logging
import re
from contextlib import AbstractContextManager, nullcontext
from typing import TYPE_CHECKING, Callable
Expand Down Expand Up @@ -379,3 +380,103 @@ def test_get_devices(self, test_result_factory: Callable[[], TestResult], list_r

assert len(result_manager.get_devices()) == 2
assert all(t in result_manager.get_devices() for t in ["Device1", "Device2"])

def test_stats_computation_methods(self, test_result_factory: Callable[[], TestResult], caplog: pytest.LogCaptureFixture) -> None:
"""Test ResultManager internal stats computation methods."""
result_manager = ResultManager()

# Initially stats should be unsynced
assert result_manager._stats_in_sync is False

# Test _reset_stats
result_manager._reset_stats()
assert result_manager._stats_in_sync is False
assert len(result_manager._device_stats) == 0
assert len(result_manager._category_stats) == 0
assert len(result_manager._test_stats) == 0

# Add some test results
test1 = test_result_factory()
test1.name = "device1"
test1.result = AntaTestStatus.SUCCESS
test1.categories = ["system"]
test1.test = "test1"

test2 = test_result_factory()
test2.name = "device2"
test2.result = AntaTestStatus.FAILURE
test2.categories = ["interfaces"]
test2.test = "test2"

result_manager.add(test1)
result_manager.add(test2)

# Stats should still be unsynced after adding results
assert result_manager._stats_in_sync is False

# Test _compute_stats directly
with caplog.at_level(logging.INFO):
result_manager._compute_stats()
assert "Computing statistics for all results" in caplog.text
assert result_manager._stats_in_sync is True

# Verify stats content
assert len(result_manager._device_stats) == 2
assert len(result_manager._category_stats) == 2
assert len(result_manager._test_stats) == 2
assert result_manager._device_stats["device1"].tests_success_count == 1
assert result_manager._device_stats["device2"].tests_failure_count == 1
assert result_manager._category_stats["system"].tests_success_count == 1
assert result_manager._category_stats["interfaces"].tests_failure_count == 1
assert result_manager._test_stats["test1"].devices_success_count == 1
assert result_manager._test_stats["test2"].devices_failure_count == 1

def test_stats_property_computation(self, test_result_factory: Callable[[], TestResult], caplog: pytest.LogCaptureFixture) -> None:
"""Test that stats are computed only once when accessed via properties."""
result_manager = ResultManager()

# Add some test results
test1 = test_result_factory()
test1.name = "device1"
test1.result = AntaTestStatus.SUCCESS
test1.categories = ["system"]
result_manager.add(test1)

test2 = test_result_factory()
test2.name = "device2"
test2.result = AntaTestStatus.FAILURE
test2.categories = ["interfaces"]
result_manager.add(test2)

# Stats should be unsynced after adding results
assert result_manager._stats_in_sync is False
assert "Computing statistics" not in caplog.text

# Access device_stats property - should trigger computation
with caplog.at_level(logging.INFO):
_ = result_manager.device_stats
assert "Computing statistics for all results" in caplog.text
assert result_manager._stats_in_sync is True

# Clear the log
caplog.clear()

# Access other stats properties - should not trigger computation again
with caplog.at_level(logging.INFO):
_ = result_manager.category_stats
_ = result_manager.test_stats
_ = result_manager.sorted_category_stats
assert "Computing statistics" not in caplog.text

# Add another result - should mark stats as unsynced
test3 = test_result_factory()
test3.name = "device3"
test3.result = "error"
result_manager.add(test3)
assert result_manager._stats_in_sync is False

# Access stats again - should trigger recomputation
with caplog.at_level(logging.INFO):
_ = result_manager.device_stats
assert "Computing statistics for all results" in caplog.text
assert result_manager._stats_in_sync is True
Loading