From 9e3286cafd33490a125276eab633fdec897463bd Mon Sep 17 00:00:00 2001 From: Jakub Krajewski <95274389+jpkrajewski@users.noreply.github.com> Date: Wed, 10 Apr 2024 09:54:42 +0200 Subject: [PATCH] Dev/acl (#9) * Tailor route policy model for use. Add unit and integration test. Fix tests setup to run session creation once for the whole class * Fix typo defaulkt to default * Fix merge mistake - add eigrp back to integration tests * Add ACLs - ipv4 and ipv6. add intergation test. add unittests * Fix type annotaion to supported in 3.8 * Add copyright --- .../feature_profile/sdwan/test_service.py | 23 + .../feature_profile/sdwan/service/__init__.py | 5 + .../feature_profile/sdwan/service/acl.py | 533 ++++++++++-------- catalystwan/tests/test_feature_profile_api.py | 3 + 4 files changed, 320 insertions(+), 244 deletions(-) diff --git a/catalystwan/integration_tests/feature_profile/sdwan/test_service.py b/catalystwan/integration_tests/feature_profile/sdwan/test_service.py index 6f5bbd47..6c169c44 100644 --- a/catalystwan/integration_tests/feature_profile/sdwan/test_service.py +++ b/catalystwan/integration_tests/feature_profile/sdwan/test_service.py @@ -4,6 +4,7 @@ from catalystwan.api.configuration_groups.parcel import Global, as_global, as_variable from catalystwan.integration_tests.feature_profile.sdwan.base import TestFeatureProfileModels from catalystwan.models.configuration.feature_profile.common import Prefix +from catalystwan.models.configuration.feature_profile.sdwan.service.acl import Ipv4AclParcel, Ipv6AclParcel from catalystwan.models.configuration.feature_profile.sdwan.service.dhcp_server import ( AddressPool, LanVpnDhcpServerParcel, @@ -145,6 +146,28 @@ def test_when_default_values_route_policy_parcel_expect_successful_post(self): # Assert assert parcel_id + def test_when_default_values_acl_ipv6_expect_successful_post(self): + # Arrange + acl_ipv6_parcel = Ipv6AclParcel( + parcel_name="TestAclIpv6Parcel", + parcel_description="Test Acl Ipv6 Parcel", + ) + # Act + parcel_id = self.api.create_parcel(self.profile_uuid, acl_ipv6_parcel).id + # Assert + assert parcel_id + + def test_when_default_values_acl_ipv4_expect_successful_post(self): + # Arrange + acl_ipv4_parcel = Ipv4AclParcel( + parcel_name="TestAclIpv4Parcel", + parcel_description="Test Acl Ipv4 Parcel", + ) + # Act + parcel_id = self.api.create_parcel(self.profile_uuid, acl_ipv4_parcel).id + # Assert + assert parcel_id + @classmethod def tearDownClass(cls) -> None: cls.api.delete_profile(cls.profile_uuid) diff --git a/catalystwan/models/configuration/feature_profile/sdwan/service/__init__.py b/catalystwan/models/configuration/feature_profile/sdwan/service/__init__.py index 06d3a9e0..4d93389d 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/service/__init__.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/service/__init__.py @@ -3,6 +3,7 @@ from pydantic import Field from typing_extensions import Annotated +from .acl import Ipv4AclParcel, Ipv6AclParcel from .appqoe import AppqoeParcel from .dhcp_server import LanVpnDhcpServerParcel from .eigrp import EigrpParcel @@ -25,6 +26,8 @@ Ospfv3IPv6Parcel, RoutePolicyParcel, EigrpParcel, + Ipv6AclParcel, + Ipv4AclParcel, # TrackerGroupData, # WirelessLanData, # SwitchportData @@ -55,6 +58,8 @@ "RoutePolicyParcel", "Ospfv3IPv4Parcel", "Ospfv3IPv6Parcel", + "Ipv6AclParcel", + "Ipv4AclParcel", "InterfaceSviParcel", "InterfaceGreParcel", "AnyServiceParcel", diff --git a/catalystwan/models/configuration/feature_profile/sdwan/service/acl.py b/catalystwan/models/configuration/feature_profile/sdwan/service/acl.py index 45474a94..bc73718f 100644 --- a/catalystwan/models/configuration/feature_profile/sdwan/service/acl.py +++ b/catalystwan/models/configuration/feature_profile/sdwan/service/acl.py @@ -1,65 +1,16 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates +from __future__ import annotations +from ipaddress import IPv4Address, IPv6Address, IPv6Interface from typing import List, Literal, Optional, Union from uuid import UUID -from pydantic import AliasPath, BaseModel, ConfigDict, Field +from pydantic import AliasPath, BaseModel, ConfigDict, Field, model_validator -from catalystwan.api.configuration_groups.parcel import Default, Global, Variable, _ParcelBase -from catalystwan.models.common import ServiceChainNumber +from catalystwan.api.configuration_groups.parcel import Default, Global, _ParcelBase, as_default -Action = Literal[ - "drop", - "accept", -] - -IcmpMessage = Literal[ - "administratively-prohibited", - "dod-host-prohibited", - "dod-net-prohibited", - "echo", - "echo-reply", - "echo-reply-no-error", - "extended-echo", - "extended-echo-reply", - "general-parameter-problem", - "host-isolated", - "host-precedence-unreachable", - "host-redirect", - "host-tos-redirect", - "host-tos-unreachable", - "host-unknown", - "host-unreachable", - "interface-error", - "malformed-query", - "multiple-interface-match", - "net-redirect", - "net-tos-redirect", - "net-tos-unreachable", - "net-unreachable", - "network-unknown", - "no-room-for-option", - "option-missing", - "packet-too-big", - "parameter-problem", - "photuris", - "port-unreachable", - "precedence-unreachable", - "protocol-unreachable", - "reassembly-timeout", - "redirect", - "router-advertisement", - "router-solicitation", - "source-route-failed", - "table-entry-error", - "time-exceeded", - "timestamp-reply", - "timestamp-request", - "ttl-exceeded", - "unreachable", -] - -Icmp6Message = Literal[ +Action = Literal["drop", "accept"] +Icmp6Msg = Literal[ "beyond-scope", "cp-advertisement", "cp-solicitation", @@ -113,285 +64,379 @@ "time-exceeded", "unreachable", ] +IcmpMsg = Literal[ + "administratively-prohibited", + "dod-host-prohibited", + "dod-net-prohibited", + "echo", + "echo-reply", + "echo-reply-no-error", + "extended-echo", + "extended-echo-reply", + "general-parameter-problem", + "host-isolated", + "host-precedence-unreachable", + "host-redirect", + "host-tos-redirect", + "host-tos-unreachable", + "host-unknown", + "host-unreachable", + "interface-error", + "malformed-query", + "multiple-interface-match", + "net-redirect", + "net-tos-redirect", + "net-tos-unreachable", + "net-unreachable", + "network-unknown", + "no-room-for-option", + "option-missing", + "packet-too-big", + "parameter-problem", + "photuris", + "port-unreachable", + "precedence-unreachable", + "protocol-unreachable", + "reassembly-timeout", + "redirect", + "router-advertisement", + "router-solicitation", + "source-route-failed", + "table-entry-error", + "time-exceeded", + "timestamp-reply", + "timestamp-request", + "ttl-exceeded", + "unreachable", +] +Tcp = Literal["syn"] -class SourceDataIPv4Prefix(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - source_ip_prefix: Union[Global[str], Variable] = Field( - serialization_alias="sourceIpPrefix", validation_alias="sourceIpPrefix" +class ReferenceId(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) + ref_id: Global[UUID] = Field(..., serialization_alias="refId", validation_alias="refId") -class SourceDataIPv6Prefix(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - source_ip_prefix: Union[Global[str], Variable] = Field( - serialization_alias="sourceIpPrefix", validation_alias="sourceIpPrefix" +class SourceDataPrefixListReference(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - - -class SourceDataIPv4PrefixParcel(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - source_data_prefix_list: Global[UUID] = Field( - serialization_alias="sourceDataPrefixList", validation_alias="sourceDataPrefixList" + source_data_prefix_list: ReferenceId = Field( + ..., + serialization_alias="sourceDataPrefixList", + validation_alias="sourceDataPrefixList", + description="Source Data Prefix Parcel", ) -class SourceDataIPv6PrefixParcel(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - source_data_prefix_list: Global[UUID] = Field( - serialization_alias="sourceDataPrefixList", validation_alias="sourceDataPrefixList" +class SourceDataPrefixIp(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + source_ip_prefix: Global[str] = Field( + ..., + serialization_alias="sourceIpPrefix", + validation_alias="sourceIpPrefix", + description="Source Data IP Prefix", ) class SourcePort(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - source_port: Global[int] = Field(serialization_alias="sourcePort", validation_alias="sourcePort") - - -class DestinationDataIPv4Prefix(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - destination_ip_prefix: Union[Global[str], Variable] = Field( - serialization_alias="destinationIpPrefix", validation_alias="destinationIpPrefix" + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - - -class DestinationDataIPv6Prefix(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - destination_ip_prefix: Union[Global[str], Variable] = Field( - serialization_alias="destinationIpPrefix", validation_alias="destinationIpPrefix" + source_port: Union[Global[int], Global[str]] = Field( + ..., + serialization_alias="sourcePort", + validation_alias="sourcePort", + description="source port range or individual port number", ) -class DestinationDataIPv4PrefixParcel(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - destination_data_prefix_list: Global[UUID] = Field( - serialization_alias="destinationDataPrefixList", validation_alias="destinationDataPrefixList" +class DestinationDataPrefixListReference(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + destination_data_prefix_list: ReferenceId = Field( + ..., + serialization_alias="destinationDataPrefixList", + validation_alias="destinationDataPrefixList", + description="Destination Data Prefix Parcel", ) -class DestinationDataIPv6PrefixParcel(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - destination_data_prefix_list: Global[UUID] = Field( - serialization_alias="destinationDataPrefixList", validation_alias="destinationDataPrefixList" +class DestinationDataPrefixIp(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + destination_ip_prefix: Global[IPv6Interface] = Field( + ..., + serialization_alias="destinationIpPrefix", + validation_alias="destinationIpPrefix", + description="Destination Data IP Prefix", ) class DestinationPort(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - destination_port: Global[int] = Field(serialization_alias="destinationPort", validation_alias="destinationPort") - - -TcpState = Literal["syn"] - + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + destination_port: Union[Global[int], Global[str]] = Field( + ..., + serialization_alias="destinationPort", + validation_alias="destinationPort", + description="destination port range or individual port number", + ) -class IPv4Match(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - dscp: Optional[Global[List[int]]] = None - packet_length: Optional[Global[int]] = Field( - serialization_alias="packetLength", validation_alias="packetLength", default=None +class Ipv4MatchEntry(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + dscp: Optional[Global[List[int]]] = Field(default=None, description="DSCP number") + packet_length: Optional[Union[Global[int], Global[str]]] = Field( + default=None, serialization_alias="packetLength", validation_alias="packetLength", description="Packet Length" ) - protocol: Optional[Global[List[int]]] = None - icmp_message: Optional[Global[List[IcmpMessage]]] = Field( - serialization_alias="icmpMsg", validation_alias="icmpMsg", default=None + protocol: Optional[Global[List[int]]] = Field( + default=None, description="protocol number list with at least one item" ) - source_data_prefix: Optional[Union[SourceDataIPv4Prefix, SourceDataIPv4PrefixParcel]] = Field( - serialization_alias="sourceDataPrefix", validation_alias="sourceDataPrefix", default=None + icmp_msg: Optional[IcmpMsg] = Field( + default=None, serialization_alias="icmpMsg", validation_alias="icmpMsg", description="ICMP Message" + ) + source_data_prefix: Optional[Union[SourceDataPrefixListReference, SourceDataPrefixIp]] = Field( + default=None, serialization_alias="sourceDataPrefix", validation_alias="sourceDataPrefix" ) source_ports: Optional[List[SourcePort]] = Field( - serialization_alias="sourcePorts", validation_alias="sourcePorts", default=None + default=None, serialization_alias="sourcePorts", validation_alias="sourcePorts", description="Source Port List" ) - destination_data_prefix: Optional[Union[DestinationDataIPv4Prefix, DestinationDataIPv4PrefixParcel]] = Field( - serialization_alias="destinationDataPrefix", validation_alias="destinationDataPrefix", default=None + destination_data_prefix: Optional[Union[DestinationDataPrefixListReference, DestinationDataPrefixIp]] = Field( + default=None, serialization_alias="destinationDataPrefix", validation_alias="destinationDataPrefix" ) destination_ports: Optional[List[DestinationPort]] = Field( - serialization_alias="destinationPorts", validation_alias="destinationPorts", default=None + default=None, + serialization_alias="destinationPorts", + validation_alias="destinationPorts", + description="Destination Port List", ) - tcp: Optional[Global[TcpState]] = None - + tcp: Optional[Tcp] = Field(default=None, description="TCP States") -class IPv6Match(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") +class Ipv6MatchEntry(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) next_header: Optional[Global[int]] = Field( - serialization_alias="nextHeader", validation_alias="nextHeader", default=None + default=None, serialization_alias="nextHeader", validation_alias="nextHeader", description="next header number" ) - packet_length: Optional[Global[int]] = Field( - serialization_alias="packetLength", validation_alias="packetLength", default=None + packet_length: Optional[Union[Global[int], Global[str]]] = Field( + default=None, serialization_alias="packetLength", validation_alias="packetLength", description="Packet Length" ) - source_data_prefix: Optional[Union[SourceDataIPv6Prefix, SourceDataIPv6PrefixParcel]] = Field( - serialization_alias="sourceDataPrefix", validation_alias="sourceDataPrefix", default=None + source_data_prefix: Optional[Union[SourceDataPrefixListReference, SourceDataPrefixIp]] = Field( + default=None, serialization_alias="sourceDataPrefix", validation_alias="sourceDataPrefix" ) source_ports: Optional[List[SourcePort]] = Field( - serialization_alias="sourcePorts", validation_alias="sourcePorts", default=None + default=None, serialization_alias="sourcePorts", validation_alias="sourcePorts", description="Source Port List" ) - destination_data_prefix: Optional[Union[DestinationDataIPv6Prefix, DestinationDataIPv6PrefixParcel]] = Field( - serialization_alias="destinationDataPrefix", validation_alias="destinationDataPrefix", default=None + destination_data_prefix: Optional[Union[DestinationDataPrefixListReference, DestinationDataPrefixIp]] = Field( + default=None, serialization_alias="destinationDataPrefix", validation_alias="destinationDataPrefix" ) destination_ports: Optional[List[DestinationPort]] = Field( - serialization_alias="destinationPorts", validation_alias="destinationPorts", default=None + default=None, + serialization_alias="destinationPorts", + validation_alias="destinationPorts", + description="Destination Port List", ) - tcp: Optional[Global[TcpState]] = None - traffic_class: Optional[Global[int]] = None - icmp6_message: Optional[Global[List[Icmp6Message]]] = Field( - serialization_alias="icmpMsg", validation_alias="icmpMsg", default=None + tcp: Optional[Global[Tcp]] = Field(default=None, description="TCP States") + traffic_class: Optional[Global[List[int]]] = Field( + default=None, + serialization_alias="trafficClass", + validation_alias="trafficClass", + description="Select Traffic Class", ) - - -class ServiceChain(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - service_chain_number: Union[Global[ServiceChainNumber], Variable] = Field( - serialization_alias="serviceChainNumber", validation_alias="serviceChainNumber" + icmp6_msg: Optional[Global[List[Icmp6Msg]]] = Field( + default=None, serialization_alias="icmp6Msg", validation_alias="icmp6Msg", description="ICMP6 Message" ) - vpn: Optional[Union[Global[int], Variable]] = None - fallback: Optional[Union[Global[bool], Variable, Default[bool]]] = None -class AcceptActionIPv4(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") +class Ipv4AcceptAction(BaseModel): + """ + Accept Action + """ - set_dscp: Optional[Global[int]] = Field(serialization_alias="setDscp", validation_alias="setDscp", default=None) - counter_name: Optional[Global[str]] = Field( - serialization_alias="counterName", validation_alias="counterName", default=None + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - log: Optional[Union[Global[bool], Default[bool]]] = None - set_next_hop: Optional[Global[str]] = Field( - serialization_alias="setNextHop", validation_alias="setNextHop", default=None + set_dscp: Optional[Global[int]] = Field( + default=None, serialization_alias="setDscp", validation_alias="setDscp", description="DSCP number" ) - set_service_chain: Optional[ServiceChain] = Field( - serialization_alias="setServiceChain", validation_alias="setServiceChain", default=None + counter_name: Optional[Global[str]] = Field( + default=None, serialization_alias="counterName", validation_alias="counterName", description="Counter Name" + ) + log: Union[Global[bool], Default[bool]] = Field(default=as_default(False), description="Enable log") + set_next_hop: Optional[Global[IPv4Address]] = Field( + default=None, + serialization_alias="setNextHop", + validation_alias="setNextHop", + description="Set Next Hop (IPV4 address)", ) - mirror: Optional[Global[UUID]] = None - policer: Optional[Global[UUID]] = None + mirror: Optional[ReferenceId] = Field(default=None, description="Select a Mirror Parcel UUID") + policer: Optional[ReferenceId] = Field(default=None, description="Select a Policer Parcel") -class AcceptActionIPv6(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") +class Ipv6AcceptAction(BaseModel): + """ + Accept Action + """ - counter_name: Optional[Global[str]] = Field( - serialization_alias="counterName", validation_alias="counterName", default=None + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - log: Optional[Union[Global[bool], Default[bool]]] = None - set_next_hop: Optional[Global[str]] = Field( - serialization_alias="setNextHop", validation_alias="setNextHop", default=None + counter_name: Optional[Global[str]] = Field( + default=None, serialization_alias="counterName", validation_alias="counterName", description="Counter Name" ) - set_service_chain: Optional[ServiceChain] = Field( - serialization_alias="setServiceChain", validation_alias="setServiceChain", default=None + log: Union[Global[bool], Default[bool]] = Field(default=as_default(False), description="Enable log") + set_next_hop: Optional[Global[IPv6Address]] = Field( + default=None, + serialization_alias="setNextHop", + validation_alias="setNextHop", + description="Set Next Hop (IPV6 address)", ) set_traffic_class: Optional[Global[int]] = Field( - serialization_alias="setTrafficClass", validation_alias="setTrafficClass", default=None + default=None, + serialization_alias="setTrafficClass", + validation_alias="setTrafficClass", + description="set traffic class number", ) - mirror: Optional[Global[UUID]] = None - policer: Optional[Global[UUID]] = None + mirror: Optional[ReferenceId] = Field(default=None, description="Select a Mirror Parcel UUID") + policer: Optional[ReferenceId] = Field(default=None, description="Select a Policer Parcel") -class DropAction(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - counter_name: Optional[Global[str]] = Field( - serialization_alias="counterName", validation_alias="counterName", default=None +class Ipv4AcceptActions(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - log: Optional[Union[Global[bool], Default[bool]]] = None + accept: Ipv4AcceptAction = Field(..., description="Accept Action") -class AcceptActionsIPv4(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - accept: AcceptActionIPv4 +class Ipv6AcceptActions(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + accept: Ipv6AcceptAction = Field(..., description="Accept Action") -class AcceptActionsIPv6(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") +class DropAction(BaseModel): + """ + Drop Action + """ - accept: AcceptActionIPv6 + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + counter_name: Optional[Global[str]] = Field( + default=None, serialization_alias="counterName", validation_alias="counterName", description="Counter Name" + ) + log: Union[Global[bool], Default[bool]] = Field(default=as_default(False), description="Enable log") class DropActions(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - drop: DropAction - + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + drop: DropAction = Field(..., description="Drop Action") -class IPv4SequenceBaseAction(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - sequence_id: Global[int] = Field(serialization_alias="sequenceId", validation_alias="sequenceId") - sequence_name: Global[str] = Field(serialization_alias="sequenceName", validation_alias="sequenceName") - base_action: Union[Global[Action], Default[Action]] = Field( - serialization_alias="baseAction", validation_alias="baseAction", default=Default[Action](value="accept") +class Sequences(BaseModel): + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - match_entries: Optional[List[IPv4Match]] = Field( - serialization_alias="matchEntries", validation_alias="matchEntries", default=None + sequence_id: Global[int] = Field( + ..., serialization_alias="sequenceId", validation_alias="sequenceId", description="Sequence Id" ) - - -class IPv6SequenceBaseAction(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - sequence_id: Global[int] = Field(serialization_alias="sequenceId", validation_alias="sequenceId") - sequence_name: Global[str] = Field(serialization_alias="sequenceName", validation_alias="sequenceName") - base_action: Union[Global[Action], Default[Action]] = Field( - serialization_alias="baseAction", validation_alias="baseAction", default=Default[Action](value="accept") + sequence_name: Global[str] = Field( + ..., serialization_alias="sequenceName", validation_alias="sequenceName", description="Sequence Name" + ) + base_action: Optional[Union[Global[Action], Default[Action]]] = Field( + default=None, serialization_alias="baseAction", validation_alias="baseAction", description="Base Action" ) - match_entries: Optional[List[IPv6Match]] = Field( - serialization_alias="matchEntries", validation_alias="matchEntries", default=None + match_entries: Optional[List[Ipv6MatchEntry]] = Field( + default=None, + serialization_alias="matchEntries", + validation_alias="matchEntries", + description="Define match conditions", + max_length=1, + min_length=1, ) + @model_validator(mode="after") + def check_fields_at_least_one_assigned(self): + """There are two Sequence models in schema, + one with set base_action and empty actions, + and one with set actions and empty base_action, + so we combine two models into one model with check if + at least one field assigned + """ + if self.base_action is None and self.actions is None: + self.base_action = as_default("accept", Action) -class IPv4SequenceActions(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - sequence_id: Global[int] = Field(serialization_alias="sequenceId", validation_alias="sequenceId") - sequence_name: Global[str] = Field(serialization_alias="sequenceName", validation_alias="sequenceName") - actions: List[Union[AcceptActionsIPv4, DropActions]] = Field( - serialization_alias="actions", validation_alias="actions" - ) - match_entries: Optional[List[IPv4Match]] = Field( - serialization_alias="matchEntries", validation_alias="matchEntries", default=None +class Ipv6Sequences(Sequences): + actions: Optional[List[Union[Ipv6AcceptActions, DropActions]]] = Field( + default=None, description="Define list of actions", max_length=1, min_length=1 ) -class IPv6SequenceActions(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - sequence_id: Global[int] = Field(serialization_alias="sequenceId", validation_alias="sequenceId") - sequence_name: Global[str] = Field(serialization_alias="sequenceName", validation_alias="sequenceName") - actions: List[Union[AcceptActionsIPv6, DropActions]] = Field( - serialization_alias="actions", validation_alias="actions" - ) - match_entries: Optional[List[IPv6Match]] = Field( - serialization_alias="matchEntries", validation_alias="matchEntries", default=None +class Ipv4Sequences(Sequences): + actions: Optional[List[Union[Ipv4AcceptActions, DropActions]]] = Field( + default=None, description="Define list of actions", max_length=1, min_length=1 ) -class IPv4AclParcel(_ParcelBase): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - defautl_action: Union[Global[Action], Default[Action]] = Field( - validation_alias=AliasPath("data", "defaultAction"), default=Default[Action](value="drop") +class Ipv4AclParcel(_ParcelBase): + type_: Literal["ipv4-acl"] = Field(default="ipv4-acl", exclude=True) + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, ) - sequences: List[Union[IPv4SequenceBaseAction, IPv4SequenceActions]] = Field( - validation_alias=AliasPath("data", "sequences") + default_action: Union[Global[Action], Global[Action]] = Field( + as_default("drop", Action), + validation_alias=AliasPath("data", "defaultAction"), + description="Default Action", + ) + sequences: List[Union[Ipv4Sequences]] = Field( + default_factory=list, validation_alias=AliasPath("data", "sequences"), description="Access Control List" ) -class IPv6AclParcel(_ParcelBase): - model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True, extra="forbid") - - defautl_action: Union[Global[Action], Default[Action]] = Field( - validation_alias=AliasPath("data", "defaultAction"), default=Default[Action](value="drop") +class Ipv6AclParcel(_ParcelBase): + type_: Literal["ipv6-acl"] = Field(default="ipv6-acl", exclude=True) + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + ) + default_action: Union[Global[Action], Global[Action]] = Field( + as_default("drop", Action), + validation_alias=AliasPath("data", "defaultAction"), + description="Default Action", ) - sequences: List[Union[IPv6SequenceBaseAction, IPv6SequenceActions]] = Field( - validation_alias=AliasPath("data", "sequences") + sequences: List[Union[Ipv6Sequences]] = Field( + default_factory=list, validation_alias=AliasPath("data", "sequences"), description="Access Control List" ) diff --git a/catalystwan/tests/test_feature_profile_api.py b/catalystwan/tests/test_feature_profile_api.py index 632bd7b3..544e4f83 100644 --- a/catalystwan/tests/test_feature_profile_api.py +++ b/catalystwan/tests/test_feature_profile_api.py @@ -19,6 +19,7 @@ LanVpnParcel, OspfParcel, ) +from catalystwan.models.configuration.feature_profile.sdwan.service.acl import Ipv4AclParcel, Ipv6AclParcel from catalystwan.models.configuration.feature_profile.sdwan.service.eigrp import EigrpParcel from catalystwan.models.configuration.feature_profile.sdwan.service.lan.gre import BasicGre from catalystwan.models.configuration.feature_profile.sdwan.service.lan.ipsec import IpsecAddress, IpsecTunnelMode @@ -112,6 +113,8 @@ def test_update_method_with_valid_arguments(self, parcel, expected_path): Ospfv3IPv6Parcel: "routing/ospfv3/ipv6", RoutePolicyParcel: "route-policy", EigrpParcel: "routing/eigrp", + Ipv6AclParcel: "ipv6-acl", + Ipv4AclParcel: "ipv4-acl", } service_interface_parcels = [