diff --git a/README.rst b/README.rst index f6e8a6736..0cc7489e4 100644 --- a/README.rst +++ b/README.rst @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke http://localhost:8081/compatibility/subjects/test-key/versions/latest {"is_compatible":true} +NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the +compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done +when trying to register the new schema through the `subjects//versions` endpoint. + Get current global backwards compatibility setting value:: $ curl -X GET http://localhost:8081/config diff --git a/src/karapace/compatibility/__init__.py b/src/karapace/compatibility/__init__.py index e5f61e710..3984ed9f5 100644 --- a/src/karapace/compatibility/__init__.py +++ b/src/karapace/compatibility/__init__.py @@ -4,22 +4,7 @@ Copyright (c) 2019 Aiven Ltd See LICENSE for details """ -from avro.compatibility import ( - merge, - ReaderWriterCompatibilityChecker as AvroChecker, - SchemaCompatibilityResult, - SchemaCompatibilityType, - SchemaIncompatibilityType, -) -from avro.schema import Schema as AvroSchema from enum import Enum, unique -from jsonschema import Draft7Validator -from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema -from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility -from karapace.protobuf.schema import ProtobufSchema -from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema -from karapace.schema_reader import SchemaType -from karapace.utils import assert_never import logging @@ -54,121 +39,3 @@ def is_transitive(self) -> bool: "FULL_TRANSITIVE", } return self.value in TRANSITIVE_MODES - - -def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult: - return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema) - - -def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult: - return jsonschema_compatibility(reader, writer) - - -def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult: - return check_protobuf_schema_compatibility(reader, writer) - - -def check_compatibility( - old_schema: ParsedTypedSchema, - new_schema: ValidatedTypedSchema, - compatibility_mode: CompatibilityModes, -) -> SchemaCompatibilityResult: - """Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`.""" - if compatibility_mode is CompatibilityModes.NONE: - LOG.info("Compatibility level set to NONE, no schema compatibility checks performed") - return SchemaCompatibilityResult(SchemaCompatibilityType.compatible) - - if old_schema.schema_type is not new_schema.schema_type: - return incompatible_schema( - incompat_type=SchemaIncompatibilityType.type_mismatch, - message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}", - location=[], - ) - - if old_schema.schema_type is SchemaType.AVRO: - assert isinstance(old_schema.schema, AvroSchema) - assert isinstance(new_schema.schema, AvroSchema) - if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: - result = check_avro_compatibility( - reader_schema=new_schema.schema, - writer_schema=old_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: - result = check_avro_compatibility( - reader_schema=old_schema.schema, - writer_schema=new_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: - result = check_avro_compatibility( - reader_schema=new_schema.schema, - writer_schema=old_schema.schema, - ) - result = merge( - result, - check_avro_compatibility( - reader_schema=old_schema.schema, - writer_schema=new_schema.schema, - ), - ) - - elif old_schema.schema_type is SchemaType.JSONSCHEMA: - assert isinstance(old_schema.schema, Draft7Validator) - assert isinstance(new_schema.schema, Draft7Validator) - if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: - result = check_jsonschema_compatibility( - reader=new_schema.schema, - writer=old_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: - result = check_jsonschema_compatibility( - reader=old_schema.schema, - writer=new_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: - result = check_jsonschema_compatibility( - reader=new_schema.schema, - writer=old_schema.schema, - ) - result = merge( - result, - check_jsonschema_compatibility( - reader=old_schema.schema, - writer=new_schema.schema, - ), - ) - - elif old_schema.schema_type is SchemaType.PROTOBUF: - assert isinstance(old_schema.schema, ProtobufSchema) - assert isinstance(new_schema.schema, ProtobufSchema) - if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: - result = check_protobuf_compatibility( - reader=new_schema.schema, - writer=old_schema.schema, - ) - elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: - result = check_protobuf_compatibility( - reader=old_schema.schema, - writer=new_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: - result = check_protobuf_compatibility( - reader=new_schema.schema, - writer=old_schema.schema, - ) - result = merge( - result, - check_protobuf_compatibility( - reader=old_schema.schema, - writer=new_schema.schema, - ), - ) - - else: - assert_never(f"Unknown schema_type {old_schema.schema_type}") - - return result diff --git a/src/karapace/compatibility/schema_compatibility.py b/src/karapace/compatibility/schema_compatibility.py new file mode 100644 index 000000000..07e059d50 --- /dev/null +++ b/src/karapace/compatibility/schema_compatibility.py @@ -0,0 +1,138 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from avro.compatibility import ( + merge, + ReaderWriterCompatibilityChecker as AvroChecker, + SchemaCompatibilityResult, + SchemaCompatibilityType, + SchemaIncompatibilityType, +) +from avro.schema import Schema as AvroSchema +from jsonschema import Draft7Validator +from karapace.compatibility import CompatibilityModes +from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema +from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility +from karapace.protobuf.schema import ProtobufSchema +from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema +from karapace.schema_type import SchemaType +from karapace.utils import assert_never + +import logging + +LOG = logging.getLogger(__name__) + + +class SchemaCompatibility: + @staticmethod + def check_compatibility( + old_schema: ParsedTypedSchema, + new_schema: ValidatedTypedSchema, + compatibility_mode: CompatibilityModes, + ) -> SchemaCompatibilityResult: + """Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`.""" + + if compatibility_mode is CompatibilityModes.NONE: + LOG.info("Compatibility level set to NONE, no schema compatibility checks performed") + return SchemaCompatibilityResult(SchemaCompatibilityType.compatible) + + if old_schema.schema_type is not new_schema.schema_type: + return incompatible_schema( + incompat_type=SchemaIncompatibilityType.type_mismatch, + message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}", + location=[], + ) + + if old_schema.schema_type is SchemaType.AVRO: + assert isinstance(old_schema.schema, AvroSchema) + assert isinstance(new_schema.schema, AvroSchema) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=new_schema.schema, + writer_schema=old_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=old_schema.schema, + writer_schema=new_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=new_schema.schema, + writer_schema=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_avro_compatibility( + reader_schema=old_schema.schema, + writer_schema=new_schema.schema, + ), + ) + elif old_schema.schema_type is SchemaType.JSONSCHEMA: + assert isinstance(old_schema.schema, Draft7Validator) + assert isinstance(new_schema.schema, Draft7Validator) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_jsonschema_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ), + ) + elif old_schema.schema_type is SchemaType.PROTOBUF: + assert isinstance(old_schema.schema, ProtobufSchema) + assert isinstance(new_schema.schema, ProtobufSchema) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ) + + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_protobuf_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ), + ) + else: + assert_never(f"Unknown schema_type {old_schema.schema_type}") + + return result + + @staticmethod + def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult: + return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema) + + @staticmethod + def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult: + return jsonschema_compatibility(reader, writer) + + @staticmethod + def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult: + return check_protobuf_schema_compatibility(reader, writer) diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 67c0fc899..67f58fddd 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -4,10 +4,12 @@ """ from __future__ import annotations +from avro.compatibility import SchemaCompatibilityResult, SchemaCompatibilityType from collections.abc import Sequence from contextlib import AsyncExitStack, closing -from karapace.compatibility import check_compatibility, CompatibilityModes +from karapace.compatibility import CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible +from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency @@ -281,7 +283,7 @@ async def subject_version_referencedby_get( return list(referenced_by) return [] - def _resolve_and_parse(self, schema: TypedSchema) -> ParsedTypedSchema: + def resolve_and_parse(self, schema: TypedSchema) -> ParsedTypedSchema: references, dependencies = self.resolve_references(schema.references) if schema.references else (None, None) return ParsedTypedSchema.parse( schema_type=schema.schema_type, @@ -325,12 +327,8 @@ async def write_new_schema_local( ) else: # First check if any of the existing schemas for the subject match - live_schema_versions = { - version_id: schema_version - for version_id, schema_version in all_schema_versions.items() - if schema_version.deleted is False - } - if not live_schema_versions: # Previous ones have been deleted by the user. + live_versions = self.get_live_versions_sorted(all_schema_versions) + if not live_versions: # Previous ones have been deleted by the user. version = self.database.get_next_version(subject=subject) schema_id = self.database.get_schema_id(new_schema) LOG.debug( @@ -351,32 +349,17 @@ async def write_new_schema_local( ) return schema_id - compatibility_mode = self.get_compatibility_mode(subject=subject) + result = self.check_schema_compatibility(new_schema, subject) - # Run a compatibility check between on file schema(s) and the one being submitted now - # the check is either towards the latest one or against all previous ones in case of - # transitive mode - schema_versions = sorted(live_schema_versions) - if compatibility_mode.is_transitive(): - check_against = schema_versions - else: - check_against = [schema_versions[-1]] - - for old_version in check_against: - parsed_old_schema = self._resolve_and_parse(all_schema_versions[old_version].schema) - result = check_compatibility( - old_schema=parsed_old_schema, - new_schema=new_schema, - compatibility_mode=compatibility_mode, + if is_incompatible(result): + LOG.warning( + "Incompatible schema: %s, incompatibilities: %s", result.compatibility, result.incompatibilities + ) + compatibility_mode = self.get_compatibility_mode(subject=subject) + raise IncompatibleSchema( + f"Incompatible schema, compatibility_mode={compatibility_mode.value}. " + f"Incompatibilities: {', '.join(result.messages)[:300]}" ) - if is_incompatible(result): - message = set(result.messages).pop() if result.messages else "" - LOG.warning( - "Incompatible schema: %s, incompatibilities: %s", result.compatibility, result.incompatibilities - ) - raise IncompatibleSchema( - f"Incompatible schema, compatibility_mode={compatibility_mode.value} {message}" - ) # We didn't find an existing schema and the schema is compatible so go and create one version = self.database.get_next_version(subject=subject) @@ -465,3 +448,48 @@ def send_delete_subject_message(self, subject: Subject, version: Version) -> Non key = {"subject": subject, "magic": 0, "keytype": "DELETE_SUBJECT"} value = {"subject": subject, "version": version.value} self.producer.send_message(key=key, value=value) + + def check_schema_compatibility( + self, + new_schema: ValidatedTypedSchema, + subject: Subject, + ) -> SchemaCompatibilityResult: + result = SchemaCompatibilityResult(SchemaCompatibilityType.compatible) + + compatibility_mode = self.get_compatibility_mode(subject=subject) + all_schema_versions: dict[Version, SchemaVersion] = self.database.find_subject_schemas( + subject=subject, include_deleted=True + ) + live_versions = self.get_live_versions_sorted(all_schema_versions) + + if not live_versions: + old_versions = [] + elif compatibility_mode.is_transitive(): + # Check against all versions + old_versions = live_versions + else: + # Only check against latest version + old_versions = [live_versions[-1]] + + for old_version in old_versions: + old_parsed_schema = self.resolve_and_parse(all_schema_versions[old_version].schema) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_parsed_schema, + new_schema=new_schema, + compatibility_mode=compatibility_mode, + ) + + if is_incompatible(result): + return result + + return result + + @staticmethod + def get_live_versions_sorted(all_schema_versions: dict[Version, SchemaVersion]) -> list[Version]: + live_schema_versions = { + version_id: schema_version + for version_id, schema_version in all_schema_versions.items() + if schema_version.deleted is False + } + return sorted(live_schema_versions) diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index 20aaabc6f..a37a3ff9f 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -9,8 +9,9 @@ from enum import Enum, unique from http import HTTPStatus from karapace.auth import HTTPAuthorizer, Operation, User -from karapace.compatibility import check_compatibility, CompatibilityModes +from karapace.compatibility import CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible +from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.config import Config from karapace.errors import ( IncompatibleSchema, @@ -34,7 +35,7 @@ from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry -from karapace.typing import JsonData, SchemaId, Subject, Version +from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version from karapace.utils import JSONDecodeError from typing import Any @@ -380,63 +381,12 @@ def _invalid_version(self, content_type, version): ) async def compatibility_check( - self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None + self, content_type: str, *, subject: Subject, version: str, request: HTTPRequest, user: User | None = None ) -> None: """Check for schema compatibility""" self._check_authorization(user, Operation.Read, f"Subject:{subject}") - body = request.json - schema_type = self._validate_schema_type(content_type=content_type, data=body) - references = self._validate_references(content_type, schema_type, body) - try: - references, new_schema_dependencies = self.schema_registry.resolve_references(references) - new_schema = ValidatedTypedSchema.parse( - schema_type=schema_type, - schema_str=body["schema"], - references=references, - dependencies=new_schema_dependencies, - use_protobuf_formatter=self.config["use_protobuf_formatter"], - ) - except InvalidSchema: - self.r( - body={ - "error_code": SchemaErrorCodes.INVALID_SCHEMA.value, - "message": f"Invalid {schema_type} schema", - }, - content_type=content_type, - status=HTTPStatus.UNPROCESSABLE_ENTITY, - ) - try: - old = self.schema_registry.subject_version_get(subject=subject, version=Versioner.V(version)) - except InvalidVersion: - self._invalid_version(content_type, version) - except (VersionNotFoundException, SchemasNotFoundException, SubjectNotFoundException): - self.r( - body={ - "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version} not found.", - }, - content_type=content_type, - status=HTTPStatus.NOT_FOUND, - ) - old_schema_type = self._validate_schema_type(content_type=content_type, data=old) - try: - old_references = old.get("references", None) - old_dependencies = None - if old_references: - old_references, old_dependencies = self.schema_registry.resolve_references(old_references) - old_schema = ParsedTypedSchema.parse(old_schema_type, old["schema"], old_references, old_dependencies) - except InvalidSchema: - self.r( - body={ - "error_code": SchemaErrorCodes.INVALID_SCHEMA.value, - "message": f"Found an invalid {old_schema_type} schema registered", - }, - content_type=content_type, - status=HTTPStatus.UNPROCESSABLE_ENTITY, - ) - try: compatibility_mode = self.schema_registry.get_compatibility_mode(subject=subject) except ValueError as ex: @@ -451,13 +401,19 @@ async def compatibility_check( status=HTTPStatus.INTERNAL_SERVER_ERROR, ) - result = check_compatibility( - old_schema=old_schema, - new_schema=new_schema, - compatibility_mode=compatibility_mode, - ) + new_schema = self.get_new_schema(request.json, content_type) + old_schema = self.get_old_schema(subject, Versioner.V(version), content_type) + if compatibility_mode.is_transitive(): + # Ignore the schema version provided in the rest api call (`version`) + # Instead check against all previous versions (including `version` if existing) + result = self.schema_registry.check_schema_compatibility(new_schema, subject) + else: + # Check against the schema version provided in the rest api call (`version`) + result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) + if is_incompatible(result): - self.r({"is_compatible": False}, content_type) + maybe_truncated_error = ", ".join(result.messages)[:300] + self.r({"is_compatible": False, "incompatibilities": maybe_truncated_error}, content_type) self.r({"is_compatible": True}, content_type) async def schemas_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None): @@ -1370,3 +1326,57 @@ def no_master_error(self, content_type: str) -> None: content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, ) + + def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedSchema: + schema_type = self._validate_schema_type(content_type=content_type, data=body) + references = self._validate_references(content_type, schema_type, body) + try: + references, new_schema_dependencies = self.schema_registry.resolve_references(references) + return ValidatedTypedSchema.parse( + schema_type=schema_type, + schema_str=body["schema"], + references=references, + dependencies=new_schema_dependencies, + use_protobuf_formatter=self.config["use_protobuf_formatter"], + ) + except InvalidSchema: + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_SCHEMA.value, + "message": f"Invalid {schema_type} schema", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + def get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema: + try: + old = self.schema_registry.subject_version_get(subject=subject, version=version) + except InvalidVersion: + self._invalid_version(content_type, version) + except (VersionNotFoundException, SchemasNotFoundException, SubjectNotFoundException): + self.r( + body={ + "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, + "message": f"Version {version} not found.", + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + old_schema_type = self._validate_schema_type(content_type=content_type, data=old) + try: + old_references = old.get("references", None) + old_dependencies = None + if old_references: + old_references, old_dependencies = self.schema_registry.resolve_references(old_references) + old_schema = ParsedTypedSchema.parse(old_schema_type, old["schema"], old_references, old_dependencies) + return old_schema + except InvalidSchema: + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_SCHEMA.value, + "message": f"Found an invalid {old_schema_type} schema registered", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) diff --git a/tests/integration/test_dependencies_compatibility_protobuf.py b/tests/integration/test_dependencies_compatibility_protobuf.py index 2bacbdf7b..725611b5c 100644 --- a/tests/integration/test_dependencies_compatibility_protobuf.py +++ b/tests/integration/test_dependencies_compatibility_protobuf.py @@ -183,7 +183,7 @@ async def test_protobuf_schema_compatibility_dependencies(registry_async_client: json={"schemaType": "PROTOBUF", "schema": evolved_schema, "references": evolved_references}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False @pytest.mark.parametrize("trail", ["", "/"]) @@ -271,7 +271,7 @@ async def test_protobuf_schema_compatibility_dependencies1(registry_async_client json={"schemaType": "PROTOBUF", "schema": evolved_schema, "references": evolved_references}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False # Do compatibility check when message field is altered from referenced type to google type @@ -339,7 +339,7 @@ async def test_protobuf_schema_compatibility_dependencies1g(registry_async_clien json={"schemaType": "PROTOBUF", "schema": evolved_schema}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False # Do compatibility check when message field is altered from google type to referenced type @@ -407,7 +407,7 @@ async def test_protobuf_schema_compatibility_dependencies1g_otherway(registry_as json={"schemaType": "PROTOBUF", "schema": evolved_schema, "references": container_references}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False @pytest.mark.parametrize("trail", ["", "/"]) @@ -491,7 +491,7 @@ async def test_protobuf_schema_compatibility_dependencies2(registry_async_client json={"schemaType": "PROTOBUF", "schema": evolved_schema, "references": evolved_references}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False SIMPLE_SCHEMA = """\ diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 546c19e0b..bb4448d80 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -332,7 +332,8 @@ async def test_compatibility_endpoint(registry_async_client: Client, trail: str) json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False + assert res.json().get("incompatibilities") == "reader type: string not compatible with writer type: int" @pytest.mark.parametrize("trail", ["", "/"]) @@ -536,7 +537,7 @@ def _test_cases(): json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": expected} + assert res.json().get("is_compatible") == expected @pytest.mark.parametrize("trail", ["", "/"]) @@ -3243,7 +3244,7 @@ async def test_schema_non_compliant_name_in_existing( json={"schema": json.dumps(evolved_schema)}, ) assert res.status_code == 200 - assert not res.json().get("is_compatible") + assert res.json().get("is_compatible") is False # Post evolved schema, should not be compatible and rejected. res = await registry_async_client.post( @@ -3253,7 +3254,10 @@ async def test_schema_non_compliant_name_in_existing( assert res.status_code == 409 assert res.json() == { "error_code": 409, - "message": "Incompatible schema, compatibility_mode=BACKWARD expected: compliant_name_test.test-schema", + "message": ( + "Incompatible schema, compatibility_mode=BACKWARD. " + "Incompatibilities: expected: compliant_name_test.test-schema" + ), } # Send compatibility configuration for subject that disabled backwards compatibility. diff --git a/tests/integration/test_schema_compatibility.py b/tests/integration/test_schema_compatibility.py new file mode 100644 index 000000000..82228ba32 --- /dev/null +++ b/tests/integration/test_schema_compatibility.py @@ -0,0 +1,235 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from collections.abc import Coroutine +from dataclasses import dataclass +from karapace.client import Client +from karapace.typing import JsonObject, Subject +from tests.base_testcase import BaseTestCase +from typing import Any, Callable, Final + +import json +import logging +import pytest + +SchemaRegitrationFunc = Callable[[Client, Subject], Coroutine[Any, Any, None]] + +LOG = logging.getLogger(__name__) + +schema_int: Final[JsonObject] = {"type": "record", "name": "schema_name", "fields": [{"type": "int", "name": "field_name"}]} +schema_long: Final[JsonObject] = { + "type": "record", + "name": "schema_name", + "fields": [{"type": "long", "name": "field_name"}], +} +schema_string: Final[JsonObject] = { + "type": "record", + "name": "schema_name", + "fields": [{"type": "string", "name": "field_name"}], +} +schema_double: Final[JsonObject] = { + "type": "record", + "name": "schema_name", + "fields": [{"type": "double", "name": "field_name"}], +} + + +@dataclass +class SchemaCompatibilityTestCase(BaseTestCase): + new_schema: str + compatibility_mode: str + register_baseline_schemas: SchemaRegitrationFunc + expected_is_compatible: bool | None + expected_status_code: int + expected_incompatibilities: str | None + + +async def _register_baseline_schemas_no_incompatibilities(registry_async_client: Client, subject: Subject) -> None: + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_int)}, + ) + assert res.status_code == 200 + + # Changing type from int to long is compatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_long)}, + ) + assert res.status_code == 200 + + +async def _register_baseline_schemas_with_incompatibilities(registry_async_client: Client, subject: Subject) -> None: + # Allow registering non backward compatible schemas + await _set_compatibility_mode(registry_async_client, subject, "NONE") + + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_string)}, + ) + assert res.status_code == 200 + + # Changing type from string to double is incompatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_double)}, + ) + assert res.status_code == 200 + + +async def _register_baseline_schemas_with_incompatibilities_and_a_deleted_schema( + registry_async_client: Client, subject: Subject +) -> None: + await _register_baseline_schemas_with_incompatibilities(registry_async_client, subject) + + # Register schema + # Changing type from double to long is incompatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_long)}, + ) + assert res.status_code == 200 + + # And delete it + res = await registry_async_client.delete(f"subjects/{subject}/versions/latest") + assert res.status_code == 200 + assert res.json() == 3 + + +async def _register_no_baseline_schemas( + registry_async_client: Client, subject: Subject # pylint: disable=unused-argument +) -> None: + pass + + +async def _set_compatibility_mode(registry_async_client: Client, subject: Subject, compatibility_mode: str) -> None: + res = await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility_mode}) + assert res.status_code == 200 + + +@pytest.mark.parametrize( + "test_case", + [ + # Case 0 + # New schema compatible with all baseline ones (int --> long, long --> long) + # Transitive mode + # --> No incompatibilities are found + SchemaCompatibilityTestCase( + test_name="case0", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_no_incompatibilities, + new_schema=json.dumps(schema_long), + expected_is_compatible=True, + expected_status_code=200, + expected_incompatibilities=None, + ), + # Case 1 + # Same as previous case, but in non-transitive mode + # --> No incompatibilities are found + SchemaCompatibilityTestCase( + test_name="case1", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_no_incompatibilities, + new_schema=json.dumps(schema_long), + expected_is_compatible=True, + expected_status_code=200, + expected_incompatibilities=None, + ), + # Case 2 + # New schema incompatible with both baseline schemas (string --> int, double --> int) + # Non-transitive mode + # --> Incompatibilies are found only against last baseline schema (double --> int) + SchemaCompatibilityTestCase( + test_name="case2", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: double", + ), + # Case 3 + # Same as previous case, but in non-transitive mode + # --> Incompatibilies are found in the first baseline schema (string --> int) + SchemaCompatibilityTestCase( + test_name="case3", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: string", + ), + # Case 4 + # Same as case 2, but with a deleted schema among baseline ones + # Non-transitive mode + # --> The delete schema is ignored + # --> Incompatibilies are found only against last baseline schema (double --> int) + SchemaCompatibilityTestCase( + test_name="case4", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: double", + ), + # Case 5 + # Same as case 3, but with a deleted schema among baseline ones + # --> The delete schema is ignored + # --> Incompatibilies are found in the first baseline schema (string --> int) + SchemaCompatibilityTestCase( + test_name="case5", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: string", + ), + # Case 6 + # A new schema and no baseline schemas + # Non-transitive mode + # --> No incompatibilities are found + # --> Status code is 404 because `latest` version to check against does not exists + SchemaCompatibilityTestCase( + test_name="case6", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_no_baseline_schemas, + new_schema=json.dumps(schema_int), + expected_is_compatible=None, + expected_status_code=404, + expected_incompatibilities=None, + ), + # Case 7 + # Same as previous case, but in non-transitive mode + # --> No incompatibilities are found + # --> Status code is 404 because `latest` version to check against does not exists + SchemaCompatibilityTestCase( + test_name="case7", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_no_baseline_schemas, + new_schema=json.dumps(schema_int), + expected_is_compatible=None, + expected_status_code=404, + expected_incompatibilities=None, + ), + ], +) +async def test_schema_compatibility(test_case: SchemaCompatibilityTestCase, registry_async_client: Client) -> None: + subject = Subject(f"subject_{test_case.test_name}") + + await test_case.register_baseline_schemas(registry_async_client, subject) + await _set_compatibility_mode(registry_async_client, subject, test_case.compatibility_mode) + + LOG.info("Validating new schema: %s", test_case.new_schema) + res = await registry_async_client.post( + f"compatibility/subjects/{subject}/versions/latest", json={"schema": test_case.new_schema} + ) + + assert res.status_code == test_case.expected_status_code + assert res.json().get("is_compatible") == test_case.expected_is_compatible + assert res.json().get("incompatibilities", None) == test_case.expected_incompatibilities diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 4b4471cb2..55825fb92 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -1123,8 +1123,8 @@ async def test_protobuf_error(registry_async_client: Client) -> None: expected=409, expected_msg=( # ACTUALLY THERE NO MESSAGE_DROP!!! - "Incompatible schema, compatibility_mode=BACKWARD " - "Incompatible modification Modification.MESSAGE_DROP found" + "Incompatible schema, compatibility_mode=BACKWARD. " + "Incompatibilities: Incompatible modification Modification.MESSAGE_DROP found" ), ) print(f"Adding new schema, subject: '{testdata.subject}'\n{testdata.schema_str}") diff --git a/tests/unit/compatibility/test_compatibility.py b/tests/unit/compatibility/test_compatibility.py index 641f7df06..af41aae99 100644 --- a/tests/unit/compatibility/test_compatibility.py +++ b/tests/unit/compatibility/test_compatibility.py @@ -3,17 +3,44 @@ See LICENSE for details """ from avro.compatibility import SchemaCompatibilityType -from karapace.compatibility import check_compatibility, CompatibilityModes +from karapace.compatibility import CompatibilityModes +from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.schema_models import SchemaType, ValidatedTypedSchema import json -def test_schema_type_can_change_when_mode_none(): +def test_schema_type_can_change_when_mode_none() -> None: avro_str = json.dumps({"type": "record", "name": "Record1", "fields": [{"name": "field1", "type": "int"}]}) - json_str = '{"type":"array"}' + json_str = '{"type": "array"}' avro_schema = ValidatedTypedSchema.parse(SchemaType.AVRO, avro_str) json_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, json_str) - result = check_compatibility(old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE) + result = SchemaCompatibility.check_compatibility( + old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE + ) assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_compatible_in_transitive_mode() -> None: + old_json = '{"type": "array", "name": "name_old"}' + new_json = '{"type": "array", "name": "name_new"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_incompatible_in_transitive_mode() -> None: + old_json = '{"type": "array"}' + new_json = '{"type": "integer"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.incompatible diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 6d850f5fc..7fcecd47e 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -14,7 +14,7 @@ import pytest -async def test_validate_schema_request_body(): +async def test_validate_schema_request_body() -> None: controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) controller._validate_schema_request_body( # pylint: disable=W0212 @@ -30,7 +30,7 @@ async def test_validate_schema_request_body(): assert str(exc_info.value) == "HTTPResponse 422" -async def test_forward_when_not_ready(): +async def test_forward_when_not_ready() -> None: with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) diff --git a/website/source/quickstart.rst b/website/source/quickstart.rst index 6e6ecdba6..f640e68d2 100644 --- a/website/source/quickstart.rst +++ b/website/source/quickstart.rst @@ -60,6 +60,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke $KARAPACE_REGISTRY_URI/compatibility/subjects/test-key/versions/latest {"is_compatible":true} +NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the +compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done +when trying to register the new schema through the `subjects//versions` endpoint. + Get current global backwards compatibility setting value:: $ curl -X GET $KARAPACE_REGISTRY_URI/config