Skip to content

Commit

Permalink
protbuf support skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Mar 30, 2021
1 parent 61b700e commit ba5c9c1
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 2 deletions.
18 changes: 18 additions & 0 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
SchemaIncompatibilityType
)
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility
from karapace.protobuf_compatibility import check_protobuf_schema_compatibility
from karapace.schema_reader import SchemaType, TypedSchema

import logging
Expand Down Expand Up @@ -52,6 +53,11 @@ def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Valida
return jsonschema_compatibility(reader, writer)


def check_protobuf_compatibility(reader_schema, writer_schema) -> SchemaCompatibilityResult:
result = check_protobuf_schema_compatibility(reader_schema, writer_schema)
return result


def check_compatibility(
source: TypedSchema, target: TypedSchema, compatibility_mode: CompatibilityModes
) -> SchemaCompatibilityResult:
Expand Down Expand Up @@ -88,6 +94,18 @@ def check_compatibility(
result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema)
result = result.merged_with(check_jsonschema_compatibility(reader=source.schema, writer=target.schema))

elif source.schema_type is SchemaType.PROTOBUF:
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_protobuf_compatibility(reader_schema=target.schema, writer_schema=source.schema)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_protobuf_compatibility(reader_schema=source.schema, writer_schema=target.schema)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_protobuf_compatibility(reader_schema=target.schema, writer_schema=source.schema)
result = result.merged_with(
check_protobuf_compatibility(reader_schema=source.schema, writer_schema=target.schema)
)
else:
result = SchemaCompatibilityResult.incompatible(
incompat_type=SchemaIncompatibilityType.type_mismatch,
Expand Down
4 changes: 3 additions & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
RECORD_CODES = [42201, 42202]
KNOWN_FORMATS = {"json", "avro", "binary"}
OFFSET_RESET_STRATEGIES = {"latest", "earliest"}
SCHEMA_MAPPINGS = {"avro": SchemaType.AVRO, "jsonschema": SchemaType.JSONSCHEMA}
# TODO: PROTOBUF* check schema mapping
SCHEMA_MAPPINGS = {"avro": SchemaType.AVRO, "jsonschema": SchemaType.JSONSCHEMA, "protobuf": SchemaType.PROTOBUF}

TypedConsumer = namedtuple("TypedConsumer", ["consumer", "serialization_format", "config"])


Expand Down
22 changes: 22 additions & 0 deletions karapace/protobuf_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# TODO: PROTOBUF* this functionality must be implemented
from karapace.avro_compatibility import SchemaCompatibilityResult


def parse_protobuf_schema_definition(schema_definition: str) -> str:
""" Parses and validates `schema_definition`.
Raises:
Nothing yet.
"""

return schema_definition


def check_protobuf_schema_compatibility(reader: str, writer: str) -> SchemaCompatibilityResult:
# TODO: PROTOBUF* for investigation purposes yet

if writer != reader:
return SchemaCompatibilityResult.compatible()

return SchemaCompatibilityResult.compatible()
13 changes: 13 additions & 0 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from kafka.errors import NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from karapace import constants
from karapace.avro_compatibility import parse_avro_schema_definition
from karapace.protobuf_compatibility import parse_protobuf_schema_definition
from karapace.statsd import StatsClient
from karapace.utils import json_encode, KarapaceKafkaClient
from queue import Queue
Expand All @@ -22,6 +23,7 @@

import json
import logging
import sys
import time

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,12 +73,23 @@ def parse_avro(schema_str: str): # pylint: disable=inconsistent-return-statemen
except SchemaParseException as e:
raise InvalidSchema from e

@staticmethod
def parse_protobuf(schema_str: str):
try:
return TypedSchema(parse_protobuf_schema_definition(schema_str), SchemaType.PROTOBUF, schema_str)
# TypeError - Raised when the user forgets to encode the schema as a string.
except: # FIXME: bare except
print("Unexpected error:", sys.exc_info()[0])
raise InvalidSchema

@staticmethod
def parse(schema_type: SchemaType, schema_str: str): # pylint: disable=inconsistent-return-statements
if schema_type is SchemaType.AVRO:
return TypedSchema.parse_avro(schema_str)
if schema_type is SchemaType.JSONSCHEMA:
return TypedSchema.parse_json(schema_str)
if schema_type is SchemaType.PROTOBUF:
return TypedSchema.parse_protobuf(schema_str)
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

def to_json(self):
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def _validate_schema_request_body(self, content_type, body) -> None:

def _validate_schema_type(self, content_type, body) -> None:
schema_type = SchemaType(body.get("schemaType", SchemaType.AVRO.value))
if schema_type not in {SchemaType.JSONSCHEMA, SchemaType.AVRO}:
if schema_type not in {SchemaType.JSONSCHEMA, SchemaType.AVRO, SchemaType.PROTOBUF}:
self.r(
body={
"error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
Expand Down
8 changes: 8 additions & 0 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def get_subject_name(self, topic_name: str, schema: str, subject_type: str, sche
namespace = schema_typed.schema.namespace
if schema_type is SchemaType.JSONSCHEMA:
namespace = schema_typed.to_json().get("namespace", "dummy")
# TODO: PROTOBUF* Seems protobuf does not use namespaces in terms of AVRO
return f"{self.subject_name_strategy(topic_name, namespace)}-{subject_type}"

async def get_schema_for_subject(self, subject: str) -> TypedSchema:
Expand Down Expand Up @@ -184,6 +185,10 @@ def read_value(schema: TypedSchema, bio: io.BytesIO):
except ValidationError as e:
raise InvalidPayload from e
return value
if schema.schema_type is SchemaType.PROTOBUF:
# TODO: PROTOBUF* we need use protobuf validator there
value = bio.read()
return value
raise ValueError("Unknown schema type")


Expand All @@ -197,6 +202,9 @@ def write_value(schema: TypedSchema, bio: io.BytesIO, value: dict):
except ValidationError as e:
raise InvalidPayload from e
bio.write(json_encode(value, binary=True))
elif schema.schema_type is SchemaType.PROTOBUF:
# TODO: PROTOBUF* we need use protobuf validator there
bio.write(value)
else:
raise ValueError("Unknown schema type")

Expand Down

0 comments on commit ba5c9c1

Please sign in to comment.