diff --git a/anta/tests/cvx.py b/anta/tests/cvx.py index cfb2dcdeb..fc369a893 100644 --- a/anta/tests/cvx.py +++ b/anta/tests/cvx.py @@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, ClassVar, Literal +from anta.custom_types import PositiveInteger from anta.models import AntaCommand, AntaTest from anta.tools import get_value @@ -90,6 +91,49 @@ def test(self) -> None: self.result.is_failure(f"Management CVX status is not valid: {cluster_state}") +class VerifyActiveCVXConnections(AntaTest): + """Verifies the number of active CVX Connections. + + Expected Results + ---------------- + * Success: The test will pass if number of connections is equal to the expected number of connections. + * Failure: The test will fail otherwise. + + Examples + -------- + ```yaml + anta.tests.cvx: + - VerifyActiveCVXConnections: + connections_count: 100 + ``` + """ + + categories: ClassVar[list[str]] = ["cvx"] + # TODO: @gmuloc - cover "% Unavailable command (controller not ready)" + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show cvx connections brief", revision=1)] + + class Input(AntaTest.Input): + """Input model for the VerifyActiveCVXConnections test.""" + + connections_count: PositiveInteger + """The expected number of active CVX Connections""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyActiveCVXConnections.""" + command_output = self.instance_commands[0].json_output + self.result.is_success() + + if not (connections := command_output.get("connections")): + self.result.is_failure("CVX connections are not available.") + return + + active_count = len([connection for connection in connections if connection.get("oobConnectionActive")]) + + if self.inputs.connections_count != active_count: + self.result.is_failure(f"CVX active connections count. Expected: {self.inputs.connections_count} , Actual : {active_count}") + + class VerifyCVXClusterStatus(AntaTest): """Verifies the CVX Server Cluster status. diff --git a/examples/tests.yaml b/examples/tests.yaml index 16e616ae4..d8b95f30f 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -135,6 +135,9 @@ anta.tests.connectivity: df_bit: True size: 100 anta.tests.cvx: + - VerifyActiveCVXConnections: + # Verifies the number of active CVX Connections. + connections_count: 100 - VerifyCVXClusterStatus: # Verifies the CVX Server Cluster status. role: Master diff --git a/tests/units/anta_tests/test_cvx.py b/tests/units/anta_tests/test_cvx.py index 2a8c6664b..0b1105cb0 100644 --- a/tests/units/anta_tests/test_cvx.py +++ b/tests/units/anta_tests/test_cvx.py @@ -7,7 +7,7 @@ from typing import Any -from anta.tests.cvx import VerifyCVXClusterStatus, VerifyManagementCVX, VerifyMcsClientMounts +from anta.tests.cvx import VerifyActiveCVXConnections, VerifyCVXClusterStatus, VerifyManagementCVX, VerifyMcsClientMounts from tests.units.anta_tests import test DATA: list[dict[str, Any]] = [ @@ -146,6 +146,57 @@ "inputs": {"enabled": False}, "expected": {"result": "failure", "messages": ["Management CVX status is not valid: None"]}, }, + { + "name": "success", + "test": VerifyActiveCVXConnections, + "eos_data": [ + { + "connections": [ + { + "switchId": "fc:bd:67:c3:16:55", + "hostname": "lyv563", + "oobConnectionActive": True, + }, + { + "switchId": "00:1c:73:3c:e3:9e", + "hostname": "tg264", + "oobConnectionActive": True, + }, + ] + } + ], + "inputs": {"connections_count": 2}, + "expected": {"result": "success"}, + }, + { + "name": "failure", + "test": VerifyActiveCVXConnections, + "eos_data": [ + { + "connections": [ + { + "switchId": "fc:bd:67:c3:16:55", + "hostname": "lyv563", + "oobConnectionActive": False, + }, + { + "switchId": "00:1c:73:3c:e3:9e", + "hostname": "tg264", + "oobConnectionActive": True, + }, + ] + } + ], + "inputs": {"connections_count": 2}, + "expected": {"result": "failure", "messages": ["CVX active connections count. Expected: 2 , Actual : 1"]}, + }, + { + "name": "failure-no-connections", + "test": VerifyActiveCVXConnections, + "eos_data": [{}], + "inputs": {"connections_count": 2}, + "expected": {"result": "failure", "messages": ["CVX connections are not available"]}, + }, { "name": "failure - no clusterStatus", "test": VerifyManagementCVX,