Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anta.tests): Added testcase to verify IP security connection #575

Merged
merged 10 commits into from
Apr 12, 2024
149 changes: 149 additions & 0 deletions anta/tests/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Mypy does not understand AntaTest.Input typing
# mypy: disable-error-code=attr-defined
from datetime import datetime, timezone
from ipaddress import IPv4Address
from typing import ClassVar

from pydantic import BaseModel, Field, model_validator
Expand Down Expand Up @@ -654,3 +655,151 @@ def test(self) -> None:

if failed_log != f"{acl_name}:\n":
self.result.is_failure(f"{failed_log}")


class VerifyIPSecConnHealth(AntaTest):
"""
Verifies all IPv4 security connections.

Expected Results
----------------
* Success: The test will pass if all the IPv4 security connections are established in all vrf.
* Failure: The test will fail if IPv4 security is not configured or any of IPv4 security connections are not established in any vrf.

Examples
--------
```yaml
anta.tests.security:
- VerifyIPSecConnHealth:
```
"""

name = "VerifyIPSecConnHealth"
description = "Verifies all IPv4 security connections."
categories: ClassVar[list[str]] = ["security"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip security connection vrf all")]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyIPSecConnHealth."""
self.result.is_success()
failure_conn = []
command_output = self.instance_commands[0].json_output["connections"]

# Check if IP security connection is configured
if not command_output:
self.result.is_failure("IPv4 security connection are not configured.")
gmuloc marked this conversation as resolved.
Show resolved Hide resolved
return

# Iterate over all ip sec connection
gmuloc marked this conversation as resolved.
Show resolved Hide resolved
for connection, conn_data in command_output.items():
state = next(iter(conn_data["pathDict"].values()))
if state != "Established":
failure_conn.append(connection)
gmuloc marked this conversation as resolved.
Show resolved Hide resolved
if failure_conn:
failure_msg = "\n".join(failure_conn)
self.result.is_failure(f"Following IPv4 security connections are not establised:\n{failure_msg}.")
gmuloc marked this conversation as resolved.
Show resolved Hide resolved


class VerifySpecificIPSecConn(AntaTest):
"""
Verifies IPv4 security connections state for a peer.
gmuloc marked this conversation as resolved.
Show resolved Hide resolved

Expected Results
----------------
* Success: The test passes if the IPv4 security connection for a peer is established in the specified VRF.
* Failure: The test fails if IPv4 security is not configured, a connection is not found for a peer, or the connection is not established in the specified VRF.

Examples
--------
```yaml
anta.tests.security:
- VerifySpecificIPSecConn:
ip_security_connections:
- peer: 10.255.0.1
- peer: 10.255.0.2
vrf: default
connections:
- source_address: 100.64.3.2
destination_address: 100.64.2.2
- source_address: 172.18.3.2
destination_address: 172.18.2.2
```
"""

name = "VerifySpecificIPSecConn"
description = "Verifies IPv4 security connections for a peer."
categories: ClassVar[list[str]] = ["security"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip security connection vrf {vrf} path peer {peer}")]

class Input(AntaTest.Input):
"""Input model for the VerifySpecificIPSecConn test."""

ip_security_connections: list[IPSecPeers]
"""List of IP4v security peers."""

class IPSecPeers(BaseModel):
"""Details of IPv4 security peers."""

peer: IPv4Address
"""IPv4 address of the peer."""

vrf: str = "default"
"""This is the optional VRF for the IP security peer. It defaults to `default` if not provided."""
gmuloc marked this conversation as resolved.
Show resolved Hide resolved

connections: list[IPSecConn] | None = None
"""Optional list of IPv4 security connections of a peer."""

class IPSecConn(BaseModel):
"""Details of IPv4 security connections for a peer."""

source_address: IPv4Address
"""Source address of the connection."""
destination_address: IPv4Address
"""Destination address of the connection."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each input IP Sec connection."""
return [template.render(peer=conn.peer, vrf=conn.vrf) for conn in self.inputs.ip_security_connections]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifySpecificIPSecConn."""
self.result.is_success()
for command_output, input_peer in zip(self.instance_commands, self.inputs.ip_security_connections):
conn_output = command_output.json_output["connections"]
peer = command_output.params["peer"]
connections = input_peer.connections
gmuloc marked this conversation as resolved.
Show resolved Hide resolved

# Check if IPv4 security connection is configured
if not conn_output:
self.result.is_failure(f"IPv4 security connections are not configured for peer `{peer}`.")
gmuloc marked this conversation as resolved.
Show resolved Hide resolved
return

# If connection details are not provided then check all connections of a peer
if connections is None:
for connection, conn_data in conn_output.items():
state = next(iter(conn_data["pathDict"].values()))
if state != "Established":
self.result.is_failure(
f"Expected state of IPv4 security connection `{connection}` for peer `{peer}` is `Established` " f"but found `{state}` instead."
)
continue

# Create a dictionary of existing connections for faster lookup
existing_connections = {
gmuloc marked this conversation as resolved.
Show resolved Hide resolved
(conn_data.get("saddr"), conn_data.get("daddr")): next(iter(conn_data["pathDict"].values())) for conn_data in conn_output.values()
}
for connection in connections:
source = str(connection.source_address)
destination = str(connection.destination_address)

if (source, destination) in existing_connections:
existing_state = existing_connections[(source, destination)]
if existing_state != "Established":
self.result.is_failure(
f"Expected state of IPv4 security connection `{source}-{destination}` for peer `{peer}` is `Established` "
f"but found `{existing_state}` instead."
)
else:
self.result.is_failure(f"IPv4 security connection `{source}-{destination}` for peer `{peer}` is not found.")
11 changes: 11 additions & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ anta.tests.security:
action: permit icmp any any
- sequence: 20
action: permit tcp any any range 5900 5910
- VerifyIPSecConnHealth:
- VerifySpecificIPSecConn:
ip_security_connections:
- peer: 10.255.0.1
- peer: 10.255.0.2
vrf: default
connections:
- source_address: 100.64.3.2
destination_address: 100.64.2.2
- source_address: 172.18.3.2
destination_address: 172.18.2.2

anta.tests.services:
- VerifyHostname:
Expand Down
Loading
Loading