From 3c3e8ee3af4ff3ea57210e75835ee19d27f7105b Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 12 Dec 2024 12:15:59 -0500 Subject: [PATCH] Refactor ResultManager stats with sync flab --- anta/result_manager/__init__.py | 133 +++++-------- anta/result_manager/models.py | 31 +-- tests/units/reporter/test__init__.py | 2 +- tests/units/result_manager/test__init__.py | 219 +++++++++------------ tests/units/result_manager/test_models.py | 70 ------- 5 files changed, 145 insertions(+), 310 deletions(-) diff --git a/anta/result_manager/__init__.py b/anta/result_manager/__init__.py index a347d10de..3ede25c70 100644 --- a/anta/result_manager/__init__.py +++ b/anta/result_manager/__init__.py @@ -6,6 +6,7 @@ from __future__ import annotations import json +import logging from collections import defaultdict from functools import cached_property from itertools import chain @@ -14,7 +15,10 @@ from .models import CategoryStats, DeviceStats, TestStats +logger = logging.getLogger(__name__) + +# pylint: disable=too-many-instance-attributes class ResultManager: """Helper to manage Test Results and generate reports. @@ -93,9 +97,11 @@ def __init__(self) -> None: self.status: AntaTestStatus = AntaTestStatus.UNSET self.error_status = False - self.device_stats: defaultdict[str, DeviceStats] = defaultdict(DeviceStats) - self.category_stats: defaultdict[str, CategoryStats] = defaultdict(CategoryStats) - self.test_stats: defaultdict[str, TestStats] = defaultdict(TestStats) + # Initialize the stats attributes + self._device_stats: defaultdict[str, DeviceStats] = defaultdict(DeviceStats) + self._category_stats: defaultdict[str, CategoryStats] = defaultdict(CategoryStats) + self._test_stats: defaultdict[str, TestStats] = defaultdict(TestStats) + self._stats_in_sync = False def __len__(self) -> int: """Implement __len__ method to count number of results.""" @@ -114,10 +120,7 @@ def results(self, value: list[TestResult]) -> None: self.status = AntaTestStatus.UNSET self.error_status = False - # Also reset the stats attributes - self.device_stats = defaultdict(DeviceStats) - self.category_stats = defaultdict(CategoryStats) - self.test_stats = defaultdict(TestStats) + self._reset_stats() for result in value: self.add(result) @@ -127,9 +130,28 @@ def json(self) -> str: """Get a JSON representation of the results.""" return json.dumps([result.model_dump() for result in self._result_entries], indent=4) + @property + def device_stats(self) -> defaultdict[str, DeviceStats]: + """Get the device statistics.""" + self._ensure_stats_in_sync() + return self._device_stats + + @property + def category_stats(self) -> defaultdict[str, CategoryStats]: + """Get the category statistics.""" + self._ensure_stats_in_sync() + return self._category_stats + + @property + def test_stats(self) -> defaultdict[str, TestStats]: + """Get the test statistics.""" + self._ensure_stats_in_sync() + return self._test_stats + @property def sorted_category_stats(self) -> dict[str, CategoryStats]: """A property that returns the category_stats dictionary sorted by key name.""" + self._ensure_stats_in_sync() return dict(sorted(self.category_stats.items())) @cached_property @@ -153,6 +175,13 @@ def _update_status(self, test_status: AntaTestStatus) -> None: elif self.status == "success" and test_status == "failure": self.status = AntaTestStatus.FAILURE + def _reset_stats(self) -> None: + """Reset the statistics.""" + self._device_stats = defaultdict(DeviceStats) + self._category_stats = defaultdict(CategoryStats) + self._test_stats = defaultdict(TestStats) + self._stats_in_sync = False + def _update_stats(self, result: TestResult) -> None: """Update the statistics based on the test result. @@ -164,7 +193,7 @@ def _update_stats(self, result: TestResult) -> None: count_attr = f"tests_{result.result}_count" # Update device stats - device_stats: DeviceStats = self.device_stats[result.name] + device_stats: DeviceStats = self._device_stats[result.name] setattr(device_stats, count_attr, getattr(device_stats, count_attr) + 1) if result.result in ("failure", "error"): device_stats.tests_failure.add(result.test) @@ -174,84 +203,33 @@ def _update_stats(self, result: TestResult) -> None: # Update category stats for category in result.categories: - category_stats: CategoryStats = self.category_stats[category] + category_stats: CategoryStats = self._category_stats[category] setattr(category_stats, count_attr, getattr(category_stats, count_attr) + 1) # Update test stats count_attr = f"devices_{result.result}_count" - test_stats: TestStats = self.test_stats[result.test] + test_stats: TestStats = self._test_stats[result.test] setattr(test_stats, count_attr, getattr(test_stats, count_attr) + 1) if result.result in ("failure", "error"): test_stats.devices_failure.add(result.name) - def _remove_from_stats(self, result: TestResult, old_status: AntaTestStatus) -> None: - """Remove a TestResult's contribution to statistics. - - Parameters - ---------- - result - TestResult being removed. - old_status - The previous status of the TestResult. - """ - count_attr = f"tests_{old_status}_count" - - # Remove device stats - device_stats: DeviceStats = self.device_stats[result.name] - curr_count = getattr(device_stats, count_attr) - setattr(device_stats, count_attr, max(0, curr_count - 1)) - if old_status in ("failure", "error"): - device_stats.tests_failure.discard(result.test) + def _compute_stats(self) -> None: + """Compute all statistics from the current results.""" + logger.info("Computing statistics for all results.") - # Remove category stats - for category in result.categories: - category_stats: CategoryStats = self.category_stats[category] - curr_count = getattr(category_stats, count_attr) - setattr(category_stats, count_attr, max(0, curr_count - 1)) - - # After decrementing the count, we need to cleanup the sets if needed - if category_stats.tests_failure_count + category_stats.tests_error_count == 0: - device_stats.categories_failed.discard(category) - if category_stats.tests_skipped_count == 0: - device_stats.categories_skipped.discard(category) - - # Remove test stats - count_attr = f"devices_{old_status}_count" - test_stats: TestStats = self.test_stats[result.test] - curr_count = getattr(test_stats, count_attr) - setattr(test_stats, count_attr, max(0, curr_count - 1)) - - # We remove the device from the failure set if it has no more failures - if device_stats.tests_failure_count + device_stats.tests_error_count == 0: - test_stats.devices_failure.discard(result.name) - - def clear_cache(self) -> None: - """Clear the cached properties.""" - self.__dict__.pop("results_by_status", None) + # Reset all stats + self._reset_stats() - def update_test_result(self, managed_result: TestResult, new_status: AntaTestStatus, message: str | None = None) -> None: - """Update the status and message of the provided TestResult instance. + # Recompute stats for all results + for result in self._result_entries: + self._update_stats(result) - The provided TestResult instance must already exist and be registered to this ResultManager instance (i.e. added using the `add` method). - """ - if managed_result not in self._result_entries: - msg = f"Cannot update status of a TestResult not managed by this ResultManager: {managed_result}" - raise ValueError(msg) + self._stats_in_sync = True - # Remove the old status from the stats - old_status = managed_result.result - self._remove_from_stats(managed_result, old_status) - - # Update with the new status and message - managed_result.result = new_status - managed_result.add_message(message) - - # Update the status and stats with the new status and the updated TestResult - self._update_status(new_status) - self._update_stats(managed_result) - - # Clear cached properties every time a TestResult is updated - self.clear_cache() + def _ensure_stats_in_sync(self) -> None: + """Ensure statistics are in sync with current results.""" + if not self._stats_in_sync: + self._compute_stats() def add(self, result: TestResult) -> None: """Add a result to the ResultManager instance. @@ -264,15 +242,12 @@ def add(self, result: TestResult) -> None: result TestResult to add to the ResultManager instance. """ - # Register the manager to the result to allow status updates - result.register_manager(self) - self._result_entries.append(result) self._update_status(result.result) - self._update_stats(result) + self._stats_in_sync = False # Every time a new result is added, we need to clear the cached property - self.clear_cache() + self.__dict__.pop("results_by_status", None) def get_results(self, status: set[AntaTestStatus] | None = None, sort_by: list[str] | None = None) -> list[TestResult]: """Get the results, optionally filtered by status and sorted by TestResult fields. diff --git a/anta/result_manager/models.py b/anta/result_manager/models.py index 4e9f35c5d..32975816c 100644 --- a/anta/result_manager/models.py +++ b/anta/result_manager/models.py @@ -5,18 +5,11 @@ from __future__ import annotations -import logging from dataclasses import dataclass, field from enum import Enum -from typing import TYPE_CHECKING from pydantic import BaseModel -if TYPE_CHECKING: - from anta.result_manager import ResultManager - -logger = logging.getLogger(__name__) - class AntaTestStatus(str, Enum): """Test status Enum for the TestResult. @@ -65,9 +58,6 @@ class TestResult(BaseModel): messages: list[str] = [] custom_field: str | None = None - # Using forward refs since Pydantic private fields don't evaluate type hints at runtime but Ruff seems to not know about this - _manager: "ResultManager | None" = None - def is_success(self, message: str | None = None) -> None: """Set status to success. @@ -123,29 +113,10 @@ def _set_status(self, status: AntaTestStatus, message: str | None = None) -> Non Optional message. """ - # Set the status via its ResultManager if available - if self._manager is not None: - self._manager.update_test_result(self, status, message) - else: - self.result = status - self.add_message(message) - - def add_message(self, message: str | None = None) -> None: - """Add a message to the TestResult.""" + self.result = status if message is not None: self.messages.append(message) - def register_manager(self, manager: ResultManager) -> None: - """Register the ResultManager instance to this TestResult to allow status updates.""" - if self._manager is not None and self._manager is not manager: - msg = ( - f"TestResult {self.test} on {self.name} is being re-registered to a different ResultManager. " - f"Status and statistics updates will be done by the new ResultManager." - ) - logger.warning(msg) - - self._manager = manager - def __str__(self) -> str: """Return a human readable string of this TestResult.""" return f"Test '{self.test}' (on '{self.name}'): Result '{self.result}'\nMessages: {self.messages}" diff --git a/tests/units/reporter/test__init__.py b/tests/units/reporter/test__init__.py index 6d575600d..af26b54cb 100644 --- a/tests/units/reporter/test__init__.py +++ b/tests/units/reporter/test__init__.py @@ -165,7 +165,7 @@ def test_report_summary_devices( # TODO: refactor this later... this is injecting double test results by modyfing the device name # should be a fixture manager = result_manager_factory(number_of_tests) - new_results = [result.model_copy(update={"_manager": None}) for result in manager.results] + new_results = [result.model_copy() for result in manager.results] for result in new_results: result.name = dev or "test_device" result.result = AntaTestStatus.FAILURE diff --git a/tests/units/result_manager/test__init__.py b/tests/units/result_manager/test__init__.py index 9f5d1dd2a..e41a436f6 100644 --- a/tests/units/result_manager/test__init__.py +++ b/tests/units/result_manager/test__init__.py @@ -6,6 +6,7 @@ from __future__ import annotations import json +import logging import re from contextlib import AbstractContextManager, nullcontext from typing import TYPE_CHECKING, Callable @@ -380,144 +381,102 @@ def test_get_devices(self, test_result_factory: Callable[[], TestResult], list_r assert len(result_manager.get_devices()) == 2 assert all(t in result_manager.get_devices() for t in ["Device1", "Device2"]) - def test_update_test_result_invalid_result(self, result_manager: ResultManager, test_result_factory: Callable[[], TestResult]) -> None: - """Test ResultManager.update_test_result with an unmanaged TestResult.""" - unmanaged_result = test_result_factory() - - with pytest.raises(ValueError, match="Cannot update status of a TestResult not managed by this ResultManager"): - result_manager.update_test_result(unmanaged_result, AntaTestStatus.SUCCESS) - - def test_update_test_result_updates(self, result_manager: ResultManager) -> None: - """Test ResultManager.update_test_result comprehensive update behavior.""" - # Get an existing success result and record initial state - success_result = result_manager.get_results(status={AntaTestStatus.SUCCESS})[0] - device_name = success_result.name - categories = success_result.categories - - # Record initial stats - initial_device_stats = result_manager.device_stats[device_name] - initial_success_count = initial_device_stats.tests_success_count - initial_failure_count = initial_device_stats.tests_failure_count - initial_failed_tests = set(initial_device_stats.tests_failure) - - initial_category_stats = {category: result_manager.category_stats[category].tests_success_count for category in categories} - - # Update the result with new status and message - result_manager.update_test_result(success_result, AntaTestStatus.FAILURE, "Test failed after update") - - # Verify result was updated - assert success_result.result == "failure" - assert success_result.messages == ["Test failed after update"] - - # Verify device stats were properly decremented and incremented - updated_device_stats = result_manager.device_stats[device_name] - assert updated_device_stats.tests_success_count == initial_success_count - 1 - assert updated_device_stats.tests_failure_count == initial_failure_count + 1 - assert success_result.test in updated_device_stats.tests_failure - assert len(updated_device_stats.tests_failure) == len(initial_failed_tests) + 1 - - # Verify category stats were properly updated - for category in categories: - cat_stats = result_manager.category_stats[category] - assert cat_stats.tests_success_count == initial_category_stats[category] - 1 - assert cat_stats.tests_failure_count >= 1 - - def test_update_test_result_device_failure_cleanup( - self, result_manager_factory: Callable[[int], ResultManager], test_result_factory: Callable[[], TestResult] - ) -> None: - """Test ResultManager.update_test_result removes device from test's failure set when its failures are fixed.""" - result_manager = result_manager_factory(0) # Start with empty manager - - # Create two failing tests for device1 - device1_test1 = test_result_factory() - device1_test1.name = "device1" - device1_test1.result = AntaTestStatus.FAILURE - result_manager.add(device1_test1) - - device1_test2 = test_result_factory() - device1_test2.name = "device1" - device1_test2.result = AntaTestStatus.FAILURE - result_manager.add(device1_test2) - - # Create one failing test for device2 with same test as device1_test1 - device2_test = test_result_factory() - device2_test.name = "device2" - device2_test.test = device1_test1.test # Same test as device1_test1 - device2_test.result = AntaTestStatus.FAILURE - result_manager.add(device2_test) - - # Verify initial state - test1_stats = result_manager.test_stats[device1_test1.test] - assert "device1" in test1_stats.devices_failure - assert "device2" in test1_stats.devices_failure - - # Fix one failing test for device1 - result_manager.update_test_result(device1_test1, AntaTestStatus.SUCCESS) - - # device1 should still be in the failure set (has another failing test) - assert "device1" in test1_stats.devices_failure - - # Fix the second failing test for device1 - result_manager.update_test_result(device1_test2, AntaTestStatus.SUCCESS) - - # Now device1 should be removed from failure set (all its failures fixed) - # while device2 remains (still failing) - assert "device1" not in test1_stats.devices_failure - assert "device2" in test1_stats.devices_failure - - # Fix device2's failing test - result_manager.update_test_result(device2_test, AntaTestStatus.SUCCESS) - - # Now device2 should also be removed - assert "device2" not in test1_stats.devices_failure - - def test_update_test_result_category_cleanup( - self, result_manager_factory: Callable[[int], ResultManager], test_result_factory: Callable[[], TestResult] - ) -> None: - """Test ResultManager.update_test_result removes categories from failed/skipped sets appropriately.""" - result_manager = result_manager_factory(0) # Start with empty manager + def test_stats_computation_methods(self, test_result_factory: Callable[[], TestResult], caplog: pytest.LogCaptureFixture) -> None: + """Test ResultManager internal stats computation methods.""" + result_manager = ResultManager() + + # Initially stats should be unsynced + assert result_manager._stats_in_sync is False + + # Test _reset_stats + result_manager._reset_stats() + assert result_manager._stats_in_sync is False + assert len(result_manager._device_stats) == 0 + assert len(result_manager._category_stats) == 0 + assert len(result_manager._test_stats) == 0 - # Create two failing tests with same category for one device + # Add some test results test1 = test_result_factory() test1.name = "device1" - test1.categories = ["category1"] - test1.result = AntaTestStatus.FAILURE - result_manager.add(test1) + test1.result = AntaTestStatus.SUCCESS + test1.categories = ["system"] + test1.test = "test1" test2 = test_result_factory() - test2.name = "device1" - test2.categories = ["category1"] + test2.name = "device2" test2.result = AntaTestStatus.FAILURE - result_manager.add(test2) - - # Verify initial state - device_stats = result_manager.device_stats["device1"] - assert "category1" in device_stats.categories_failed - - # Fix one failing test - result_manager.update_test_result(test1, AntaTestStatus.SUCCESS) - - # Category should still be in failed set - assert "category1" in device_stats.categories_failed + test2.categories = ["interfaces"] + test2.test = "test2" - # Fix second failing test - result_manager.update_test_result(test2, AntaTestStatus.SUCCESS) - - # Category should be removed from failed set - assert "category1" not in device_stats.categories_failed + result_manager.add(test1) + result_manager.add(test2) - # Test skipped category cleanup - skipped_test = test_result_factory() - skipped_test.name = "device1" - skipped_test.categories = ["category2"] - skipped_test.result = AntaTestStatus.SKIPPED - result_manager.add(skipped_test) + # Stats should still be unsynced after adding results + assert result_manager._stats_in_sync is False + + # Test _compute_stats directly + with caplog.at_level(logging.INFO): + result_manager._compute_stats() + assert "Computing statistics for all results" in caplog.text + assert result_manager._stats_in_sync is True + + # Verify stats content + assert len(result_manager._device_stats) == 2 + assert len(result_manager._category_stats) == 2 + assert len(result_manager._test_stats) == 2 + assert result_manager._device_stats["device1"].tests_success_count == 1 + assert result_manager._device_stats["device2"].tests_failure_count == 1 + assert result_manager._category_stats["system"].tests_success_count == 1 + assert result_manager._category_stats["interfaces"].tests_failure_count == 1 + assert result_manager._test_stats["test1"].devices_success_count == 1 + assert result_manager._test_stats["test2"].devices_failure_count == 1 + + def test_stats_property_computation(self, test_result_factory: Callable[[], TestResult], caplog: pytest.LogCaptureFixture) -> None: + """Test that stats are computed only once when accessed via properties.""" + result_manager = ResultManager() - # Verify initial skipped state - assert "category2" in device_stats.categories_skipped + # Add some test results + test1 = test_result_factory() + test1.name = "device1" + test1.result = AntaTestStatus.SUCCESS + test1.categories = ["system"] + result_manager.add(test1) - # Update skipped test to success - result_manager.update_test_result(skipped_test, AntaTestStatus.SUCCESS) + test2 = test_result_factory() + test2.name = "device2" + test2.result = AntaTestStatus.FAILURE + test2.categories = ["interfaces"] + result_manager.add(test2) - # Category should be removed from skipped set - assert "category2" not in device_stats.categories_skipped + # Stats should be unsynced after adding results + assert result_manager._stats_in_sync is False + assert "Computing statistics" not in caplog.text + + # Access device_stats property - should trigger computation + with caplog.at_level(logging.INFO): + _ = result_manager.device_stats + assert "Computing statistics for all results" in caplog.text + assert result_manager._stats_in_sync is True + + # Clear the log + caplog.clear() + + # Access other stats properties - should not trigger computation again + with caplog.at_level(logging.INFO): + _ = result_manager.category_stats + _ = result_manager.test_stats + _ = result_manager.sorted_category_stats + assert "Computing statistics" not in caplog.text + + # Add another result - should mark stats as unsynced + test3 = test_result_factory() + test3.name = "device3" + test3.result = "error" + result_manager.add(test3) + assert result_manager._stats_in_sync is False + + # Access stats again - should trigger recomputation + with caplog.at_level(logging.INFO): + _ = result_manager.device_stats + assert "Computing statistics for all results" in caplog.text + assert result_manager._stats_in_sync is True diff --git a/tests/units/result_manager/test_models.py b/tests/units/result_manager/test_models.py index 0d643b4bb..0561dffd2 100644 --- a/tests/units/result_manager/test_models.py +++ b/tests/units/result_manager/test_models.py @@ -5,12 +5,10 @@ from __future__ import annotations -import logging from typing import TYPE_CHECKING, Callable import pytest -from anta.result_manager import ResultManager from anta.result_manager.models import AntaTestStatus from tests.units.conftest import DEVICE_NAME @@ -69,71 +67,3 @@ def test____str__(self, test_result_factory: Callable[[int], Result], target: An testresult._set_status(target, message) assert testresult.result == target assert str(testresult) == f"Test 'VerifyTest1' (on '{DEVICE_NAME}'): Result '{target}'\nMessages: {[message]}" - - def test_register_manager(self, test_result_factory: Callable[[], Result], caplog: pytest.LogCaptureFixture) -> None: - """Test TestResult.register_manager.""" - test_result = test_result_factory() - manager1 = ResultManager() - manager2 = ResultManager() - - # Initial registration - assert test_result._manager is None - test_result.register_manager(manager1) - assert test_result._manager is manager1 - - # Re-register to same manager (no warning) - with caplog.at_level(logging.WARNING): - test_result.register_manager(manager1) - assert len(caplog.records) == 0 - assert test_result._manager is manager1 - - # Register to different manager (should log warning) - with caplog.at_level(logging.WARNING): - test_result.register_manager(manager2) - - # Verify warning was logged - assert len(caplog.records) == 1 - assert "is being re-registered to a different ResultManager" in caplog.records[0].message - assert test_result._manager is manager2 - - def test_add_message(self, test_result_factory: Callable[[], Result]) -> None: - """Test TestResult.add_message.""" - test_result = test_result_factory() - assert len(test_result.messages) == 0 - - # Test adding None message (shouldn't add) - test_result.add_message(None) - assert len(test_result.messages) == 0 - - # Test adding first message - test_result.add_message("First message") - assert len(test_result.messages) == 1 - assert test_result.messages == ["First message"] - - # Test adding second message - test_result.add_message("Second message") - assert len(test_result.messages) == 2 - assert test_result.messages == ["First message", "Second message"] - - def test_status_updates_with_manager(self, test_result_factory: Callable[[], Result]) -> None: - """Test status updates with and without manager.""" - test_result = test_result_factory() - manager = ResultManager() - - # Before adding to manager, status updates are direct - test_result.is_success("Direct update") - assert test_result.result == AntaTestStatus.SUCCESS - assert test_result not in manager.results - - # Add to manager (this also registers the manager) - manager.add(test_result) - assert test_result._manager is manager - assert test_result in manager.results - - # Now status updates should go through manager - test_result.is_failure("Through manager") - assert test_result.result == AntaTestStatus.FAILURE - assert len(manager.get_results(status={AntaTestStatus.FAILURE})) == 1 - - # Verify message was added - assert "Through manager" in test_result.messages