Skip to content

Commit

Permalink
feat: use tracing in healthcheck endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
nosahama committed Dec 16, 2024
1 parent 8d11f75 commit 57b07bc
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 69 deletions.
27 changes: 17 additions & 10 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
from aiokafka.errors import KafkaConnectionError
from aiokafka.helpers import create_ssl_context
from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest
from dependency_injector.wiring import inject, Provide
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.tracer import Tracer
from threading import Thread
from typing import Final

Expand Down Expand Up @@ -146,16 +149,20 @@ def init_schema_coordinator(self) -> SchemaCoordinator:
schema_coordinator.start()
return schema_coordinator

def get_coordinator_status(self) -> SchemaCoordinatorStatus:
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master() if self._sc is not None else None,
is_primary_eligible=self._config.master_eligibility,
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
group_generation_id=generation if generation is not None else -1,
)
@inject
def get_coordinator_status(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> SchemaCoordinatorStatus:
with tracer.get_tracer().start_as_current_span(
tracer.get_name_from_caller_with_class(self, self.get_coordinator_status)
):
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master() if self._sc is not None else None,
is_primary_eligible=self._config.master_eligibility,
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
group_generation_id=generation if generation is not None else -1,
)

def get_master_info(self) -> tuple[bool | None, str | None]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
Expand Down
9 changes: 7 additions & 2 deletions src/karapace/offset_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from dependency_injector.wiring import inject, Provide
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.tracer import Tracer
from threading import Condition


Expand All @@ -20,8 +23,10 @@ def __init__(self) -> None:
self._condition = Condition()
self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever.

def greatest_offset(self) -> int:
return self._greatest_offset
@inject
def greatest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int:
with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.greatest_offset)):
return self._greatest_offset

def offset_seen(self, new_offset: int) -> None:
with self._condition:
Expand Down
4 changes: 4 additions & 0 deletions src/schema_registry/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan
from schema_registry.telemetry.container import TelemetryContainer

import karapace.coordinator.master_coordinator
import karapace.offset_watcher
import schema_registry.controller
import schema_registry.factory
import schema_registry.reader
Expand Down Expand Up @@ -40,6 +42,8 @@
schema_registry.telemetry.setup,
schema_registry.telemetry.middleware,
schema_registry.reader,
karapace.offset_watcher,
karapace.coordinator.master_coordinator,
]
)

Expand Down
57 changes: 32 additions & 25 deletions src/schema_registry/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,29 +278,33 @@ def run(self) -> None:
self.consecutive_unexpected_errors_start = time.monotonic()
LOG.warning("Unexpected exception in schema reader loop - %s", e)

async def is_healthy(self) -> bool:
if (
self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS
and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS
):
LOG.warning(
"Health check failed with %s consecutive errors in %s seconds", self.consecutive_unexpected_errors, duration
)
return False

try:
# Explicitly check if topic exists.
# This needs to be done because in case of missing topic the consumer will not repeat the error
# on conscutive consume calls and instead will return empty list.
assert self.admin_client is not None
topic = self.config.topic_name
res = self.admin_client.describe_topics(TopicCollection([topic]))
await asyncio.wrap_future(res[topic])
except Exception as e: # pylint: disable=broad-except
LOG.warning("Health check failed with %r", e)
return False
@inject
async def is_healthy(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool:
with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.is_healthy)):
if (
self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS
and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS
):
LOG.warning(
"Health check failed with %s consecutive errors in %s seconds",
self.consecutive_unexpected_errors,
duration,
)
return False

return True
try:
# Explicitly check if topic exists.
# This needs to be done because in case of missing topic the consumer will not repeat the error
# on conscutive consume calls and instead will return empty list.
assert self.admin_client is not None
topic = self.config.topic_name
res = self.admin_client.describe_topics(TopicCollection([topic]))
await asyncio.wrap_future(res[topic])
except Exception as e: # pylint: disable=broad-except
LOG.warning("Health check failed with %r", e)
return False

return True

def _get_beginning_offset(self) -> int:
assert self.consumer is not None, "Thread must be started"
Expand Down Expand Up @@ -369,12 +373,15 @@ def _is_ready(self) -> bool:
LOG.info("Ready in %s seconds", time.monotonic() - self.start_time)
return ready

def highest_offset(self) -> int:
return max(self._highest_offset, self._offset_watcher.greatest_offset())
@inject
def highest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int:
with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.highest_offset)):
return max(self._highest_offset, self._offset_watcher.greatest_offset())

@inject
def ready(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool:
with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.ready)):
with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.ready)) as span:
span.add_event("Acquiring ready lock")
with self._ready_lock:
return self._ready

Expand Down
76 changes: 51 additions & 25 deletions src/schema_registry/routers/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Depends, HTTPException, status
from karapace.schema_registry import KarapaceSchemaRegistry
from opentelemetry.trace import Span
from opentelemetry.trace.status import StatusCode
from pydantic import BaseModel
from schema_registry.container import SchemaRegistryContainer
from schema_registry.registry import KarapaceSchemaRegistry
from schema_registry.telemetry.tracer import Tracer


class HealthStatus(BaseModel):
Expand All @@ -34,34 +37,57 @@ class HealthCheck(BaseModel):
)


