Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Url filtering v1 model #638

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions catalystwan/api/policy_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
ConfigurationPolicySecurityGroupDefinition,
)
from catalystwan.endpoints.configuration.policy.definition.traffic_data import ConfigurationPolicyDataDefinition
from catalystwan.endpoints.configuration.policy.definition.url_filtering import (
ConfigurationPolicyUrlFilteringDefinition,
)
from catalystwan.endpoints.configuration.policy.definition.vpn_membership import (
ConfigurationPolicyVPNMembershipGroupDefinition,
)
Expand Down Expand Up @@ -144,6 +147,7 @@
from catalystwan.models.policy.definition.rule_set import RuleSet, RuleSetGetResponse
from catalystwan.models.policy.definition.security_group import SecurityGroup, SecurityGroupGetResponse
from catalystwan.models.policy.definition.traffic_data import TrafficDataPolicy, TrafficDataPolicyGetResponse
from catalystwan.models.policy.definition.url_filtering import UrlFilteringPolicy, UrlFilteringPolicyGetResponse
from catalystwan.models.policy.definition.vpn_membership import VPNMembershipPolicy, VPNMembershipPolicyGetResponse
from catalystwan.models.policy.definition.zone_based_firewall import ZoneBasedFWPolicy, ZoneBasedFWPolicyGetResponse
from catalystwan.models.policy.list.app_probe import AppProbeClassListInfo
Expand Down Expand Up @@ -233,6 +237,7 @@
DeviceAccessIPv6Policy: ConfigurationPolicyDeviceAccessIPv6Definition,
AdvancedMalwareProtectionPolicy: ConfigurationPolicyAMPDefinition,
IntrusionPreventionPolicy: ConfigurationPolicyIntrusionPreventionDefinition,
UrlFilteringPolicy: ConfigurationPolicyUrlFilteringDefinition,
}


