Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anta.tests): Added testcase to verify IP security connection #575

Merged
merged 10 commits into from
Apr 12, 2024
163 changes: 163 additions & 0 deletions anta/tests/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Mypy does not understand AntaTest.Input typing
# mypy: disable-error-code=attr-defined
from datetime import datetime, timezone
from ipaddress import IPv4Address
from typing import ClassVar

from pydantic import BaseModel, Field, model_validator
Expand Down Expand Up @@ -653,3 +654,165 @@ def test(self) -> None:

if failed_log != f"{acl_name}:\n":
self.result.is_failure(f"{failed_log}")


class VerifyIPSecConnHealth(AntaTest):
"""
Verifies all IPv4 security connections.

Expected Results
----------------
* Success: The test will pass if all the IPv4 security connections are established in all vrf.
* Failure: The test will fail if IPv4 security is not configured or any of IPv4 security connections are not established in any vrf.

Examples
--------
```yaml
anta.tests.security:
- VerifyIPSecConnHealth:
```
"""

name = "VerifyIPSecConnHealth"
description = "Verifies all IPv4 security connections."
categories: ClassVar[list[str]] = ["security"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip security connection vrf all")]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyIPSecConnHealth."""
self.result.is_success()
failure_conn = []
command_output = self.instance_commands[0].json_output["connections"]

# Check if IP security connection is configured
if not command_output:
self.result.is_failure("No IPv4 security connection configured.")
return

# Iterate over all ipsec connections
for conn_data in command_output.values():
state = next(iter(conn_data["pathDict"].values()))
if state != "Established":
source = conn_data.get("saddr")
destination = conn_data.get("daddr")
vrf = conn_data.get("tunnelNs")
failure_conn.append(f"source:{source} destination:{destination} vrf:{vrf}")
if failure_conn:
failure_msg = "\n".join(failure_conn)
self.result.is_failure(f"The following IPv4 security connections are not established:\n{failure_msg}.")


class VerifySpecificIPSecConn(AntaTest):
"""
Verifies the state of IPv4 security connections for a specified peer.

It optionally allows for the verification of a specific path for a peer by providing source and destination addresses.
If these addresses are not provided, it will verify all paths for the specified peer.

Expected Results
----------------
* Success: The test passes if the IPv4 security connection for a peer is established in the specified VRF.
* Failure: The test fails if IPv4 security is not configured, a connection is not found for a peer, or the connection is not established in the specified VRF.

Examples
--------
```yaml
anta.tests.security:
- VerifySpecificIPSecConn:
ip_security_connections:
- peer: 10.255.0.1
- peer: 10.255.0.2
vrf: default
connections:
- source_address: 100.64.3.2
destination_address: 100.64.2.2
- source_address: 172.18.3.2
destination_address: 172.18.2.2
```
"""

name = "VerifySpecificIPSecConn"
description = "Verifies IPv4 security connections for a peer."
categories: ClassVar[list[str]] = ["security"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip security connection vrf {vrf} path peer {peer}")]

class Input(AntaTest.Input):
"""Input model for the VerifySpecificIPSecConn test."""

ip_security_connections: list[IPSecPeers]
"""List of IP4v security peers."""

class IPSecPeers(BaseModel):
"""Details of IPv4 security peers."""

peer: IPv4Address
"""IPv4 address of the peer."""

vrf: str = "default"
"""Optional VRF for the IP security peer."""

connections: list[IPSecConn] | None = None
"""Optional list of IPv4 security connections of a peer."""

class IPSecConn(BaseModel):
"""Details of IPv4 security connections for a peer."""

source_address: IPv4Address
"""Source IPv4 address of the connection."""
destination_address: IPv4Address
"""Destination IPv4 address of the connection."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each input IP Sec connection."""
return [template.render(peer=conn.peer, vrf=conn.vrf) for conn in self.inputs.ip_security_connections]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifySpecificIPSecConn."""
self.result.is_success()
for command_output, input_peer in zip(self.instance_commands, self.inputs.ip_security_connections):
conn_output = command_output.json_output["connections"]
peer = command_output.params.peer
vrf = command_output.params.vrf
conn_input = input_peer.connections

# Check if IPv4 security connection is configured
if not conn_output:
self.result.is_failure(f"No IPv4 security connection configured for peer `{peer}`.")
continue

# If connection details are not provided then check all connections of a peer
if conn_input is None:
for conn_data in conn_output.values():
state = next(iter(conn_data["pathDict"].values()))
if state != "Established":
source = conn_data.get("saddr")
destination = conn_data.get("daddr")
vrf = conn_data.get("tunnelNs")
self.result.is_failure(
f"Expected state of IPv4 security connection `source:{source} destination:{destination} vrf:{vrf}` for peer `{peer}` is `Established` "
f"but found `{state}` instead."
)
continue

# Create a dictionary of existing connections for faster lookup
existing_connections = {
gmuloc marked this conversation as resolved.
Show resolved Hide resolved
(conn_data.get("saddr"), conn_data.get("daddr"), conn_data.get("tunnelNs")): next(iter(conn_data["pathDict"].values()))
for conn_data in conn_output.values()
}
for connection in conn_input:
source_input = str(connection.source_address)
destination_input = str(connection.destination_address)

if (source_input, destination_input, vrf) in existing_connections:
existing_state = existing_connections[(source_input, destination_input, vrf)]
if existing_state != "Established":
self.result.is_failure(
f"Expected state of IPv4 security connection `source:{source_input} destination:{destination_input} vrf:{vrf}` "
f"for peer `{peer}` is `Established` but found `{existing_state}` instead."
)
else:
self.result.is_failure(
f"IPv4 security connection `source:{source_input} destination:{destination_input} vrf:{vrf}` for peer `{peer}` is not found."
)
11 changes: 11 additions & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ anta.tests.security:
action: permit icmp any any
- sequence: 20
action: permit tcp any any range 5900 5910
- VerifyIPSecConnHealth:
- VerifySpecificIPSecConn:
ip_security_connections:
- peer: 10.255.0.1
- peer: 10.255.0.2
vrf: default
connections:
- source_address: 100.64.3.2
destination_address: 100.64.2.2
- source_address: 172.18.3.2
destination_address: 172.18.2.2

anta.tests.services:
- VerifyHostname:
Expand Down
Loading
Loading