Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests, protobuf: add test on invalid reference schema #959

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from tests.base_testcase import BaseTestCase
from tests.utils import schema_protobuf_invalid
from tests.utils import schema_protobuf_invalid_because_corrupted, schema_protobuf_with_invalid_ref
from typing import Callable, List, Tuple
from unittest.mock import Mock

Expand Down Expand Up @@ -485,7 +485,7 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message:
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false, "schema":'
+ json.dumps(schema_protobuf_invalid).encode()
+ json.dumps(schema_protobuf_invalid_because_corrupted).encode()
+ b"}"
),
schema_type=SchemaType.PROTOBUF,
Expand Down Expand Up @@ -515,3 +515,62 @@ def test_message_error_handling(
assert log.name == "karapace.schema_reader"
assert log.levelname == "WARNING"
assert log.message == test_case.expected_log_message


def test_message_error_handling_with_invalid_reference_schema_protobuf(
caplog: LogCaptureFixture,
schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader],
message_factory: Callable[[bytes, bytes, int], Message],
) -> None:
# Given an invalid schema (corrupted)
key_ref = b'{"keytype":"SCHEMA","subject":"testref","version":1,"magic":1}'
value_ref = (
b'{"schemaType": "PROTOBUF", "subject": "testref", "version": 1, "id": 1, "deleted": false'
+ b', "schema": '
+ json.dumps(schema_protobuf_invalid_because_corrupted).encode()
+ b"}"
)
message_ref = message_factory(key=key_ref, value=value_ref)

# And given a schema referencing that corrupted schema (valid otherwise)
key_using_ref = b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}'
value_using_ref = (
b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false'
+ b', "schema": '
+ json.dumps(schema_protobuf_with_invalid_ref).encode()
+ b', "references": [{"name": "testref.proto", "subject": "testref", "version": 1}]'
+ b"}"
)
message_using_ref = message_factory(key=key_using_ref, value=value_using_ref)

with caplog.at_level(logging.WARN, logger="karapace.schema_reader"):
# When handling the corrupted schema
schema_reader = schema_reader_with_consumer_messages_factory(([message_ref],))

# Then the schema is recognised as invalid
with pytest.raises(CorruptKafkaRecordException):
schema_reader.handle_messages()

assert schema_reader.offset == 1
assert not schema_reader.ready

# When handling the schema
schema_reader.consumer.consume.side_effect = ([message_using_ref],)

# Then the schema is recognised as invalid because of the corrupted referenced schema
with pytest.raises(CorruptKafkaRecordException):
schema_reader.handle_messages()

assert schema_reader.offset == 1
assert not schema_reader.ready

warn_records = [r for r in caplog.records if r.levelname == "WARNING"]

assert len(warn_records) == 2

# Check that different warnings are logged for each schema
assert warn_records[0].name == "karapace.schema_reader"
assert warn_records[0].message == "Schema is not valid ProtoBuf definition"

assert warn_records[1].name == "karapace.schema_reader"
assert warn_records[1].message == "Invalid Protobuf references"
16 changes: 14 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
{"q": 3, "sensor_type": "L1", "nums": [3, 4], "order": {"item": "ABC01223"}},
]

schema_protobuf_invalid = """
schema_protobuf_invalid_because_corrupted = """
|o3"
|
|opti -- om.codingharbour.protobuf";
Expand All @@ -162,7 +162,19 @@
| HIGH = 0
| MIDDLE = ;
"""
schema_protobuf_invalid = trim_margin(schema_protobuf_invalid)
schema_protobuf_invalid_because_corrupted = trim_margin(schema_protobuf_invalid_because_corrupted)

schema_protobuf_with_invalid_ref = """
|syntax = "proto3";
|
|import "Message.proto";
|
|message MessageWithInvalidRef {
| string name = 1;
| Message ref = 2;
|}
|"""
schema_protobuf_with_invalid_ref = trim_margin(schema_protobuf_with_invalid_ref)

schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)}

Expand Down
Loading