Skip to content

Commit

Permalink
Add workload_tracing_protocols as an arg (#95)
Browse files Browse the repository at this point in the history
* add jaeger_thrift_http

* add internal _tracing_receivers_urls

* fix tests

* refactor

* lint

* sep charm and workload tracing

* add test

* remove jaeger

* push to charm container

* fix push

* split tracing receivers
  • Loading branch information
michaeldmitry authored Nov 14, 2024
1 parent 563234d commit 6ab947b
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "cosl"
version = "0.0.42"
version = "0.0.43"
authors = [
{ name="sed-i", email="[email protected]" },
]
Expand Down
39 changes: 29 additions & 10 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
)
from charms.observability_libs.v1.cert_handler import VAULT_SECRET_LABEL, CertHandler
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
from charms.tempo_coordinator_k8s.v0.tracing import TracingEndpointRequirer
from charms.tempo_coordinator_k8s.v0.tracing import ReceiverProtocol, TracingEndpointRequirer
from lightkube.models.core_v1 import ResourceRequirements

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -159,7 +159,8 @@ def _validate_container_name(
"grafana-dashboards": str,
"logging": str,
"metrics": str,
"tracing": str,
"charm-tracing": str,
"workload-tracing": str,
"s3": str,
},
total=True,
Expand Down Expand Up @@ -195,11 +196,11 @@ def __init__(
nginx_options: Optional[NginxMappingOverrides] = None,
is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
tracing_receivers: Optional[Callable[[], Optional[Dict[str, str]]]] = None,
resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None,
resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]] = None,
container_name: Optional[str] = None,
remote_write_endpoints: Optional[Callable[[], List[RemoteWriteEndpoint]]] = None,
workload_tracing_protocols: Optional[List[ReceiverProtocol]] = None,
):
"""Constructor for a Coordinator object.
Expand All @@ -215,7 +216,6 @@ def __init__(
nginx_options: Non-default config options for Nginx.
is_coherent: Custom coherency checker for a minimal deployment.
is_recommended: Custom coherency checker for a recommended deployment.
tracing_receivers: Endpoints to which the workload (and the worker charm) can push traces to.
resources_limit_options: A dictionary containing resources limit option names. The dictionary should include
"cpu_limit" and "memory_limit" keys with values as option names, as defined in the config.yaml.
If no dictionary is provided, the default option names "cpu_limit" and "memory_limit" would be used.
Expand All @@ -226,6 +226,8 @@ def __init__(
Required if `resources_requests` is provided.
remote_write_endpoints: A function generating endpoints to which the workload
and the worker charm can push metrics to.
workload_tracing_protocols: A list of protocols that the worker intends to send
workload traces with.
Raises:
ValueError:
Expand All @@ -250,7 +252,6 @@ def __init__(

self._is_coherent = is_coherent
self._is_recommended = is_recommended
self._tracing_receivers_getter = tracing_receivers
self._resources_requests_getter = (
partial(resources_requests, self) if resources_requests is not None else None
)
Expand Down Expand Up @@ -299,11 +300,16 @@ def __init__(
refresh_event=refresh_events,
)

self.tracing = TracingEndpointRequirer(
self.charm_tracing = TracingEndpointRequirer(
self._charm,
relation_name=self._endpoints["tracing"],
relation_name=self._endpoints["charm-tracing"],
protocols=["otlp_http"],
)
self.workload_tracing = TracingEndpointRequirer(
self._charm,
relation_name=self._endpoints["workload-tracing"],
protocols=workload_tracing_protocols,
)

# Resources patch
self.resources_patch = (
Expand Down Expand Up @@ -350,6 +356,20 @@ def __init__(
# UTILITY PROPERTIES #
######################

@property
def _charm_tracing_receivers_urls(self) -> Dict[str, str]:
"""Returns the charm tracing enabled receivers with their corresponding endpoints."""
endpoints = self.charm_tracing.get_all_endpoints()
receivers = endpoints.receivers if endpoints else ()
return {receiver.protocol.name: receiver.url for receiver in receivers}

@property
def _workload_tracing_receivers_urls(self) -> Dict[str, str]:
"""Returns the workload tracing enabled receivers with their corresponding endpoints."""
endpoints = self.workload_tracing.get_all_endpoints()
receivers = endpoints.receivers if endpoints else ()
return {receiver.protocol.name: receiver.url for receiver in receivers}

@property
def is_coherent(self) -> bool:
"""Check whether this coordinator is coherent."""
Expand Down Expand Up @@ -661,9 +681,8 @@ def update_cluster(self):
privkey_secret_id=(
self.cluster.grant_privkey(VAULT_SECRET_LABEL) if self.tls_available else None
),
tracing_receivers=(
self._tracing_receivers_getter() if self._tracing_receivers_getter else None
),
charm_tracing_receivers=self._charm_tracing_receivers_urls,
workload_tracing_receivers=self._workload_tracing_receivers_urls,
remote_write_endpoints=(
self.remote_write_endpoints_getter()
if self.remote_write_endpoints_getter
Expand Down
25 changes: 18 additions & 7 deletions src/cosl/coordinated_workers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ class ClusterProviderAppData(DatabagModel):
### self-monitoring stuff
loki_endpoints: Optional[Dict[str, str]] = None
"""Endpoints to which the workload (and the worker charm) can push logs to."""
tracing_receivers: Optional[Dict[str, str]] = None
"""Endpoints to which the workload (and the worker charm) can push traces to."""
charm_tracing_receivers: Optional[Dict[str, str]] = None
"""Endpoints to which the the worker charm can push charm traces to."""
workload_tracing_receivers: Optional[Dict[str, str]] = None
"""Endpoints to which the the worker can push workload traces to."""
remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None
"""Endpoints to which the workload (and the worker charm) can push metrics to."""

Expand Down Expand Up @@ -282,7 +284,8 @@ def publish_data(
s3_tls_ca_chain: Optional[str] = None,
privkey_secret_id: Optional[str] = None,
loki_endpoints: Optional[Dict[str, str]] = None,
tracing_receivers: Optional[Dict[str, str]] = None,
charm_tracing_receivers: Optional[Dict[str, str]] = None,
workload_tracing_receivers: Optional[Dict[str, str]] = None,
remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None,
) -> None:
"""Publish the config to all related worker clusters."""
Expand All @@ -294,7 +297,8 @@ def publish_data(
ca_cert=ca_cert,
server_cert=server_cert,
privkey_secret_id=privkey_secret_id,
tracing_receivers=tracing_receivers,
charm_tracing_receivers=charm_tracing_receivers,
workload_tracing_receivers=workload_tracing_receivers,
remote_write_endpoints=remote_write_endpoints,
s3_tls_ca_chain=s3_tls_ca_chain,
)
Expand Down Expand Up @@ -573,11 +577,18 @@ def get_tls_data(self, allow_none: bool = False) -> Optional[TLSData]:
s3_tls_ca_chain=data.s3_tls_ca_chain,
)

def get_tracing_receivers(self) -> Optional[Dict[str, str]]:
"""Fetch the tracing receivers from the coordinator databag."""
def get_charm_tracing_receivers(self) -> Dict[str, str]:
"""Fetch the charm tracing receivers from the coordinator databag."""
data = self._get_data_from_coordinator()
if data:
return data.tracing_receivers or {}
return data.charm_tracing_receivers or {}
return {}

def get_workload_tracing_receivers(self) -> Dict[str, str]:
"""Fetch the workload tracing receivers from the coordinator databag."""
data = self._get_data_from_coordinator()
if data:
return data.workload_tracing_receivers or {}
return {}

def get_remote_write_endpoints(self) -> List[RemoteWriteEndpoint]:
Expand Down
8 changes: 8 additions & 0 deletions src/cosl/coordinated_workers/nginx.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""Workload manager for Nginx. Used by the coordinator to load-balance and group the workers."""

