Skip to content

Commit

Permalink
feat: Added access policy condition operands validation
Browse files Browse the repository at this point in the history
  • Loading branch information
willguibr committed Sep 27, 2023
1 parent 011ebe5 commit bcfaa78
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 165 deletions.
154 changes: 46 additions & 108 deletions plugins/module_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
__metaclass__ = type

import pycountry
from ansible_collections.zscaler.zpacloud.plugins.module_utils.zpa_client import (
ZPAClientHelper,
)


def map_conditions(conditions_obj):
result = []
Expand Down Expand Up @@ -68,7 +66,6 @@ def normalize_policy(policy):

return normalized


def validate_operand(operand, module):
def lhsWarn(object_type, expected, got, error=None):
error_msg = f"Invalid LHS for '{object_type}'. Expected {expected}, but got '{got}'"
Expand All @@ -82,72 +79,51 @@ def rhsWarn(object_type, expected, got, error=None):
error_msg += f". Error details: {error}"
return error_msg

object_type = operand.get("objectType", "").upper()
object_type = operand.get("object_type", "").upper()
lhs = operand.get("lhs")
rhs = operand.get("rhs")

# Check lhs and rhs for emptiness
if lhs is None or not lhs.strip():
return lhsWarn(object_type, "a non-empty value", "empty or None")
if rhs is None or not rhs.strip():
return rhsWarn(object_type, "a non-empty value", "empty or None")

lhs = lhs.strip()
rhs = rhs.strip()

client = ZPAClientHelper(module)

