Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Advanced malware protection v1 (#199)
Browse files Browse the repository at this point in the history
  • Loading branch information
radkrawczyk authored Apr 25, 2024
1 parent 9ee248b commit 0a38068
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 0 deletions.
14 changes: 14 additions & 0 deletions catalystwan/api/policy_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from catalystwan.endpoints.configuration.policy.definition.access_control_list_ipv6 import (
ConfigurationPolicyAclIPv6Definition,
)
from catalystwan.endpoints.configuration.policy.definition.amp import ConfigurationPolicyAMPDefinition
from catalystwan.endpoints.configuration.policy.definition.control import ConfigurationPolicyControlDefinition
from catalystwan.endpoints.configuration.policy.definition.device_access import (
ConfigurationPolicyDeviceAccessDefinition,
Expand Down Expand Up @@ -119,6 +120,10 @@
from catalystwan.models.policy.centralized import CentralizedPolicy, CentralizedPolicyEditPayload, CentralizedPolicyInfo
from catalystwan.models.policy.definition.access_control_list import AclPolicy, AclPolicyGetResponse
from catalystwan.models.policy.definition.access_control_list_ipv6 import AclIPv6Policy, AclIPv6PolicyGetResponse
from catalystwan.models.policy.definition.amp import (
AdvancedMalwareProtectionPolicy,
AdvancedMalwareProtectionPolicyGetResponse,
)
from catalystwan.models.policy.definition.control import ControlPolicy, ControlPolicyGetResponse
from catalystwan.models.policy.definition.device_access import DeviceAccessPolicy, DeviceAccessPolicyGetResponse
from catalystwan.models.policy.definition.device_access_ipv6 import (
Expand Down Expand Up @@ -219,6 +224,7 @@
AclIPv6Policy: ConfigurationPolicyAclIPv6Definition,
DeviceAccessPolicy: ConfigurationPolicyDeviceAccessDefinition,
DeviceAccessIPv6Policy: ConfigurationPolicyDeviceAccessIPv6Definition,
AdvancedMalwareProtectionPolicy: ConfigurationPolicyAMPDefinition,
}


Expand Down Expand Up @@ -646,6 +652,10 @@ def delete(self, type: Type[AnyPolicyDefinition], id: UUID) -> None:
def get(self, type: Type[TrafficDataPolicy]) -> DataSequence[PolicyDefinitionInfo]:
...

@overload
def get(self, type: Type[AdvancedMalwareProtectionPolicy]) -> DataSequence[PolicyDefinitionInfo]:
...

@overload
def get(self, type: Type[RuleSet]) -> DataSequence[PolicyDefinitionInfo]:
...
Expand Down Expand Up @@ -704,6 +714,10 @@ def get(self, type: Type[DeviceAccessIPv6Policy]) -> DataSequence[PolicyDefiniti
def get(self, type: Type[TrafficDataPolicy], id: UUID) -> TrafficDataPolicyGetResponse:
...

@overload
def get(self, type: Type[AdvancedMalwareProtectionPolicy], id: UUID) -> AdvancedMalwareProtectionPolicyGetResponse:
...

@overload
def get(self, type: Type[RuleSet], id: UUID) -> RuleSetGetResponse:
...
Expand Down
52 changes: 52 additions & 0 deletions catalystwan/endpoints/configuration/policy/definition/amp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

# mypy: disable-error-code="empty-body"

from uuid import UUID

from catalystwan.endpoints import APIEndpoints, delete, get, post, put
from catalystwan.endpoints.configuration.policy.abstractions import PolicyDefinitionEndpoints
from catalystwan.models.policy.definition.amp import (
AdvancedMalwareProtectionPolicy,
AdvancedMalwareProtectionPolicyEditPayload,
AdvancedMalwareProtectionPolicyGetResponse,
)
from catalystwan.models.policy.policy_definition import (
PolicyDefinitionEditResponse,
PolicyDefinitionId,
PolicyDefinitionInfo,
PolicyDefinitionPreview,
)
from catalystwan.typed_list import DataSequence


class ConfigurationPolicyAMPDefinition(APIEndpoints, PolicyDefinitionEndpoints):
@post("/template/policy/definition/advancedMalwareProtection")
def create_policy_definition(self, payload: AdvancedMalwareProtectionPolicy) -> PolicyDefinitionId:
...

@delete("/template/policy/definition/advancedMalwareProtection/{id}")
def delete_policy_definition(self, id: UUID) -> None:
...

@put("/template/policy/definition/advancedMalwareProtection/{id}")
def edit_policy_definition(
self, id: UUID, payload: AdvancedMalwareProtectionPolicyEditPayload
) -> PolicyDefinitionEditResponse:
...

@get("/template/policy/definition/advancedMalwareProtection", "data")
def get_definitions(self) -> DataSequence[PolicyDefinitionInfo]:
...

@get("/template/policy/definition/advancedMalwareProtection/{id}")
def get_policy_definition(self, id: UUID) -> AdvancedMalwareProtectionPolicyGetResponse:
...

@post("/template/policy/definition/advancedMalwareProtection/preview")
def preview_policy_definition(self, payload: AdvancedMalwareProtectionPolicy) -> PolicyDefinitionPreview:
...

@get("/template/policy/definition/advancedMalwareProtection/preview/{id}")
def preview_policy_definition_by_id(self, id: UUID) -> PolicyDefinitionPreview:
...
4 changes: 4 additions & 0 deletions catalystwan/models/policy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from .centralized import CentralizedPolicy, TrafficDataDirection
from .definition.access_control_list import AclPolicy, AclPolicyGetResponse
from .definition.access_control_list_ipv6 import AclIPv6Policy, AclIPv6PolicyGetResponse
from .definition.amp import AdvancedMalwareProtectionPolicy, AdvancedMalwareProtectionPolicyGetResponse
from .definition.control import ControlPolicy, ControlPolicyGetResponse
from .definition.device_access import DeviceAccessPolicy, DeviceAccessPolicyGetResponse
from .definition.device_access_ipv6 import DeviceAccessIPv6Policy, DeviceAccessIPv6PolicyGetResponse
Expand Down Expand Up @@ -74,6 +75,7 @@
Union[
AclIPv6Policy,
AclPolicy,
AdvancedMalwareProtectionPolicy,
ControlPolicy,
DeviceAccessIPv6Policy,
DeviceAccessPolicy,
Expand Down Expand Up @@ -166,6 +168,7 @@

AnyPolicyDefinitionInfo = Annotated[
Union[
AdvancedMalwareProtectionPolicyGetResponse,
AclIPv6PolicyGetResponse,
AclPolicyGetResponse,
ControlPolicyGetResponse,
Expand All @@ -188,6 +191,7 @@
__all__ = (
"AclIPv6Policy",
"AclPolicy",
"AdvancedMalwareProtectionPolicy",
"AnyPolicyList",
"AnyPolicyDefinitionInfo",
"AppList",
Expand Down
69 changes: 69 additions & 0 deletions catalystwan/models/policy/definition/amp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

from typing import List, Literal, Optional

from annotated_types import Ge, Le
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import Annotated

from catalystwan.models.common import IntStr
from catalystwan.models.policy.policy_definition import (
PolicyDefinitionBase,
PolicyDefinitionGetResponse,
PolicyDefinitionId,
)

FileReputationServer = Literal["nam", "eur", "apjc"]
FileAnalysisServer = Literal["", "nam", "eur"]
AlertsLogLevel = Literal["", "info", "warning", "critical"] # hack
AMPPolicyType = Literal["security", "unified"]
FileAnalysisFileTypes = Literal[
"pdf", "ms-exe", "new-office", "rtf", "mdb", "mscab", "msole2", "wri", "xlw", "flv", "swf"
]

VpnId = Annotated[
IntStr,
Ge(0),
Le(65530),
]


class AdvancedMalwareProtectionDefinition(BaseModel):
model_config = ConfigDict(populate_by_name=True)
match_all_vpn: bool = Field(validation_alias="matchAllVpn", serialization_alias="matchAllVpn")
file_reputation_cloud_server: FileReputationServer = Field(
validation_alias="fileReputationCloudServer", serialization_alias="fileReputationCloudServer"
)
file_reputation_est_server: FileReputationServer = Field(
validation_alias="fileReputationEstServer", serialization_alias="fileReputationEstServer"
)
file_reputation_alert: AlertsLogLevel = Field(
validation_alias="fileReputationAlert", serialization_alias="fileReputationAlert"
)
file_analysis_enabled: Optional[bool] = Field(
default=False, validation_alias="fileAnalysisEnabled", serialization_alias="fileAnalysisEnabled"
)
file_analysis_file_types: List[FileAnalysisFileTypes] = Field(
default=[], validation_alias="fileAnalysisFileTypes", serialization_alias="fileAnalysisFileTypes"
)
file_analysis_alert: AlertsLogLevel = Field(
default="", validation_alias="fileAnalysisAlert", serialization_alias="fileAnalysisAlert"
)
file_analysis_cloud_server: FileAnalysisServer = Field(
default="", validation_alias="fileAnalysisCloudServer", serialization_alias="fileAnalysisCloudServer"
)
target_vpns: List[VpnId] = Field(default=[], validation_alias="targetVpns", serialization_alias="targetVpns")


class AdvancedMalwareProtectionPolicy(PolicyDefinitionBase):
type: Literal["advancedMalwareProtection"] = "advancedMalwareProtection"
mode: AMPPolicyType
definition: AdvancedMalwareProtectionDefinition


class AdvancedMalwareProtectionPolicyEditPayload(AdvancedMalwareProtectionPolicy, PolicyDefinitionId):
pass


class AdvancedMalwareProtectionPolicyGetResponse(AdvancedMalwareProtectionPolicy, PolicyDefinitionGetResponse):
pass

0 comments on commit 0a38068

Please sign in to comment.