Skip to content

Commit

Permalink
Merge pull request #885 from Aiven-Open/nosahama/resolve-minus-1-as-l…
Browse files Browse the repository at this point in the history
…atest-version

refactor: consolidate schema version resolution and validation logic
  • Loading branch information
eliax1996 authored Jun 5, 2024
2 parents 15698cf + c36b130 commit a189931
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 115 deletions.
14 changes: 10 additions & 4 deletions karapace/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.schema_references import Referents
from karapace.typing import ResolvedVersion

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from karapace.schema_references import Referents
from karapace.typing import Version


class VersionNotFoundException(Exception):
Expand Down Expand Up @@ -55,10 +61,10 @@ class SubjectNotSoftDeletedException(Exception):


class ReferenceExistsException(Exception):
def __init__(self, referenced_by: Referents, version: ResolvedVersion) -> None:
def __init__(self, referenced_by: Referents, version: Version) -> None:
super().__init__()
self.version = version
self.referenced_by = referenced_by
self.version = version


class SubjectSoftDeletedException(Exception):
Expand Down
26 changes: 13 additions & 13 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from __future__ import annotations

from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_models import SchemaVersion, TypedSchema, Versioner
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.typing import SchemaId, Subject, Version
from threading import Lock, RLock
from typing import Iterable, Sequence

Expand All @@ -20,7 +20,7 @@

@dataclass
class SubjectData:
schemas: dict[ResolvedVersion, SchemaVersion] = field(default_factory=dict)
schemas: dict[Version, SchemaVersion] = field(default_factory=dict)
compatibility: str | None = None


Expand All @@ -31,7 +31,7 @@ def __init__(self) -> None:
self.subjects: dict[Subject, SubjectData] = {}
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
self.referenced_by: dict[tuple[Subject, Version], Referents] = {}

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -100,15 +100,15 @@ def _delete_subject_from_schema_id_on_subject(self, *, subject: Subject) -> None
def _get_from_hash_cache(self, *, typed_schema: TypedSchema) -> TypedSchema:
return self._hash_to_schema.setdefault(typed_schema.fingerprint(), typed_schema)

def get_next_version(self, *, subject: Subject) -> ResolvedVersion:
return ResolvedVersion(max(self.subjects[subject].schemas) + 1)
def get_next_version(self, *, subject: Subject) -> Version:
return Versioner.V(max(self.subjects[subject].schemas) + 1)

def insert_schema_version(
self,
*,
subject: Subject,
schema_id: SchemaId,
version: ResolvedVersion,
version: Version,
deleted: bool,
schema: TypedSchema,
references: Sequence[Reference] | None,
Expand Down Expand Up @@ -217,19 +217,19 @@ def find_subjects(self, *, include_deleted: bool) -> list[Subject]:
subject for subject in self.subjects if self.find_subject_schemas(subject=subject, include_deleted=False)
]

