Skip to content

Commit

Permalink
feat(anta.tests): Added testcase to verify IP security connection (#575)
Browse files Browse the repository at this point in the history
Co-authored-by: gmulocher <[email protected]>
  • Loading branch information
MaheshGSLAB and gmuloc authored Apr 12, 2024
1 parent f35e64d commit 00fba99
Show file tree
Hide file tree
Showing 3 changed files with 450 additions and 0 deletions.
163 changes: 163 additions & 0 deletions anta/tests/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Mypy does not understand AntaTest.Input typing
# mypy: disable-error-code=attr-defined
from datetime import datetime, timezone
from ipaddress import IPv4Address
from typing import ClassVar

from pydantic import BaseModel, Field, model_validator
Expand Down Expand Up @@ -653,3 +654,165 @@ def test(self) -> None:

if failed_log != f"{acl_name}:\n":
self.result.is_failure(f"{failed_log}")


class VerifyIPSecConnHealth(AntaTest):
"""
Verifies all IPv4 security connections.
Expected Results
----------------
* Success: The test will pass if all the IPv4 security connections are established in all vrf.
* Failure: The test will fail if IPv4 security is not configured or any of IPv4 security connections are not established in any vrf.
Examples
--------
```yaml
anta.tests.security:
- VerifyIPSecConnHealth:
```
"""

name = "VerifyIPSecConnHealth"
description = "Verifies all IPv4 security connections."
categories: ClassVar[list[str]] = ["security"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip security connection vrf all")]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyIPSecConnHealth."""
self.result.is_success()
failure_conn = []
command_output = self.instance_commands[0].json_output["connections"]

# Check if IP security connection is configured
if not command_output:
self.result.is_failure("No IPv4 security connection configured.")
return

# Iterate over all ipsec connections
for conn_data in command_output.values():
state = next(iter(conn_data["pathDict"].values()))
if state != "Established":
source = conn_data.get("saddr")
destination = conn_data.get("daddr")
vrf = conn_data.get("tunnelNs")
failure_conn.append(f"source:{source} destination:{destination} vrf:{vrf}")
if failure_conn:
failure_msg = "\n".join(failure_conn)
self.result.is_failure(f"The following IPv4 security connections are not established:\n{failure_msg}.")


class VerifySpecificIPSecConn(AntaTest):
"""
Verifies the state of IPv4 security connections for a specified peer.
It optionally allows for the verification of a specific path for a peer by providing source and destination addresses.
If these addresses are not provided, it will verify all paths for the specified peer.
Expected Results
----------------
* Success: The test passes if the IPv4 security connection for a peer is established in the specified VRF.
* Failure: The test fails if IPv4 security is not configured, a connection is not found for a peer, or the connection is not established in the specified VRF.
Examples
--------
```yaml
anta.tests.security:
- VerifySpecificIPSecConn:
ip_security_connections:
- peer: 10.255.0.1
- peer: 10.255.0.2
vrf: default
connections:
- source_address: 100.64.3.2
destination_address: 100.64.2.2
- source_address: 172.18.3.2
destination_address: 172.18.2.2
```
"""

name = "VerifySpecificIPSecConn"
description = "Verifies IPv4 security connections for a peer."
categories: ClassVar[list[str]] = ["security"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip security connection vrf {vrf} path peer {peer}")]

class Input(AntaTest.Input):
"""Input model for the VerifySpecificIPSecConn test."""

ip_security_connections: list[IPSecPeers]
"""List of IP4v security peers."""

class IPSecPeers(BaseModel):
"""Details of IPv4 security peers."""

peer: IPv4Address
"""IPv4 address of the peer."""

vrf: str = "default"
"""Optional VRF for the IP security peer."""

connections: list[IPSecConn] | None = None
"""Optional list of IPv4 security connections of a peer."""

class IPSecConn(BaseModel):
"""Details of IPv4 security connections for a peer."""

source_address: IPv4Address
"""Source IPv4 address of the connection."""
destination_address: IPv4Address
"""Destination IPv4 address of the connection."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each input IP Sec connection."""
return [template.render(peer=conn.peer, vrf=conn.vrf) for conn in self.inputs.ip_security_connections]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifySpecificIPSecConn."""
self.result.is_success()
for command_output, input_peer in zip(self.instance_commands, self.inputs.ip_security_connections):
conn_output = command_output.json_output["connections"]
peer = command_output.params.peer
vrf = command_output.params.vrf
conn_input = input_peer.connections

# Check if IPv4 security connection is configured
if not conn_output:
self.result.is_failure(f"No IPv4 security connection configured for peer `{peer}`.")
continue

# If connection details are not provided then check all connections of a peer
if conn_input is None:
for conn_data in conn_output.values():
state = next(iter(conn_data["pathDict"].values()))
if state != "Established":
source = conn_data.get("saddr")
destination = conn_data.get("daddr")
vrf = conn_data.get("tunnelNs")
self.result.is_failure(
f"Expected state of IPv4 security connection `source:{source} destination:{destination} vrf:{vrf}` for peer `{peer}` is `Established` "
f"but found `{state}` instead."
)
continue

# Create a dictionary of existing connections for faster lookup
existing_connections = {
(conn_data.get("saddr"), conn_data.get("daddr"), conn_data.get("tunnelNs")): next(iter(conn_data["pathDict"].values()))
for conn_data in conn_output.values()
}
for connection in conn_input:
source_input = str(connection.source_address)
destination_input = str(connection.destination_address)

if (source_input, destination_input, vrf) in existing_connections:
existing_state = existing_connections[(source_input, destination_input, vrf)]
if existing_state != "Established":
self.result.is_failure(
f"Expected state of IPv4 security connection `source:{source_input} destination:{destination_input} vrf:{vrf}` "
f"for peer `{peer}` is `Established` but found `{existing_state}` instead."
)
else:
self.result.is_failure(
f"IPv4 security connection `source:{source_input} destination:{destination_input} vrf:{vrf}` for peer `{peer}` is not found."
)
11 changes: 11 additions & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ anta.tests.security:
action: permit icmp any any
- sequence: 20
action: permit tcp any any range 5900 5910
- VerifyIPSecConnHealth:
- VerifySpecificIPSecConn:
ip_security_connections:
- peer: 10.255.0.1
- peer: 10.255.0.2
vrf: default
connections:
- source_address: 100.64.3.2
destination_address: 100.64.2.2
- source_address: 172.18.3.2
destination_address: 172.18.2.2

anta.tests.services:
- VerifyHostname:
Expand Down
Loading

0 comments on commit 00fba99

Please sign in to comment.