Skip to content

Commit

Permalink
test: add a test that detect duplicate ids in a backup of thei `_sche…
Browse files Browse the repository at this point in the history
…mas` topic
  • Loading branch information
eliax1996 committed Sep 25, 2024
1 parent 6be6b46 commit a5b4634
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 5 deletions.
118 changes: 116 additions & 2 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema, Versioner
from karapace.schema_references import Reference, Referents
Expand All @@ -24,7 +25,120 @@ class SubjectData:
compatibility: str | None = None


class InMemoryDatabase:
class KarapaceDatabase(ABC):
@abstractmethod
def get_schema_id(self, new_schema: TypedSchema) -> SchemaId:
pass

@abstractmethod
def get_schema_id_if_exists(
self,
*,
subject: Subject,
schema: TypedSchema,
include_deleted: bool,
) -> SchemaId | None:
pass

@abstractmethod
def get_next_version(self, *, subject: Subject) -> Version:
pass

@abstractmethod
def insert_schema_version(
self,
*,
subject: Subject,
schema_id: SchemaId,
version: Version,
deleted: bool,
schema: TypedSchema,
references: Sequence[Reference] | None,
) -> None:
pass

@abstractmethod
def insert_subject(self, *, subject: Subject) -> None:
pass

@abstractmethod
def get_subject_compatibility(self, *, subject: Subject) -> str | None:
pass

@abstractmethod
def delete_subject_compatibility(self, *, subject: Subject) -> None:
pass

@abstractmethod
def set_subject_compatibility(self, *, subject: Subject, compatibility: str) -> None:
pass

@abstractmethod
def find_schema(self, *, schema_id: SchemaId) -> TypedSchema | None:
pass

@abstractmethod
def find_schemas(self, *, include_deleted: bool, latest_only: bool) -> dict[Subject, list[SchemaVersion]]:
pass

@abstractmethod
def subjects_for_schema(self, schema_id: SchemaId) -> list[Subject]:
pass

@abstractmethod
def find_schema_versions_by_schema_id(self, *, schema_id: SchemaId, include_deleted: bool) -> list[SchemaVersion]:
pass

@abstractmethod
def find_subject(self, *, subject: Subject) -> Subject | None:
pass

@abstractmethod
def find_subjects(self, *, include_deleted: bool) -> list[Subject]:
pass

@abstractmethod
def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[Version, SchemaVersion]:
pass

@abstractmethod
def delete_subject(self, *, subject: Subject, version: Version) -> None:
pass

@abstractmethod
def delete_subject_hard(self, *, subject: Subject) -> None:
pass

@abstractmethod
def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
pass

@abstractmethod
def num_schemas(self) -> int:
pass

@abstractmethod
def num_subjects(self) -> int:
pass

@abstractmethod
def num_schema_versions(self) -> tuple[int, int]:
pass

@abstractmethod
def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
pass

@abstractmethod
def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
pass

@abstractmethod
def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
pass


class InMemoryDatabase(KarapaceDatabase):
def __init__(self) -> None:
self.global_schema_id = SchemaId(0)
self.id_lock_thread = Lock()
Expand Down Expand Up @@ -76,7 +190,7 @@ def get_schema_id_if_exists(
*,
subject: Subject,
schema: TypedSchema,
include_deleted: bool, # pylint: disable=unused-argument
include_deleted: bool,
) -> SchemaId | None:
subject_fingerprints = self._hash_to_schema_id_on_subject.get(subject)
if subject_fingerprints:
Expand Down
7 changes: 5 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema, InvalidVersion, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.in_memory_database import KarapaceDatabase
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import KafkaConsumer
Expand Down Expand Up @@ -127,7 +127,7 @@ def __init__(
config: Config,
offset_watcher: OffsetWatcher,
key_formatter: KeyFormatter,
database: InMemoryDatabase,
database: KarapaceDatabase,
master_coordinator: MasterCoordinator | None = None,
) -> None:
Thread.__init__(self, name="schema-reader")
Expand Down Expand Up @@ -349,6 +349,9 @@ def handle_messages(self) -> None:
if are_we_master is True:
watch_offsets = True

self.consume_messages(msgs, watch_offsets)

def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None:
schema_records_processed_keymode_canonical = 0
schema_records_processed_keymode_deprecated_karapace = 0
for msg in msgs:
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_data/schemas.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"keytype":"CONFIG","subject":null,"magic":0} {"compatibilityLevel":"NONE"} 1671533077776
{"keytype":"SCHEMA","subject":"wow.first.subject","version":1,"magic":1} {"deleted": false, "id": 1, "schema": "{\"type\":\"record\",\"name\":\"foo\",\"namespace\":\"awesomity\",\"fields\":[{\"name\":\"coffee\",\"type\":\"string\"}], "subject": "wow.first.subject", "version": 1} 1671534130996
{"keytype":"SCHEMA","subject":"omg.even.better.a.second.subject","version":1,"magic":1} {"deleted": false, "id": 1, "schema": "{\"type\":\"record\",\"name\":\"foo\",\"namespace\":\"awesomity\",\"fields\":[{\"name\":\"pasta\",\"type\":\"string\"}], "subject": "omg.even.better.a.second.subject", "version": 1} 1671534131365
Loading

0 comments on commit a5b4634

Please sign in to comment.