diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index 9b4f8181c..ada53c664 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -10,10 +10,13 @@ from aiokafka.errors import KafkaConnectionError from aiokafka.helpers import create_ssl_context from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest +from dependency_injector.wiring import inject, Provide from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS from karapace.typing import SchemaReaderStoppper +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer from threading import Thread from typing import Final @@ -146,16 +149,20 @@ def init_schema_coordinator(self) -> SchemaCoordinator: schema_coordinator.start() return schema_coordinator - def get_coordinator_status(self) -> SchemaCoordinatorStatus: - assert self._sc is not None - generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID - return SchemaCoordinatorStatus( - is_primary=self._sc.are_we_master() if self._sc is not None else None, - is_primary_eligible=self._config.master_eligibility, - primary_url=self._sc.master_url if self._sc is not None else None, - is_running=True, - group_generation_id=generation if generation is not None else -1, - ) + @inject + def get_coordinator_status(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> SchemaCoordinatorStatus: + with tracer.get_tracer().start_as_current_span( + tracer.get_name_from_caller_with_class(self, self.get_coordinator_status) + ): + assert self._sc is not None + generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID + return SchemaCoordinatorStatus( + is_primary=self._sc.are_we_master() if self._sc is not None else None, + is_primary_eligible=self._config.master_eligibility, + primary_url=self._sc.master_url if self._sc is not None else None, + is_running=True, + group_generation_id=generation if generation is not None else -1, + ) def get_master_info(self) -> tuple[bool | None, str | None]: """Return whether we're the master, and the actual master url that can be used if we're not""" diff --git a/src/karapace/offset_watcher.py b/src/karapace/offset_watcher.py index 6056d5f37..092691e9f 100644 --- a/src/karapace/offset_watcher.py +++ b/src/karapace/offset_watcher.py @@ -4,6 +4,9 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from dependency_injector.wiring import inject, Provide +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer from threading import Condition @@ -20,8 +23,10 @@ def __init__(self) -> None: self._condition = Condition() self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever. - def greatest_offset(self) -> int: - return self._greatest_offset + @inject + def greatest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int: + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.greatest_offset)): + return self._greatest_offset def offset_seen(self, new_offset: int) -> None: with self._condition: diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 9bd86cb8f..3e1fcae25 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -7,6 +7,8 @@ from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan from schema_registry.telemetry.container import TelemetryContainer +import karapace.coordinator.master_coordinator +import karapace.offset_watcher import schema_registry.controller import schema_registry.factory import schema_registry.reader @@ -40,6 +42,8 @@ schema_registry.telemetry.setup, schema_registry.telemetry.middleware, schema_registry.reader, + karapace.offset_watcher, + karapace.coordinator.master_coordinator, ] ) diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index a0e4f68b3..4919a0271 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -278,29 +278,33 @@ def run(self) -> None: self.consecutive_unexpected_errors_start = time.monotonic() LOG.warning("Unexpected exception in schema reader loop - %s", e) - async def is_healthy(self) -> bool: - if ( - self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS - and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS - ): - LOG.warning( - "Health check failed with %s consecutive errors in %s seconds", self.consecutive_unexpected_errors, duration - ) - return False - - try: - # Explicitly check if topic exists. - # This needs to be done because in case of missing topic the consumer will not repeat the error - # on conscutive consume calls and instead will return empty list. - assert self.admin_client is not None - topic = self.config.topic_name - res = self.admin_client.describe_topics(TopicCollection([topic])) - await asyncio.wrap_future(res[topic]) - except Exception as e: # pylint: disable=broad-except - LOG.warning("Health check failed with %r", e) - return False + @inject + async def is_healthy(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.is_healthy)): + if ( + self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS + and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS + ): + LOG.warning( + "Health check failed with %s consecutive errors in %s seconds", + self.consecutive_unexpected_errors, + duration, + ) + return False - return True + try: + # Explicitly check if topic exists. + # This needs to be done because in case of missing topic the consumer will not repeat the error + # on conscutive consume calls and instead will return empty list. + assert self.admin_client is not None + topic = self.config.topic_name + res = self.admin_client.describe_topics(TopicCollection([topic])) + await asyncio.wrap_future(res[topic]) + except Exception as e: # pylint: disable=broad-except + LOG.warning("Health check failed with %r", e) + return False + + return True def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" @@ -369,12 +373,15 @@ def _is_ready(self) -> bool: LOG.info("Ready in %s seconds", time.monotonic() - self.start_time) return ready - def highest_offset(self) -> int: - return max(self._highest_offset, self._offset_watcher.greatest_offset()) + @inject + def highest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int: + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.highest_offset)): + return max(self._highest_offset, self._offset_watcher.greatest_offset()) @inject def ready(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: - with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.ready)): + with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.ready)) as span: + span.add_event("Acquiring ready lock") with self._ready_lock: return self._ready diff --git a/src/schema_registry/routers/health.py b/src/schema_registry/routers/health.py index b02d2f760..c7365a555 100644 --- a/src/schema_registry/routers/health.py +++ b/src/schema_registry/routers/health.py @@ -5,9 +5,12 @@ from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Depends, HTTPException, status -from karapace.schema_registry import KarapaceSchemaRegistry +from opentelemetry.trace import Span +from opentelemetry.trace.status import StatusCode from pydantic import BaseModel from schema_registry.container import SchemaRegistryContainer +from schema_registry.registry import KarapaceSchemaRegistry +from schema_registry.telemetry.tracer import Tracer class HealthStatus(BaseModel): @@ -34,34 +37,57 @@ class HealthCheck(BaseModel): ) +def set_health_status_tracing_attributes(health_check_span: Span, health_status: HealthStatus) -> None: + health_check_span.add_event("Setting health status tracing attributes") + health_check_span.set_attribute("schema_registry_ready", health_status.schema_registry_ready) + health_check_span.set_attribute("schema_registry_startup_time_sec", health_status.schema_registry_startup_time_sec) + health_check_span.set_attribute( + "schema_registry_reader_current_offset", health_status.schema_registry_reader_current_offset + ) + health_check_span.set_attribute( + "schema_registry_reader_highest_offset", health_status.schema_registry_reader_highest_offset + ) + health_check_span.set_attribute("schema_registry_is_primary", getattr(health_status, "schema_registry_is_primary", "")) + + @health_router.get("") @inject async def health( - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), + tracer: Tracer = Depends(Provide[SchemaRegistryContainer.telemetry_container.tracer]), ) -> HealthCheck: - starttime = 0.0 - if schema_registry.schema_reader.ready(): - starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time - - cs = schema_registry.mc.get_coordinator_status() - - health_status = HealthStatus( - schema_registry_ready=schema_registry.schema_reader.ready(), - schema_registry_startup_time_sec=starttime, - schema_registry_reader_current_offset=schema_registry.schema_reader.offset, - schema_registry_reader_highest_offset=schema_registry.schema_reader.highest_offset(), - schema_registry_is_primary=cs.is_primary, - schema_registry_is_primary_eligible=cs.is_primary_eligible, - schema_registry_primary_url=cs.primary_url, - schema_registry_coordinator_running=cs.is_running, - schema_registry_coordinator_generation_id=cs.group_generation_id, - ) - # if self._auth is not None: - # resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified + with tracer.get_tracer().start_span("health_check_api_handler_GET") as health_check_span: + starttime = 0.0 - if not await schema_registry.schema_reader.is_healthy(): - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + health_check_span.add_event("Checking schema-reader is ready") + schema_reader_is_ready = schema_registry.schema_reader.ready() + if schema_reader_is_ready: + starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time + + health_check_span.add_event("Getting schema-registry master-coordinator status") + cs = schema_registry.mc.get_coordinator_status() + + health_check_span.add_event("Building health status response model") + health_status = HealthStatus( + schema_registry_ready=schema_reader_is_ready, + schema_registry_startup_time_sec=starttime, + schema_registry_reader_current_offset=schema_registry.schema_reader.offset, + schema_registry_reader_highest_offset=schema_registry.schema_reader.highest_offset(), + schema_registry_is_primary=cs.is_primary, + schema_registry_is_primary_eligible=cs.is_primary_eligible, + schema_registry_primary_url=cs.primary_url, + schema_registry_coordinator_running=cs.is_running, + schema_registry_coordinator_generation_id=cs.group_generation_id, ) + set_health_status_tracing_attributes(health_check_span=health_check_span, health_status=health_status) + + # if self._auth is not None: + # resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified + + if not await schema_registry.schema_reader.is_healthy(): + health_check_span.add_event("Erroring because schema-reader is not healthy") + health_check_span.set_status(status=StatusCode.ERROR, description="Schema reader is not healthy") + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) - return HealthCheck(status=health_status, healthy=True) + health_check_span.add_event("Returning health check response") + return HealthCheck(status=health_status, healthy=True) diff --git a/src/schema_registry/telemetry/tracer.py b/src/schema_registry/telemetry/tracer.py index 5b6ed86ea..e25a463da 100644 --- a/src/schema_registry/telemetry/tracer.py +++ b/src/schema_registry/telemetry/tracer.py @@ -3,6 +3,7 @@ See LICENSE for details """ +from collections.abc import Callable from dependency_injector.wiring import inject, Provide from fastapi import Request, Response from karapace.config import Config @@ -11,6 +12,7 @@ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor from opentelemetry.trace.span import Span +from typing import Any import inspect @@ -34,8 +36,8 @@ def get_name_from_caller() -> str: return inspect.stack()[1].function @staticmethod - def get_name_from_caller_with_class(function_class, function) -> str: - return f"{type(function_class).__name__}.{function.__name__}" + def get_name_from_caller_with_class(function_class: object, function: Callable[[Any], Any]) -> str: + return f"{type(function_class).__name__}.{function.__name__}()" @staticmethod def add_span_attribute(span: Span, key: str, value: str | int) -> None: @@ -46,13 +48,13 @@ def add_span_attribute(span: Span, key: str, value: str | int) -> None: def update_span_with_request(request: Request, span: Span) -> None: if span.is_recording(): span.set_attribute("server.scheme", request.url.scheme) - span.set_attribute("server.hostname", request.url.hostname) - span.set_attribute("server.port", request.url.port) + span.set_attribute("server.hostname", request.url.hostname or "") + span.set_attribute("server.port", request.url.port or "") span.set_attribute("server.is_secure", request.url.is_secure) span.set_attribute("request.http.method", request.method) span.set_attribute("request.http.path", request.url.path) - span.set_attribute("request.http.client.host", request.client.host) - span.set_attribute("request.http.client.port", request.client.port) + span.set_attribute("request.http.client.host", request.client.host or "" if request.client else "") + span.set_attribute("request.http.client.port", request.client.port or "" if request.client else "") span.set_attribute("request.http.headers.connection", request.headers.get("connection", "")) span.set_attribute("request.http.headers.user_agent", request.headers.get("user-agent", "")) span.set_attribute("request.http.headers.content_type", request.headers.get("content-type", "")) diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py index 25f349e41..ff11c31fc 100644 --- a/tests/unit/schema_registry/telemetry/test_tracer.py +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -30,7 +30,7 @@ class Test: def test_function(self): return Tracer.get_name_from_caller_with_class(self, self.test_function) - assert Test().test_function() == "Test.test_function" + assert Test().test_function() == "Test.test_function()" def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: