Skip to content

Commit

Permalink
di: do not wire core karapace
Browse files Browse the repository at this point in the history
  • Loading branch information
nosahama committed Dec 20, 2024
1 parent 092e3c8 commit db3c49d
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 39 deletions.
7 changes: 2 additions & 5 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
from aiokafka.errors import KafkaConnectionError
from aiokafka.helpers import create_ssl_context
from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest
from dependency_injector.wiring import inject, Provide
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.tracer import Tracer
from threading import Thread
from typing import Final
Expand All @@ -40,8 +38,7 @@ class MasterCoordinator:
5 milliseconds.
"""

@inject
def __init__(self, config: Config, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> None:
def __init__(self, config: Config) -> None:
super().__init__()
self._config: Final = config
self._kafka_client: AIOKafkaClient | None = None
Expand All @@ -50,7 +47,7 @@ def __init__(self, config: Config, tracer: Tracer = Provide[TelemetryContainer.t
self._thread: Thread = Thread(target=self._start_loop, daemon=True)
self._loop: asyncio.AbstractEventLoop | None = None
self._schema_reader_stopper: SchemaReaderStoppper | None = None
self.tracer = tracer
self.tracer = Tracer()

def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None:
self._schema_reader_stopper = schema_reader_stopper
Expand Down
11 changes: 11 additions & 0 deletions src/karapace/kafka/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@
from karapace.constants import TOPIC_CREATION_TIMEOUT_S
from karapace.kafka.common import (
_KafkaConfigMixin,
KafkaClientParams,
raise_from_kafkaexception,
single_futmap_result,
UnknownTopicOrPartitionError,
)
from schema_registry.telemetry.tracer import Tracer
from typing_extensions import Unpack


class KafkaAdminClient(_KafkaConfigMixin, AdminClient):
def __init__(
self,
bootstrap_servers: Iterable[str] | str,
**params: Unpack[KafkaClientParams],
) -> None:
self.tracer = Tracer()
super().__init__(bootstrap_servers, **params)

def new_topic(
self,
name: str,
Expand Down
6 changes: 0 additions & 6 deletions src/karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
from collections.abc import Callable, Iterable
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
from dependency_injector.wiring import inject, Provide
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.tracer import Tracer
from typing import Any, Literal, NoReturn, Protocol, TypedDict, TypeVar
from typing_extensions import Unpack

Expand Down Expand Up @@ -125,17 +122,14 @@ class _KafkaConfigMixin:
extract configuration, initialization and connection verification.
"""

@inject
def __init__(
self,
bootstrap_servers: Iterable[str] | str,
verify_connection: bool = True,
tracer: Tracer = Provide[TelemetryContainer.tracer],
**params: Unpack[KafkaClientParams],
) -> None:
self._errors: set[KafkaError] = set()
self.log = logging.getLogger(f"{self.__module__}.{self.__class__.__qualname__}")
self.tracer = tracer

super().__init__(self._get_config_from_params(bootstrap_servers, **params)) # type: ignore[call-arg]
self._activate_callbacks()
Expand Down
8 changes: 3 additions & 5 deletions src/karapace/offset_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from dependency_injector.wiring import inject, Provide
from schema_registry.telemetry.container import TelemetryContainer

from schema_registry.telemetry.tracer import Tracer
from threading import Condition

