From 7d45a083201e65d86487144abc3dcef3df56b714 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Thu, 4 Jan 2024 16:00:16 +0200 Subject: [PATCH] fix: Match schemas by parsed schema object and references when queried When schema is queried with schema and references the match must happen on the parsed form and with references matching the version. Closes #791 --- karapace/schema_models.py | 15 ++++ karapace/schema_registry_apis.py | 4 +- tests/integration/test_schema_protobuf.py | 84 ++++++++++++----------- 3 files changed, 61 insertions(+), 42 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index d4a80946a..498de4b8e 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -285,6 +285,21 @@ def __str__(self) -> str: return str(self.schema) return super().__str__() + def match(self, other: ParsedTypedSchema) -> bool: + """Match the schema with given one. + + Special case function where schema is matched to other. The parsed schema object and references are matched. + The parent class equality function works based on the normalized schema string. That does not take into account + the canonical forms of any schema type. This function uses the parsed form of the schema to match if schemas + are equal. For example Avro schemas `{"type": "int", "name": schema_name}` and `{"type": "int"}` are equal by + Avro spec. + References are also matched and the refered schemas and the versions of those must match. + + :param other: The schema to match against. + :return: True if schema match, False if not. + """ + return self.schema_type is other.schema_type and self.schema == other.schema and self.references == other.references + @property def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema: if self._schema_cached is not None: diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index e8bbaf969..f47236d28 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -1111,7 +1111,7 @@ async def subjects_schema_post( parsed_typed_schema = ParsedTypedSchema.parse( schema_version.schema.schema_type, schema_version.schema.schema_str, - references=references, + references=schema_version.references, dependencies=new_schema_dependencies, ) except InvalidSchema as e: @@ -1132,7 +1132,7 @@ async def subjects_schema_post( if schema_type is SchemaType.JSONSCHEMA: schema_valid = parsed_typed_schema.to_dict() == new_schema.to_dict() else: - schema_valid = parsed_typed_schema.schema == new_schema.schema + schema_valid = new_schema.match(parsed_typed_schema) if parsed_typed_schema.schema_type == new_schema.schema_type and schema_valid: ret = { "subject": subject, diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 7b38ab086..1900a8f87 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -1010,52 +1010,45 @@ async def test_references(testcase: ReferenceTestCase, registry_async_client: Cl assert fetch_schema_res.status_code == 200 -@pytest.mark.parametrize( - "testcase", - [ - ReferenceTestCase( - test_name="With updated reference version", - schemas=[ - TestCaseSchema( - schema_type=SchemaType.PROTOBUF, - schema_str=SCHEMA_NO_REF, - subject="wr_s1", - references=None, - expected=200, - ), - TestCaseSchema( - schema_type=SchemaType.PROTOBUF, - schema_str=SCHEMA_WITH_REF, - subject="wr_s2", - references=[{"name": "NoReference.proto", "subject": "wr_s1", "version": 1}], - expected=200, - ), - TestCaseSchema( - schema_type=SchemaType.PROTOBUF, - schema_str=SCHEMA_NO_REF_V2, - subject="wr_s1", - references=None, - expected=200, - ), - TestCaseSchema( - schema_type=SchemaType.PROTOBUF, - schema_str=SCHEMA_WITH_REF, - subject="wr_s2", - references=[{"name": "NoReference.proto", "subject": "wr_s1", "version": 2}], - expected=200, - ), - ], +async def test_reference_update_creates_new_schema_version(registry_async_client: Client): + test_schemas = [ + TestCaseSchema( + schema_type=SchemaType.PROTOBUF, + schema_str=SCHEMA_NO_REF, + subject="wr_s1", + references=None, + expected=200, ), - ], - ids=str, -) -async def test_reference_update_creates_new_schema_version(testcase: ReferenceTestCase, registry_async_client: Client): - for testdata in testcase.schemas: + TestCaseSchema( + schema_type=SchemaType.PROTOBUF, + schema_str=SCHEMA_WITH_REF, + subject="wr_s2", + references=[{"name": "NoReference.proto", "subject": "wr_s1", "version": 1}], + expected=200, + ), + TestCaseSchema( + schema_type=SchemaType.PROTOBUF, + schema_str=SCHEMA_NO_REF_V2, + subject="wr_s1", + references=None, + expected=200, + ), + TestCaseSchema( + schema_type=SchemaType.PROTOBUF, + schema_str=SCHEMA_WITH_REF, + subject="wr_s2", + references=[{"name": "NoReference.proto", "subject": "wr_s1", "version": 2}], + expected=200, + ), + ] + schema_ids: list[int] = [] + for testdata in test_schemas: body = {"schemaType": testdata.schema_type, "schema": testdata.schema_str} if testdata.references: body["references"] = testdata.references res = await registry_async_client.post(f"subjects/{testdata.subject}/versions", json=body) assert res.status_code == testdata.expected + schema_ids.append(res.json_result.get("id")) res = await registry_async_client.get("subjects/wr_s2/versions") assert len(res.json_result) == 2, "Expected two versions of schemas as reference was updated." res = await registry_async_client.get("subjects/wr_s2/versions/2") @@ -1065,6 +1058,17 @@ async def test_reference_update_creates_new_schema_version(testcase: ReferenceTe assert references[0].get("subject") == "wr_s1" assert references[0].get("version") == 2 + # Assert when querying the schema id with schema version with references correct schema id is returned. + for testdata, expected_schema_id in zip(test_schemas, schema_ids): + body = { + "schemaType": testdata.schema_type, + "schema": testdata.schema_str, + } + if testdata.references: + body["references"] = testdata.references + res = await registry_async_client.post(f"subjects/{testdata.subject}", json=body) + assert res.json_result.get("id") == expected_schema_id + async def test_protobuf_error(registry_async_client: Client) -> None: testdata = TestCaseSchema(