Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Start working on integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
jpkrajewski committed Jul 29, 2024
1 parent eb1f93c commit baff2b6
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 29 deletions.
2 changes: 1 addition & 1 deletion catalystwan/api/template_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def delete(self, template: Type[FeatureTemplate], name: str) -> bool: # type: i
def delete(self, template: Type[CLITemplate], name: str) -> bool: # type: ignore
...

def delete(self, template, name):
def delete(self, template, name) -> bool:
status = False

if template is FeatureTemplate:
Expand Down
5 changes: 5 additions & 0 deletions catalystwan/api/templates/device_template/device_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ def get(self, name: str, session: ManagerSession) -> DeviceTemplate:
resp = session.get(f"dataservice/template/device/object/{device_template.id}").json()
return DeviceTemplate(**resp)

def associate_feature_template(self, template_type: str, template_uuid: UUID) -> None:
self.general_templates.append(
GeneralTemplate(name="", template_id=str(template_uuid), template_type=template_type)
)

model_config = ConfigDict(populate_by_name=True, use_enum_values=True)


Expand Down
160 changes: 159 additions & 1 deletion catalystwan/integration_tests/test_migration_flow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates


from catalystwan.integration_tests.base import TestCaseBase
from ipaddress import IPv4Address, IPv4Interface
from uuid import UUID

from catalystwan.api.templates.device_template.device_template import DeviceTemplate
from catalystwan.api.templates.feature_template import FeatureTemplate
from catalystwan.api.templates.models.cisco_secure_internet_gateway import (
CiscoSecureInternetGatewayModel,
Interface,
InterfacePair,
Service,
Tracker,
)
from catalystwan.integration_tests.base import TestCaseBase, create_name_with_run_id
from catalystwan.utils.config_migration.runner import ConfigMigrationRunner


Expand Down Expand Up @@ -36,3 +48,149 @@ def test_push_artefact(self):
def tearDownClass(cls) -> None:
cls.runner.clear_ux2()
super().tearDownClass()


class TestPolicyGroupAggregation(TestCaseBase):
runner: ConfigMigrationRunner
sig_name: str
sig_uuid: str
device_template_name: str
device_template_uuid: str

@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
# --------------------------------------
# Create SIG Feature Template
# --------------------------------------
templates = cls.session.api.templates
cls.sig_name = create_name_with_run_id("SIG")
cisco_sig = CiscoSecureInternetGatewayModel(
template_name=cls.sig_name,
template_description="Comprehensive CiscoSecureInternetGateway Configuration",
vpn_id=10,
child_org_id="example_org",
interface=[
Interface(
if_name="GigabitEthernet0/0",
auto=True,
shutdown=False,
description="Main interface for SIG",
unnumbered=False,
address=IPv4Interface("192.168.1.1/24"),
tunnel_source=IPv4Address("192.168.1.1"),
tunnel_source_interface="Loopback0",
tunnel_route_via="192.168.2.1",
tunnel_destination="203.0.113.1",
application="sig",
tunnel_set="secure-internet-gateway-umbrella",
tunnel_dc_preference="primary-dc",
tcp_mss_adjust=1400,
mtu=1400,
dpd_interval=30,
dpd_retries=3,
ike_version=2,
pre_shared_secret="MyPreSharedSecret", # pragma: allowlist secret
ike_rekey_interval=3600,
ike_ciphersuite="aes256-cbc-sha1",
ike_group="14",
pre_shared_key_dynamic=False,
ike_local_id="local-id",
ike_remote_id="remote-id",
ipsec_rekey_interval=3600,
ipsec_replay_window=32,
ipsec_ciphersuite="aes256-gcm",
perfect_forward_secrecy="group-14",
tracker=True,
track_enable=True,
)
],
service=[
Service(
svc_type="sig",
interface_pair=[
InterfacePair(
active_interface="GigabitEthernet0/0",
active_interface_weight=10,
backup_interface="GigabitEthernet0/1",
backup_interface_weight=5,
)
],
auth_required=True,
xff_forward_enabled=True,
ofw_enabled=False,
ips_control=True,
caution_enabled=False,
primary_data_center="Auto",
secondary_data_center="Auto",
ip=True,
idle_time=30,
display_time_unit="MINUTE",
ip_enforced_for_known_browsers=True,
refresh_time=5,
refresh_time_unit="MINUTE",
enabled=True,
block_internet_until_accepted=False,
force_ssl_inspection=True,
timeout=60,
data_center_primary="Auto",
data_center_secondary="Auto",
)
],
tracker_src_ip=IPv4Interface("192.0.2.1/32"),
tracker=[
Tracker(
name="health-check-tracker",
endpoint_api_url="https://api.example.com/health",
threshold=5,
interval=60,
multiplier=2,
tracker_type="SIG",
)
],
)
cls.sig_uuid = templates.create(cisco_sig)
# --------------------------------------
# Create Device Template
# --------------------------------------
dt = DeviceTemplate.get(
"Factory_Default_C8000V_V01",
cls.session,
)
cls.device_template_name = create_name_with_run_id("DeviceTemplate_to_PolicyGroup")
dt.template_name = cls.device_template_name
dt.factory_default = False
dt.associate_feature_template(cisco_sig.type, UUID(cls.sig_uuid))
cls.device_template_uuid = templates.create(dt)
# --------------------------------------
# Run Migration
# --------------------------------------
cls.runner = ConfigMigrationRunner.collect_and_push(cls.session, filter=cls.device_template_name)
cls.runner.set_dump_prefix("aggregation")
cls.runner.run()