object_validations = {
"APP": {
"lhs": ["id"],
"fetch_method": client.app_segments.get_segment,
"kwargs": {"id": rhs},
"rhs_msg": "valid application segment ID",
},
"APP_GROUP": {
"lhs": ["id"],
"fetch_method": client.segment_groups.get_group,
"kwargs": {"id": rhs},
"rhs_msg": "valid segment group ID",
},
"MACHINE_GRP": {
"lhs": ["id"],
"fetch_method": client.machine_groups.get_group,
"kwargs": {"id": rhs},
"rhs_msg": "valid machine group ID",
},
"EDGE_CONNECTOR_GROUP": {
"lhs": ["id"],
"fetch_method": client.cloud_connector_groups.get_group,
"kwargs": {"id": rhs},
"rhs_msg": "valid cloud connector ID",
},
"POSTURE": {
"rhs": ["true", "false"],
"fetch_method": client.posture_profiles.get_profile_by_posture_udid,
"kwargs": {"posture_udid": lhs},
"lhs_msg": "valid posture profile ID",
},
"TRUSTED_NETWORK": {
"rhs": ["true", "false"],
"fetch_method": client.trusted_networks.get_by_network_id,
"kwargs": {"network_id": lhs},
"lhs_msg": "valid trusted network ID",
},
"PLATFORM": {
"rhs": ["true"],
"lhs": ['linux', 'android', 'windows', 'ios', 'mac'],
"lhs_msg": "one of ['linux', 'android', 'windows', 'ios', 'mac']",
},
"COUNTRY_CODE": {
"rhs": ["true"],
"lhs": validate_iso3166_alpha2, # Using the function directly here
"lhs_msg": "valid ISO-3166 Alpha-2 country code. Please visit the following site for reference: https://en.wikipedia.org/wiki/List_of_ISO_3166_country_codes",
},
"CLIENT_TYPE": {
"lhs": ["id"],
"lhs_msg": "the string 'id'",
"rhs": [
# Validate non-emptiness
if not object_type or not lhs or not rhs:
return "Object type, LHS, and RHS cannot be empty or None"

# Ensure lhs and rhs are strings
if not isinstance(lhs, str):
lhs = str(lhs)
if not isinstance(rhs, str):
rhs = str(rhs)

valid_object_types = ["APP", "APP_GROUP", "MACHINE_GRP", "EDGE_CONNECTOR_GROUP", "POSTURE", "TRUSTED_NETWORK", "PLATFORM", "COUNTRY_CODE", "CLIENT_TYPE"]

if object_type not in valid_object_types:
return f"Invalid object type: {object_type}. Supported types are: {', '.join(valid_object_types)}"

if object_type in ["APP", "APP_GROUP", "MACHINE_GRP", "EDGE_CONNECTOR_GROUP"]:
if lhs != 'id':
return lhsWarn(object_type, 'id', lhs)
if not rhs:
return rhsWarn(object_type, "non-empty string", rhs)

elif object_type in ["POSTURE", "TRUSTED_NETWORK"]:
if rhs not in ['true', 'false']:
return rhsWarn(object_type, "one of ['true', 'false']", rhs)

elif object_type == "PLATFORM":
if rhs != 'true':
return rhsWarn(object_type, 'true', rhs)
if lhs not in ['linux', 'android', 'windows', 'ios', 'mac']:
return lhsWarn(object_type, "one of ['linux', 'android', 'windows', 'ios', 'mac']", lhs)

elif object_type == "COUNTRY_CODE":
if rhs != 'true':
return rhsWarn(object_type, 'true', rhs)
if not validate_iso3166_alpha2(lhs):
return lhsWarn(object_type, "a valid ISO-3166 Alpha-2 country code", lhs, "Please visit the following site for reference: https://en.wikipedia.org/wiki/List_of_ISO_3166_country_codes")

elif object_type == "CLIENT_TYPE":
if lhs != 'id':
return lhsWarn(object_type, 'id', lhs)
valid_client_types = [
'zpn_client_type_exporter',
'zpn_client_type_exporter_noauth',
'zpn_client_type_browser_isolation',
Expand All @@ -158,47 +134,9 @@ def rhsWarn(object_type, expected, got, error=None):
'zpn_client_type_slogger',
'zpn_client_type_zapp_partner',
'zpn_client_type_branch_connector'
],
"rhs_msg": "one of ['zpn_client_type_exporter', zpn_client_type_exporter_noauth, zpn_client_type_browser_isolation, zpn_client_type_machine_tunnel, zpn_client_type_ip_anchoring, zpn_client_type_edge_connector, zpn_client_type_zapp, zpn_client_type_s]"
},
}

validation = object_validations.get(object_type)

if validation:
# Validate LHS
if "lhs" in validation and lhs not in validation["lhs"]:
return lhsWarn(object_type, validation["lhs_msg"], lhs)

# Validate RHS for APP, APP_GROUP, MACHINE_GRP, and EDGE_CONNECTOR_GROUP
if object_type in ["APP", "APP_GROUP", "MACHINE_GRP", "EDGE_CONNECTOR_GROUP"]:
try:
result = validation["fetch_method"](**validation["kwargs"])
if not result or result.get('id') != rhs:
return rhsWarn(object_type, validation["rhs_msg"], rhs)
except Exception as e:
fetch_msg = f"Error retrieving {object_type} with ID '{rhs}': {str(e)}"
return fetch_msg

# Validate RHS for other types (POSTURE, TRUSTED_NETWORK, PLATFORM, CLIENT_TYPE)
elif rhs not in validation["rhs"]:
return rhsWarn(object_type, validation["rhs_msg"], rhs)

# Validate LHS for POSTURE and TRUSTED_NETWORK
if object_type in ["POSTURE", "TRUSTED_NETWORK"]:
try:
result = validation["fetch_method"](**validation["kwargs"])
if not result:
return lhsWarn(object_type, validation["lhs_msg"], lhs)
except Exception as e:
fetch_msg = f"Error retrieving {object_type} with ID '{lhs}': {str(e)}"
return fetch_msg

# Specific LHS Validation for PLATFORM and COUNTRY_CODE
if object_type == "PLATFORM" and lhs not in ['linux', 'android', 'windows', 'ios', 'mac']:
return lhsWarn(object_type, "one of ['linux', 'android', 'windows', 'ios', 'mac']", lhs)
if object_type == "COUNTRY_CODE" and not validate_iso3166_alpha2(lhs):
return lhsWarn(object_type, "a valid ISO-3166 Alpha-2 country code", lhs)
]
if rhs not in valid_client_types:
return rhsWarn(object_type, f"one of {valid_client_types}", rhs)

return None

Expand Down
Loading

0 comments on commit bcfaa78

Please sign in to comment.