Skip to content

Commit

Permalink
Merge pull request #610 from aiven/references-improvements
Browse files Browse the repository at this point in the history
Improve references support
  • Loading branch information
jjaakola-aiven authored May 10, 2023
2 parents 0b6685c + ca31803 commit bfbaca6
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 29 deletions.
12 changes: 2 additions & 10 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,14 @@ def __init__(
schema_type (SchemaType): The type of the schema
schema_str (str): The original schema string
schema (Optional[Union[Draft7Validator, AvroSchema, ProtobufSchema]]): The parsed and validated schema
references (Optional[List[Dependency]]): The references of schema
references (Optional[List[Dependency]]): The references of schema
"""
self.schema_type = schema_type
self.references = references
self.dependencies = dependencies
self.schema_str = TypedSchema.normalize_schema_str(schema_str, schema_type, schema)
self.max_id: Optional[SchemaId] = None
self._fingerprint_cached: Optional[str] = None
self._str_cached: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
if self.schema_type is SchemaType.PROTOBUF:
Expand All @@ -118,8 +117,6 @@ def normalize_schema_str(
schema_str: str,
schema_type: SchemaType,
schema: Optional[Union[Draft7Validator, AvroSchema, ProtobufSchema]] = None,
# references: Optional[List[Reference]] = None,
# dependencies: Optional[Dict[str, Dependency]] = None,
) -> str:
if schema_type is SchemaType.AVRO or schema_type is SchemaType.JSONSCHEMA:
try:
Expand All @@ -142,12 +139,7 @@ def normalize_schema_str(
return schema_str

def __str__(self) -> str:
if self.schema_type == SchemaType.PROTOBUF:
return self.schema_str

if self._str_cached is None:
self._str_cached = json_encode(self.to_dict())
return self._str_cached
return self.schema_str

def __repr__(self) -> str:
return f"TypedSchema(type={self.schema_type}, schema={str(self)})"
Expand Down
9 changes: 8 additions & 1 deletion karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,16 @@ def get_referenced_by(self, subject: Subject, version: ResolvedVersion) -> Optio
return self.database.get_referenced_by(subject, version)

def _resolve_reference(self, reference: Reference) -> Dependency:
subject_data = self.database.find_subject_schemas(subject=reference.subject, include_deleted=False)
subject_data: Dict[ResolvedVersion, SchemaVersion] = self.database.find_subject_schemas(
subject=reference.subject, include_deleted=False
)
if not subject_data:
raise InvalidReferences(f"Subject not found {reference.subject}.")

# -1 is alias to latest version
if reference.version == -1:
reference.version = max(subject_data)

schema_version: SchemaVersion = subject_data.get(reference.version, None)
if schema_version is None:
raise InvalidReferences(f"Subject {reference.subject} has no such schema version")
Expand Down
21 changes: 12 additions & 9 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,16 @@ async def schemas_list(self, content_type: str, *, request: HTTPRequest, user: U
if self._auth and not self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}"):
continue
for schema_version in schema_versions:
response_schemas.append(
{
"subject": schema_version.subject,
"version": schema_version.version,
"id": schema_version.schema_id,
"schemaType": schema_version.schema.schema_type,
"schema": schema_version.schema.schema_str,
}
)
response_schema = {
"subject": schema_version.subject,
"version": schema_version.version,
"id": schema_version.schema_id,
"schemaType": schema_version.schema.schema_type,
}
if schema_version.references:
response_schema["references"] = [r.to_dict() for r in schema_version.references]
response_schema["schema"] = schema_version.schema.schema_str
response_schemas.append(response_schema)

self.r(
body=response_schemas,
Expand Down Expand Up @@ -512,6 +513,8 @@ def _has_subject_with_id() -> bool:
response_body = {"schema": schema.schema_str}
if schema.schema_type is not SchemaType.AVRO:
response_body["schemaType"] = schema.schema_type
if schema.references:
response_body["references"] = [r.to_dict() for r in schema.references]
if fetch_max_id:
response_body["maxId"] = schema.max_id
self.r(response_body, content_type)
Expand Down
9 changes: 0 additions & 9 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@ strict_equality = True
[mypy-karapace.schema_registry_apis]
ignore_errors = True

[mypy-karapace.version]
ignore_errors = True

[mypy-karapace.compatibility.jsonschema.checks]
disallow_untyped_defs = False
disallow_incomplete_defs = False
warn_unused_ignores = False

[mypy-karapace.constants]
ignore_errors = True

[mypy-karapace.karapace_all]
ignore_errors = True

Expand Down Expand Up @@ -132,6 +126,3 @@ ignore_errors = True

[mypy-karapace.kafka_rest_apis.admin]
ignore_errors = True

[mypy-karapace.kafka_rest_apis.error_codes]
ignore_errors = True
48 changes: 48 additions & 0 deletions tests/integration/test_dependencies_compatibility_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,51 @@ async def test_protobuf_schema_references_valid_values(registry_async_client: Cl
f"subjects/{subject}/versions", json={"schemaType": "PROTOBUF", "schema": SIMPLE_SCHEMA, "references": []}
)
assert res.status_code == 200


async def test_protobuf_references_latest(registry_async_client: Client) -> None:
subject = create_subject_name_factory("test_protobuf_references_latest")()
res = await registry_async_client.put(f"config/{subject}", json={"compatibility": "BACKWARD"})
assert res.status_code == 200

original_dependencies = trim_margin(
"""
|syntax = "proto3";
|package a1;
|message container {
| message Hint {
| string hint_str = 1;
| }
|}
|"""
)

res = await registry_async_client.post(
f"subjects/{subject}_base/versions", json={"schemaType": "PROTOBUF", "schema": original_dependencies}
)
assert res.status_code == 200
assert "id" in res.json()

original_schema = trim_margin(
"""
|syntax = "proto3";
|package a1;
|import "container1.proto";
|message TestMessage {
| message Value {
| .a1.container.Hint hint = 1;
| int32 x = 2;
| }
| string test = 1;
| .a1.TestMessage.Value val = 2;
|}
|"""
)

original_references = [{"name": "container1.proto", "subject": f"{subject}_base", "version": -1}]
res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schemaType": "PROTOBUF", "schema": original_schema, "references": original_references},
)
assert res.status_code == 200
assert "id" in res.json()
4 changes: 4 additions & 0 deletions tests/integration/test_schema_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,10 @@ async def test_references(testcase: ReferenceTestCase, registry_async_client: Cl
schema_id = res.json().get("id")
fetch_schema_res = await registry_async_client.get(f"/schemas/ids/{schema_id}")
assert fetch_schema_res.status_code == 200
if testdata.references:
assert "references" in fetch_schema_res.json()
else:
assert "references" not in fetch_schema_res.json()
if isinstance(testdata, TestCaseDeleteSchema):
if testdata.expected == 200:
fetch_res = await registry_async_client.get(f"/subjects/{testdata.subject}/versions/{testdata.version}")
Expand Down

0 comments on commit bfbaca6

Please sign in to comment.