Expand All @@ -17,13 +16,12 @@ class OffsetWatcher:
correct as long as no unclean leader election is performed.
"""

@inject
def __init__(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> None:
def __init__(self) -> None:
# Condition used to protected _greatest_offset, any modifications to that object must
# be performed with this condition acquired
self._condition = Condition()
self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever.
self.tracer = tracer
self.tracer = Tracer()

def greatest_offset(self) -> int:
with self.tracer.get_tracer().start_as_current_span(
Expand Down
8 changes: 0 additions & 8 deletions src/schema_registry/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan
from schema_registry.telemetry.container import TelemetryContainer

import karapace.coordinator.master_coordinator
import karapace.kafka.common
import karapace.offset_watcher
import schema_registry.controller
import schema_registry.factory
import schema_registry.reader
import schema_registry.routers.compatibility
import schema_registry.routers.config
import schema_registry.routers.health
Expand Down Expand Up @@ -42,10 +38,6 @@
modules=[
schema_registry.telemetry.setup,
schema_registry.telemetry.middleware,
schema_registry.reader,
karapace.offset_watcher,
karapace.coordinator.master_coordinator,
karapace.kafka.common,
]
)

Expand Down
File renamed without changes.
6 changes: 1 addition & 5 deletions src/schema_registry/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from collections.abc import Mapping, Sequence
from confluent_kafka import Message, TopicCollection, TopicPartition
from contextlib import closing, ExitStack
from dependency_injector.wiring import inject, Provide
from enum import Enum
from jsonschema.validators import Draft7Validator
from karapace import constants
Expand All @@ -46,7 +45,6 @@
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version
from karapace.utils import json_decode, JSONDecodeError, shutdown
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.tracer import Tracer
from threading import Event, Lock, Thread
from typing import Final
Expand Down Expand Up @@ -133,15 +131,13 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient:


class KafkaSchemaReader(Thread, SchemaReaderStoppper):
@inject
def __init__(
self,
config: Config,
offset_watcher: OffsetWatcher,
key_formatter: KeyFormatter,
database: KarapaceDatabase,
master_coordinator: MasterCoordinator | None = None,
tracer: Tracer = Provide[TelemetryContainer.tracer],
) -> None:
Thread.__init__(self, name="schema-reader")
self.master_coordinator = master_coordinator
Expand All @@ -156,7 +152,7 @@ def __init__(
self._offset_watcher = offset_watcher
self.stats = StatsClient(config=config)
self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config)
self.tracer = tracer
self.tracer = Tracer()

# Thread synchronization objects
# - offset is used by the REST API to wait until this thread has
Expand Down
4 changes: 3 additions & 1 deletion src/schema_registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
)
from karapace.in_memory_database import InMemoryDatabase
from karapace.key_format import KeyFormatter
from karapace.messaging import KarapaceProducer
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner
from karapace.schema_references import LatestVersionReference, Reference
from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version
from schema_registry.messaging import KarapaceProducer
from schema_registry.reader import KafkaSchemaReader
from schema_registry.telemetry.tracer import Tracer

import asyncio
import logging
Expand All @@ -43,6 +44,7 @@ class KarapaceSchemaRegistry:
def __init__(self, config: Config) -> None:
# TODO: compatibility was previously in mutable dict, fix the runtime config to be distinct from static config.
self.config = config
self.tracer = Tracer()
self._key_formatter = KeyFormatter()

offset_watcher = OffsetWatcher()
Expand Down
8 changes: 0 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@
from tempfile import mkstemp

import json
import karapace.coordinator.master_coordinator
import karapace.kafka.common
import karapace.offset_watcher
import os
import pytest
import re
import schema_registry.controller
import schema_registry.reader
import schema_registry.telemetry.middleware
import schema_registry.telemetry.setup
import schema_registry.telemetry.tracer
Expand Down Expand Up @@ -208,12 +204,8 @@ def fixture_telemetry_container() -> TelemetryContainer:
telemetry_container = TelemetryContainer()
telemetry_container.wire(
modules=[
schema_registry.reader,
schema_registry.telemetry.setup,
schema_registry.telemetry.middleware,
karapace.offset_watcher,
karapace.coordinator.master_coordinator,
karapace.kafka.common,
]
)
return telemetry_container
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/schema_registry/telemetry/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceConta


def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceContainer) -> None:
config = karapace_container.config().set_config_defaults(
new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)}
)
with (
patch("schema_registry.telemetry.tracer.ConsoleSpanExporter") as mock_console_exporter,
patch("schema_registry.telemetry.tracer.SimpleSpanProcessor") as mock_simple_span_processor,
):
processor: SpanProcessor = Tracer.get_span_processor(config=karapace_container.config())
processor: SpanProcessor = Tracer.get_span_processor(config=config)
mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value)
assert processor is mock_simple_span_processor.return_value

Expand Down
4 changes: 4 additions & 0 deletions tests/unit/schema_registry/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryC
user=None,
authorizer=None,
)
with pytest.raises(HTTPResponse):
# prevent `future exception was never retrieved` warning logs
# future: <Future finished exception=HTTPResponse(status=200 body={'mock': 'response'})>
await mock_forward_func_future

0 comments on commit db3c49d

Please sign in to comment.