Expand Down Expand Up @@ -664,6 +669,10 @@ def get(self, type: Type[IntrusionPreventionPolicy]) -> DataSequence[PolicyDefin
def get(self, type: Type[TrafficDataPolicy]) -> DataSequence[PolicyDefinitionInfo]:
...

@overload
def get(self, type: Type[UrlFilteringPolicy]) -> DataSequence[PolicyDefinitionInfo]:
...

@overload
def get(self, type: Type[AdvancedMalwareProtectionPolicy]) -> DataSequence[PolicyDefinitionInfo]:
...
Expand Down Expand Up @@ -729,6 +738,10 @@ def get(self, type: Type[IntrusionPreventionPolicy], id: UUID) -> IntrusionPreve
def get(self, type: Type[TrafficDataPolicy], id: UUID) -> TrafficDataPolicyGetResponse:
...

@overload
def get(self, type: Type[UrlFilteringPolicy], id: UUID) -> UrlFilteringPolicyGetResponse:
...

@overload
def get(self, type: Type[AdvancedMalwareProtectionPolicy], id: UUID) -> AdvancedMalwareProtectionPolicyGetResponse:
...
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

# mypy: disable-error-code="empty-body"

from uuid import UUID

from catalystwan.endpoints import APIEndpoints, delete, get, post, put
from catalystwan.endpoints.configuration.policy.abstractions import PolicyDefinitionEndpoints
from catalystwan.models.policy.definition.url_filtering import (
UrlFilteringPolicy,
UrlFilteringPolicyEditPayload,
UrlFilteringPolicyGetResponse,
)
from catalystwan.models.policy.policy_definition import (
PolicyDefinitionEditResponse,
PolicyDefinitionId,
PolicyDefinitionInfo,
PolicyDefinitionPreview,
)
from catalystwan.typed_list import DataSequence


class ConfigurationPolicyUrlFilteringDefinition(APIEndpoints, PolicyDefinitionEndpoints):
@post("/template/policy/definition/urlfiltering")
def create_policy_definition(self, payload: UrlFilteringPolicy) -> PolicyDefinitionId:
...

@delete("/template/policy/definition/urlfiltering/{id}")
def delete_policy_definition(self, id: UUID) -> None:
...

@put("/template/policy/definition/urlfiltering/{id}")
def edit_policy_definition(self, id: UUID, payload: UrlFilteringPolicyEditPayload) -> PolicyDefinitionEditResponse:
...

@get("/template/policy/definition/urlfiltering", "data")
def get_definitions(self) -> DataSequence[PolicyDefinitionInfo]:
...

@get("/template/policy/definition/urlfiltering/{id}")
def get_policy_definition(self, id: UUID) -> UrlFilteringPolicyGetResponse:
...

@post("/template/policy/definition/urlfiltering/preview")
def preview_policy_definition(self, payload: UrlFilteringPolicy) -> PolicyDefinitionPreview:
...

@get("/template/policy/definition/urlfiltering/preview/{id}")
def preview_policy_definition_by_id(self, id: UUID) -> PolicyDefinitionPreview:
...
4 changes: 4 additions & 0 deletions catalystwan/models/policy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from .definition.rule_set import RuleSet, RuleSetGetResponse
from .definition.security_group import SecurityGroup, SecurityGroupGetResponse
from .definition.traffic_data import TrafficDataPolicy, TrafficDataPolicyGetResponse
from .definition.url_filtering import UrlFilteringPolicy, UrlFilteringPolicyGetResponse
from .definition.vpn_membership import VPNMembershipPolicy, VPNMembershipPolicyGetResponse
from .definition.zone_based_firewall import ZoneBasedFWPolicy, ZoneBasedFWPolicyGetResponse
from .localized import LocalizedPolicy
Expand Down Expand Up @@ -88,6 +89,7 @@
RuleSet,
SecurityGroup,
TrafficDataPolicy,
UrlFilteringPolicy,
VPNMembershipPolicy,
ZoneBasedFWPolicy,
],
Expand Down Expand Up @@ -184,6 +186,7 @@
RuleSetGetResponse,
SecurityGroupGetResponse,
TrafficDataPolicyGetResponse,
UrlFilteringPolicyGetResponse,
VPNMembershipPolicyGetResponse,
ZoneBasedFWPolicyGetResponse,
],
Expand Down Expand Up @@ -252,6 +255,7 @@
"UnifiedSecurityPolicy",
"URLBlockList",
"URLAllowList",
"UrlFilteringPolicy",
"VPNList",
"VPNMembershipPolicy",
"ZoneBasedFWPolicy",
Expand Down
8 changes: 2 additions & 6 deletions catalystwan/models/policy/definition/intrusion_prevention.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

from typing import List, Literal, Optional
from uuid import UUID

from pydantic import BaseModel, ConfigDict, Field

Expand All @@ -10,22 +9,19 @@
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Reference,
)

SignatureSetType = Literal["balanced", "connectivity", "security"]
InspectionModeType = Literal["protection", "detection"]
LogLevel = Literal["emergency", "alert", "critical", "error", "warning", "notice", "info", "debug"]


class SignatureWhiteList(BaseModel):
ref: Optional[UUID] = Field(default=None)


