Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent from accepting schema that are not uniquely identifiable from the current parser #717

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions karapace/protobuf/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from karapace.dependency import DependencyVerifierResult
from karapace.protobuf.known_dependency import DependenciesHardcoded, KnownDependency
from karapace.protobuf.one_of_element import OneOfElement
from typing import List
from typing import List, Optional, Set


class FieldNotUniquelyIdentifiableException(Exception):
pass


class ProtobufDependencyVerifier:
Expand All @@ -17,7 +21,12 @@ def __init__(self) -> None:
self.used_types: List[str] = []
self.import_path: List[str] = []

def add_declared_type(self, full_name: str) -> None:
def add_declared_type(self, full_name: str, uniquely: bool = False) -> None:
if uniquely and full_name in self.declared_types:
raise FieldNotUniquelyIdentifiableException(
f"{full_name} is not currently identifiable from the parser, "
f"validating this message lead to break the schema evolution!"
)
self.declared_types.append(full_name)

def add_used_type(self, parent: str, element_type: str) -> None:
Expand All @@ -35,13 +44,31 @@ def add_used_type(self, parent: str, element_type: str) -> None:
def add_import(self, import_name: str) -> None:
self.import_path.append(import_name)

def is_type_declared(
self,
used_type: str,
declared_index: Set[str],
father_child_type: Optional[str],
used_type_with_scope: Optional[str],
) -> bool:
return (
used_type in declared_index
or (used_type_with_scope is not None and used_type_with_scope in declared_index)
or (father_child_type is not None and father_child_type in declared_index)
or "." + used_type in declared_index
)

def verify(self) -> DependencyVerifierResult:
declared_index = set(self.declared_types)
for used_type in self.used_types:
delimiter = used_type.rfind(";")
used_type_with_scope = ""
father_child_type = None
used_type_with_scope = None
if delimiter != -1:
used_type_with_scope = used_type[:delimiter] + "." + used_type[delimiter + 1 :]
father_delimiter = used_type[:delimiter].find(".")
if father_delimiter != -1:
father_child_type = used_type[:father_delimiter] + "." + used_type[delimiter + 1 :]
used_type = used_type[delimiter + 1 :]

if used_type in DependenciesHardcoded.index:
Expand All @@ -51,11 +78,7 @@ def verify(self) -> DependencyVerifierResult:
if known_pkg is not None and known_pkg in self.import_path:
continue

if (
used_type in declared_index
or (delimiter != -1 and used_type_with_scope in declared_index)
or "." + used_type in declared_index
):
if self.is_type_declared(used_type, declared_index, father_child_type, used_type_with_scope):
continue

return DependencyVerifierResult(False, f'type "{used_type}" is not defined')
Expand Down
16 changes: 15 additions & 1 deletion karapace/protobuf/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ def _process_nested_type(
):
verifier.add_declared_type(package_name + "." + parent_name + "." + element_type.name)
verifier.add_declared_type(parent_name + "." + element_type.name)
anchestor_only = parent_name.find(".")
if anchestor_only != -1:
# adding the first father and the type name, this should be unique to identify which is which.
verifier.add_declared_type(parent_name[:anchestor_only] + "." + element_type.name, uniquely=True)