import logging
from pathlib import Path
from typing import Callable, Optional, TypedDict

from ops import CharmBase, pebble
Expand Down Expand Up @@ -83,6 +84,11 @@ def configure_tls(self, private_key: str, server_cert: str, ca_cert: str) -> Non
self._container.push(KEY_PATH, private_key, make_dirs=True)
self._container.push(CERT_PATH, server_cert, make_dirs=True)
self._container.push(CA_CERT_PATH, ca_cert, make_dirs=True)

# push CA cert to charm container
Path(CA_CERT_PATH).parent.mkdir(parents=True, exist_ok=True)
Path(CA_CERT_PATH).write_text(ca_cert)

# FIXME: uncomment as soon as the nginx image contains the ca-certificates package
# self._container.exec(["update-ca-certificates", "--fresh"])

Expand All @@ -95,6 +101,8 @@ def delete_certificates(self) -> None:
self._container.remove_path(KEY_PATH, recursive=True)
if self._container.exists(CA_CERT_PATH):
self._container.remove_path(CA_CERT_PATH, recursive=True)
if Path(CA_CERT_PATH).exists():
Path(CA_CERT_PATH).unlink(missing_ok=True)
# FIXME: uncomment as soon as the nginx image contains the ca-certificates package
# self._container.exec(["update-ca-certificates", "--fresh"])

