Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

dev: converter fixes #785

Merged
merged 4 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions catalystwan/models/common.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, Iterator, List, Literal, Mapping, Optional, Sequence, Set, Tuple, Union
from typing import Any, Dict, Iterator, List, Literal, Mapping, Optional, Sequence, Set, Tuple, Union, get_args
from uuid import UUID

from annotated_types import Ge, Le
from packaging.specifiers import SpecifierSet # type: ignore
from packaging.version import Version # type: ignore
from pydantic import NonNegativeInt, PlainSerializer, PositiveInt, SerializationInfo, ValidationInfo
from pydantic import Field, NonNegativeInt, PlainSerializer, PositiveInt, SerializationInfo, ValidationInfo
from pydantic.fields import FieldInfo
from pydantic.functional_validators import BeforeValidator
from typing_extensions import Annotated
Expand Down Expand Up @@ -257,6 +258,12 @@ def int_range_serializer(value: IntRange) -> str:
"VirtualPortGroup",
"Vlan",
]
InterfaceTypePattern = re.compile(r"^(?:" + "|".join(map(re.escape, get_args(InterfaceType))) + r")[\x00-\x7F]*$")

InterfaceStr = Annotated[
str,
Field(pattern=InterfaceTypePattern),
]

StaticNatDirection = Literal["inside", "outside"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pydantic import AliasPath, BaseModel, ConfigDict, Field, ValidationError, model_validator
from typing_extensions import Self

from catalystwan.api.configuration_groups.parcel import Global, Variable, _ParcelBase, as_global
from catalystwan.api.configuration_groups.parcel import Global, Variable, _ParcelBase, as_global, as_variable
from catalystwan.models.configuration.feature_profile.common import RefIdItem, RefIdList

DefaultAction = Literal["pass", "drop"]
Expand Down Expand Up @@ -456,7 +456,7 @@ def create_with_ip_networks(cls, ip_networks: List[IPv4Network]) -> Self:

@classmethod
def create_with_variable(cls, variable_name: str) -> Self:
return cls(ipv4_value=Variable(value=variable_name))
return cls(ipv4_value=as_variable(variable_name))


class FqdnMatch(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from pydantic import AliasPath, BaseModel, ConfigDict, Field, field_validator, model_validator

from catalystwan.api.configuration_groups.parcel import Global, _ParcelBase, as_global
from catalystwan.models.common import InterfaceType, check_fields_exclusive
from catalystwan.models.common import InterfaceStr, check_fields_exclusive


class SecurityZoneListEntry(BaseModel):
vpn: Optional[Global[str]] = Field(default=None, description="0-65530 single number")
interface: Optional[Global[InterfaceType]] = None
interface: Optional[Global[InterfaceStr]] = None

@field_validator("vpn")
@classmethod
Expand All @@ -29,10 +29,10 @@ class SecurityZoneListParcel(_ParcelBase):
type_: Literal["security-zone"] = Field(default="security-zone", exclude=True)
entries: List[SecurityZoneListEntry] = Field(default=[], validation_alias=AliasPath("data", "entries"))

def add_interface(self, interface: InterfaceType):
def add_interface(self, interface: InterfaceStr):
self.entries.append(
SecurityZoneListEntry(
interface=as_global(interface, InterfaceType),
interface=Global[InterfaceStr](value=interface),
)
)

Expand Down
6 changes: 3 additions & 3 deletions catalystwan/models/policy/list/zone.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

from pydantic import BaseModel, Field, field_validator, model_validator

from catalystwan.models.common import InterfaceType, IntRangeStr, check_fields_exclusive
from catalystwan.models.common import InterfaceStr, IntRangeStr, check_fields_exclusive
from catalystwan.models.policy.policy_list import PolicyListBase, PolicyListId, PolicyListInfo


class ZoneListEntry(BaseModel):
vpn: Optional[IntRangeStr] = Field(default=None, description="0-65530 single number")
interface: Optional[InterfaceType] = None
interface: Optional[InterfaceStr] = None

@field_validator("vpn")
@classmethod
Expand All @@ -33,7 +33,7 @@ class ZoneList(PolicyListBase):
def assign_vpns(self, vpns: Set[int]) -> None:
self.entries = [ZoneListEntry(vpn=(vpn, None)) for vpn in vpns]

def assign_interfaces(self, ifs: Set[InterfaceType]) -> None:
def assign_interfaces(self, ifs: Set[InterfaceStr]) -> None:
self.entries = [ZoneListEntry(interface=interface) for interface in ifs]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def test_convert_source_ip_with_ip_networks(self) -> None:
assert not self._convert_result.info

def test_convert_source_ip_with_variable_name(self) -> None:
in_ = SourceIPEntry(vip_variable_name="{{some_var}}")
in_ = SourceIPEntry(vip_variable_name="some_var")
out_ = convert_sequence_match_entry(in_, self._convert_result)
assert type(out_) is SourceIp
assert out_.source_ip.ipv4_value.value == "{{some_var}}"
Expand All @@ -198,7 +198,7 @@ def test_convert_dst_ip_with_ip_networks(self) -> None:
assert not self._convert_result.info

def test_convert_dst_ip_with_variable_name(self) -> None:
in_ = DestinationIPEntry(vip_variable_name="{{some_var}}")
in_ = DestinationIPEntry(vip_variable_name="some_var")
out_ = convert_sequence_match_entry(in_, self._convert_result)
assert type(out_) is DestinationIp
assert out_.destination_ip.ipv4_value.value == "{{some_var}}"
Expand Down
3 changes: 1 addition & 2 deletions catalystwan/tests/config_migration/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,7 @@ def test_when_localized_policy_with_qos_expect_application_priority_feature_prof
(
p
for p in ux2_config.feature_profiles
if p.feature_profile.name == f"FROM_{localized_policy.policy_name}"
and p.header.type == "application-priority"
if p.feature_profile.name == localized_policy.policy_name and p.header.type == "application-priority"
),
None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from catalystwan.models.policy.definition.zone_based_firewall import LogAction as LogActionV1
from catalystwan.models.policy.definition.zone_based_firewall import ZoneBasedFWPolicy
from catalystwan.models.policy.policy_definition import ActionEntry, MatchEntry
from catalystwan.utils.config_migration.converters.utils import convert_varname


def split_value(match_entry) -> List[str]:
Expand Down Expand Up @@ -85,14 +86,14 @@ def convert_sequence_match_entry(
if match_entry.value is not None:
return SourceIp.from_ip_networks(list(map(IPv4Network, split_value(match_entry))))
elif match_entry.vip_variable_name is not None:
return SourceIp.from_variable(match_entry.vip_variable_name)
return SourceIp.from_variable(convert_varname(match_entry.vip_variable_name))
convert_result.update_status("partial", "SrcIP match entry does not contain value/vipVariableName")
return None
elif match_entry.field == "destinationIp":
if match_entry.value is not None:
return DestinationIp.from_ip_networks(list(map(IPv4Network, split_value(match_entry))))
elif match_entry.vip_variable_name is not None:
return DestinationIp.from_variable(match_entry.vip_variable_name)
return DestinationIp.from_variable(convert_varname(match_entry.vip_variable_name))
convert_result.update_status("partial", "DstIP match entry does not contain value/vipVariableName")
return None
elif match_entry.field == "destinationFqdn":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _create_sigs(self):
for sig in sigs:
self._ux2_config.profile_parcels.remove(sig)
try:
profile_name = f"FROM_{sig.header.origname}"
profile_name = sig.header.origname
profile_uuid = api.create_profile(profile_name, "Feature Profile created from SIG Feature Template").id
parcel_uuid = api.create_parcel(profile_uuid, sig.parcel).id
created_parcel = (sig.parcel.parcel_name, parcel_uuid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
SslDecryptionProfileParcel,
UrlFilteringParcel,
)
from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.app_probe import AppProbeParcel
from catalystwan.models.configuration.feature_profile.sdwan.policy_object.policy.sla_class import SLAClassParcel
from catalystwan.session import ManagerSession
from catalystwan.typed_list import DataSequence

Expand All @@ -25,7 +27,9 @@
UrlFilteringParcel: 1,
SslDecryptionProfileParcel: 1,
IntrusionPreventionParcel: 1,
AppProbeParcel: 1,
AdvancedInspectionProfileParcel: 2,
SLAClassParcel: 2,
}


Expand Down Expand Up @@ -96,17 +100,17 @@ def push(self) -> None:
continue

try:
self._progress(
f"Creating Policy Object Parcel: {parcel.parcel_name}",
i + 1,
len(transformed_parcels),
)
parcel = update_parcel_references(parcel, self.push_context.id_lookup)
parcel_id = self._policy_object_api.create_parcel(profile_id=profile_id, payload=parcel).id
profile_rollback.add_parcel(parcel.type_, parcel_id)
self._push_result.report.groups_of_interest.add_created(parcel.parcel_name, parcel_id)
self.push_context.id_lookup[transformed_parcel.header.origin] = parcel_id

self._progress(
f"Creating Policy Object Parcel: {parcel.parcel_name}",
i + 1,
len(transformed_parcels),
)
except ManagerHTTPError as e:
logger.error(f"Error occured during config group creation: {e.info}")
self._push_result.report.groups_of_interest.add_failed(parcel, e)
Expand Down
2 changes: 1 addition & 1 deletion catalystwan/workflows/config_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def transform(ux1: UX1Config, add_suffix: bool = False) -> ConfigTransformResult
origname=localized_policy.policy_name,
),
feature_profile=FeatureProfileCreationPayload(
name=f"FROM_{localized_policy.policy_name}",
name=localized_policy.policy_name,
description=localized_policy.policy_description,
),
)
Expand Down