if isinstance(element_type, MessageElement):
for one_of in element_type.one_ofs:
Expand All @@ -169,7 +173,17 @@ def _process_nested_type(
one_of_parent_name = parent_name + "." + element_type.name
process_one_of(verifier, package_name, one_of_parent_name, one_of)
for field in element_type.fields:
verifier.add_used_type(parent_name, field.element_type)
# since we declare the subtype in the same level of the scope, it's legit
# use the same scoping when declare the dependent type.
if field.element_type in [defined_in_same_scope.name for defined_in_same_scope in element_type.nested_types]:
verifier.add_used_type(parent_name + "." + element_type.name, field.element_type)
else:
ancestor_only = parent_name.find(".")
if ancestor_only != -1:
verifier.add_used_type(parent_name[:ancestor_only], field.element_type)
else:
verifier.add_used_type(parent_name, field.element_type)

for nested_type in element_type.nested_types:
self._process_nested_type(verifier, package_name, parent_name + "." + element_type.name, nested_type)

Expand Down
52 changes: 52 additions & 0 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from __future__ import annotations

from kafka.errors import UnknownTopicOrPartitionError
from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRestAdminClient
from pytest import raises
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
Expand Down Expand Up @@ -186,6 +188,56 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie
# assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}"


async def test_another_avro_publish(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
):
topic = new_topic(admin_client)
other_tn = new_topic(admin_client)

await wait_for_topics(rest_async_client, topic_names=[topic, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
header = REST_HEADERS["avro"]

tested_avro_schema = {
"type": "record",
"name": "example",
"namespace": "example",
"doc": "example",
"fields": [{"type": "int", "name": "test", "doc": "my test number", "namespace": "test", "default": "5"}],
}

schema_str = json.dumps(tested_avro_schema)

# check succeeds with 1 record and brand new schema]
res = await registry_async_client.post(
f"subjects/{topic}-key/versions", json={"schema": schema_str, "schemaType": "AVRO"}
)
assert res.ok

key_schema_id = res.json()["id"]

res = await registry_async_client.post(
f"subjects/{topic}-value/versions", json={"schema": schema_str, "schemaType": "AVRO"}
)
assert res.ok

value_schema_id = res.json()["id"]

key_body = {"test": 5}

value_body = {"test": 5}

body = {
"key_schema_id": key_schema_id,
"value_schema_id": value_schema_id,
"records": [{"key": key_body, "value": value_body}],
}

url = f"/topics/{topic}"
res = await rest_async_client.post(url, json=body, headers=header)
assert res.ok

async def test_admin_client(admin_client, producer):
topic_names = [new_topic(admin_client) for i in range(10, 13)]
topic_info = admin_client.cluster_metadata()
Expand Down
179 changes: 175 additions & 4 deletions tests/integration/test_rest_consumer_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRestAdminClient
from karapace.protobuf.kotlin_wrapper import trim_margin
from tests.integration.test_rest import NEW_TOPIC_TIMEOUT
from tests.utils import (
create_subject_name_factory,
new_consumer,
new_random_name,
new_topic,
Expand All @@ -17,6 +17,7 @@
schema_data_second,
wait_for_topics,
)
from typing import Generator

import pytest

Expand Down Expand Up @@ -139,7 +140,7 @@ async def test_publish_protobuf_with_references(
res = await rest_async_client.post(
f"/topics/{topic_name}",
json=example_message,
headers=REST_HEADERS["avro"],
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

Expand Down Expand Up @@ -230,7 +231,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references(
res = await rest_async_client.post(
f"/topics/{topic_name}",
json=example_message,
headers=REST_HEADERS["avro"],
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

Expand All @@ -241,7 +242,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references(

consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000"

res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"])
res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"])
assert res.ok

resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"])
Expand All @@ -262,3 +263,173 @@ async def test_publish_and_consume_protobuf_with_recursive_references(
assert msg["offset"] == 0 and msg["partition"] == 0, "first message of the only partition available"
assert msg["topic"] == topic_name
assert msg["value"] == produced_message


@pytest.mark.parametrize("google_library_included", [True, False])
async def test_produce_and_retrieve_protobuf(
registry_async_client: Client,
rest_async_client: Client,
admin_client: KafkaRestAdminClient,
google_library_included: bool,
) -> None:
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
subject = create_subject_name_factory("test_produce_and_retrieve_protobuf")()
subject_topic = f"{topic_name}-value"

base_schema_subject = f"{subject}_base_schema_subject"
google_postal_address_schema_subject = f"{subject}_google_address_schema_subject"

CUSTOMER_PLACE_PROTO = """
syntax = "proto3";
package a1;
message Place {
string city = 1;
int32 zone = 2;
}
"""

body = {"schemaType": "PROTOBUF", "schema": CUSTOMER_PLACE_PROTO}
res = await registry_async_client.post(f"subjects/{base_schema_subject}/versions", json=body)
assert res.status_code == 200

if not google_library_included:
GOOGLE_POSTAL_ADDRESS_PROTO = """
syntax = "proto3";

package google.type;

option cc_enable_arenas = true;
option go_package = "google.golang.org/genproto/googleapis/type/postaladdress;postaladdress";
option java_multiple_files = true;
option java_outer_classname = "PostalAddressProto";
option java_package = "com.google.type";
option objc_class_prefix = "GTP";
message PostalAddress {
int32 revision = 1;
string region_code = 2;
string language_code = 3;
string postal_code = 4;
string sorting_code = 5;
string administrative_area = 6;
string locality = 7;
string sublocality = 8;
repeated string address_lines = 9;
repeated string recipients = 10;
string organization = 11;
}
"""

body = {"schemaType": "PROTOBUF", "schema": GOOGLE_POSTAL_ADDRESS_PROTO}
res = await registry_async_client.post(f"subjects/{google_postal_address_schema_subject}/versions", json=body)
assert res.status_code == 200

postal_address_import = (
'import "google/type/postal_address.proto";' if google_library_included else 'import "postal_address.proto";'
)

CUSTOMER_PROTO = f"""
syntax = "proto3";
package a1;
import "Place.proto";

{postal_address_import}

// @producer: another comment
message Customer {{
string name = 1;
int32 code = 2;
Place place = 3;
google.type.PostalAddress address = 4;
}}
"""

def references() -> Generator[str, None, None]:
yield {"name": "Place.proto", "subject": base_schema_subject, "version": 1}

if not google_library_included:
yield {"name": "postal_address.proto", "subject": google_postal_address_schema_subject, "version": 1}

body = {
"schemaType": "PROTOBUF",
"schema": CUSTOMER_PROTO,
"references": list(references()),
}
res = await registry_async_client.post(f"subjects/{subject_topic}/versions", json=body)

assert res.status_code == 200
topic_schema_id = res.json()["id"]

message_to_produce = [
{
"name": "John Doe",
"code": 123456,
"place": {"city": "New York", "zone": 5},
"address": {
"revision": 1,
"region_code": "US",
"postal_code": "10001",
"address_lines": ["123 Main St", "Apt 4"],
},
},
{
"name": "Sophie Smith",
"code": 987654,
"place": {"city": "London", "zone": 3},
"address": {
"revision": 2,
"region_code": "UK",
"postal_code": "SW1A 1AA",
"address_lines": ["10 Downing Street"],
},
},
{
"name": "Pierre Dupont",
"code": 246813,
"place": {"city": "Paris", "zone": 1},
"address": {"revision": 1, "region_code": "FR", "postal_code": "75001", "address_lines": ["1 Rue de Rivoli"]},
},
]

res = await rest_async_client.post(
f"/topics/{topic_name}",
json={"value_schema_id": topic_schema_id, "records": [{"value": m} for m in message_to_produce]},
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

group = new_random_name("protobuf_recursive_reference_message")
instance_id = await new_consumer(rest_async_client, group)

subscribe_path = f"/consumers/{group}/instances/{instance_id}/subscription"

consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000"

res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"])
assert res.ok

resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"])
data = resp.json()

assert isinstance(data, list)
assert len(data) == 3

for i in range(0, 3):
msg = data[i]
expected_message = message_to_produce[i]

assert "key" in msg
assert "offset" in msg
assert "topic" in msg
assert "value" in msg
assert "timestamp" in msg

assert msg["key"] is None, "no key defined in production"
assert msg["topic"] == topic_name

for key in expected_message.keys():
if key == "address":
for address_key in expected_message["address"].keys():
assert expected_message["address"][address_key] == msg["value"]["address"][address_key]
else:
assert msg["value"][key] == expected_message[key]
Loading