From e990750e1ef453ea429f5412262b60028446eebf Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 16 Dec 2024 21:58:18 +0100 Subject: [PATCH] otel: use dedicated container - we also build a custom resource --- container/compose.yml | 12 ++-- src/karapace/config.py | 5 ++ .../coordinator/master_coordinator.py | 4 +- src/schema_registry/__main__.py | 15 +++-- src/schema_registry/reader.py | 2 +- src/schema_registry/routers/health.py | 2 +- src/schema_registry/telemetry/container.py | 20 +++++- src/schema_registry/telemetry/tracer.py | 36 ++++++----- .../schema_registry/telemetry/test_tracer.py | 63 ++++++++++++++++++- 9 files changed, 122 insertions(+), 37 deletions(-) diff --git a/container/compose.yml b/container/compose.yml index 5b433f6ba..c42d4b428 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -85,14 +85,12 @@ services: KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true KARAPACE_TAGS__APP: karapace-schema-registry - KARAPACE_TELEMETRY__OTEL_ENDPOINT_URL: http://opentelemetry-collector:4317 - OTEL_RESOURCE_ATTRIBUTES: | - service.instance.id=karapace-schema-registry, - service.name=karapace-schema-registry, - telemetry.sdk.name=opentelemetry, - telemetry.sdk.language=python, - telemetry.sdk.version=1.28.2 + KARAPACE_TELEMETRY__RESOURCE_SERVICE_NAME: karapace-schema-registry + KARAPACE_TELEMETRY__RESOURCE_SERVICE_INSTANCE_ID: sr1 + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_NAME: opentelemetry + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_LANGUAGE: python + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_VERSION: 1.27.0 karapace-rest-proxy: image: ghcr.io/aiven-open/karapace:develop diff --git a/src/karapace/config.py b/src/karapace/config.py index 152249eb2..0dd811d92 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -29,6 +29,11 @@ class KarapaceTags(BaseModel): class KarapaceTelemetry(BaseModel): otel_endpoint_url: str | None = None + resource_service_name: str = "karapace" + resource_service_instance_id: str = "karapace" + resource_telemetry_sdk_name: str = "opentelemetry" + resource_telemetry_sdk_language: str = "python" + resource_telemetry_sdk_version: str = "1.27.0" class Config(BaseSettings): diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index ada53c664..790f3aec6 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -151,9 +151,7 @@ def init_schema_coordinator(self) -> SchemaCoordinator: @inject def get_coordinator_status(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> SchemaCoordinatorStatus: - with tracer.get_tracer().start_as_current_span( - tracer.get_name_from_caller_with_class(self, self.get_coordinator_status) - ): + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.get_coordinator_status)): assert self._sc is not None generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID return SchemaCoordinatorStatus( diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 3e1fcae25..20dfc109e 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -27,8 +27,8 @@ import uvicorn if __name__ == "__main__": - container = KarapaceContainer() - container.wire( + karapace_container = KarapaceContainer() + karapace_container.wire( modules=[ __name__, schema_registry.controller, @@ -36,7 +36,7 @@ ] ) - telemetry_container = TelemetryContainer() + telemetry_container = TelemetryContainer(karapace_container=karapace_container) telemetry_container.wire( modules=[ schema_registry.telemetry.setup, @@ -48,7 +48,7 @@ ) schema_registry_container = SchemaRegistryContainer( - karapace_container=container, telemetry_container=telemetry_container + karapace_container=karapace_container, telemetry_container=telemetry_container ) schema_registry_container.wire( modules=[ @@ -66,7 +66,6 @@ ] ) - app = create_karapace_application(config=container.config(), lifespan=karapace_schema_registry_lifespan) - uvicorn.run( - app, host=container.config().host, port=container.config().port, log_level=container.config().log_level.lower() - ) + config = karapace_container.config() + app = create_karapace_application(config=config, lifespan=karapace_schema_registry_lifespan) + uvicorn.run(app, host=config.host, port=config.port, log_level=config.log_level.lower()) diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index 4919a0271..86de663d1 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -380,7 +380,7 @@ def highest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> @inject def ready(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: - with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.ready)) as span: + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.ready)) as span: span.add_event("Acquiring ready lock") with self._ready_lock: return self._ready diff --git a/src/schema_registry/routers/health.py b/src/schema_registry/routers/health.py index c7365a555..da9f4c3fb 100644 --- a/src/schema_registry/routers/health.py +++ b/src/schema_registry/routers/health.py @@ -56,7 +56,7 @@ async def health( schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), tracer: Tracer = Depends(Provide[SchemaRegistryContainer.telemetry_container.tracer]), ) -> HealthCheck: - with tracer.get_tracer().start_span("health_check_api_handler_GET") as health_check_span: + with tracer.get_tracer().start_span("APIRouter: health_check") as health_check_span: starttime = 0.0 health_check_span.add_event("Checking schema-reader is ready") diff --git a/src/schema_registry/telemetry/container.py b/src/schema_registry/telemetry/container.py index 3aad7ac11..d9d53ea2f 100644 --- a/src/schema_registry/telemetry/container.py +++ b/src/schema_registry/telemetry/container.py @@ -4,10 +4,28 @@ """ from dependency_injector import containers, providers +from karapace.config import Config +from karapace.container import KarapaceContainer +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.semconv.attributes import telemetry_attributes as T from schema_registry.telemetry.tracer import Tracer +def create_tracing_resource(config: Config) -> Resource: + return Resource.create( + { + "service.name": config.telemetry.resource_service_name, + "service.instance.id": config.telemetry.resource_service_instance_id, + T.TELEMETRY_SDK_NAME: config.telemetry.resource_telemetry_sdk_name, + T.TELEMETRY_SDK_LANGUAGE: config.telemetry.resource_telemetry_sdk_language, + T.TELEMETRY_SDK_VERSION: config.telemetry.resource_telemetry_sdk_version, + } + ) + + class TelemetryContainer(containers.DeclarativeContainer): - tracer_provider = providers.Singleton(TracerProvider) + karapace_container = providers.Container(KarapaceContainer) + tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config) + tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource) tracer = providers.Singleton(Tracer) diff --git a/src/schema_registry/telemetry/tracer.py b/src/schema_registry/telemetry/tracer.py index e25a463da..d23fa0ec4 100644 --- a/src/schema_registry/telemetry/tracer.py +++ b/src/schema_registry/telemetry/tracer.py @@ -11,6 +11,12 @@ from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor +from opentelemetry.semconv.attributes import ( + client_attributes as C, + http_attributes as H, + server_attributes as S, + url_attributes as U, +) from opentelemetry.trace.span import Span from typing import Any @@ -47,22 +53,22 @@ def add_span_attribute(span: Span, key: str, value: str | int) -> None: @staticmethod def update_span_with_request(request: Request, span: Span) -> None: if span.is_recording(): - span.set_attribute("server.scheme", request.url.scheme) - span.set_attribute("server.hostname", request.url.hostname or "") - span.set_attribute("server.port", request.url.port or "") - span.set_attribute("server.is_secure", request.url.is_secure) - span.set_attribute("request.http.method", request.method) - span.set_attribute("request.http.path", request.url.path) - span.set_attribute("request.http.client.host", request.client.host or "" if request.client else "") - span.set_attribute("request.http.client.port", request.client.port or "" if request.client else "") - span.set_attribute("request.http.headers.connection", request.headers.get("connection", "")) - span.set_attribute("request.http.headers.user_agent", request.headers.get("user-agent", "")) - span.set_attribute("request.http.headers.content_type", request.headers.get("content-type", "")) + span.set_attribute(C.CLIENT_ADDRESS, request.client.host or "" if request.client else "") + span.set_attribute(C.CLIENT_PORT, request.client.port or "" if request.client else "") + span.set_attribute(S.SERVER_ADDRESS, request.url.hostname or "") + span.set_attribute(S.SERVER_PORT, request.url.port or "") + span.set_attribute(U.URL_SCHEME, request.url.scheme) + span.set_attribute(U.URL_PATH, request.url.path) + span.set_attribute(H.HTTP_REQUEST_METHOD, request.method) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.connection", request.headers.get("connection", "")) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.user_agent", request.headers.get("user-agent", "")) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.content_type", request.headers.get("content-type", "")) @staticmethod def update_span_with_response(response: Response, span: Span) -> None: if span.is_recording(): - span.set_attribute("response.http.status_code", response.status_code) - span.set_attribute("response.http.media_type", response.media_type or "") - span.set_attribute("response.http.headers.content_type", response.headers.get("content-type", "")) - span.set_attribute("response.http.headers.content_length", response.headers.get("content-length", "")) + span.set_attribute(H.HTTP_RESPONSE_STATUS_CODE, response.status_code) + span.set_attribute(f"{H.HTTP_RESPONSE_HEADER_TEMPLATE}.content_type", response.headers.get("content-type", "")) + span.set_attribute( + f"{H.HTTP_RESPONSE_HEADER_TEMPLATE}.content_length", response.headers.get("content-length", "") + ) diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py index ff11c31fc..b5860657e 100644 --- a/tests/unit/schema_registry/telemetry/test_tracer.py +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -5,11 +5,13 @@ See LICENSE for details """ +from fastapi import Request, Response from karapace.config import KarapaceTelemetry from karapace.container import KarapaceContainer from opentelemetry.sdk.trace.export import SpanProcessor +from opentelemetry.trace.span import Span from schema_registry.telemetry.tracer import Tracer -from unittest.mock import patch +from unittest.mock import call, MagicMock, patch def test_tracer(karapace_container: KarapaceContainer): @@ -55,3 +57,62 @@ def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceCo processor: SpanProcessor = Tracer.get_span_processor(config=karapace_container.config()) mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value) assert processor is mock_simple_span_processor.return_value + + +def test_add_span_attribute(): + span = MagicMock(spec=Span) + + # Test when span is not recording + span.is_recording.return_value = False + Tracer.add_span_attribute(span=span, key="key", value="value") + assert not span.set_attribute.called + + # Test when span is recording + span.is_recording.return_value = True + Tracer.add_span_attribute(span=span, key="key", value="value") + span.set_attribute.assert_called_once_with("key", "value") + + +def test_update_span_with_request(): + span = MagicMock(spec=Span) + span.is_recording.return_value = True + + request = MagicMock(spec=Request) + request.headers = {"content-type": "application/json", "connection": "keep-alive", "user-agent": "pytest"} + request.method = "GET" + request.url = MagicMock(port=8081, scheme="http", path="/test", hostname="server") + request.client = MagicMock(host="client", port=8080) + + Tracer.update_span_with_request(request=request, span=span) + span.set_attribute.assert_has_calls( + [ + call("client.address", "client"), + call("client.port", 8080), + call("server.address", "server"), + call("server.port", 8081), + call("url.scheme", "http"), + call("url.path", "/test"), + call("http.request.method", "GET"), + call("http.request.header.connection", "keep-alive"), + call("http.request.header.user_agent", "pytest"), + call("http.request.header.content_type", "application/json"), + ] + ) + + +def test_update_span_with_response(): + span = MagicMock(spec=Span) + + response = MagicMock(spec=Response) + response.status_code = 200 + response.headers = {"content-type": "application/json", "content-length": 8} + + span.is_recording.return_value = True + Tracer.update_span_with_response(response=response, span=span) + span.set_attribute.assert_has_calls( + [ + call("http.response.status_code", 200), + call("http.response.header.content_type", "application/json"), + call("http.response.header.content_length", 8), + ] + )