Skip to content

Commit

Permalink
Refactor ResultManager stats with sync flab
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon committed Dec 12, 2024
1 parent 53f5266 commit 3c3e8ee
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 310 deletions.
133 changes: 54 additions & 79 deletions anta/result_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import json
import logging
from collections import defaultdict
from functools import cached_property
from itertools import chain
Expand All @@ -14,7 +15,10 @@

from .models import CategoryStats, DeviceStats, TestStats

logger = logging.getLogger(__name__)


# pylint: disable=too-many-instance-attributes
class ResultManager:
"""Helper to manage Test Results and generate reports.
Expand Down Expand Up @@ -93,9 +97,11 @@ def __init__(self) -> None:
self.status: AntaTestStatus = AntaTestStatus.UNSET
self.error_status = False

self.device_stats: defaultdict[str, DeviceStats] = defaultdict(DeviceStats)
self.category_stats: defaultdict[str, CategoryStats] = defaultdict(CategoryStats)
self.test_stats: defaultdict[str, TestStats] = defaultdict(TestStats)
# Initialize the stats attributes
self._device_stats: defaultdict[str, DeviceStats] = defaultdict(DeviceStats)
self._category_stats: defaultdict[str, CategoryStats] = defaultdict(CategoryStats)
self._test_stats: defaultdict[str, TestStats] = defaultdict(TestStats)
self._stats_in_sync = False

def __len__(self) -> int:
"""Implement __len__ method to count number of results."""
Expand All @@ -114,10 +120,7 @@ def results(self, value: list[TestResult]) -> None:
self.status = AntaTestStatus.UNSET
self.error_status = False

# Also reset the stats attributes
self.device_stats = defaultdict(DeviceStats)
self.category_stats = defaultdict(CategoryStats)
self.test_stats = defaultdict(TestStats)
self._reset_stats()

for result in value:
self.add(result)
Expand All @@ -127,9 +130,28 @@ def json(self) -> str:
"""Get a JSON representation of the results."""
return json.dumps([result.model_dump() for result in self._result_entries], indent=4)

@property
def device_stats(self) -> defaultdict[str, DeviceStats]:
"""Get the device statistics."""
self._ensure_stats_in_sync()
return self._device_stats

@property
def category_stats(self) -> defaultdict[str, CategoryStats]:
"""Get the category statistics."""
self._ensure_stats_in_sync()
return self._category_stats

@property
def test_stats(self) -> defaultdict[str, TestStats]:
"""Get the test statistics."""
self._ensure_stats_in_sync()
return self._test_stats

@property
def sorted_category_stats(self) -> dict[str, CategoryStats]:
"""A property that returns the category_stats dictionary sorted by key name."""
self._ensure_stats_in_sync()
return dict(sorted(self.category_stats.items()))

@cached_property
Expand All @@ -153,6 +175,13 @@ def _update_status(self, test_status: AntaTestStatus) -> None:
elif self.status == "success" and test_status == "failure":
self.status = AntaTestStatus.FAILURE

def _reset_stats(self) -> None:
"""Reset the statistics."""
self._device_stats = defaultdict(DeviceStats)
self._category_stats = defaultdict(CategoryStats)
self._test_stats = defaultdict(TestStats)
self._stats_in_sync = False

def _update_stats(self, result: TestResult) -> None:
"""Update the statistics based on the test result.
Expand All @@ -164,7 +193,7 @@ def _update_stats(self, result: TestResult) -> None:
count_attr = f"tests_{result.result}_count"

# Update device stats
device_stats: DeviceStats = self.device_stats[result.name]
device_stats: DeviceStats = self._device_stats[result.name]
setattr(device_stats, count_attr, getattr(device_stats, count_attr) + 1)
if result.result in ("failure", "error"):
device_stats.tests_failure.add(result.test)
Expand All @@ -174,84 +203,33 @@ def _update_stats(self, result: TestResult) -> None:

# Update category stats
for category in result.categories:
category_stats: CategoryStats = self.category_stats[category]
category_stats: CategoryStats = self._category_stats[category]
setattr(category_stats, count_attr, getattr(category_stats, count_attr) + 1)

# Update test stats
count_attr = f"devices_{result.result}_count"
test_stats: TestStats = self.test_stats[result.test]
test_stats: TestStats = self._test_stats[result.test]
setattr(test_stats, count_attr, getattr(test_stats, count_attr) + 1)
if result.result in ("failure", "error"):
test_stats.devices_failure.add(result.name)

def _remove_from_stats(self, result: TestResult, old_status: AntaTestStatus) -> None:
"""Remove a TestResult's contribution to statistics.
Parameters
----------
result
TestResult being removed.
old_status
The previous status of the TestResult.
"""
count_attr = f"tests_{old_status}_count"

# Remove device stats
device_stats: DeviceStats = self.device_stats[result.name]
curr_count = getattr(device_stats, count_attr)
setattr(device_stats, count_attr, max(0, curr_count - 1))
if old_status in ("failure", "error"):
device_stats.tests_failure.discard(result.test)
def _compute_stats(self) -> None:
"""Compute all statistics from the current results."""
logger.info("Computing statistics for all results.")

