diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index 8aaf4e396..6b27c274c 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -467,7 +467,6 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats ) # we get to be more in line with the confluent proxy by doing a bunch of fetches each time and # respecting the max fetch request size - # pylint: disable=protected-access max_bytes = ( int(query_params["max_bytes"]) if "max_bytes" in query_params diff --git a/karapace/protobuf/proto_normalizations.py b/karapace/protobuf/proto_normalizations.py new file mode 100644 index 000000000..973a67c58 --- /dev/null +++ b/karapace/protobuf/proto_normalizations.py @@ -0,0 +1,36 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from karapace.protobuf.proto_file_element import ProtoFileElement +from karapace.typing import StrEnum + + +class ProtobufNormalisationOptions(StrEnum): + sort_options = "sort_options" + + +def normalize_options_ordered(proto_file_element: ProtoFileElement) -> ProtoFileElement: + sorted_options = ( + None if proto_file_element.options is None else list(sorted(proto_file_element.options, key=lambda x: x.name)) + ) + return ProtoFileElement( + location=proto_file_element.location, + package_name=proto_file_element.package_name, + syntax=proto_file_element.syntax, + imports=proto_file_element.imports, + public_imports=proto_file_element.public_imports, + types=proto_file_element.types, + services=proto_file_element.services, + extend_declarations=proto_file_element.extend_declarations, + options=sorted_options, + ) + + +# if other normalizations are added we will switch to a more generic approach: +# def normalize_parsed_file(proto_file_element: ProtoFileElement, +# normalization: ProtobufNormalisationOptions) -> ProtoFileElement: +# if normalization == ProtobufNormalisationOptions.sort_options: +# return normalize_options_ordered(proto_file_element) +# else: +# assert_never(normalization) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 155dc7853..0215ebc69 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -19,6 +19,7 @@ ProtobufUnresolvedDependencyException, SchemaParseException as ProtobufSchemaParseException, ) +from karapace.protobuf.proto_normalizations import normalize_options_ordered from karapace.protobuf.schema import ProtobufSchema from karapace.schema_references import Reference from karapace.schema_type import SchemaType @@ -62,6 +63,7 @@ def parse_protobuf_schema_definition( references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, validate_references: bool = True, + normalize: bool = False, ) -> ProtobufSchema: """Parses and validates `schema_definition`. @@ -74,6 +76,10 @@ def parse_protobuf_schema_definition( result = protobuf_schema.verify_schema_dependencies() if not result.result: raise ProtobufUnresolvedDependencyException(f"{result.message}") + + if protobuf_schema.proto_file_element is not None and normalize: + protobuf_schema.proto_file_element = normalize_options_ordered(protobuf_schema.proto_file_element) + return protobuf_schema @@ -179,6 +185,7 @@ def parse( validate_avro_names: bool, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, + normalize: bool = False, ) -> ParsedTypedSchema: if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") @@ -203,7 +210,7 @@ def parse( elif schema_type is SchemaType.PROTOBUF: try: - parsed_schema = parse_protobuf_schema_definition(schema_str, references, dependencies) + parsed_schema = parse_protobuf_schema_definition(schema_str, references, dependencies, normalize=normalize) except ( TypeError, SchemaError, @@ -270,6 +277,7 @@ def parse( schema_str: str, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, + normalize: bool = False, ) -> ParsedTypedSchema: return parse( schema_type=schema_type, @@ -278,6 +286,7 @@ def parse( validate_avro_names=False, references=references, dependencies=dependencies, + normalize=normalize, ) def __str__(self) -> str: @@ -352,6 +361,7 @@ def parse( schema_str: str, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, + normalize: bool = False, ) -> ValidatedTypedSchema: parsed_schema = parse( schema_type=schema_type, @@ -360,6 +370,7 @@ def parse( validate_avro_names=True, references=references, dependencies=dependencies, + normalize=normalize, ) return cast(ValidatedTypedSchema, parsed_schema) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 9c04cbf0b..3605bebec 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -1191,6 +1191,7 @@ async def subject_post( self._validate_schema_request_body(content_type, body) schema_type = self._validate_schema_type(content_type, body) self._validate_schema_key(content_type, body) + normalize = request.query.get("normalize", "false").lower() == "true" references = self._validate_references(content_type, schema_type, body) try: @@ -1200,6 +1201,7 @@ async def subject_post( schema_str=body["schema"], references=references, dependencies=resolved_dependencies, + normalize=normalize, ) except (InvalidReferences, InvalidSchema, InvalidSchemaType) as e: self.log.warning("Invalid schema: %r", body["schema"], exc_info=True) diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 243d9b1fb..49c84f0c7 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -1270,3 +1270,53 @@ async def test_protobuf_update_ordering(registry_async_client: Client) -> None: assert res.status_code == 200 assert "id" in res.json() assert schema_id != res.json()["id"] + + +async def test_protobuf_normalization_of_options(registry_async_client: Client) -> None: + subject = create_subject_name_factory("test_protobuf_normalization")() + + schema_with_option_unordered_1 = """\ +syntax = "proto3"; +package tc4; + +option java_package = "com.example"; +option java_outer_classname = "FredProto"; +option java_multiple_files = true; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option java_string_check_utf8 = true; + +message Foo { + string code = 1; +} +""" + + body = {"schemaType": "PROTOBUF", "schema": schema_with_option_unordered_1} + res = await registry_async_client.post(f"subjects/{subject}/versions?normalize=true", json=body) + + assert res.status_code == 200 + assert "id" in res.json() + original_schema_id = res.json()["id"] + + schema_with_option_unordered_2 = """\ +syntax = "proto3"; +package tc4; + +option java_package = "com.example"; +option java_generate_equals_and_hash = true; +option java_string_check_utf8 = true; +option java_multiple_files = true; +option java_outer_classname = "FredProto"; +option java_generic_services = true; + +message Foo { + string code = 1; +} +""" + + body = {"schemaType": "PROTOBUF", "schema": schema_with_option_unordered_2} + res = await registry_async_client.post(f"subjects/{subject}/versions?normalize=true", json=body) + + assert res.status_code == 200 + assert "id" in res.json() + assert original_schema_id == res.json()["id"] diff --git a/tests/unit/protobuf/test_protobuf_normalization.py b/tests/unit/protobuf/test_protobuf_normalization.py new file mode 100644 index 000000000..c65abef6a --- /dev/null +++ b/tests/unit/protobuf/test_protobuf_normalization.py @@ -0,0 +1,71 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from karapace.protobuf.compare_result import CompareResult +from karapace.protobuf.location import Location +from karapace.protobuf.proto_normalizations import normalize_options_ordered +from karapace.protobuf.proto_parser import ProtoParser + +location: Location = Location("some/folder", "file.proto") + + +def test_different_options_order_its_correctly_normalized() -> None: + ordered_schema = """\ +syntax = "proto3"; + +package pkg; + +option cc_generic_services = true; +option java_generate_equals_and_hash = true; +option java_generic_services = true; +option java_multiple_files = true; +option java_outer_classname = "FooProto"; +option java_package = "com.example.foo"; +option java_string_check_utf8 = true; +option optimize_for = SPEED; + +message Foo { + string fieldA = 1; + + string fieldB = 2; + + string fieldC = 3; + + string fieldX = 4; +} +""" + + unordered_schema = """\ +syntax = "proto3"; + +package pkg; + +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option java_package = "com.example.foo"; +option java_outer_classname = "FooProto"; +option optimize_for = SPEED; +option cc_generic_services = true; +option java_multiple_files = true; +option java_string_check_utf8 = true; + +message Foo { + string fieldA = 1; + + string fieldB = 2; + + string fieldC = 3; + + string fieldX = 4; +} +""" + + ordered_proto = ProtoParser.parse(location, ordered_schema) + unordered_proto = ProtoParser.parse(location, unordered_schema) + + result = CompareResult() + assert result.is_compatible() + normalize_options_ordered(ordered_proto).compare(normalize_options_ordered(unordered_proto), result) + assert result.is_compatible() + assert normalize_options_ordered(ordered_proto).to_schema() == normalize_options_ordered(unordered_proto).to_schema()