Skip to content

Commit

Permalink
Enable Generic Alert Rules (#115)
Browse files Browse the repository at this point in the history
Refactor `_from_file` to use the new `_from_dict` method which can accept alert rules on the fly

---------

Co-authored-by: sed-i <[email protected]>
  • Loading branch information
MichaelThamm and sed-i authored Dec 20, 2024
1 parent f758e22 commit 2af8735
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 79 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "cosl"
version = "0.0.48"
version = "0.0.49"
authors = [
{ name = "sed-i", email = "[email protected]" },
]
Expand Down
186 changes: 108 additions & 78 deletions src/cosl/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
- `juju_application`
""" # noqa: W505

import hashlib
import logging
import os
import re
from abc import ABC, abstractmethod
from pathlib import Path
Expand All @@ -86,11 +86,9 @@

from . import CosTool, JujuTopology
from .types import (
OfficialRuleFileFormat,
OfficialRuleFileItem,
QueryType,
RuleType,
SingleRuleFormat,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -149,7 +147,7 @@ def __init__(self, query_type: QueryType, topology: Optional[JujuTopology] = Non
self.query_type = query_type
self.topology = topology
self.tool = CosTool(default_query_type=query_type)
self.groups = [] # type: List[Dict[str, Any]]
self.groups: List[OfficialRuleFileItem] = []

@property
@abstractmethod
Expand All @@ -160,7 +158,7 @@ def rule_type(self) -> RuleType:
# --- HELPER METHODS FOR READING FILES, SHOULD BE STATIC --- #

@staticmethod
def _is_official_rule_format(rules_dict: OfficialRuleFileFormat) -> bool:
def _is_official_rule_format(rules_dict: Dict[str, Any]) -> bool:
"""Are rules in the upstream format as supported by Prometheus or Loki.
Rules in dictionary format are in "official" form if they
Expand All @@ -176,7 +174,7 @@ def _is_official_rule_format(rules_dict: OfficialRuleFileFormat) -> bool:
return "groups" in rules_dict

@staticmethod
def _is_single_rule_format(rules_dict: SingleRuleFormat, rule_type: RuleType) -> bool:
def _is_single_rule_format(rules_dict: Dict[str, Any], rule_type: RuleType) -> bool:
"""Are alert rules in single rule format.
This library supports reading of rules in a custom format that
Expand Down Expand Up @@ -212,7 +210,7 @@ def _multi_suffix_glob(
all_files_in_dir = dir_path.glob("**/*" if recursive else "*")
return list(filter(lambda f: f.is_file() and f.suffix in suffixes, all_files_in_dir))

def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:
def _from_dir(self, dir_path: Path, recursive: bool) -> List[OfficialRuleFileItem]:
"""Read all rule files in a directory.
All rules from files for the same directory are loaded into a single
Expand All @@ -227,7 +225,7 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:
a list of dictionaries representing prometheus rule groups, each dictionary
representing a group (structure determined by `yaml.safe_load`).
"""
groups = [] # type: List[Dict[str, Any]]
groups: List[OfficialRuleFileItem] = []

# Gather all records into a list of groups
for file_path in Rules._multi_suffix_glob(
Expand All @@ -243,7 +241,7 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:
def _from_file( # noqa: C901
self, root_path: Path, file_path: Path
) -> List[OfficialRuleFileItem]:
"""Read a rules file from path, injecting juju topology.
"""Read a rules file from path.
Args:
root_path: full path to the root rules folder (used only for generating group name)
Expand All @@ -262,82 +260,96 @@ def _from_file( # noqa: C901
logger.error("Failed to read rules from %s: %s", file_path.name, e)
return []

if not rule_file:
logger.warning("Empty rules file: %s", file_path.name)
return []
if not isinstance(rule_file, dict):
logger.error("Invalid rules file (must be a dict): %s", file_path.name)
return []
# Generate group name prefix
# - name, from juju topology
# - suffix, from the relative path of the rule file;
rel_path = file_path.parent.relative_to(root_path)
rel_path = "" if rel_path == Path(".") else str(rel_path)
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.append(rel_path)
group_name_prefix = "_".join(filter(None, group_name_parts))

if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_file)):
rule_file = cast(OfficialRuleFileFormat, rule_file)
groups = rule_file["groups"]
elif self._is_single_rule_format(cast(SingleRuleFormat, rule_file), self.rule_type):
# convert to list of groups
# group name is made up from the file name
rule_file = cast(SingleRuleFormat, rule_file)
groups = [{"name": file_path.stem, "rules": [rule_file]}]
else:
# invalid/unsupported
logger.error("Invalid rules file: %s", file_path.name)
try:
groups = self._from_dict(
rule_file, group_name=file_path.stem, group_name_prefix=group_name_prefix
)
except ValueError as e:
logger.error("Invalid rules file: %s (%s)", file_path.name, e)
return []

# update rules with additional metadata
groups = cast(List[OfficialRuleFileItem], groups)
for group in groups:
if not self._is_already_modified(group["name"]):
# update group name with topology and sub-path
group["name"] = self._group_name(str(root_path), str(file_path), group["name"])

# add "juju_" topology labels
for rule in group["rules"]:
if "labels" not in rule:
rule["labels"] = {}

if self.topology:
# only insert labels that do not already exist
for label, val in self.topology.label_matcher_dict.items():
if label not in rule["labels"]:
rule["labels"][label] = val

# insert juju topology filters into a prometheus rule
repl = r'job=~".+"' if self.query_type == "logql" else ""
rule["expr"] = self.tool.inject_label_matchers( # type: ignore
expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]),
topology={
k: rule["labels"][k]
for k in ("juju_model", "juju_model_uuid", "juju_application")
if rule["labels"].get(k) is not None
},
query_type=self.query_type,
)

return groups

def _group_name(self, root_path: str, file_path: str, group_name: str) -> str:
"""Generate group name from path and topology.
The group name is made up of the relative path between the root dir_path, the file path,
and topology identifier.
def _from_dict(
self,
rule_dict: Dict[str, Any],
*,
group_name: Optional[str] = None,
group_name_prefix: Optional[str] = None,
) -> List[OfficialRuleFileItem]:
"""Process rules from dict, injecting juju topology. If a single-rule format is provided, a hash of the yaml file is injected into the group name to ensure uniqueness.
Args:
root_path: path to the root rules dir.
file_path: path to rule file.
group_name: original group name to keep as part of the new augmented group name
rule_dict: rules content in single-rule or official-rule format as a YAML dict
group_name: a custom identifier for the rule name to include in the group name
group_name_prefix: a custom group identifier to prefix the resulting group name, likely Juju topology and relative path context
Returns:
New group name, augmented by juju topology and relative path.
Raises:
ValueError, when invalid rule format given.
"""
rel_path = os.path.relpath(os.path.dirname(file_path), root_path)
rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_")
if not rule_dict:
raise ValueError("Empty")

if self._is_official_rule_format(rule_dict):
groups = rule_dict["groups"]
elif self._is_single_rule_format(rule_dict, self.rule_type):
if not group_name:
# Note: the caller of this function should ensure this never happens:
# Either we use the standard format, or we'd pass a group_name.
# If/when we drop support for the single-rule-per-file format, this won't
# be needed anymore.
group_name = hashlib.shake_256(str(rule_dict).encode("utf-8")).hexdigest(10)

# convert to list of groups to match official rule format
groups = [{"name": group_name, "rules": [rule_dict]}]
else:
# invalid/unsupported
raise ValueError("Invalid rule format")

# update rules with additional metadata
groups = cast(List[OfficialRuleFileItem], groups)
for group in groups:
if not self._is_already_modified(group["name"]):
# update group name with topology and sub-path
group["name"] = "_".join(
filter(None, [group_name_prefix, group["name"], f"{self.rule_type}s"])
)
# after sanitizing we should not modify group["name"] anymore
group["name"] = self._sanitize_metric_name(group["name"])

# add "juju_" topology labels
for rule in group["rules"]:
if "labels" not in rule:
rule["labels"] = {}

if self.topology:
# only insert labels that do not already exist
for label, val in self.topology.label_matcher_dict.items():
if label not in rule["labels"]:
rule["labels"][label] = val

# insert juju topology filters into a prometheus rule
repl = r'job=~".+"' if self.query_type == "logql" else ""
rule["expr"] = self.tool.inject_label_matchers( # type: ignore
expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]),
topology={
k: rule["labels"][k]
for k in ("juju_model", "juju_model_uuid", "juju_application")
if rule["labels"].get(k) is not None
},
query_type=self.query_type,
)

