diff --git a/lib/charms/grafana_agent/v0/cos_agent.py b/lib/charms/grafana_agent/v0/cos_agent.py index 76157da..582b70c 100644 --- a/lib/charms/grafana_agent/v0/cos_agent.py +++ b/lib/charms/grafana_agent/v0/cos_agent.py @@ -206,25 +206,36 @@ def __init__(self, *args): ``` """ +import enum import json import logging +import socket from collections import namedtuple from itertools import chain from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Optional, Set, Tuple, Union - +from typing import ( + TYPE_CHECKING, + Any, + Callable, + ClassVar, + Dict, + List, + Literal, + MutableMapping, + Optional, + Set, + Tuple, + Union, +) + +import pydantic from cosl import GrafanaDashboard, JujuTopology from cosl.rules import AlertRules from ops.charm import RelationChangedEvent from ops.framework import EventBase, EventSource, Object, ObjectEvents -from ops.model import Relation +from ops.model import ModelError, Relation from ops.testing import CharmType -try: - import pydantic.v1 as pydantic -except ImportError: - import pydantic - if TYPE_CHECKING: try: from typing import TypedDict @@ -238,7 +249,7 @@ class _MetricsEndpointDict(TypedDict): LIBID = "dc15fa84cef84ce58155fb84f6c6213a" LIBAPI = 0 -LIBPATCH = 9 +LIBPATCH = 10 PYDEPS = ["cosl", "pydantic"] @@ -253,7 +264,207 @@ class _MetricsEndpointDict(TypedDict): SnapEndpoint = namedtuple("SnapEndpoint", "owner, name") -class CosAgentProviderUnitData(pydantic.BaseModel): +# Note: MutableMapping is imported from the typing module and not collections.abc +# because subscripting collections.abc.MutableMapping was added in python 3.9, but +# most of our charms are based on 20.04, which has python 3.8. + +_RawDatabag = MutableMapping[str, str] + + +class TransportProtocolType(str, enum.Enum): + """Receiver Type.""" + + http = "http" + grpc = "grpc" + + +receiver_protocol_to_transport_protocol = { + "zipkin": TransportProtocolType.http, + "kafka": TransportProtocolType.http, + "tempo_http": TransportProtocolType.http, + "tempo_grpc": TransportProtocolType.grpc, + "otlp_grpc": TransportProtocolType.grpc, + "otlp_http": TransportProtocolType.http, + "jaeger_thrift_http": TransportProtocolType.http, +} + +_tracing_receivers_ports = { + # OTLP receiver: see + # https://github.com/open-telemetry/opentelemetry-collector/tree/v0.96.0/receiver/otlpreceiver + "otlp_http": 4318, + "otlp_grpc": 4317, + # Jaeger receiver: see + # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/receiver/jaegerreceiver + "jaeger_grpc": 14250, + "jaeger_thrift_http": 14268, + # Zipkin receiver: see + # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/receiver/zipkinreceiver + "zipkin": 9411, +} + +ReceiverProtocol = Literal["otlp_grpc", "otlp_http", "zipkin", "jaeger_thrift_http", "jaeger_grpc"] + + +class TracingError(Exception): + """Base class for custom errors raised by tracing.""" + + +class NotReadyError(TracingError): + """Raised by the provider wrapper if a requirer hasn't published the required data (yet).""" + + +class ProtocolNotRequestedError(TracingError): + """Raised if the user attempts to obtain an endpoint for a protocol it did not request.""" + + +class DataValidationError(TracingError): + """Raised when data validation fails on IPU relation data.""" + + +class AmbiguousRelationUsageError(TracingError): + """Raised when one wrongly assumes that there can only be one relation on an endpoint.""" + + +# TODO we want to eventually use `DatabagModel` from cosl but it likely needs a move to common package first +if int(pydantic.version.VERSION.split(".")[0]) < 2: # type: ignore + + class DatabagModel(pydantic.BaseModel): # type: ignore + """Base databag model.""" + + class Config: + """Pydantic config.""" + + # ignore any extra fields in the databag + extra = "ignore" + """Ignore any extra fields in the databag.""" + allow_population_by_field_name = True + """Allow instantiating this class by field name (instead of forcing alias).""" + + _NEST_UNDER = None + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + if cls._NEST_UNDER: + return cls.parse_obj(json.loads(databag[cls._NEST_UNDER])) + + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {f.alias for f in cls.__fields__.values()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.parse_raw(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + + if self._NEST_UNDER: + databag[self._NEST_UNDER] = self.json(by_alias=True) + return databag + + dct = self.dict() + for key, field in self.__fields__.items(): # type: ignore + value = dct[key] + databag[field.alias or key] = json.dumps(value) + + return databag + +else: + from pydantic import ConfigDict + + class DatabagModel(pydantic.BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # ignore any extra fields in the databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + # Custom config key: whether to nest the whole datastructure (as json) + # under a field or spread it out at the toplevel. + _NEST_UNDER=None, # type: ignore + arbitrary_types_allowed=True, + ) + """Pydantic config.""" + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + nest_under = cls.model_config.get("_NEST_UNDER") # type: ignore + if nest_under: + return cls.model_validate(json.loads(databag[nest_under])) # type: ignore + + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {(f.alias or n) for n, f in cls.__fields__.items()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.model_validate_json(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + nest_under = self.model_config.get("_NEST_UNDER") + if nest_under: + databag[nest_under] = self.model_dump_json( # type: ignore + by_alias=True, + # skip keys whose values are default + exclude_defaults=True, + ) + return databag + + dct = self.model_dump() # type: ignore + for key, field in self.model_fields.items(): # type: ignore + value = dct[key] + if value == field.default: + continue + databag[field.alias or key] = json.dumps(value) + + return databag + + +class CosAgentProviderUnitData(DatabagModel): """Unit databag model for `cos-agent` relation.""" # The following entries are the same for all units of the same principal. @@ -271,13 +482,16 @@ class CosAgentProviderUnitData(pydantic.BaseModel): metrics_scrape_jobs: List[Dict] log_slots: List[str] + # Requested tracing protocols. + tracing_protocols: Optional[List[str]] = None + # when this whole datastructure is dumped into a databag, it will be nested under this key. # while not strictly necessary (we could have it 'flattened out' into the databag), # this simplifies working with the model. KEY: ClassVar[str] = "config" -class CosAgentPeersUnitData(pydantic.BaseModel): +class CosAgentPeersUnitData(DatabagModel): """Unit databag model for `peers` cos-agent machine charm peer relation.""" # We need the principal unit name and relation metadata to be able to render identifiers @@ -308,6 +522,83 @@ def app_name(self) -> str: return self.unit_name.split("/")[0] +if int(pydantic.version.VERSION.split(".")[0]) < 2: # type: ignore + + class ProtocolType(pydantic.BaseModel): # type: ignore + """Protocol Type.""" + + class Config: + """Pydantic config.""" + + use_enum_values = True + """Allow serializing enum values.""" + + name: str = pydantic.Field( + ..., + description="Receiver protocol name. What protocols are supported (and what they are called) " + "may differ per provider.", + examples=["otlp_grpc", "otlp_http", "tempo_http"], + ) + + type: TransportProtocolType = pydantic.Field( + ..., + description="The transport protocol used by this receiver.", + examples=["http", "grpc"], + ) + +else: + + class ProtocolType(pydantic.BaseModel): + """Protocol Type.""" + + model_config = pydantic.ConfigDict( + # Allow serializing enum values. + use_enum_values=True + ) + """Pydantic config.""" + + name: str = pydantic.Field( + ..., + description="Receiver protocol name. What protocols are supported (and what they are called) " + "may differ per provider.", + examples=["otlp_grpc", "otlp_http", "tempo_http"], + ) + + type: TransportProtocolType = pydantic.Field( + ..., + description="The transport protocol used by this receiver.", + examples=["http", "grpc"], + ) + + +class Receiver(pydantic.BaseModel): + """Specification of an active receiver.""" + + protocol: ProtocolType = pydantic.Field(..., description="Receiver protocol name and type.") + url: str = pydantic.Field( + ..., + description="""URL at which the receiver is reachable. If there's an ingress, it would be the external URL. + Otherwise, it would be the service's fqdn or internal IP. + If the protocol type is grpc, the url will not contain a scheme.""", + examples=[ + "http://traefik_address:2331", + "https://traefik_address:2331", + "http://tempo_public_ip:2331", + "https://tempo_public_ip:2331", + "tempo_public_ip:2331", + ], + ) + + +class CosAgentRequirerUnitData(DatabagModel): # noqa: D101 + """Application databag model for the COS-agent requirer.""" + + receivers: List[Receiver] = pydantic.Field( + ..., + description="List of all receivers enabled on the tracing provider.", + ) + + class COSAgentProvider(Object): """Integration endpoint wrapper for the provider side of the cos_agent interface.""" @@ -322,6 +613,7 @@ def __init__( log_slots: Optional[List[str]] = None, dashboard_dirs: Optional[List[str]] = None, refresh_events: Optional[List] = None, + tracing_protocols: Optional[List[str]] = None, *, scrape_configs: Optional[Union[List[dict], Callable]] = None, ): @@ -340,6 +632,7 @@ def __init__( in the form ["snap-name:slot", ...]. dashboard_dirs: Directory where the dashboards are stored. refresh_events: List of events on which to refresh relation data. + tracing_protocols: List of protocols that the charm will be using for sending traces. scrape_configs: List of standard scrape_configs dicts or a callable that returns the list in case the configs need to be generated dynamically. The contents of this list will be merged with the contents of `metrics_endpoints`. @@ -357,6 +650,8 @@ def __init__( self._log_slots = log_slots or [] self._dashboard_dirs = dashboard_dirs self._refresh_events = refresh_events or [self._charm.on.config_changed] + self._tracing_protocols = tracing_protocols + self._is_single_endpoint = charm.meta.relations[relation_name].limit == 1 events = self._charm.on[relation_name] self.framework.observe(events.relation_joined, self._on_refresh) @@ -381,6 +676,7 @@ def _on_refresh(self, event): dashboards=self._dashboards, metrics_scrape_jobs=self._scrape_jobs, log_slots=self._log_slots, + tracing_protocols=self._tracing_protocols, ) relation.data[self._charm.unit][data.KEY] = data.json() except ( @@ -445,6 +741,103 @@ def _dashboards(self) -> List[GrafanaDashboard]: dashboards.append(dashboard) return dashboards + @property + def relations(self) -> List[Relation]: + """The tracing relations associated with this endpoint.""" + return self._charm.model.relations[self._relation_name] + + @property + def _relation(self) -> Optional[Relation]: + """If this wraps a single endpoint, the relation bound to it, if any.""" + if not self._is_single_endpoint: + objname = type(self).__name__ + raise AmbiguousRelationUsageError( + f"This {objname} wraps a {self._relation_name} endpoint that has " + "limit != 1. We can't determine what relation, of the possibly many, you are " + f"referring to. Please pass a relation instance while calling {objname}, " + "or set limit=1 in the charm metadata." + ) + relations = self.relations + return relations[0] if relations else None + + def is_ready(self, relation: Optional[Relation] = None): + """Is this endpoint ready?""" + relation = relation or self._relation + if not relation: + logger.debug(f"no relation on {self._relation_name !r}: tracing not ready") + return False + if relation.data is None: + logger.error(f"relation data is None for {relation}") + return False + if not relation.app: + logger.error(f"{relation} event received but there is no relation.app") + return False + try: + unit = next(iter(relation.units), None) + if not unit: + return False + databag = dict(relation.data[unit]) + CosAgentRequirerUnitData.load(databag) + + except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): + logger.info(f"failed validating relation data for {relation}") + return False + return True + + def get_all_endpoints( + self, relation: Optional[Relation] = None + ) -> Optional[CosAgentRequirerUnitData]: + """Unmarshalled relation data.""" + relation = relation or self._relation + if not relation or not self.is_ready(relation): + return None + unit = next(iter(relation.units), None) + if not unit: + return None + return CosAgentRequirerUnitData.load(relation.data[unit]) # type: ignore + + def _get_tracing_endpoint( + self, relation: Optional[Relation], protocol: ReceiverProtocol + ) -> Optional[str]: + unit_data = self.get_all_endpoints(relation) + if not unit_data: + return None + receivers: List[Receiver] = [i for i in unit_data.receivers if i.protocol.name == protocol] + if not receivers: + logger.error(f"no receiver found with protocol={protocol!r}") + return None + if len(receivers) > 1: + logger.error( + f"too many receivers with protocol={protocol!r}; using first one. Found: {receivers}" + ) + return None + + receiver = receivers[0] + return receiver.url + + def get_tracing_endpoint( + self, protocol: ReceiverProtocol, relation: Optional[Relation] = None + ) -> Optional[str]: + """Receiver endpoint for the given protocol.""" + endpoint = self._get_tracing_endpoint(relation or self._relation, protocol=protocol) + if not endpoint: + requested_protocols = set() + relations = [relation] if relation else self.relations + for relation in relations: + try: + databag = CosAgentProviderUnitData.load(relation.data[self._charm.unit]) + except DataValidationError: + continue + + if databag.tracing_protocols: + requested_protocols.update(databag.tracing_protocols) + + if protocol not in requested_protocols: + raise ProtocolNotRequestedError(protocol, relation) + + return None + return endpoint + class COSAgentDataChanged(EventBase): """Event emitted by `COSAgentRequirer` when relation data changes.""" @@ -558,6 +951,12 @@ def _on_relation_data_changed(self, event: RelationChangedEvent): if not (provider_data := self._validated_provider_data(raw)): return + # write enabled receivers to cos-agent relation + try: + self.update_tracing_receivers() + except ModelError: + raise + # Copy data from the cos_agent relation to the peer relation, so the leader could # follow up. # Save the originating unit name, so it could be used for topology later on by the leader. @@ -578,6 +977,37 @@ def _on_relation_data_changed(self, event: RelationChangedEvent): # need to emit `on.data_changed`), so we're emitting `on.data_changed` either way. self.on.data_changed.emit() # pyright: ignore + def update_tracing_receivers(self): + """Updates the list of exposed tracing receivers in all relations.""" + try: + for relation in self._charm.model.relations[self._relation_name]: + CosAgentRequirerUnitData( + receivers=[ + Receiver( + url=f"{self._get_tracing_receiver_url(protocol)}", + protocol=ProtocolType( + name=protocol, + type=receiver_protocol_to_transport_protocol[protocol], + ), + ) + for protocol in self.requested_tracing_protocols() + ], + ).dump(relation.data[self._charm.unit]) + + except ModelError as e: + # args are bytes + msg = e.args[0] + if isinstance(msg, bytes): + if msg.startswith( + b"ERROR cannot read relation application settings: permission denied" + ): + logger.error( + f"encountered error {e} while attempting to update_relation_data." + f"The relation must be gone." + ) + return + raise + def _validated_provider_data(self, raw) -> Optional[CosAgentProviderUnitData]: try: return CosAgentProviderUnitData(**json.loads(raw)) @@ -590,6 +1020,55 @@ def trigger_refresh(self, _): # FIXME: Figure out what we should do here self.on.data_changed.emit() # pyright: ignore + def _get_requested_protocols(self, relation: Relation): + # Coherence check + units = relation.units + if len(units) > 1: + # should never happen + raise ValueError( + f"unexpected error: subordinate relation {relation} " + f"should have exactly one unit" + ) + + unit = next(iter(units), None) + + if not unit: + return None + + if not (raw := relation.data[unit].get(CosAgentProviderUnitData.KEY)): + return None + + if not (provider_data := self._validated_provider_data(raw)): + return None + + return provider_data.tracing_protocols + + def requested_tracing_protocols(self): + """All receiver protocols that have been requested by our related apps.""" + requested_protocols = set() + for relation in self._charm.model.relations[self._relation_name]: + try: + protocols = self._get_requested_protocols(relation) + except NotReadyError: + continue + if protocols: + requested_protocols.update(protocols) + return requested_protocols + + def _get_tracing_receiver_url(self, protocol: str): + scheme = "http" + try: + if self._charm.cert.enabled: # type: ignore + scheme = "https" + # not only Grafana Agent can implement cos_agent. If the charm doesn't have the `cert` attribute + # using our cert_handler, it won't have the `enabled` parameter. In this case, we pass and assume http. + except AttributeError: + pass + # the assumption is that a subordinate charm will always be accessible to its principal charm under its fqdn + if receiver_protocol_to_transport_protocol[protocol] == TransportProtocolType.grpc: + return f"{socket.getfqdn()}:{_tracing_receivers_ports[protocol]}" + return f"{scheme}://{socket.getfqdn()}:{_tracing_receivers_ports[protocol]}" + @property def _remote_data(self) -> List[Tuple[CosAgentProviderUnitData, JujuTopology]]: """Return a list of remote data from each of the related units. @@ -819,3 +1298,67 @@ def dashboards(self) -> List[Dict[str, str]]: ) return dashboards + + +def charm_tracing_config( + endpoint_requirer: COSAgentProvider, cert_path: Optional[Union[Path, str]] +) -> Tuple[Optional[str], Optional[str]]: + """Utility function to determine the charm_tracing config you will likely want. + + If no endpoint is provided: + disable charm tracing. + If https endpoint is provided but cert_path is not found on disk: + disable charm tracing. + If https endpoint is provided and cert_path is None: + ERROR + Else: + proceed with charm tracing (with or without tls, as appropriate) + + Usage: + If you are using charm_tracing >= v1.9: + >>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm + >>> from lib.charms.tempo_k8s.v0.cos_agent import charm_tracing_config + >>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path") + >>> class MyCharm(...): + >>> _cert_path = "/path/to/cert/on/charm/container.crt" + >>> def __init__(self, ...): + >>> self.cos_agent = COSAgentProvider(...) + >>> self.my_endpoint, self.cert_path = charm_tracing_config( + ... self.cos_agent, self._cert_path) + + If you are using charm_tracing < v1.9: + >>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm + >>> from lib.charms.tempo_k8s.v2.tracing import charm_tracing_config + >>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path") + >>> class MyCharm(...): + >>> _cert_path = "/path/to/cert/on/charm/container.crt" + >>> def __init__(self, ...): + >>> self.cos_agent = COSAgentProvider(...) + >>> self.my_endpoint, self.cert_path = charm_tracing_config( + ... self.cos_agent, self._cert_path) + >>> @property + >>> def my_endpoint(self): + >>> return self._my_endpoint + >>> @property + >>> def cert_path(self): + >>> return self._cert_path + + """ + if not endpoint_requirer.is_ready(): + return None, None + + endpoint = endpoint_requirer.get_tracing_endpoint("otlp_http") + if not endpoint: + return None, None + + is_https = endpoint.startswith("https://") + + if is_https: + if cert_path is None: + raise TracingError("Cannot send traces to an https endpoint without a certificate.") + if not Path(cert_path).exists(): + # if endpoint is https BUT we don't have a server_cert yet: + # disable charm tracing until we do to prevent tls errors + return None, None + return endpoint, str(cert_path) + return endpoint, None