def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[ResolvedVersion, SchemaVersion]:
def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[Version, SchemaVersion]:
if subject not in self.subjects:
return {}
if include_deleted:
return self.subjects[subject].schemas
with self.schema_lock_thread:
return {
version_id: schema_version
Versioner.V(version_id): schema_version
for version_id, schema_version in self.subjects[subject].schemas.items()
if schema_version.deleted is False
}

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
def delete_subject(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
for schema_version in self.subjects[subject].schemas.values():
if schema_version.version <= version:
Expand All @@ -241,7 +241,7 @@ def delete_subject_hard(self, *, subject: Subject) -> None:
del self.subjects[subject]
self._delete_subject_from_schema_id_on_subject(subject=subject)

def delete_subject_schema(self, *, subject: Subject, version: ResolvedVersion) -> None:
def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
self.subjects[subject].schemas.pop(version, None)

Expand All @@ -263,15 +263,15 @@ def num_schema_versions(self) -> tuple[int, int]:
soft_deleted_versions += 1
return (live_versions, soft_deleted_versions)

def insert_referenced_by(self, *, subject: Subject, version: ResolvedVersion, schema_id: SchemaId) -> None:
def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
with self.schema_lock_thread:
referents = self.referenced_by.get((subject, version), None)
if referents:
referents.append(schema_id)
else:
self.referenced_by[(subject, version)] = Referents([schema_id])

def get_referenced_by(self, subject: Subject, version: ResolvedVersion) -> Referents | None:
def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
with self.schema_lock_thread:
return self.referenced_by.get((subject, version), None)

Expand Down
36 changes: 33 additions & 3 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from jsonschema import Draft7Validator
from jsonschema.exceptions import SchemaError
from karapace.dependency import Dependency
from karapace.errors import InvalidSchema
from karapace.errors import InvalidSchema, InvalidVersion, VersionNotFoundException
from karapace.protobuf.exception import (
Error as ProtobufError,
IllegalArgumentException,
Expand All @@ -23,7 +23,7 @@
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_references import Reference
from karapace.schema_type import SchemaType
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonObject, SchemaId, Subject, Version, VersionTag
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Dict, Final, final, Mapping, Sequence

Expand Down Expand Up @@ -383,8 +383,38 @@ def parse(
@dataclass
class SchemaVersion:
subject: Subject
version: ResolvedVersion
version: Version
deleted: bool
schema_id: SchemaId
schema: TypedSchema
references: Sequence[Reference] | None


class Versioner:
@classmethod
def V(cls, tag: VersionTag) -> Version:
cls.validate_tag(tag=tag)
return Version(version=cls.resolve_tag(tag))

@classmethod
def validate_tag(cls, tag: VersionTag) -> None:
try:
version = cls.resolve_tag(tag=tag)
if (version < Version.MINUS_1_VERSION_TAG) or (version == 0):
raise InvalidVersion(f"Invalid version {tag}")
except ValueError as exc:
if tag != Version.LATEST_VERSION_TAG:
raise InvalidVersion(f"Invalid version {tag}") from exc

@staticmethod
def resolve_tag(tag: VersionTag) -> int:
return Version.MINUS_1_VERSION_TAG if tag == Version.LATEST_VERSION_TAG else int(tag)

@staticmethod
def from_schema_versions(schema_versions: Mapping[Version, SchemaVersion], version: Version) -> Version:
max_version = max(schema_versions)
if version.is_latest:
return max_version
if version in schema_versions and version <= max_version:
return version
raise VersionNotFoundException()
4 changes: 2 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonObject, SchemaId, Subject, Version
from karapace.utils import json_decode, JSONDecodeError
from threading import Event, Thread
from typing import Final, Mapping, Sequence
Expand Down Expand Up @@ -602,7 +602,7 @@ def remove_referenced_by(
def get_referenced_by(
self,
subject: Subject,
version: ResolvedVersion,
version: Version,
) -> Referents | None:
return self.database.get_referenced_by(subject, version)

Expand Down
12 changes: 6 additions & 6 deletions karapace/schema_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version
from typing import cast, List, Mapping, NewType, TypeVar

Referents = NewType("Referents", List[SchemaId])
Expand Down Expand Up @@ -36,7 +36,7 @@ class LatestVersionReference:
name: str
subject: Subject

def resolve(self, version: ResolvedVersion) -> Reference:
def resolve(self, version: Version) -> Reference:
return Reference(
name=self.name,
subject=self.subject,
Expand All @@ -48,10 +48,10 @@ def resolve(self, version: ResolvedVersion) -> Reference:
class Reference:
name: str
subject: Subject
version: ResolvedVersion
version: Version

def __post_init__(self) -> None:
assert self.version != -1
assert self.version != Version.MINUS_1_VERSION_TAG

def __repr__(self) -> str:
return f"{{name='{self.name}', subject='{self.subject}', version={self.version}}}"
Expand All @@ -68,7 +68,7 @@ def from_dict(data: JsonObject) -> Reference:
return Reference(
name=str(data["name"]),
subject=Subject(str(data["subject"])),
version=ResolvedVersion(cast(int, data["version"])),
version=Version(cast(int, data["version"])),
)


Expand All @@ -88,6 +88,6 @@ def reference_from_mapping(
else Reference(
name=name,
subject=subject,
version=ResolvedVersion(version),
version=Version(version),
)
)
Loading

0 comments on commit a189931

Please sign in to comment.