diff --git a/karapace/compatibility/__init__.py b/karapace/compatibility/__init__.py index e5f61e710..2e9e4a0ab 100644 --- a/karapace/compatibility/__init__.py +++ b/karapace/compatibility/__init__.py @@ -88,6 +88,7 @@ def check_compatibility( if old_schema.schema_type is SchemaType.AVRO: assert isinstance(old_schema.schema, AvroSchema) assert isinstance(new_schema.schema, AvroSchema) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: result = check_avro_compatibility( reader_schema=new_schema.schema, diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 655f9506f..f1e876f7f 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -27,8 +27,10 @@ from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError from typing import Any, cast, Dict, Final, final, Mapping, Sequence +import avro.schema import hashlib import logging +import re LOG = logging.getLogger(__name__) @@ -181,15 +183,42 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema: return parsed_typed_schema.schema -def avro_schema_merge(schema_str: str, dependencies: Mapping[str, Dependency]) -> str: - """To support references in AVRO we recursively merge all referenced schemas with current schema""" - if dependencies: - merged_schema = "" - for dependency in dependencies.values(): - merged_schema += avro_schema_merge(dependency.schema.schema_str, dependency.schema.dependencies) + ",\n" - merged_schema += schema_str - return "[\n" + merged_schema + "\n]" - return schema_str +class AvroMerge: + def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None): + self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True) + self.dependencies = dependencies + self.unique_id = 0 + + def union_safe_schema_str(self, schema_str: str) -> str: + # in case we meet union - we use it as is + regex = re.compile(r"^\s*\[") + base_schema = ( + f'{{ "name": "___RESERVED_KARAPACE_WRAPPER_NAME_{self.unique_id}___",' + f'"type": "record", "fields": [{{"name": "name", "type":' + ) + if regex.match(schema_str): + return f"{base_schema} {schema_str}}}]}}" + return f"{base_schema} [{schema_str}]}}]}}" + + def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> str: + """To support references in AVRO we iteratively merge all referenced schemas with current schema""" + stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)] + merged_schemas = [] + + while stack: + current_schema_str, current_dependencies = stack.pop() + if current_dependencies: + stack.append((current_schema_str, None)) + for dependency in reversed(current_dependencies.values()): + stack.append((dependency.schema.schema_str, dependency.schema.dependencies)) + else: + self.unique_id += 1 + merged_schemas.append(self.union_safe_schema_str(current_schema_str)) + + return ",\n".join(merged_schemas) + + def wrap(self) -> str: + return "[\n" + self.builder(self.schema_str, self.dependencies) + "\n]" def parse( @@ -200,21 +229,41 @@ def parse( references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, normalize: bool = False, + dependencies_compat: bool = False, ) -> ParsedTypedSchema: if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") - + parsed_schema_result: Draft7Validator | AvroSchema | ProtobufSchema parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema if schema_type is SchemaType.AVRO: try: + if dependencies or dependencies_compat: + wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap() + else: + wrapped_schema_str = schema_str parsed_schema = parse_avro_schema_definition( - avro_schema_merge(schema_str, dependencies), + wrapped_schema_str, validate_enum_symbols=validate_avro_enum_symbols, validate_names=validate_avro_names, ) + if dependencies or dependencies_compat: + if isinstance(parsed_schema, avro.schema.UnionSchema): + parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1] + + else: + raise InvalidSchema + else: + parsed_schema_result = parsed_schema + return ParsedTypedSchema( + schema_type=schema_type, + schema_str=schema_str, + schema=parsed_schema_result, + references=references, + dependencies=dependencies, + schema_wrapped=parsed_schema, + ) except (SchemaParseException, JSONDecodeError, TypeError) as e: raise InvalidSchema from e - elif schema_type is SchemaType.JSONSCHEMA: try: parsed_schema = parse_jsonschema_definition(schema_str) @@ -276,9 +325,10 @@ def __init__( schema: Draft7Validator | AvroSchema | ProtobufSchema, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, + schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None, ) -> None: self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema - + self.schema_wrapped = schema_wrapped super().__init__( schema_type=schema_type, schema_str=schema_str, diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 485ce090b..c03c51082 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -512,9 +512,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: try: schema_type_parsed = SchemaType(schema_type) - except ValueError: + except ValueError as e: LOG.warning("Invalid schema type: %s", schema_type) - return + raise e # This does two jobs: # - Validates the schema's JSON @@ -531,18 +531,18 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references] resolved_references, resolved_dependencies = self.resolve_references(candidate_references) schema_str = json.dumps(json.loads(schema_str), sort_keys=True) - except json.JSONDecodeError: + except json.JSONDecodeError as e: LOG.warning("Schema is not valid JSON") - return - except InvalidReferences: + raise e + except InvalidReferences as e: LOG.exception("Invalid AVRO references") - return + raise e elif schema_type_parsed == SchemaType.JSONSCHEMA: try: schema_str = json.dumps(json.loads(schema_str), sort_keys=True) - except json.JSONDecodeError: + except json.JSONDecodeError as e: LOG.warning("Schema is not valid JSON") - return + raise e elif schema_type_parsed == SchemaType.PROTOBUF: try: if schema_references: @@ -556,12 +556,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: normalize=False, ) schema_str = str(parsed_schema) - except InvalidSchema: + except InvalidSchema as e: LOG.exception("Schema is not valid ProtoBuf definition") - return - except InvalidReferences: + raise e + except InvalidReferences as e: LOG.exception("Invalid Protobuf references") - return + raise e try: typed_schema = TypedSchema( @@ -571,8 +571,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: dependencies=resolved_dependencies, schema=parsed_schema, ) - except (InvalidSchema, JSONDecodeError): - return + except (InvalidSchema, JSONDecodeError) as e: + raise e self.database.insert_schema_version( subject=schema_subject, diff --git a/tests/integration/test_schema_avro_references.py b/tests/integration/test_schema_avro_references.py index 5c68c14f5..3c4cfc8ba 100644 --- a/tests/integration/test_schema_avro_references.py +++ b/tests/integration/test_schema_avro_references.py @@ -4,47 +4,187 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -import json -from karapace.client import Client +from karapace.client import Client, Result +from tests.utils import create_subject_name_factory + +import json baseurl = "http://localhost:8081" +# country.avsc +SCHEMA_COUNTRY = { + "type": "record", + "name": "Country", + "namespace": "com.netapp", + "fields": [{"name": "name", "type": "string"}, {"name": "code", "type": "string"}], +} + +# address.avsc +SCHEMA_ADDRESS = { + "type": "record", + "name": "Address", + "namespace": "com.netapp", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "postalCode", "type": "string"}, + {"name": "country", "type": "Country"}, + ], +} + +# job.avsc +SCHEMA_JOB = { + "type": "record", + "name": "Job", + "namespace": "com.netapp", + "fields": [{"name": "title", "type": "string"}, {"name": "salary", "type": "double"}], +} + +# person.avsc +SCHEMA_PERSON = { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], +} + +SCHEMA_PERSON_INT_LONG = { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "long"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], +} -async def test_avro_references(registry_async_client: Client) -> None: - schema_country = { +SCHEMA_PERSON_INT_STRING = { + "type": "record", + "name": "Person", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "string"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], +} + +SCHEMA_UNION_REFERENCES = { + "type": "record", + "namespace": "com.netapp", + "name": "Person2", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + { + "name": "children", + "type": [ + "null", + { + "type": "record", + "name": "child", + "fields": [{"name": "name", "type": "string"}, {"name": "age", "type": "int"}], + }, + ], + }, + ], +} + +SCHEMA_UNION_REFERENCES2 = [ + { "type": "record", - "name": "Country", + "name": "Person", "namespace": "com.netapp", "fields": [ {"name": "name", "type": "string"}, - {"name": "code", "type": "string"} - ] - } - - schema_address = { + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], + }, + { "type": "record", - "name": "Address", + "name": "UnemployedPerson", "namespace": "com.netapp", "fields": [ - {"name": "street", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "postalCode", "type": "string"}, - {"name": "country", "type": "Country"} - ] + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + ], + }, +] + +SCHEMA_ADDRESS_INCOMPATIBLE = { + "type": "record", + "name": "ChangedAddress", + "namespace": "com.netapp", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "postalCode", "type": "string"}, + {"name": "country", "type": "Country"}, + ], +} + +def address_references(subject_prefix: str) -> list: + return [{"name": "country.avsc", "subject": f"{subject_prefix}country", "version": 1}] + + +def person_references(subject_prefix: str) -> list: + return [ + {"name": "address.avsc", "subject": f"{subject_prefix}address", "version": 1}, + {"name": "job.avsc", "subject": f"{subject_prefix}job", "version": 1}, + ] + + +def stored_person_subject(subject_prefix: str, subject_id: int) -> dict: + return { + "id": subject_id, + "references": [ + {"name": "address.avsc", "subject": f"{subject_prefix}address", "version": 1}, + {"name": "job.avsc", "subject": f"{subject_prefix}job", "version": 1}, + ], + "schema": json.dumps( + { + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + {"name": "address", "type": "Address"}, + {"name": "job", "type": "Job"}, + ], + "name": "Person", + "namespace": "com.netapp", + "type": "record", + }, + separators=(",", ":"), + ), + "subject": f"{subject_prefix}person", + "version": 1, } + +async def basic_avro_references_fill_test(registry_async_client: Client, subject_prefix: str) -> Result: res = await registry_async_client.post( - f"subjects/country/versions", json={"schema": json.dumps(schema_country)} + f"subjects/{subject_prefix}country/versions", json={"schema": json.dumps(SCHEMA_COUNTRY)} ) assert res.status_code == 200 assert "id" in res.json() - country_references = [{"name": "country.proto", "subject": "country", "version": 1}] res = await registry_async_client.post( - "subjects/address/versions", - json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references}, + f"subjects/{subject_prefix}address/versions", + json={"schemaType": "AVRO", "schema": json.dumps(SCHEMA_ADDRESS), "references": address_references(subject_prefix)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -53,8 +193,8 @@ async def test_avro_references(registry_async_client: Client) -> None: # Check if the schema has now been registered under the subject res = await registry_async_client.post( - "subjects/address", - json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references}, + f"subjects/{subject_prefix}address", + json={"schemaType": "AVRO", "schema": json.dumps(SCHEMA_ADDRESS), "references": address_references(subject_prefix)}, ) assert res.status_code == 200 assert "subject" in res.json() @@ -62,3 +202,94 @@ async def test_avro_references(registry_async_client: Client) -> None: assert address_id == res.json()["id"] assert "version" in res.json() assert "schema" in res.json() + + res = await registry_async_client.post(f"subjects/{subject_prefix}job/versions", json={"schema": json.dumps(SCHEMA_JOB)}) + assert res.status_code == 200 + assert "id" in res.json() + res = await registry_async_client.post( + f"subjects/{subject_prefix}person/versions", + json={"schemaType": "AVRO", "schema": json.dumps(SCHEMA_PERSON), "references": person_references(subject_prefix)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + return res + + +async def test_basic_avro_references(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("basic-avro-references-")() + res = await basic_avro_references_fill_test(registry_async_client, subject_prefix) + person_id = res.json()["id"] + res = await registry_async_client.get(f"subjects/{subject_prefix}person/versions/latest") + assert res.status_code == 200 + assert res.json() == stored_person_subject(subject_prefix, person_id) + + +async def test_avro_references_compatibility(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("avro-references-compatibility-")() + await basic_avro_references_fill_test(registry_async_client, subject_prefix) + + res = await registry_async_client.post( + f"compatibility/subjects/{subject_prefix}person/versions/latest", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_PERSON_INT_LONG), + "references": person_references(subject_prefix), + }, + ) + assert res.status_code == 200 + assert res.json() == {"is_compatible": True} + res = await registry_async_client.post( + f"compatibility/subjects/{subject_prefix}person/versions/latest", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_PERSON_INT_STRING), + "references": person_references(subject_prefix), + }, + ) + assert res.status_code == 200 + assert res.json() == {"is_compatible": False} + + +async def test_avro_union_references(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("avro-references-union-one-")() + await basic_avro_references_fill_test(registry_async_client, subject_prefix) + res = await registry_async_client.post( + f"subjects/{subject_prefix}person2/versions", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_UNION_REFERENCES), + "references": person_references(subject_prefix), + }, + ) + assert res.status_code == 200 + assert "id" in res.json() + + +async def test_avro_union_references2(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("avro-references-union-two-")() + await basic_avro_references_fill_test(registry_async_client, subject_prefix) + res = await registry_async_client.post( + f"subjects/{subject_prefix}person2/versions", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_UNION_REFERENCES2), + "references": person_references(subject_prefix), + }, + ) + assert res.status_code == 200 and "id" in res.json() + + +async def test_avro_incompatible_name_references(registry_async_client: Client) -> None: + subject_prefix = create_subject_name_factory("avro-references-incompatible-name-")() + await basic_avro_references_fill_test(registry_async_client, subject_prefix) + res = await registry_async_client.post( + f"subjects/{subject_prefix}address/versions", + json={ + "schemaType": "AVRO", + "schema": json.dumps(SCHEMA_ADDRESS_INCOMPATIBLE), + "references": address_references(subject_prefix), + }, + ) + assert res.status_code == 409 + msg = "Incompatible schema, compatibility_mode=BACKWARD expected: com.netapp.Address" + assert res.json()["message"] == msg