# Remove category stats
for category in result.categories:
category_stats: CategoryStats = self.category_stats[category]
curr_count = getattr(category_stats, count_attr)
setattr(category_stats, count_attr, max(0, curr_count - 1))

# After decrementing the count, we need to cleanup the sets if needed
if category_stats.tests_failure_count + category_stats.tests_error_count == 0:
device_stats.categories_failed.discard(category)
if category_stats.tests_skipped_count == 0:
device_stats.categories_skipped.discard(category)

# Remove test stats
count_attr = f"devices_{old_status}_count"
test_stats: TestStats = self.test_stats[result.test]
curr_count = getattr(test_stats, count_attr)
setattr(test_stats, count_attr, max(0, curr_count - 1))

# We remove the device from the failure set if it has no more failures
if device_stats.tests_failure_count + device_stats.tests_error_count == 0:
test_stats.devices_failure.discard(result.name)

def clear_cache(self) -> None:
"""Clear the cached properties."""
self.__dict__.pop("results_by_status", None)
# Reset all stats
self._reset_stats()

def update_test_result(self, managed_result: TestResult, new_status: AntaTestStatus, message: str | None = None) -> None:
"""Update the status and message of the provided TestResult instance.
# Recompute stats for all results
for result in self._result_entries:
self._update_stats(result)

The provided TestResult instance must already exist and be registered to this ResultManager instance (i.e. added using the `add` method).
"""
if managed_result not in self._result_entries:
msg = f"Cannot update status of a TestResult not managed by this ResultManager: {managed_result}"
raise ValueError(msg)
self._stats_in_sync = True

# Remove the old status from the stats
old_status = managed_result.result
self._remove_from_stats(managed_result, old_status)

# Update with the new status and message
managed_result.result = new_status
managed_result.add_message(message)

# Update the status and stats with the new status and the updated TestResult
self._update_status(new_status)
self._update_stats(managed_result)

# Clear cached properties every time a TestResult is updated
self.clear_cache()
def _ensure_stats_in_sync(self) -> None:
"""Ensure statistics are in sync with current results."""
if not self._stats_in_sync:
self._compute_stats()

def add(self, result: TestResult) -> None:
"""Add a result to the ResultManager instance.
Expand All @@ -264,15 +242,12 @@ def add(self, result: TestResult) -> None:
result
TestResult to add to the ResultManager instance.
"""
# Register the manager to the result to allow status updates
result.register_manager(self)

self._result_entries.append(result)
self._update_status(result.result)
self._update_stats(result)
self._stats_in_sync = False

# Every time a new result is added, we need to clear the cached property
self.clear_cache()
self.__dict__.pop("results_by_status", None)

def get_results(self, status: set[AntaTestStatus] | None = None, sort_by: list[str] | None = None) -> list[TestResult]:
"""Get the results, optionally filtered by status and sorted by TestResult fields.
Expand Down
31 changes: 1 addition & 30 deletions anta/result_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,11 @@

from __future__ import annotations

import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING

from pydantic import BaseModel

if TYPE_CHECKING:
from anta.result_manager import ResultManager

logger = logging.getLogger(__name__)


class AntaTestStatus(str, Enum):
"""Test status Enum for the TestResult.
Expand Down Expand Up @@ -65,9 +58,6 @@ class TestResult(BaseModel):
messages: list[str] = []
custom_field: str | None = None

# Using forward refs since Pydantic private fields don't evaluate type hints at runtime but Ruff seems to not know about this
_manager: "ResultManager | None" = None

def is_success(self, message: str | None = None) -> None:
"""Set status to success.
Expand Down Expand Up @@ -123,29 +113,10 @@ def _set_status(self, status: AntaTestStatus, message: str | None = None) -> Non
Optional message.
"""
# Set the status via its ResultManager if available
if self._manager is not None:
self._manager.update_test_result(self, status, message)
else:
self.result = status
self.add_message(message)

def add_message(self, message: str | None = None) -> None:
"""Add a message to the TestResult."""
self.result = status
if message is not None:
self.messages.append(message)

def register_manager(self, manager: ResultManager) -> None:
"""Register the ResultManager instance to this TestResult to allow status updates."""
if self._manager is not None and self._manager is not manager:
msg = (
f"TestResult {self.test} on {self.name} is being re-registered to a different ResultManager. "
f"Status and statistics updates will be done by the new ResultManager."
)
logger.warning(msg)

self._manager = manager

def __str__(self) -> str:
"""Return a human readable string of this TestResult."""
return f"Test '{self.test}' (on '{self.name}'): Result '{self.result}'\nMessages: {self.messages}"
Expand Down
2 changes: 1 addition & 1 deletion tests/units/reporter/test__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def test_report_summary_devices(
# TODO: refactor this later... this is injecting double test results by modyfing the device name
# should be a fixture
manager = result_manager_factory(number_of_tests)
new_results = [result.model_copy(update={"_manager": None}) for result in manager.results]
new_results = [result.model_copy() for result in manager.results]
for result in new_results:
result.name = dev or "test_device"
result.result = AntaTestStatus.FAILURE
Expand Down
Loading

0 comments on commit 3c3e8ee

Please sign in to comment.