Skip to content

Commit

Permalink
Merge pull request #953 from Aiven-Open/davide-armand/EC-289-fix-tran…
Browse files Browse the repository at this point in the history
…sitive-validation

fix: align transitive compatibility checks
  • Loading branch information
eliax1996 authored Oct 10, 2024
2 parents aaa7812 + 429e18c commit f9f5fe7
Show file tree
Hide file tree
Showing 12 changed files with 559 additions and 242 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke
http://localhost:8081/compatibility/subjects/test-key/versions/latest
{"is_compatible":true}

NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the
compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done
when trying to register the new schema through the `subjects/<subject-key>/versions` endpoint.

Get current global backwards compatibility setting value::

$ curl -X GET http://localhost:8081/config
Expand Down
133 changes: 0 additions & 133 deletions src/karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,7 @@
Copyright (c) 2019 Aiven Ltd
See LICENSE for details
"""
from avro.compatibility import (
merge,
ReaderWriterCompatibilityChecker as AvroChecker,
SchemaCompatibilityResult,
SchemaCompatibilityType,
SchemaIncompatibilityType,
)
from avro.schema import Schema as AvroSchema
from enum import Enum, unique
from jsonschema import Draft7Validator
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema
from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema
from karapace.schema_reader import SchemaType
from karapace.utils import assert_never

import logging

Expand Down Expand Up @@ -54,121 +39,3 @@ def is_transitive(self) -> bool:
"FULL_TRANSITIVE",
}
return self.value in TRANSITIVE_MODES


def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult:
return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema)


def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult:
return jsonschema_compatibility(reader, writer)


def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult:
return check_protobuf_schema_compatibility(reader, writer)


def check_compatibility(
old_schema: ParsedTypedSchema,
new_schema: ValidatedTypedSchema,
compatibility_mode: CompatibilityModes,
) -> SchemaCompatibilityResult:
"""Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`."""
if compatibility_mode is CompatibilityModes.NONE:
LOG.info("Compatibility level set to NONE, no schema compatibility checks performed")
return SchemaCompatibilityResult(SchemaCompatibilityType.compatible)

if old_schema.schema_type is not new_schema.schema_type:
return incompatible_schema(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}",
location=[],
)

if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
result = merge(
result,
check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
),
)

elif old_schema.schema_type is SchemaType.JSONSCHEMA:
assert isinstance(old_schema.schema, Draft7Validator)
assert isinstance(new_schema.schema, Draft7Validator)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)

elif old_schema.schema_type is SchemaType.PROTOBUF:
assert isinstance(old_schema.schema, ProtobufSchema)
assert isinstance(new_schema.schema, ProtobufSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)

else:
assert_never(f"Unknown schema_type {old_schema.schema_type}")

return result
138 changes: 138 additions & 0 deletions src/karapace/compatibility/schema_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from avro.compatibility import (
merge,
ReaderWriterCompatibilityChecker as AvroChecker,
SchemaCompatibilityResult,
SchemaCompatibilityType,
SchemaIncompatibilityType,
)
from avro.schema import Schema as AvroSchema
from jsonschema import Draft7Validator
from karapace.compatibility import CompatibilityModes
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema
from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.utils import assert_never

import logging

LOG = logging.getLogger(__name__)


class SchemaCompatibility:
@staticmethod
def check_compatibility(
old_schema: ParsedTypedSchema,
new_schema: ValidatedTypedSchema,
compatibility_mode: CompatibilityModes,
) -> SchemaCompatibilityResult:
"""Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`."""

if compatibility_mode is CompatibilityModes.NONE:
LOG.info("Compatibility level set to NONE, no schema compatibility checks performed")
return SchemaCompatibilityResult(SchemaCompatibilityType.compatible)

if old_schema.schema_type is not new_schema.schema_type:
return incompatible_schema(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}",
location=[],
)

if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
),
)
elif old_schema.schema_type is SchemaType.JSONSCHEMA:
assert isinstance(old_schema.schema, Draft7Validator)
assert isinstance(new_schema.schema, Draft7Validator)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)
elif old_schema.schema_type is SchemaType.PROTOBUF:
assert isinstance(old_schema.schema, ProtobufSchema)
assert isinstance(new_schema.schema, ProtobufSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)
else:
assert_never(f"Unknown schema_type {old_schema.schema_type}")

return result

@staticmethod
def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult:
return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema)

@staticmethod
def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult:
return jsonschema_compatibility(reader, writer)

@staticmethod
def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult:
return check_protobuf_schema_compatibility(reader, writer)
Loading

0 comments on commit f9f5fe7

Please sign in to comment.