diff --git a/README.rst b/README.rst index d1bcbd28f..f513c26f2 100644 --- a/README.rst +++ b/README.rst @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke http://localhost:8081/compatibility/subjects/test-key/versions/latest {"is_compatible":true} +NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the +compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done +when trying to register the new schema through the `subjects//versions` endpoint. + Get current global backwards compatibility setting value:: $ curl -X GET http://localhost:8081/config diff --git a/src/karapace/compatibility/schema_compatibility.py b/src/karapace/compatibility/schema_compatibility.py new file mode 100644 index 000000000..d26d62913 --- /dev/null +++ b/src/karapace/compatibility/schema_compatibility.py @@ -0,0 +1,145 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from avro.compatibility import ( + merge, + ReaderWriterCompatibilityChecker as AvroChecker, + SchemaCompatibilityResult, + SchemaCompatibilityType, + SchemaIncompatibilityType, +) +from avro.schema import Schema as AvroSchema +from jsonschema import Draft7Validator +from karapace.compatibility import CompatibilityModes +from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema +from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility +from karapace.protobuf.schema import ProtobufSchema +from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema +from karapace.schema_type import SchemaType +from karapace.utils import assert_never + +import logging + +LOG = logging.getLogger(__name__) + + +class SchemaCompatibility: + @staticmethod + def check_compatibility( + old_schema: ParsedTypedSchema, + new_schema: ValidatedTypedSchema, + compatibility_mode: CompatibilityModes, + ) -> SchemaCompatibilityResult: + """Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`.""" + + if compatibility_mode is CompatibilityModes.NONE: + LOG.info("Compatibility level set to NONE, no schema compatibility checks performed") + return SchemaCompatibilityResult(SchemaCompatibilityType.compatible) + + if old_schema.schema_type is not new_schema.schema_type: + return incompatible_schema( + incompat_type=SchemaIncompatibilityType.type_mismatch, + message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}", + location=[], + ) + + if old_schema.schema_type is SchemaType.AVRO: + assert isinstance(old_schema.schema, AvroSchema) + assert isinstance(new_schema.schema, AvroSchema) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=new_schema.schema, + writer_schema=old_schema.schema, + ) + + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=old_schema.schema, + writer_schema=new_schema.schema, + ) + + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=new_schema.schema, + writer_schema=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_avro_compatibility( + reader_schema=old_schema.schema, + writer_schema=new_schema.schema, + ), + ) + + elif old_schema.schema_type is SchemaType.JSONSCHEMA: + assert isinstance(old_schema.schema, Draft7Validator) + assert isinstance(new_schema.schema, Draft7Validator) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ) + + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_jsonschema_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ), + ) + + elif old_schema.schema_type is SchemaType.PROTOBUF: + assert isinstance(old_schema.schema, ProtobufSchema) + assert isinstance(new_schema.schema, ProtobufSchema) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ) + + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_protobuf_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ), + ) + + else: + assert_never(f"Unknown schema_type {old_schema.schema_type}") + + return result + + @staticmethod + def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult: + return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema) + + @staticmethod + def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult: + return jsonschema_compatibility(reader, writer) + + @staticmethod + def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult: + return check_protobuf_schema_compatibility(reader, writer) diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 06a0d91ae..d61cc058a 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -463,7 +463,7 @@ def check_schema_compatibility( if not live_versions: old_versions = [] elif compatibility_mode.is_transitive(): - # Only check against all versions + # Check against all versions old_versions = live_versions else: # Only check against latest version @@ -479,7 +479,7 @@ def check_schema_compatibility( ) if is_incompatible(result): - break + return result return result diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index 6b7453d03..8800d6734 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -397,9 +397,14 @@ async def compatibility_check( ) new_schema = self.get_new_schema(request.json, content_type) - old_schema = self._get_old_schema(subject, Versioner.V(version), content_type) - - result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) + old_schema = self.get_old_schema(subject, Versioner.V(version), content_type) + if compatibility_mode.is_transitive(): + # Ignore the schema version provided in the rest api call (`version`) + # Instead check against all previous versions (including `version` if existing) + result = self.schema_registry.check_schema_compatibility(new_schema, subject) + else: + # Check against the schema version provided in the rest api call (`version`) + result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) if is_incompatible(result): self.r({"is_compatible": False}, content_type) @@ -1338,7 +1343,7 @@ def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedS status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - def _get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema: + def get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema: try: old = self.schema_registry.subject_version_get(subject=subject, version=version) except InvalidVersion: diff --git a/tests/integration/test_schema_compatibility.py b/tests/integration/test_schema_compatibility.py new file mode 100644 index 000000000..7d804e989 --- /dev/null +++ b/tests/integration/test_schema_compatibility.py @@ -0,0 +1,218 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from dataclasses import dataclass +from karapace.client import Client +from karapace.typing import Subject +from tests.base_testcase import BaseTestCase +from typing import Any, Callable, Coroutine, Union + +import json +import logging +import pytest + +LOG = logging.getLogger(__name__) + +schema_int = {"type": "record", "name": "schema_name", "fields": [{"type": "int", "name": "field_name"}]} +schema_long = {"type": "record", "name": "schema_name", "fields": [{"type": "long", "name": "field_name"}]} +schema_string = {"type": "record", "name": "schema_name", "fields": [{"type": "string", "name": "field_name"}]} +schema_double = {"type": "record", "name": "schema_name", "fields": [{"type": "double", "name": "field_name"}]} + + +@dataclass +class SchemaCompatibilityTestCase(BaseTestCase): + new_schema: str + compatibility_mode: str + register_baseline_schemas: Callable[[Client, Subject], Coroutine[Any, Any, None]] + expected_is_compatible: Union[bool, None] + expected_status_code: int + expected_incompatibilities: Union[str, None] + + +async def _register_baseline_schemas_no_incompatibilities(registry_async_client: Client, subject: Subject) -> None: + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_int)}, + ) + assert res.status_code == 200 + + # Changing type from int to long is compatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_long)}, + ) + assert res.status_code == 200 + + +async def _register_baseline_schemas_with_incompatibilities(registry_async_client: Client, subject: Subject) -> None: + # Allow registering non backward compatible schemas + await _set_compatibility_mode(registry_async_client, subject, "NONE") + + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_string)}, + ) + assert res.status_code == 200 + + # Changing type from string to double is incompatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_double)}, + ) + assert res.status_code == 200 + + +async def _register_baseline_schemas_with_incompatibilities_and_a_deleted_schema( + registry_async_client: Client, subject: Subject +) -> None: + await _register_baseline_schemas_with_incompatibilities(registry_async_client, subject) + + # Register schema + # Changing type from double to long is incompatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_long)}, + ) + assert res.status_code == 200 + + # And delete it + res = await registry_async_client.delete(f"subjects/{subject}/versions/latest") + assert res.status_code == 200 + assert res.json() == 3 + + +async def _register_no_baseline_schemas( + registry_async_client: Client, subject: Subject # pylint: disable=unused-argument +) -> None: + pass + + +async def _set_compatibility_mode(registry_async_client: Client, subject: Subject, compatibility_mode: str) -> None: + res = await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility_mode}) + assert res.status_code == 200 + + +@pytest.mark.parametrize( + "test_case", + [ + # Case 0 + # New schema compatible with all baseline ones (int --> long, long --> long) + # Transitive mode + # --> No incompatibilities are found + SchemaCompatibilityTestCase( + test_name="case0", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_no_incompatibilities, + new_schema=json.dumps(schema_long), + expected_is_compatible=True, + expected_status_code=200, + expected_incompatibilities=None, + ), + # Case 1 + # Same as previous case, but in non-transitive mode + # --> No incompatibilities are found + SchemaCompatibilityTestCase( + test_name="case1", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_no_incompatibilities, + new_schema=json.dumps(schema_long), + expected_is_compatible=True, + expected_status_code=200, + expected_incompatibilities=None, + ), + # Case 2 + # New schema incompatible with both baseline schemas (string --> int, double --> int) + # Non-transitive mode + # --> Incompatibilies are found only against last baseline schema (double --> int) + SchemaCompatibilityTestCase( + test_name="case2", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: double", + ), + # Case 3 + # Same as previous case, but in non-transitive mode + # --> Incompatibilies are found in the first baseline schema (string --> int) + SchemaCompatibilityTestCase( + test_name="case3", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: string", + ), + # Case 4 + # Same as case 2, but with a deleted schema among baseline ones + # Non-transitive mode + # --> The delete schema is ignored + # --> Incompatibilies are found only against last baseline schema (double --> int) + SchemaCompatibilityTestCase( + test_name="case4", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: double", + ), + # Case 5 + # Same as case 3, but with a deleted schema among baseline ones + # --> The delete schema is ignored + # --> Incompatibilies are found in the first baseline schema (string --> int) + SchemaCompatibilityTestCase( + test_name="case5", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: string", + ), + # Case 6 + # A new schema and no baseline schemas + # Non-transitive mode + # --> No incompatibilities are found + # --> Status code is 404 because `latest` version to check against does not exists + SchemaCompatibilityTestCase( + test_name="case6", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_no_baseline_schemas, + new_schema=json.dumps(schema_int), + expected_is_compatible=None, + expected_status_code=404, + expected_incompatibilities=None, + ), + # Case 7 + # Same as previous case, but in non-transitive mode + # --> No incompatibilities are found + # --> Status code is 404 because `latest` version to check against does not exists + SchemaCompatibilityTestCase( + test_name="case7", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_no_baseline_schemas, + new_schema=json.dumps(schema_int), + expected_is_compatible=None, + expected_status_code=404, + expected_incompatibilities=None, + ), + ], +) +async def test_schema_compatibility(test_case: SchemaCompatibilityTestCase, registry_async_client: Client) -> None: + subject = Subject(f"subject_{test_case.test_name}") + + await test_case.register_baseline_schemas(registry_async_client, subject) + await _set_compatibility_mode(registry_async_client, subject, test_case.compatibility_mode) + + LOG.info("Validating new schema: %s", test_case.new_schema) + res = await registry_async_client.post( + f"compatibility/subjects/{subject}/versions/latest", json={"schema": test_case.new_schema} + ) + + assert res.status_code == test_case.expected_status_code + assert res.json().get("is_compatible") == test_case.expected_is_compatible + assert res.json().get("incompatibilities", None) == test_case.expected_incompatibilities diff --git a/tests/unit/compatibility/test_compatibility.py b/tests/unit/compatibility/test_compatibility.py index 76f0e22b9..af41aae99 100644 --- a/tests/unit/compatibility/test_compatibility.py +++ b/tests/unit/compatibility/test_compatibility.py @@ -20,3 +20,27 @@ def test_schema_type_can_change_when_mode_none() -> None: old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE ) assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_compatible_in_transitive_mode() -> None: + old_json = '{"type": "array", "name": "name_old"}' + new_json = '{"type": "array", "name": "name_new"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_incompatible_in_transitive_mode() -> None: + old_json = '{"type": "array"}' + new_json = '{"type": "integer"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.incompatible diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 6d850f5fc..7fcecd47e 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -14,7 +14,7 @@ import pytest -async def test_validate_schema_request_body(): +async def test_validate_schema_request_body() -> None: controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) controller._validate_schema_request_body( # pylint: disable=W0212 @@ -30,7 +30,7 @@ async def test_validate_schema_request_body(): assert str(exc_info.value) == "HTTPResponse 422" -async def test_forward_when_not_ready(): +async def test_forward_when_not_ready() -> None: with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) diff --git a/website/source/quickstart.rst b/website/source/quickstart.rst index 6e6ecdba6..f640e68d2 100644 --- a/website/source/quickstart.rst +++ b/website/source/quickstart.rst @@ -60,6 +60,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke $KARAPACE_REGISTRY_URI/compatibility/subjects/test-key/versions/latest {"is_compatible":true} +NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the +compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done +when trying to register the new schema through the `subjects//versions` endpoint. + Get current global backwards compatibility setting value:: $ curl -X GET $KARAPACE_REGISTRY_URI/config