Skip to content

Commit

Permalink
tracer: trace also KafkaAdminClient.describe_topics()
Browse files Browse the repository at this point in the history
  • Loading branch information
nosahama committed Dec 17, 2024
1 parent f2c52ed commit 87131a0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/karapace/kafka/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from collections.abc import Container, Iterable
from concurrent.futures import Future
from confluent_kafka import TopicPartition
from confluent_kafka import TopicCollection, TopicPartition
from confluent_kafka.admin import (
AdminClient,
BrokerMetadata,
Expand All @@ -20,13 +20,16 @@
TopicMetadata,
)
from confluent_kafka.error import KafkaException
from dependency_injector.wiring import inject, Provide
from karapace.constants import TOPIC_CREATION_TIMEOUT_S
from karapace.kafka.common import (
_KafkaConfigMixin,
raise_from_kafkaexception,
single_futmap_result,
UnknownTopicOrPartitionError,
)
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.tracer import Tracer


class KafkaAdminClient(_KafkaConfigMixin, AdminClient):
Expand Down Expand Up @@ -175,3 +178,10 @@ def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]:
except KafkaException as exc:
raise_from_kafkaexception(exc)
return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset}

@inject
def describe_topics(
self, topics: TopicCollection, tracer: Tracer = Provide[TelemetryContainer.tracer]
) -> dict[str, Future]:
with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.describe_topics)):
return super().describe_topics(topics)
2 changes: 2 additions & 0 deletions src/schema_registry/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from schema_registry.telemetry.container import TelemetryContainer

import karapace.coordinator.master_coordinator
import karapace.kafka.admin
import karapace.offset_watcher
import schema_registry.controller
import schema_registry.factory
Expand Down Expand Up @@ -44,6 +45,7 @@
schema_registry.reader,
karapace.offset_watcher,
karapace.coordinator.master_coordinator,
karapace.kafka.admin,
]
)

Expand Down

0 comments on commit 87131a0

Please sign in to comment.