Skip to content

Commit

Permalink
otel: use dedicated container
Browse files Browse the repository at this point in the history
- we also build a custom resource
  • Loading branch information
nosahama committed Dec 17, 2024
1 parent abe7bc7 commit f2c52ed
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 36 deletions.
12 changes: 5 additions & 7 deletions container/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,12 @@ services:
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true
KARAPACE_TAGS__APP: karapace-schema-registry

KARAPACE_TELEMETRY__OTEL_ENDPOINT_URL: http://opentelemetry-collector:4317
OTEL_RESOURCE_ATTRIBUTES: |
service.instance.id=karapace-schema-registry,
service.name=karapace-schema-registry,
telemetry.sdk.name=opentelemetry,
telemetry.sdk.language=python,
telemetry.sdk.version=1.28.2
KARAPACE_TELEMETRY__RESOURCE_SERVICE_NAME: karapace-schema-registry
KARAPACE_TELEMETRY__RESOURCE_SERVICE_INSTANCE_ID: sr1
KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_NAME: opentelemetry
KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_LANGUAGE: python
KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_VERSION: 1.27.0

karapace-rest-proxy:
image: ghcr.io/aiven-open/karapace:develop
Expand Down
5 changes: 5 additions & 0 deletions src/karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class KarapaceTags(BaseModel):

class KarapaceTelemetry(BaseModel):
otel_endpoint_url: str | None = None
resource_service_name: str = "karapace"
resource_service_instance_id: str = "karapace"
resource_telemetry_sdk_name: str = "opentelemetry"
resource_telemetry_sdk_language: str = "python"
resource_telemetry_sdk_version: str = "1.27.0"


class Config(BaseSettings):
Expand Down
2 changes: 1 addition & 1 deletion src/karapace/offset_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self) -> None:

@inject
def greatest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int:
with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.greatest_offset)):
with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.greatest_offset)):
return self._greatest_offset

def offset_seen(self, new_offset: int) -> None:
Expand Down
15 changes: 7 additions & 8 deletions src/schema_registry/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
import uvicorn

if __name__ == "__main__":
container = KarapaceContainer()
container.wire(
karapace_container = KarapaceContainer()
karapace_container.wire(
modules=[
__name__,
schema_registry.controller,
schema_registry.telemetry.tracer,
]
)

telemetry_container = TelemetryContainer()
telemetry_container = TelemetryContainer(karapace_container=karapace_container)
telemetry_container.wire(
modules=[
schema_registry.telemetry.setup,
Expand All @@ -48,7 +48,7 @@
)

schema_registry_container = SchemaRegistryContainer(
karapace_container=container, telemetry_container=telemetry_container
karapace_container=karapace_container, telemetry_container=telemetry_container
)
schema_registry_container.wire(
modules=[
Expand All @@ -66,7 +66,6 @@
]
)

app = create_karapace_application(config=container.config(), lifespan=karapace_schema_registry_lifespan)
uvicorn.run(
app, host=container.config().host, port=container.config().port, log_level=container.config().log_level.lower()
)
config = karapace_container.config()
app = create_karapace_application(config=config, lifespan=karapace_schema_registry_lifespan)
uvicorn.run(app, host=config.host, port=config.port, log_level=config.log_level.lower())
4 changes: 2 additions & 2 deletions src/schema_registry/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def run(self) -> None:

@inject
async def is_healthy(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool:
with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.is_healthy)):
with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.is_healthy)):
if (
self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS
and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS
Expand Down Expand Up @@ -375,7 +375,7 @@ def _is_ready(self) -> bool:

@inject
def highest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int:
with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.highest_offset)):
with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.highest_offset)):
return max(self._highest_offset, self._offset_watcher.greatest_offset())

@inject
Expand Down
2 changes: 1 addition & 1 deletion src/schema_registry/routers/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def health(
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
tracer: Tracer = Depends(Provide[SchemaRegistryContainer.telemetry_container.tracer]),
) -> HealthCheck:
with tracer.get_tracer().start_span("health_check_api_handler_GET") as health_check_span:
with tracer.get_tracer().start_as_current_span("APIRouter: health_check") as health_check_span:
starttime = 0.0

