Skip to content

Commit

Permalink
Workload tracing (#473)
Browse files Browse the repository at this point in the history
* Add workload traces from Loki

* adjust comments
  • Loading branch information
mmkay authored Nov 26, 2024
1 parent cf6a25b commit 4f98bcc
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 3 deletions.
9 changes: 8 additions & 1 deletion metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,16 @@ requires:
Certificate and key files for the loki server.
catalogue:
interface: catalogue
tracing:
charm-tracing:
description: |
Enables sending charm traces to a distributed tracing backend such as Tempo.
limit: 1
interface: tracing
workload-tracing:
description: |
Enables sending workload traces to a distributed tracing backend such as Tempo.
limit: 1
interface: tracing

peers:
replicas:
Expand Down
47 changes: 45 additions & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from charms.tempo_coordinator_k8s.v0.charm_tracing import trace_charm
from charms.tempo_coordinator_k8s.v0.tracing import TracingEndpointRequirer, charm_tracing_config
from charms.traefik_k8s.v1.ingress_per_unit import IngressPerUnitRequirer
from cosl import JujuTopology
from ops import CollectStatusEvent, StoredState
from ops.charm import CharmBase
from ops.main import main
Expand Down Expand Up @@ -141,6 +142,8 @@ def __init__(self, *args):
self._node_exporter_container = self.unit.get_container("node-exporter")
self.unit.set_ports(self._port)

self._juju_topology = JujuTopology.from_charm(self)

# If Loki is run in single-tenant mode, all the chunks are put in a folder named "fake"
# https://grafana.com/docs/loki/latest/operations/storage/filesystem/
# https://grafana.com/docs/loki/latest/rules/#ruler-storage
Expand Down Expand Up @@ -218,9 +221,23 @@ def __init__(self, *args):
self.dashboard_provider = GrafanaDashboardProvider(self)

self.catalogue = CatalogueConsumer(charm=self, item=self._catalogue_item)
self.tracing = TracingEndpointRequirer(self, protocols=["otlp_http"])
self.charm_tracing = TracingEndpointRequirer(
self, relation_name="charm-tracing", protocols=["otlp_http"]
)
self.workload_tracing = TracingEndpointRequirer(
self, relation_name="workload-tracing", protocols=["jaeger_thrift_http"]
)
self._charm_tracing_endpoint, self._charm_tracing_ca_cert = charm_tracing_config(
self.tracing, self._ca_cert_path
self.charm_tracing, self._ca_cert_path
)

self.framework.observe(
self.workload_tracing.on.endpoint_changed, # type: ignore
self._on_workload_tracing_endpoint_changed,
)
self.framework.observe(
self.workload_tracing.on.endpoint_removed, # type: ignore
self._on_workload_tracing_endpoint_removed,
)

self.framework.observe(self.on.config_changed, self._on_config_changed)
Expand Down Expand Up @@ -294,6 +311,14 @@ def _on_logging_relation_changed(self, event):
# when it is related with ingress. If not, endpoints will end up outdated in consumer side.
self.loki_provider.update_endpoint(url=self._external_url, relation=event.relation)

def _on_workload_tracing_endpoint_changed(self, _) -> None:
"""Adds workload tracing information to loki's config."""
self._configure()

def _on_workload_tracing_endpoint_removed(self, _) -> None:
"""Removes workload tracing information from loki's config."""
self._configure()

##############################################
# PROPERTIES #
##############################################
Expand Down Expand Up @@ -329,6 +354,20 @@ def _loki_pebble_layer(self) -> Layer:
Returns:
a Pebble layer specification for the Loki workload container.
"""
env = {}
if self.workload_tracing.is_ready():
tempo_endpoint = self.workload_tracing.get_endpoint("jaeger_thrift_http")
topology = self._juju_topology
env.update(
{
"JAEGER_ENDPOINT": (f"{tempo_endpoint}/api/traces?format=jaeger.thrift"),
"JAEGER_SAMPLER_PARAM": "1",
"JAEGER_SAMPLER_TYPE": "const",
"JAEGER_TAGS": f"juju_application={topology.application},juju_model={topology.model}"
+ f",juju_model_uuid={topology.model_uuid},juju_unit={topology.unit},juju_charm={topology.charm_name}",
},
)

pebble_layer = Layer(
{
"summary": "Loki layer",
Expand All @@ -339,6 +378,7 @@ def _loki_pebble_layer(self) -> Layer:
"summary": "loki",
"command": self._loki_command,
"startup": "disabled",
"environment": env,
},
},
}
Expand Down Expand Up @@ -520,6 +560,9 @@ def _configure(self): # noqa: C901
self._loki_container.restart(self._name)
logger.info("Loki restarted. The service was not in the active state.")

# trigger replan to notice if it was the pebble layer itself that changed
self._loki_container.replan()

if isinstance(to_status(self._stored.status["rules"]), BlockedStatus):
# Wait briefly for Loki to come back up and re-check the alert rules
# in case an upgrade/refresh caused the check to occur when it wasn't
Expand Down
103 changes: 103 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

import requests
import yaml
from juju.application import Application
from juju.unit import Unit
from minio import Minio
from pytest_operator.plugin import OpsTest
from tenacity import retry, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -468,3 +472,102 @@ async def delete_pod(model_name: str, app_name: str, unit_num: int) -> bool:
except subprocess.CalledProcessError as e:
logger.error(e.stdout.decode())
raise e


async def deploy_and_configure_minio(ops_test: OpsTest) -> None:
"""Deploy and set up minio and s3-integrator needed for s3-like storage backend in the HA charms."""
config = {
"access-key": "accesskey",
"secret-key": "secretkey",
}
await ops_test.model.deploy("minio", channel="edge", trust=True, config=config)
await ops_test.model.wait_for_idle(apps=["minio"], status="active", timeout=2000)
minio_addr = await get_unit_address(ops_test, "minio", 0)

mc_client = Minio(
f"{minio_addr}:9000",
access_key="accesskey",
secret_key="secretkey",
secure=False,
)

# create tempo bucket
found = mc_client.bucket_exists("tempo")
if not found:
mc_client.make_bucket("tempo")

# configure s3-integrator
s3_integrator_app: Application = ops_test.model.applications["s3-integrator"]
s3_integrator_leader: Unit = s3_integrator_app.units[0]

await s3_integrator_app.set_config(
{
"endpoint": f"minio-0.minio-endpoints.{ops_test.model.name}.svc.cluster.local:9000",
"bucket": "tempo",
}
)

action = await s3_integrator_leader.run_action("sync-s3-credentials", **config)
action_result = await action.wait()
assert action_result.status == "completed"


async def deploy_tempo_cluster(ops_test: OpsTest):
"""Deploys tempo in its HA version together with minio and s3-integrator."""
tempo_app = "tempo"
worker_app = "tempo-worker"
tempo_worker_charm_url, worker_channel = "tempo-worker-k8s", "edge"
tempo_coordinator_charm_url, coordinator_channel = "tempo-coordinator-k8s", "edge"
await ops_test.model.deploy(
tempo_worker_charm_url, application_name=worker_app, channel=worker_channel, trust=True
)
await ops_test.model.deploy(
tempo_coordinator_charm_url,
application_name=tempo_app,
channel=coordinator_channel,
trust=True,
)
await ops_test.model.deploy("s3-integrator", channel="edge")

await ops_test.model.integrate(tempo_app + ":s3", "s3-integrator" + ":s3-credentials")
await ops_test.model.integrate(tempo_app + ":tempo-cluster", worker_app + ":tempo-cluster")

await deploy_and_configure_minio(ops_test)
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[tempo_app, worker_app, "s3-integrator"],
status="active",
timeout=2000,
idle_period=30,
)


def get_traces(tempo_host: str, service_name="tracegen-otlp_http", tls=True):
"""Get traces directly from Tempo REST API."""
url = f"{'https' if tls else 'http'}://{tempo_host}:3200/api/search?tags=service.name={service_name}"
req = requests.get(
url,
verify=False,
)
assert req.status_code == 200
traces = json.loads(req.text)["traces"]
return traces


@retry(stop=stop_after_attempt(15), wait=wait_exponential(multiplier=1, min=4, max=10))
async def get_traces_patiently(tempo_host, service_name="tracegen-otlp_http", tls=True):
"""Get traces directly from Tempo REST API, but also try multiple times.
Useful for cases when Tempo might not return the traces immediately (its API is known for returning data in
random order).
"""
traces = get_traces(tempo_host, service_name=service_name, tls=tls)
assert len(traces) > 0
return traces


async def get_application_ip(ops_test: OpsTest, app_name: str) -> str:
"""Get the application IP address."""
status = await ops_test.model.get_status()
app = status["applications"][app_name]
return app.public_address
56 changes: 56 additions & 0 deletions tests/integration/test_workload_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
from pathlib import Path

import pytest
import yaml
from helpers import deploy_tempo_cluster, get_application_ip, get_traces_patiently, is_loki_up

logger = logging.getLogger(__name__)

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
app_name = "loki"
TEMPO_APP_NAME = "tempo"
loki_resources = {
"loki-image": METADATA["resources"]["loki-image"]["upstream-source"],
"node-exporter-image": METADATA["resources"]["node-exporter-image"]["upstream-source"],
}


async def test_setup_env(ops_test):
await ops_test.model.set_config({"logging-config": "<root>=WARNING; unit=DEBUG"})


@pytest.mark.abort_on_fail
async def test_workload_tracing_is_present(ops_test, loki_charm):
logger.info("deploying tempo cluster")
await deploy_tempo_cluster(ops_test)

logger.info("deploying local charm")
await ops_test.model.deploy(
loki_charm, resources=loki_resources, application_name=app_name, trust=True
)
await ops_test.model.wait_for_idle(
apps=[app_name], status="active", timeout=300, wait_for_exact_units=1
)

# we relate _only_ workload tracing not to confuse with charm traces
await ops_test.model.add_relation(
"{}:workload-tracing".format(app_name), "{}:tracing".format(TEMPO_APP_NAME)
)
# but we also need anything to come in to loki so that loki generates traces
await ops_test.model.add_relation(
"{}:logging".format(TEMPO_APP_NAME), "{}:logging".format(app_name)
)
await ops_test.model.wait_for_idle(apps=[app_name], status="active")
assert await is_loki_up(ops_test, app_name, num_units=1)

# Verify workload traces are ingested into Tempo
assert await get_traces_patiently(
await get_application_ip(ops_test, TEMPO_APP_NAME),
service_name=f"{app_name}",
tls=False,
)
2 changes: 2 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,7 @@ deps =
juju
pytest
pytest-operator
cosl
minio
commands =
pytest -v --tb native --log-cli-level=INFO --color=yes -s {posargs} {toxinidir}/tests/integration

0 comments on commit 4f98bcc

Please sign in to comment.