def test_sss(self):
"""Test Policy Group Aggregation
When:
UX1.0: Device Template has associated a SIG Feature Template, Security Policy and Localized Policy
Expect:
UX2.0: Policy Group with associated SIG, Application Priority & SLA, Security, DNS Security Feature Profiles
"""
# Act
policy_group = (
self.session.endpoints.configuration.policy_group.get_all()
.filter(name=self.device_template_name)
.single_or_default()
)
# Assert
assert policy_group is not None
assert policy_group.get_profile_by_name(self.sig_name) is not None

@classmethod
def tearDownClass(cls) -> None:
cls.session.api.templates.delete(DeviceTemplate, cls.device_template_name)
cls.session.api.templates.delete(FeatureTemplate, cls.sig_name) # type: ignore
cls.runner.clear_ux2()
return super().tearDownClass()
1 change: 1 addition & 0 deletions catalystwan/models/configuration/config_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ class PushContext:
id_lookup: Dict[UUID, UUID] = field(
default_factory=dict
) # universal lookup for finding pushed item id by origin id
policy_group_feature_profiles_id_lookup: Dict[UUID, UUID] = field(default_factory=dict)


@dataclass
Expand Down
11 changes: 11 additions & 0 deletions catalystwan/models/configuration/policy_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,23 @@ class ProfileInfo(BaseModel):
profile_type: Optional[Literal["global"]] = Field(
default=None, validation_alias="profileType", serialization_alias="profileType"
)
name: str


class PolicyGroupId(BaseModel):
model_config = ConfigDict(populate_by_name=True)
id: UUID
name: str
profiles: Optional[List[ProfileInfo]] = Field(
default=None,
description="(Optional - only applicable for AON) List of profile ids that belongs to the policy group",
)

def get_profile_by_name(self, name: str) -> Optional[ProfileInfo]:
if self.profiles is None:
return None

for profile in self.profiles:
if profile.name == name:
return profile
return None
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@