Expand Down
2 changes: 1 addition & 1 deletion src/cosl/coordinated_workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ def charm_tracing_config(self) -> Tuple[Optional[str], Optional[str]]:
>>> self.worker = Worker(...)
>>> self.my_endpoint, self.cert_path = self.worker.charm_tracing_config()
"""
receivers = self.cluster.get_tracing_receivers()
receivers = self.cluster.get_charm_tracing_receivers()

if not receivers:
return None, None
Expand Down
48 changes: 44 additions & 4 deletions tests/test_coordinated_workers/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def coordinator_state():
for endpoint, interface in {
"my-certificates": {"interface": "certificates"},
"my-logging": {"interface": "loki_push_api"},
"my-tracing": {"interface": "tracing"},
"my-charm-tracing": {"interface": "tracing"},
"my-workload-tracing": {"interface": "tracing"},
}.items()
}
requires_relations["my-s3"] = Relation(
Expand Down Expand Up @@ -75,7 +76,8 @@ class MyCoordinator(ops.CharmBase):
"my-certificates": {"interface": "certificates"},
"my-cluster": {"interface": "cluster"},
"my-logging": {"interface": "loki_push_api"},
"my-tracing": {"interface": "tracing"},
"my-charm-tracing": {"interface": "tracing", "limit": 1},
"my-workload-tracing": {"interface": "tracing", "limit": 1},
"my-s3": {"interface": "s3"},
},
"provides": {
Expand Down Expand Up @@ -116,15 +118,15 @@ def __init__(self, framework: Framework):
"grafana-dashboards": "my-dashboards",
"logging": "my-logging",
"metrics": "my-metrics",
"tracing": "my-tracing",
"charm-tracing": "my-charm-tracing",
"workload-tracing": "my-workload-tracing",
"s3": "my-s3",
},
nginx_config=lambda coordinator: f"nginx configuration for {coordinator._charm.meta.name}",
workers_config=lambda coordinator: f"workers configuration for {coordinator._charm.meta.name}",
# nginx_options: Optional[NginxMappingOverrides] = None,
# is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
# is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
# tracing_receivers: Optional[Callable[[], Optional[Dict[str, str]]]] = None,
)

return MyCoordinator
Expand Down Expand Up @@ -239,3 +241,41 @@ def test_s3_integration(
assert coordinator.s3_connection_info.tls_ca_chain == tls_ca_chain
assert coordinator._s3_config["endpoint"] == endpoint_stripped
assert coordinator._s3_config["insecure"] is (not tls_ca_chain)


def test_tracing_receivers_urls(coordinator_state: State, coordinator_charm: ops.CharmBase):

charm_tracing_relation = Relation(
endpoint="my-charm-tracing",
remote_app_data={
"receivers": json.dumps(
[{"protocol": {"name": "otlp_http", "type": "http"}, "url": "1.2.3.4:4318"}]
)
},
)
workload_tracing_relation = Relation(
endpoint="my-workload-tracing",
remote_app_data={
"receivers": json.dumps(
[
{"protocol": {"name": "otlp_http", "type": "http"}, "url": "5.6.7.8:4318"},
{"protocol": {"name": "otlp_grpc", "type": "grpc"}, "url": "5.6.7.8:4317"},
]
)
},
)
ctx = Context(coordinator_charm, meta=coordinator_charm.META)
with ctx.manager(
"update-status",
state=coordinator_state.replace(
relations=[charm_tracing_relation, workload_tracing_relation]
),
) as mgr:
coordinator: Coordinator = mgr.charm.coordinator
assert coordinator._charm_tracing_receivers_urls == {
"otlp_http": "1.2.3.4:4318",
}
assert coordinator._workload_tracing_receivers_urls == {
"otlp_http": "5.6.7.8:4318",
"otlp_grpc": "5.6.7.8:4317",
}
6 changes: 4 additions & 2 deletions tests/test_coordinated_workers/test_coordinator_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def __init__(self, framework: Framework):
"grafana-dashboards": "grafana-dashboard",
"logging": "logging",
"metrics": "metrics-endpoint",
"tracing": "self-tracing",
"charm-tracing": "self-charm-tracing",
"workload-tracing": "self-workload-tracing",
},
nginx_config=lambda _: "nginx config",
workers_config=lambda _: "worker config",
Expand All @@ -61,7 +62,8 @@ def ctx(coord_charm):
"s3": {"interface": "s3"},
"logging": {"interface": "loki_push_api"},
"certificates": {"interface": "tls-certificates"},
"self-tracing": {"interface": "tracing"},
"self-charm-tracing": {"interface": "tracing", "limit": 1},
"self-workload-tracing": {"interface": "tracing", "limit": 1},
},
"provides": {
"cluster": {"interface": "cluster"},
Expand Down

0 comments on commit 6ab947b

Please sign in to comment.