diff --git a/catalystwan/tests/config_migration/policy_converters/test_custom_control.py b/catalystwan/tests/config_migration/policy_converters/test_custom_control.py index 06b1e4da..b36654a0 100644 --- a/catalystwan/tests/config_migration/policy_converters/test_custom_control.py +++ b/catalystwan/tests/config_migration/policy_converters/test_custom_control.py @@ -10,9 +10,6 @@ class TestCustomControlConverter(unittest.TestCase): - def setUp(self) -> None: - self.context = PolicyConvertContext() - def test_custom_control_with_route_sequence_conversion(self): # Arrange default_action = "accept" @@ -37,11 +34,11 @@ def test_custom_control_with_route_sequence_conversion(self): region_id = 200 region_role = "border-router" site_id = 1 - site_list = uuid4() tloc_ip = IPv4Address("30.1.2.3") tloc_color = "private5" tloc_encap = "ipsec" - vpn_list = uuid4() + vpn_list_id = uuid4() + vpn_list_entries = map(str, [30, 31, 32]) seq_name = "route_sequence" base_action = "accept" ip_type = "ipv4" @@ -62,12 +59,12 @@ def test_custom_control_with_route_sequence_conversion(self): route.match_prefix_list(prefix_list) route.match_region(region_id, region_role) route.match_site(site_id) - # route.match_site_list(site_list) siteList is mutually exclusive with {'siteId'} route.match_tloc(ip=tloc_ip, color=tloc_color, encap=tloc_encap) - route.match_vpn_list(vpn_list) + route.match_vpn_list(vpn_list_id) + # Arrange context + context = PolicyConvertContext(lan_vpns_by_list_id=dict.fromkeys([vpn_list_id], vpn_list_entries)) # Act - uuid = uuid4() - parcel = convert(policy, uuid, context=self.context).output + parcel = convert(policy, uuid4(), context).output # Assert parcel assert isinstance(parcel, CustomControlParcel) assert parcel.parcel_name == name @@ -116,12 +113,12 @@ def test_custom_control_with_tloc_sequence_conversion(self): omp_tag = 2 originator = IPv4Address("20.30.3.1") preference = 100 - region_id = 200 - region_role = "border-router" - site_id = 1 - tloc_ip = IPv4Address("30.1.2.3") - tloc_color = "private5" - tloc_encap = "ipsec" + region_list_id = uuid4() + region_list_entries = ["Region-9", "Region-10"] + region_role = "edge-router" + site_list_id = uuid4() + site_list_entries = ["SITE_100", "SITE_200"] + tloc_list_id = uuid4() seq_name = "tloc_sequence" base_action = "accept" ip_type = "ipv4" @@ -137,14 +134,16 @@ def test_custom_control_with_tloc_sequence_conversion(self): tloc.match_omp_tag(omp_tag) tloc.match_originator(originator) tloc.match_preference(preference) - tloc.match_region(region_id, region_role) - tloc.match_site(site_id) - # tloc.match_site_list(site_list) siteList is mutually exclusive with {'siteId'} - tloc.match_tloc(ip=tloc_ip, color=tloc_color, encap=tloc_encap) - # tloc.match_tloc_list(tloc_list) tlocList is mutually exclusive with {'tloc'} + tloc.match_region_list(region_list_id, region_role) + tloc.match_site_list(site_list_id) + tloc.match_tloc_list(tloc_list_id) + # Arrange context + context = PolicyConvertContext( + sites_by_list_id=dict.fromkeys([site_list_id], site_list_entries), + regions_by_list_id=dict.fromkeys([region_list_id], region_list_entries), + ) # Act - uuid = uuid4() - parcel = convert(policy, uuid, context=self.context).output + parcel = convert(policy, uuid4(), context).output # Assert parcel assert isinstance(parcel, CustomControlParcel) assert parcel.parcel_name == name @@ -164,10 +163,8 @@ def test_custom_control_with_tloc_sequence_conversion(self): assert seq.match.entries[4].omp_tag.value == omp_tag assert seq.match.entries[5].originator.value == str(originator) assert seq.match.entries[6].preference.value == preference - # assert seq.match.entries[9]..value == region_id - # assert seq.match.entries[10].region_role.value == region_role - # assert seq.match.entries[11].site.value == site_id - assert seq.match.entries[10].tloc.ip.value == str(tloc_ip) - assert seq.match.entries[10].tloc.color.value == tloc_color - assert seq.match.entries[10].tloc.encap.value == tloc_encap - # assert seq.match.entries[13].vpn_list.ref_id.value == str(vpn_list) + assert seq.match.entries[7].regions[0].region.value == region_list_entries[0] + assert seq.match.entries[7].regions[1].region.value == region_list_entries[1] + assert seq.match.entries[8].site.value[0] == site_list_entries[0] + assert seq.match.entries[8].site.value[1] == site_list_entries[1] + assert seq.match.entries[9].tloc_list.ref_id == str(tloc_list_id) diff --git a/catalystwan/utils/config_migration/converters/feature_template/base.py b/catalystwan/utils/config_migration/converters/feature_template/base.py index 98092e30..46825113 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/base.py +++ b/catalystwan/utils/config_migration/converters/feature_template/base.py @@ -1,10 +1,12 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates from abc import ABC, abstractmethod -from typing import Optional, Tuple +from typing import Dict, Optional, Tuple, Union +from catalystwan.api.configuration_groups.parcel import Global, Variable, as_global from catalystwan.models.configuration.config_migration import ConvertResult from catalystwan.models.configuration.feature_profile.parcel import AnyParcel from catalystwan.utils.config_migration.converters.exceptions import CatalystwanConverterCantConvertException +from catalystwan.utils.config_migration.converters.utils import convert_interface_name class FTConverter(ABC): @@ -33,3 +35,16 @@ def convert(self, name: str, description: str, template_values: dict) -> Convert except (CatalystwanConverterCantConvertException, AttributeError, LookupError, TypeError, ValueError) as e: self._convert_result.update_status("failed", str(e.__class__.__name__ + ": " + str(e))) return self._convert_result + + def parse_interface_name(self, data: Dict) -> Union[Global[str], Variable]: + if_name = data.get("if_name") + if isinstance(if_name, Variable): + return if_name + elif isinstance(if_name, Global): + converted_if_name = convert_interface_name(if_name.value) + if converted_if_name != if_name.value: + self._convert_result.update_status( + "partial", f"Converted interface name: {if_name.value} -> {converted_if_name}" + ) + return as_global(converted_if_name) + raise CatalystwanConverterCantConvertException("Interface name is required") diff --git a/catalystwan/utils/config_migration/converters/feature_template/ethernet.py b/catalystwan/utils/config_migration/converters/feature_template/ethernet.py index 6ccf19d2..64fd6dab 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/ethernet.py +++ b/catalystwan/utils/config_migration/converters/feature_template/ethernet.py @@ -23,7 +23,6 @@ StaticIPv6AddressConfig, ) from catalystwan.utils.config_migration.converters.feature_template.helpers import create_dict_without_none -from catalystwan.utils.config_migration.converters.utils import parse_interface_name from catalystwan.utils.config_migration.steps.constants import MANAGEMENT_VPN_ETHERNET from .base import FTConverter @@ -39,7 +38,7 @@ def create_parcel(self, name: str, description: str, template_values: dict) -> I parcel_name=name, parcel_description=description, advanced=self.parse_advanced(data), - interface_name=parse_interface_name(self, data), + interface_name=self.parse_interface_name(data), interface_description=data.get("description", Default[None](value=None)), intf_ip_address=self.parse_ipv4_address(data), shutdown=data.get("shutdown"), diff --git a/catalystwan/utils/config_migration/converters/feature_template/lan/ethernet.py b/catalystwan/utils/config_migration/converters/feature_template/lan/ethernet.py index 20a5a4ad..bdbd90df 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/lan/ethernet.py +++ b/catalystwan/utils/config_migration/converters/feature_template/lan/ethernet.py @@ -30,7 +30,6 @@ VrrpIPv6, ) from catalystwan.utils.config_migration.converters.feature_template.base import FTConverter -from catalystwan.utils.config_migration.converters.utils import parse_interface_name from catalystwan.utils.config_migration.steps.constants import LAN_VPN_ETHERNET @@ -92,7 +91,7 @@ def create_parcel(self, name: str, description: str, template_values: dict) -> I parcel_name=name, parcel_description=description, shutdown=data.get("shutdown", as_default(True)), - interface_name=parse_interface_name(self, data), + interface_name=self.parse_interface_name(data), ethernet_description=data.get("description"), interface_ip_address=self.configure_ipv4_address(data), dhcp_helper=data.get("dhcp_helper"), diff --git a/catalystwan/utils/config_migration/converters/feature_template/wan/ethernet.py b/catalystwan/utils/config_migration/converters/feature_template/wan/ethernet.py index b8ffec02..1b01b487 100644 --- a/catalystwan/utils/config_migration/converters/feature_template/wan/ethernet.py +++ b/catalystwan/utils/config_migration/converters/feature_template/wan/ethernet.py @@ -32,7 +32,6 @@ ) from catalystwan.utils.config_migration.converters.feature_template.base import FTConverter from catalystwan.utils.config_migration.converters.feature_template.helpers import create_dict_without_none -from catalystwan.utils.config_migration.converters.utils import parse_interface_name from catalystwan.utils.config_migration.steps.constants import WAN_VPN_ETHERNET logger = logging.getLogger(__name__) @@ -45,7 +44,7 @@ def create_parcel(self, name: str, description: str, template_values: dict) -> I data = deepcopy(template_values) encapsulation = self.parse_encapsulations(data.get("tunnel_interface", {}).get("encapsulation", [])) - interface_name = parse_interface_name(self, data) + interface_name = self.parse_interface_name(data) interface_description = data.get( "description", as_global(description) ) # Edge case where model doesn't have description but its required diff --git a/catalystwan/utils/config_migration/converters/utils.py b/catalystwan/utils/config_migration/converters/utils.py index 56ecb67a..95553f61 100644 --- a/catalystwan/utils/config_migration/converters/utils.py +++ b/catalystwan/utils/config_migration/converters/utils.py @@ -1,10 +1,5 @@ # Copyright 2024 Cisco Systems, Inc. and its affiliates import re -from typing import Dict, Union - -from catalystwan.api.configuration_groups.parcel import Global, Variable, as_global -from catalystwan.utils.config_migration.converters.exceptions import CatalystwanConverterCantConvertException -from catalystwan.utils.config_migration.converters.feature_template.base import FTConverter NEGATIVE_VARIABLE_REGEX = re.compile(r"[^.\/\[\]a-zA-Z0-9_-]") @@ -24,17 +19,3 @@ def convert_interface_name(if_name: str) -> str: new_if_name = value + if_name[len(key) :] return new_if_name return if_name - - -def parse_interface_name(converter: FTConverter, data: Dict) -> Union[Global[str], Variable]: - if_name = data.get("if_name") - if isinstance(if_name, Variable): - return if_name - elif isinstance(if_name, Global): - converted_if_name = convert_interface_name(if_name.value) - if converted_if_name != if_name.value: - converter._convert_result.update_status( - "partial", f"Converted interface name: {if_name.value} -> {converted_if_name}" - ) - return as_global(converted_if_name) - raise CatalystwanConverterCantConvertException("Interface name is required")