def create_device_template(
name, *, sig_uuid: Optional[UUID] = None, security_policy_uuid: Optional[UUID] = None
name,
*,
sig_uuid: Optional[UUID] = None,
security_policy_uuid: Optional[UUID] = None,
localized_policy_uuid: Optional[UUID] = None
) -> DeviceTemplateWithInfo:
dt = DeviceTemplateWithInfo(
template_id=str(uuid4()),
Expand All @@ -31,5 +35,7 @@ def create_device_template(
]
if security_policy_uuid:
dt.security_policy_id = str(security_policy_uuid)
if localized_policy_uuid:
dt.policy_id = str(localized_policy_uuid)

return dt
59 changes: 36 additions & 23 deletions catalystwan/tests/config_migration/test_transform.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates
# Copyright 2023 Cisco Systems, Inc. and its affiliates
from typing import List, Optional, TypeVar
from typing import List, Optional, Set, TypeVar
from uuid import UUID, uuid4

from catalystwan.api.configuration_groups.parcel import _ParcelBase
from catalystwan.api.templates.device_template.device_template import GeneralTemplate
from catalystwan.models.configuration.config_migration import (
DeviceTemplateWithInfo,
TransformedParcel,
TransformedPolicyGroup,
UX1Config,
UX1Policies,
UX1Templates,
Expand Down Expand Up @@ -57,6 +58,12 @@ def find_subelement_parcel(
return subelement_parcel


def _find_policy_group_by_subelements(
array: List[TransformedPolicyGroup], subelements: Set[UUID]
) -> Optional[TransformedPolicyGroup]:
return next((pg for pg in array if pg.header.subelements == subelements), None)


def test_when_many_cisco_vpn_feature_templates_expect_assign_to_correct_feature_profile():
"""Cisco VPN Feature Templates can represent Service, Transport, or Security VPNs,
but there is only one template_type for all of them in UX1.
Expand Down Expand Up @@ -572,11 +579,11 @@ def test_when_localized_policy_with_qos_expect_application_priority_feature_prof
def test_policy_profile_merge():
"""Assumptions:
- dt_a uses sp_1 and sig_1
- dt_b uses sp_1 and sig_1
- dt_c uses sp_2 and sig_1
- dt_d uses sp_2 and sig_1
- dt_e uses sp_3
- dt_a uses sp_1, lp_1, sig_1,
- dt_b uses sp_1, lp_1, sig_1
- dt_c uses sp_2, lp_2, sig_1
- dt_d uses sp_2, lp_2, sig_1
- dt_e uses sp_3, lp_2
Expected result:
- dt_a, dt_b are merged to one policy group
Expand All @@ -592,30 +599,36 @@ def test_policy_profile_merge():
sp_2 = create_security_policy("SP-2")
sp_3 = create_security_policy("SP-3")

dt_A = create_device_template("DT-A", sig_uuid=sig_1_uuid, security_policy_uuid=sp_1.policy_id)
dt_B = create_device_template("DT-B", sig_uuid=sig_1_uuid, security_policy_uuid=sp_1.policy_id)
dt_C = create_device_template("DT-C", sig_uuid=sig_1_uuid, security_policy_uuid=sp_2.policy_id)
dt_D = create_device_template("DT-D", sig_uuid=sig_1_uuid, security_policy_uuid=sp_2.policy_id)
dt_E = create_device_template("DT-E", security_policy_uuid=sp_3.policy_id)
lp_1 = create_localized_policy_info("LP-1")
lp_2 = create_localized_policy_info("LP-2")

dt_A = create_device_template(
"DT-A", sig_uuid=sig_1_uuid, security_policy_uuid=sp_1.policy_id, localized_policy_uuid=lp_1.policy_id
)
dt_B = create_device_template(
"DT-B", sig_uuid=sig_1_uuid, security_policy_uuid=sp_1.policy_id, localized_policy_uuid=lp_1.policy_id
)
dt_C = create_device_template(
"DT-C", sig_uuid=sig_1_uuid, security_policy_uuid=sp_2.policy_id, localized_policy_uuid=lp_2.policy_id
)
dt_D = create_device_template(
"DT-D", sig_uuid=sig_1_uuid, security_policy_uuid=sp_2.policy_id, localized_policy_uuid=lp_2.policy_id
)
dt_E = create_device_template("DT-E", security_policy_uuid=sp_3.policy_id, localized_policy_uuid=lp_2.policy_id)

ux1.templates.device_templates = [dt_A, dt_B, dt_C, dt_D, dt_E]
ux1.policies.security_policies = [sp_1, sp_2, sp_3]
ux1.policies.localized_policies = [lp_1, lp_2]

merged_A_B_subelements = set().union(sp_1.get_assemby_item_uuids(), {sig_1_uuid}, {lp_1.policy_id})
merged_C_D_subelements = set().union(sp_2.get_assemby_item_uuids(), {sig_1_uuid}, {lp_2.policy_id})
separate_E_subelements = set().union(sp_3.get_assemby_item_uuids(), {lp_2.policy_id})
# Act
config = transform(ux1=ux1).ux2_config
merged_A_B = _find_policy_group_by_subelements(config.policy_groups, merged_A_B_subelements)
merged_C_D = _find_policy_group_by_subelements(config.policy_groups, merged_C_D_subelements)
separate_E = _find_policy_group_by_subelements(config.policy_groups, separate_E_subelements)
# Assert
merged_A_B = None
merged_C_D = None
separate_E = None
for pg in config.policy_groups:
se = pg.header.subelements
if se == {sig_1_uuid}.union(sp_1.get_assemby_item_uuids()):
merged_A_B = pg
elif se == {sig_1_uuid}.union(sp_2.get_assemby_item_uuids()):
merged_C_D = pg
elif se == sp_3.get_assemby_item_uuids():
separate_E = pg

assert len(config.policy_groups) == 3
assert merged_A_B is not None
assert merged_C_D is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def _create_sigs(self):
)
)
self._push_result.rollback.add_feature_profile(profile_uuid, "sig-security")
self._push_context.policy_group_feature_profiles_id_lookup[sig.header.origin] = profile_uuid
except ManagerHTTPError as e:
logger.error(f"Error occured during sig creation: {e}")
self._push_result.report.add_standalone_feature_profiles(profiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ def push(self):
report = app_prio_builder.build()
self._push_result.rollback.add_feature_profile(report.profile_uuid, app_prio_profile.header.type)
app_prio_reports.append(report)
self._push_context.policy_group_feature_profiles_id_lookup[
app_prio_profile.header.origin
] = report.profile_uuid
except ManagerHTTPError as e:
logger.error(f"Error occured during Application Priority profile creation: {e.info}")
self._push_result.report.add_standalone_feature_profiles(app_prio_reports)
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def _push_policy_parcel(
security_policy_parcel_id = self._embedded_security_api.create_parcel(profile_id, parcel).id
report.add_created_parcel(parcel.parcel_name, security_policy_parcel_id)
self.push_context.id_lookup[policy.header.origin] = security_policy_parcel_id
self.push_context.policy_group_feature_profiles_id_lookup[policy.header.origin] = profile_id
except ManagerHTTPError as e:
logger.error(f"Error occured during creating PolicyParcel in embedded security profile: {e.info}")
report.add_failed_parcel(parcel_name=parcel.parcel_name, parcel_type=parcel.type_, error_info=e.info)
Expand Down Expand Up @@ -145,6 +146,9 @@ def push_dns_security_policies(self) -> None:
try:
parcel_id = self._dns_security_api.create_parcel(profile_id, parcel).id
self.push_context.id_lookup[dns_security_policy.header.origin] = parcel_id
self.push_context.policy_group_feature_profiles_id_lookup[
dns_security_policy.header.origin
] = profile_id
feature_profile_report.add_created_parcel(dns_security_policy.parcel.parcel_name, parcel_id)
except ManagerHTTPError as e:
logger.error(f"Error occured during DNS Security policy creation: {e.info}")
Expand Down
Loading

0 comments on commit baff2b6

Please sign in to comment.