Skip to content

Commit

Permalink
fix: Match schemas by parsed schema object and references when queried
Browse files Browse the repository at this point in the history
When schema is queried with schema and references the match must happen
on the parsed form and with references matching the version.

Closes #791
  • Loading branch information
jjaakola-aiven committed Jan 4, 2024
1 parent 28e3a73 commit 7d45a08
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 42 deletions.
15 changes: 15 additions & 0 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,21 @@ def __str__(self) -> str:
return str(self.schema)
return super().__str__()

def match(self, other: ParsedTypedSchema) -> bool:
"""Match the schema with given one.
Special case function where schema is matched to other. The parsed schema object and references are matched.
The parent class equality function works based on the normalized schema string. That does not take into account
the canonical forms of any schema type. This function uses the parsed form of the schema to match if schemas
are equal. For example Avro schemas `{"type": "int", "name": schema_name}` and `{"type": "int"}` are equal by
Avro spec.
References are also matched and the refered schemas and the versions of those must match.
:param other: The schema to match against.
:return: True if schema match, False if not.
"""
return self.schema_type is other.schema_type and self.schema == other.schema and self.references == other.references

@property
def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
if self._schema_cached is not None:
Expand Down
4 changes: 2 additions & 2 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ async def subjects_schema_post(
parsed_typed_schema = ParsedTypedSchema.parse(
schema_version.schema.schema_type,
schema_version.schema.schema_str,
references=references,
references=schema_version.references,
dependencies=new_schema_dependencies,
)
except InvalidSchema as e:
Expand All @@ -1132,7 +1132,7 @@ async def subjects_schema_post(
if schema_type is SchemaType.JSONSCHEMA:
schema_valid = parsed_typed_schema.to_dict() == new_schema.to_dict()
else:
schema_valid = parsed_typed_schema.schema == new_schema.schema
schema_valid = new_schema.match(parsed_typed_schema)
if parsed_typed_schema.schema_type == new_schema.schema_type and schema_valid:
ret = {
"subject": subject,
Expand Down
84 changes: 44 additions & 40 deletions tests/integration/test_schema_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,52 +1010,45 @@ async def test_references(testcase: ReferenceTestCase, registry_async_client: Cl
assert fetch_schema_res.status_code == 200


@pytest.mark.parametrize(
"testcase",
[
ReferenceTestCase(
test_name="With updated reference version",
schemas=[
TestCaseSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=SCHEMA_NO_REF,
subject="wr_s1",
references=None,
expected=200,
),
TestCaseSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=SCHEMA_WITH_REF,
subject="wr_s2",
references=[{"name": "NoReference.proto", "subject": "wr_s1", "version": 1}],
expected=200,
),
TestCaseSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=SCHEMA_NO_REF_V2,
subject="wr_s1",
references=None,
expected=200,
),
TestCaseSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=SCHEMA_WITH_REF,
subject="wr_s2",
references=[{"name": "NoReference.proto", "subject": "wr_s1", "version": 2}],
expected=200,
),
],
async def test_reference_update_creates_new_schema_version(registry_async_client: Client):
test_schemas = [
TestCaseSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=SCHEMA_NO_REF,
subject="wr_s1",
references=None,
expected=200,
),
],
ids=str,
)
async def test_reference_update_creates_new_schema_version(testcase: ReferenceTestCase, registry_async_client: Client):
for testdata in testcase.schemas:
TestCaseSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=SCHEMA_WITH_REF,
subject="wr_s2",
references=[{"name": "NoReference.proto", "subject": "wr_s1", "version": 1}],
expected=200,
),
TestCaseSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=SCHEMA_NO_REF_V2,
subject="wr_s1",
references=None,
expected=200,
),
TestCaseSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=SCHEMA_WITH_REF,
subject="wr_s2",
references=[{"name": "NoReference.proto", "subject": "wr_s1", "version": 2}],
expected=200,
),
]
schema_ids: list[int] = []
for testdata in test_schemas:
body = {"schemaType": testdata.schema_type, "schema": testdata.schema_str}
if testdata.references:
body["references"] = testdata.references
res = await registry_async_client.post(f"subjects/{testdata.subject}/versions", json=body)
assert res.status_code == testdata.expected
schema_ids.append(res.json_result.get("id"))
res = await registry_async_client.get("subjects/wr_s2/versions")
assert len(res.json_result) == 2, "Expected two versions of schemas as reference was updated."
res = await registry_async_client.get("subjects/wr_s2/versions/2")
Expand All @@ -1065,6 +1058,17 @@ async def test_reference_update_creates_new_schema_version(testcase: ReferenceTe
assert references[0].get("subject") == "wr_s1"
assert references[0].get("version") == 2

# Assert when querying the schema id with schema version with references correct schema id is returned.
for testdata, expected_schema_id in zip(test_schemas, schema_ids):
body = {
"schemaType": testdata.schema_type,
"schema": testdata.schema_str,
}
if testdata.references:
body["references"] = testdata.references
res = await registry_async_client.post(f"subjects/{testdata.subject}", json=body)
assert res.json_result.get("id") == expected_schema_id


async def test_protobuf_error(registry_async_client: Client) -> None:
testdata = TestCaseSchema(
Expand Down

0 comments on commit 7d45a08

Please sign in to comment.