Skip to content

Commit

Permalink
move static functions outside main class
Browse files Browse the repository at this point in the history
  • Loading branch information
IbraAoad committed Dec 3, 2024
1 parent 431bdf1 commit e3c7613
Showing 1 changed file with 62 additions and 62 deletions.
124 changes: 62 additions & 62 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
# - Example invalid: "value@", "value#", "value space"
ANNOTATION_VALUE_PATTERN = re.compile(r"^[\w.\-_]+$")


# Based on https://github.com/kubernetes/apimachinery/blob/master/pkg/util/validation/validation.go#L206
# Regex for DNS1123 subdomains:
# - Starts with a lowercase letter or number ([a-z0-9])
# - May contain dashes (-), but not consecutively, and must not start or end with them
Expand All @@ -113,6 +113,7 @@
r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
)

# Based on https://github.com/kubernetes/apimachinery/blob/master/pkg/util/validation/validation.go#L32
# Regex for Kubernetes qualified names:
# - Starts with an alphanumeric character ([A-Za-z0-9])
# - Can include dashes (-), underscores (_), dots (.), or alphanumeric characters in the middle
Expand Down Expand Up @@ -388,66 +389,6 @@ def _basic_auth_user(self) -> Optional[str]:
"""
return cast(Optional[str], self.config.get("basic_auth_user", None))

def _validate_annotation_key(self, key: str) -> bool:
"""Validate the annotation key."""
if len(key) > 253:
logger.error(f"Invalid annotation key: '{key}'. Key length exceeds 253 characters.")
return False

if not is_qualified_name(key.lower()):
logger.error(
f"Invalid annotation key: '{key}'. Must follow Kubernetes annotation syntax."
)
return False

if key.startswith(("kubernetes.io/", "k8s.io/")):
logger.error(f"Invalid annotation: Key '{key}' uses a reserved prefix.")
return False

return True

def _validate_annotation_value(self, value: str) -> bool:
"""Validate the annotation value."""
if not ANNOTATION_VALUE_PATTERN.match(value):
logger.error(
f"Invalid annotation value: '{value}'. Must follow Kubernetes annotation syntax."
)
return False

return True

def _parse_annotations(self, annotations: Optional[str]) -> Optional[Dict[str, str]]:
"""Parse and validate annotations from a string.
logic is based on Kubernetes annotation validation as described here:
https://github.com/kubernetes/apimachinery/blob/v0.31.3/pkg/api/validation/objectmeta.go#L44
"""
if not annotations:
return {}

annotations = annotations.strip().rstrip(",") # Trim spaces and trailing commas

try:
parsed_annotations = {
key.strip(): value.strip()
for key, value in (pair.split("=", 1) for pair in annotations.split(",") if pair)
}
except ValueError:
logger.error(
"Invalid format for 'loadbalancer_annotations'. "
"Expected format: key1=value1,key2=value2."
)
return None

# Validate each key-value pair
for key, value in parsed_annotations.items():
if not self._validate_annotation_key(key) or not self._validate_annotation_value(
value
):
return None

return parsed_annotations

@property
def _loadbalancer_annotations(self) -> Optional[dict]:
"""Parse the loadbalancer annotations from the config.
Expand All @@ -459,7 +400,7 @@ def _loadbalancer_annotations(self) -> Optional[dict]:
A dictionary of valid annotations if the config is valid, otherwise None.
"""
annotations = cast(Optional[str], self.config.get("loadbalancer_annotations", None))
return self._parse_annotations(annotations)
return parse_annotations(annotations)

def _on_forward_auth_config_changed(self, event: AuthConfigChangedEvent):
if self._is_forward_auth_enabled:
Expand Down Expand Up @@ -1318,6 +1259,65 @@ def _validate_config(self):
logger.warning(invalid_hostname_and_routing_mode_message)


def validate_annotation_key(key: str) -> bool:
"""Validate the annotation key."""
if len(key) > 253:
logger.error(f"Invalid annotation key: '{key}'. Key length exceeds 253 characters.")
return False

if not is_qualified_name(key.lower()):
logger.error(f"Invalid annotation key: '{key}'. Must follow Kubernetes annotation syntax.")
return False

if key.startswith(("kubernetes.io/", "k8s.io/")):
logger.error(f"Invalid annotation: Key '{key}' uses a reserved prefix.")
return False

return True


def validate_annotation_value(value: str) -> bool:
"""Validate the annotation value."""
if not ANNOTATION_VALUE_PATTERN.match(value):
logger.error(
f"Invalid annotation value: '{value}'. Must follow Kubernetes annotation syntax."
)
return False

return True


def parse_annotations(annotations: Optional[str]) -> Optional[Dict[str, str]]:
"""Parse and validate annotations from a string.
logic is based on Kubernetes annotation validation as described here:
https://github.com/kubernetes/apimachinery/blob/v0.31.3/pkg/api/validation/objectmeta.go#L44
"""
if not annotations:
return {}

annotations = annotations.strip().rstrip(",") # Trim spaces and trailing commas

try:
parsed_annotations = {
key.strip(): value.strip()
for key, value in (pair.split("=", 1) for pair in annotations.split(",") if pair)
}
except ValueError:
logger.error(
"Invalid format for 'loadbalancer_annotations'. "
"Expected format: key1=value1,key2=value2."
)
return None

# Validate each key-value pair
for key, value in parsed_annotations.items():
if not validate_annotation_key(key) or not validate_annotation_value(value):
return None

return parsed_annotations


def is_qualified_name(value: str) -> bool:
"""Check if a value is a valid Kubernetes qualified name."""
parts = value.split("/")
Expand Down

0 comments on commit e3c7613

Please sign in to comment.