health_check_span.add_event("Checking schema-reader is ready")
Expand Down
20 changes: 19 additions & 1 deletion src/schema_registry/telemetry/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,28 @@
"""

from dependency_injector import containers, providers
from karapace.config import Config
from karapace.container import KarapaceContainer
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.semconv.attributes import telemetry_attributes as T
from schema_registry.telemetry.tracer import Tracer


def create_tracing_resource(config: Config) -> Resource:
return Resource.create(
{
"service.name": config.telemetry.resource_service_name,
"service.instance.id": config.telemetry.resource_service_instance_id,
T.TELEMETRY_SDK_NAME: config.telemetry.resource_telemetry_sdk_name,
T.TELEMETRY_SDK_LANGUAGE: config.telemetry.resource_telemetry_sdk_language,
T.TELEMETRY_SDK_VERSION: config.telemetry.resource_telemetry_sdk_version,
}
)


class TelemetryContainer(containers.DeclarativeContainer):
tracer_provider = providers.Singleton(TracerProvider)
karapace_container = providers.Container(KarapaceContainer)
tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config)
tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource)
tracer = providers.Singleton(Tracer)
36 changes: 21 additions & 15 deletions src/schema_registry/telemetry/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor
from opentelemetry.semconv.attributes import (
client_attributes as C,
http_attributes as H,
server_attributes as S,
url_attributes as U,
)
from opentelemetry.trace.span import Span
from typing import Any

Expand Down Expand Up @@ -47,22 +53,22 @@ def add_span_attribute(span: Span, key: str, value: str | int) -> None:
@staticmethod
def update_span_with_request(request: Request, span: Span) -> None:
if span.is_recording():
span.set_attribute("server.scheme", request.url.scheme)
span.set_attribute("server.hostname", request.url.hostname or "")
span.set_attribute("server.port", request.url.port or "")
span.set_attribute("server.is_secure", request.url.is_secure)
span.set_attribute("request.http.method", request.method)
span.set_attribute("request.http.path", request.url.path)
span.set_attribute("request.http.client.host", request.client.host or "" if request.client else "")
span.set_attribute("request.http.client.port", request.client.port or "" if request.client else "")
span.set_attribute("request.http.headers.connection", request.headers.get("connection", ""))
span.set_attribute("request.http.headers.user_agent", request.headers.get("user-agent", ""))
span.set_attribute("request.http.headers.content_type", request.headers.get("content-type", ""))
span.set_attribute(C.CLIENT_ADDRESS, request.client.host or "" if request.client else "")
span.set_attribute(C.CLIENT_PORT, request.client.port or "" if request.client else "")
span.set_attribute(S.SERVER_ADDRESS, request.url.hostname or "")
span.set_attribute(S.SERVER_PORT, request.url.port or "")
span.set_attribute(U.URL_SCHEME, request.url.scheme)
span.set_attribute(U.URL_PATH, request.url.path)
span.set_attribute(H.HTTP_REQUEST_METHOD, request.method)
span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.connection", request.headers.get("connection", ""))
span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.user_agent", request.headers.get("user-agent", ""))
span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.content_type", request.headers.get("content-type", ""))

@staticmethod
def update_span_with_response(response: Response, span: Span) -> None:
if span.is_recording():
span.set_attribute("response.http.status_code", response.status_code)
span.set_attribute("response.http.media_type", response.media_type or "")
span.set_attribute("response.http.headers.content_type", response.headers.get("content-type", ""))
span.set_attribute("response.http.headers.content_length", response.headers.get("content-length", ""))
span.set_attribute(H.HTTP_RESPONSE_STATUS_CODE, response.status_code)
span.set_attribute(f"{H.HTTP_RESPONSE_HEADER_TEMPLATE}.content_type", response.headers.get("content-type", ""))
span.set_attribute(
f"{H.HTTP_RESPONSE_HEADER_TEMPLATE}.content_length", response.headers.get("content-length", "")
)
63 changes: 62 additions & 1 deletion tests/unit/schema_registry/telemetry/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
See LICENSE for details
"""

from fastapi import Request, Response
from karapace.config import KarapaceTelemetry
from karapace.container import KarapaceContainer
from opentelemetry.sdk.trace.export import SpanProcessor
from opentelemetry.trace.span import Span
from schema_registry.telemetry.tracer import Tracer
from unittest.mock import patch
from unittest.mock import call, MagicMock, patch


def test_tracer(karapace_container: KarapaceContainer):
Expand Down Expand Up @@ -55,3 +57,62 @@ def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceCo
processor: SpanProcessor = Tracer.get_span_processor(config=karapace_container.config())
mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value)
assert processor is mock_simple_span_processor.return_value


def test_add_span_attribute():
span = MagicMock(spec=Span)

# Test when span is not recording
span.is_recording.return_value = False
Tracer.add_span_attribute(span=span, key="key", value="value")
assert not span.set_attribute.called

# Test when span is recording
span.is_recording.return_value = True
Tracer.add_span_attribute(span=span, key="key", value="value")
span.set_attribute.assert_called_once_with("key", "value")


def test_update_span_with_request():
span = MagicMock(spec=Span)
span.is_recording.return_value = True

request = MagicMock(spec=Request)
request.headers = {"content-type": "application/json", "connection": "keep-alive", "user-agent": "pytest"}
request.method = "GET"
request.url = MagicMock(port=8081, scheme="http", path="/test", hostname="server")
request.client = MagicMock(host="client", port=8080)

Tracer.update_span_with_request(request=request, span=span)
span.set_attribute.assert_has_calls(
[
call("client.address", "client"),
call("client.port", 8080),
call("server.address", "server"),
call("server.port", 8081),
call("url.scheme", "http"),
call("url.path", "/test"),
call("http.request.method", "GET"),
call("http.request.header.connection", "keep-alive"),
call("http.request.header.user_agent", "pytest"),
call("http.request.header.content_type", "application/json"),
]
)


def test_update_span_with_response():
span = MagicMock(spec=Span)

response = MagicMock(spec=Response)
response.status_code = 200
response.headers = {"content-type": "application/json", "content-length": 8}

span.is_recording.return_value = True
Tracer.update_span_with_response(response=response, span=span)
span.set_attribute.assert_has_calls(
[
call("http.response.status_code", 200),
call("http.response.header.content_type", "application/json"),
call("http.response.header.content_length", 8),
]
)

0 comments on commit f2c52ed

Please sign in to comment.