class IntrusionPreventionDefinition(BaseModel):
model_config = ConfigDict(populate_by_name=True)
signature_set: SignatureSetType = Field(validation_alias="signatureSet", serialization_alias="signatureSet")
inspection_mode: InspectionModeType = Field(validation_alias="inspectionMode", serialization_alias="inspectionMode")
signature_white_list: Optional[SignatureWhiteList] = Field(
signature_white_list: Optional[Reference] = Field(
default=None, validation_alias="signatureWhiteList", serialization_alias="signatureWhiteList"
)
log_level: Optional[LogLevel] = Field(default="error", validation_alias="logLevel", serialization_alias="logLevel")
Expand Down
152 changes: 152 additions & 0 deletions catalystwan/models/policy/definition/url_filtering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

from typing import List, Literal, Optional, Set

from pydantic import BaseModel, ConfigDict, Field, field_validator

from catalystwan.models.common import PolicyModeType, VpnId
from catalystwan.models.policy.policy_definition import (
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
Reference,
)

BLOCK_PAGE_CONTENT_HEADER = "Access to the requested page has been denied."

BlockPageAction = Literal["text", "redirectUrl"]
UrlFilteringAlerts = Literal["blacklist", "whitelist", "categories-reputation"]
WebReputation = Literal["low-risk", "moderate-risk", "high-risk", "suspicious", "trustworthy"]
WebCategoriesAction = Literal["allow", "block"]
WebCategories = Literal[
"abused-drugs",
"abortion",
"adult-and-pornography",
"alcohol-and-tobacco",
"auctions",
"bot-nets",
"business-and-economy",
"cdns",
"cheating",
"computer-and-internet-info",
"computer-and-internet-security",
"games",
"gambling",
"financial-services",
"fashion-and-beauty",
"entertainment-and-arts",
"educational-institutions",
"dynamic-content",
"dead-sites",
"confirmed-spam-sources",
"cult-and-occult",
"dating",
"government",
"gross",
"hacking",
"hate-and-racism",
"health-and-medicine",
"home",
"hunting-and-fishing",
"illegal",
"image-and-video-search",
"individual-stock-advice-and-tools",
"internet-communications",
"internet-portals",
"job-search",
"keyloggers-and-monitoring",
"kids",
"legal",
"local-information",
"malware-sites",
"marijuana",
"p2p",
"parked-sites",
"pay-to-surf",
"personal-sites-and-blogs",
"philosophy-and-political-advocacy",
"phishing-and-other-frauds",
"private-ip-addresses",
"proxy-avoid-and-anonymizers",
"questionable",
"real-estate",
"recreation-and-hobbies",
"reference-and-research",
"religion",
"search-engines",
"sex-education",
"shareware-and-freeware",
"shopping",
"social-network",
"society",
"sports",
"spam-urls",
"spyware-and-adware",
"streaming-media",
"swimsuits-and-intimate-apparel",
"training-and-tools",
"translation",
"travel",
"uncategorized",
"unconfirmed-spam-sources",
"violence",
"weapons",
"web-advertisements",
"web-based-email",
"web-hosting",
"open-http-proxies",
"online-personal-storage",
"online-greeting-cards",
"nudity",
"news-and-media",
"music",
"motor-vehicles",
"military",
]


class UrlFilteringDefinition(BaseModel):
model_config = ConfigDict(populate_by_name=True)
web_categories_action: WebCategoriesAction = Field(
validation_alias="webCategoriesAction", serialization_alias="webCategoriesAction"
)
web_categories: List[WebCategories] = Field(validation_alias="webCategories", serialization_alias="webCategories")
web_reputation: WebReputation = Field(validation_alias="webReputation", serialization_alias="webReputation")
url_white_list: Optional[Reference] = Field(
default=None, validation_alias="urlWhiteList", serialization_alias="urlWhiteList"
)
url_black_list: Optional[Reference] = Field(
default=None, validation_alias="urlBlackList", serialization_alias="urlBlackList"
)
block_page_action: BlockPageAction = Field(
validation_alias="blockPageAction", serialization_alias="blockPageAction"
)
block_page_contents: str = Field(
default=BLOCK_PAGE_CONTENT_HEADER, validation_alias="blockPageContents", serialization_alias="blockPageContents"
)
logging: List[str] = Field(default=[])
enable_alerts: bool = Field(validation_alias="enableAlerts", serialization_alias="enableAlerts")
alerts: Set[UrlFilteringAlerts] = Field(default=[], validation_alias="alerts", serialization_alias="alerts")
target_vpns: List[VpnId] = Field(default=[], validation_alias="targetVpns", serialization_alias="targetVpns")

@field_validator("url_black_list", "url_white_list", mode="before")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, if it's the best solution.
in some cases the url_black_list / white_list is not None, but empty dict, which cannot be deserialised do the Reference object.

The validator checks if value is an empty dict and the returns None,.

Everything works as expected. Tested a few cases

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍

@classmethod
def convert_empty_dict_to_none(cls, value):
if not value:
return None

return value


class UrlFilteringPolicy(PolicyDefinitionBase):
type: Literal["urlFiltering"] = "urlFiltering"
mode: PolicyModeType = "security"
definition: UrlFilteringDefinition


class UrlFilteringPolicyEditPayload(UrlFilteringPolicy, PolicyDefinitionId):
pass


class UrlFilteringPolicyGetResponse(UrlFilteringPolicy, PolicyDefinitionGetResponse):
pass