# Generate group name:
# - name, from juju topology
# - suffix, from the relative path of the rule file;
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.extend([rel_path, group_name, f"{self.rule_type}s"])
# filter to remove empty strings
return "_".join(filter(None, group_name_parts))
return groups

def _is_already_modified(self, name: str) -> bool:
"""Detect whether a group name has already been modified with juju topology."""
Expand All @@ -346,8 +358,29 @@ def _is_already_modified(self, name: str) -> bool:
return False
return True

def _sanitize_metric_name(self, metric_name: str) -> str:
"""Sanitize a metric name according to https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels."""
return "".join(char if re.match(r"[a-zA-Z0-9_:]", char) else "_" for char in metric_name)

# ---- END STATIC HELPER METHODS --- #

def add(
self,
rule_dict: Dict[str, Any],
group_name: Optional[str] = None,
group_name_prefix: Optional[str] = None,
) -> None:
"""Add rules from dict to the existing ruleset.
Args:
rule_dict: a single-rule or official-rule YAML dict
group_name: a custom group name, used only if the new rule is of single-rule format
group_name_prefix: a custom group name prefix, used only if the new rule is of single-rule format
"""
self.groups.extend(
self._from_dict(rule_dict, group_name=group_name, group_name_prefix=group_name_prefix)
)

def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> None:
"""Add rules from a dir path.
Expand All @@ -357,9 +390,6 @@ def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> No
Args:
dir_path: either a rules file or a dir of rules files.
recursive: whether to read files recursively or not (no impact if `path` is a file).
Returns:
True if path was added else False.
"""
path = Path(dir_path) if isinstance(dir_path, str) else dir_path
if path.is_dir():
Expand Down
Loading

0 comments on commit 2af8735

Please sign in to comment.