def set_health_status_tracing_attributes(health_check_span: Span, health_status: HealthStatus) -> None:
health_check_span.add_event("Setting health status tracing attributes")
health_check_span.set_attribute("schema_registry_ready", health_status.schema_registry_ready)
health_check_span.set_attribute("schema_registry_startup_time_sec", health_status.schema_registry_startup_time_sec)
health_check_span.set_attribute(
"schema_registry_reader_current_offset", health_status.schema_registry_reader_current_offset
)
health_check_span.set_attribute(
"schema_registry_reader_highest_offset", health_status.schema_registry_reader_highest_offset
)
health_check_span.set_attribute("schema_registry_is_primary", getattr(health_status, "schema_registry_is_primary", ""))


@health_router.get("")
@inject
async def health(
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
tracer: Tracer = Depends(Provide[SchemaRegistryContainer.telemetry_container.tracer]),
) -> HealthCheck:
starttime = 0.0
if schema_registry.schema_reader.ready():
starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time

cs = schema_registry.mc.get_coordinator_status()

health_status = HealthStatus(
schema_registry_ready=schema_registry.schema_reader.ready(),
schema_registry_startup_time_sec=starttime,
schema_registry_reader_current_offset=schema_registry.schema_reader.offset,
schema_registry_reader_highest_offset=schema_registry.schema_reader.highest_offset(),
schema_registry_is_primary=cs.is_primary,
schema_registry_is_primary_eligible=cs.is_primary_eligible,
schema_registry_primary_url=cs.primary_url,
schema_registry_coordinator_running=cs.is_running,
schema_registry_coordinator_generation_id=cs.group_generation_id,
)
# if self._auth is not None:
# resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified
with tracer.get_tracer().start_span("health_check_api_handler_GET") as health_check_span:
starttime = 0.0

if not await schema_registry.schema_reader.is_healthy():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
health_check_span.add_event("Checking schema-reader is ready")
schema_reader_is_ready = schema_registry.schema_reader.ready()
if schema_reader_is_ready:
starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time

health_check_span.add_event("Getting schema-registry master-coordinator status")
cs = schema_registry.mc.get_coordinator_status()

health_check_span.add_event("Building health status response model")
health_status = HealthStatus(
schema_registry_ready=schema_reader_is_ready,
schema_registry_startup_time_sec=starttime,
schema_registry_reader_current_offset=schema_registry.schema_reader.offset,
schema_registry_reader_highest_offset=schema_registry.schema_reader.highest_offset(),
schema_registry_is_primary=cs.is_primary,
schema_registry_is_primary_eligible=cs.is_primary_eligible,
schema_registry_primary_url=cs.primary_url,
schema_registry_coordinator_running=cs.is_running,
schema_registry_coordinator_generation_id=cs.group_generation_id,
)
set_health_status_tracing_attributes(health_check_span=health_check_span, health_status=health_status)

# if self._auth is not None:
# resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified

if not await schema_registry.schema_reader.is_healthy():
health_check_span.add_event("Erroring because schema-reader is not healthy")
health_check_span.set_status(status=StatusCode.ERROR, description="Schema reader is not healthy")
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)

return HealthCheck(status=health_status, healthy=True)
health_check_span.add_event("Returning health check response")
return HealthCheck(status=health_status, healthy=True)
14 changes: 8 additions & 6 deletions src/schema_registry/telemetry/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
See LICENSE for details
"""

from collections.abc import Callable
from dependency_injector.wiring import inject, Provide
from fastapi import Request, Response
from karapace.config import Config
Expand All @@ -11,6 +12,7 @@
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor
from opentelemetry.trace.span import Span
from typing import Any

import inspect

Expand All @@ -34,8 +36,8 @@ def get_name_from_caller() -> str:
return inspect.stack()[1].function

@staticmethod
def get_name_from_caller_with_class(function_class, function) -> str:
return f"{type(function_class).__name__}.{function.__name__}"
def get_name_from_caller_with_class(function_class: object, function: Callable[[Any], Any]) -> str:
return f"{type(function_class).__name__}.{function.__name__}()"

@staticmethod
def add_span_attribute(span: Span, key: str, value: str | int) -> None:
Expand All @@ -46,13 +48,13 @@ def add_span_attribute(span: Span, key: str, value: str | int) -> None:
def update_span_with_request(request: Request, span: Span) -> None:
if span.is_recording():
span.set_attribute("server.scheme", request.url.scheme)
span.set_attribute("server.hostname", request.url.hostname)
span.set_attribute("server.port", request.url.port)
span.set_attribute("server.hostname", request.url.hostname or "")
span.set_attribute("server.port", request.url.port or "")
span.set_attribute("server.is_secure", request.url.is_secure)
span.set_attribute("request.http.method", request.method)
span.set_attribute("request.http.path", request.url.path)
span.set_attribute("request.http.client.host", request.client.host)
span.set_attribute("request.http.client.port", request.client.port)
span.set_attribute("request.http.client.host", request.client.host or "" if request.client else "")
span.set_attribute("request.http.client.port", request.client.port or "" if request.client else "")
span.set_attribute("request.http.headers.connection", request.headers.get("connection", ""))
span.set_attribute("request.http.headers.user_agent", request.headers.get("user-agent", ""))
span.set_attribute("request.http.headers.content_type", request.headers.get("content-type", ""))
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/schema_registry/telemetry/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Test:
def test_function(self):
return Tracer.get_name_from_caller_with_class(self, self.test_function)

assert Test().test_function() == "Test.test_function"
assert Test().test_function() == "Test.test_function()"


def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceContainer) -> None:
Expand Down

0 comments on commit 57b07bc

Please sign in to comment.