Skip to content

Commit

Permalink
Merge pull request Aiven-Open#175 from aiven/hacka-rest-api-integrate…
Browse files Browse the repository at this point in the history
…-jsonschema-compatibility

rest api integrate jsonschema compatibility
  • Loading branch information
HelenMel authored Mar 17, 2021
2 parents b516638 + 44f0c0b commit 61b700e
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 255 deletions.
10 changes: 9 additions & 1 deletion karapace/avro_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
E = TypeVar("E", bound=Enum)


def parse_json_ignore_trailing(s: str) -> Any:
def parse_avro_schema_definition(s: str) -> Schema:
""" Compatibility function with Avro which ignores trailing data in JSON
strings.
Expand All @@ -30,6 +30,14 @@ def parse_json_ignore_trailing(s: str) -> Any:
return SchemaFromJSONData(json_data, names)


def is_incompatible(result: "SchemaCompatibilityResult") -> bool:
return result.compatibility is SchemaCompatibilityType.incompatible


def is_compatible(result: "SchemaCompatibilityResult") -> bool:
return result.compatibility is SchemaCompatibilityType.compatible


# TODO: remove SchemaCompatibilityType.incompatible, it can be determined from
# SchemaCompatibilityResult.incompatibilities
@unique
Expand Down
59 changes: 44 additions & 15 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
See LICENSE for details
"""
from enum import Enum, unique
from jsonschema import Draft7Validator
from karapace.avro_compatibility import (
ReaderWriterCompatibilityChecker as AvroChecker, SchemaCompatibilityType, SchemaIncompatibilityType
ReaderWriterCompatibilityChecker as AvroChecker, SchemaCompatibilityResult, SchemaCompatibilityType,
SchemaIncompatibilityType
)
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility
from karapace.schema_reader import SchemaType, TypedSchema

import logging
Expand All @@ -34,36 +37,62 @@ def is_transitive(self) -> bool:
return self.value in TRANSITIVE_MODES


class IncompatibleSchema(Exception):
pass


def check_avro_compatibility(reader_schema, writer_schema) -> None:
def check_avro_compatibility(reader_schema, writer_schema) -> SchemaCompatibilityResult:
result = AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema)
if (
result.compatibility is SchemaCompatibilityType.incompatible
and [SchemaIncompatibilityType.missing_enum_symbols] != result.incompatibilities
):
raise IncompatibleSchema(str(result.compatibility))
return result

return SchemaCompatibilityResult.compatible()


def check_compatibility(source: TypedSchema, target: TypedSchema, compatibility_mode: CompatibilityModes) -> None:
def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult:
return jsonschema_compatibility(reader, writer)


def check_compatibility(
source: TypedSchema, target: TypedSchema, compatibility_mode: CompatibilityModes
) -> SchemaCompatibilityResult:
if source.schema_type is not target.schema_type:
raise IncompatibleSchema(f"Comparing different schema types: {source.schema_type} with {target.schema_type}")
return SchemaCompatibilityResult.incompatible(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Comparing different schema types: {source.schema_type} with {target.schema_type}",
location=[],
)

if compatibility_mode is CompatibilityModes.NONE:
LOG.info("Compatibility level set to NONE, no schema compatibility checks performed")
return
return SchemaCompatibilityResult.compatible()

if source.schema_type is SchemaType.AVRO:
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
check_avro_compatibility(writer_schema=source.schema, reader_schema=target.schema)
result = check_avro_compatibility(reader_schema=target.schema, writer_schema=source.schema)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_avro_compatibility(reader_schema=source.schema, writer_schema=target.schema)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_avro_compatibility(reader_schema=target.schema, writer_schema=source.schema)
result = result.merged_with(check_avro_compatibility(reader_schema=source.schema, writer_schema=target.schema))

elif source.schema_type is SchemaType.JSONSCHEMA:
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
check_avro_compatibility(writer_schema=target.schema, reader_schema=source.schema)
result = check_jsonschema_compatibility(reader=source.schema, writer=target.schema)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
check_avro_compatibility(writer_schema=source.schema, reader_schema=target.schema)
check_avro_compatibility(writer_schema=target.schema, reader_schema=source.schema)
result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema)
result = result.merged_with(check_jsonschema_compatibility(reader=source.schema, writer=target.schema))

else:
result = SchemaCompatibilityResult.incompatible(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Unknow schema_type {source.schema_type}",
location=[],
)

LOG.info("Unknow schema_type %r", source.schema_type)
return result
8 changes: 4 additions & 4 deletions karapace/compatibility/jsonschema/checks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from dataclasses import dataclass
from itertools import product
from jsonschema import Draft7Validator
from karapace.avro_compatibility import SchemaCompatibilityResult
from karapace.avro_compatibility import is_compatible, is_incompatible, SchemaCompatibilityResult
from karapace.compatibility.jsonschema.types import (
AssertionCheck, BooleanSchema, Incompatibility, Instance, Keyword, Subschema
)
from karapace.compatibility.jsonschema.utils import (
get_type_of, gt, introduced_constraint, is_compatible, is_false_schema, is_incompatible, is_object_content_model_open,
is_simple_subschema, is_true_schema, is_tuple, is_tuple_without_additional_items, lt, maybe_get_subschemas_and_type, ne,
normalize_schema, schema_from_partially_open_content_model
get_type_of, gt, introduced_constraint, is_false_schema, is_object_content_model_open, is_simple_subschema,
is_true_schema, is_tuple, is_tuple_without_additional_items, lt, maybe_get_subschemas_and_type, ne, normalize_schema,
schema_from_partially_open_content_model
)
from typing import Any, List, Optional

Expand Down
14 changes: 0 additions & 14 deletions karapace/compatibility/jsonschema/parse.py

This file was deleted.

36 changes: 18 additions & 18 deletions karapace/compatibility/jsonschema/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from copy import copy
from jsonschema import Draft7Validator
from karapace.avro_compatibility import SchemaCompatibilityResult, SchemaCompatibilityType
from karapace.compatibility.jsonschema.types import BooleanSchema, Instance, Keyword, Subschema
from typing import Any, List, Optional, Tuple, TypeVar, Union
from typing import Any, List, Optional, Tuple, Type, TypeVar, Union

import re

Expand All @@ -18,6 +17,7 @@ def normalize_schema_rec(validator, original_schema) -> Any:
if isinstance(original_schema, (bool, str, float, int)) or original_schema is None:
return original_schema

normalized: Any
if isinstance(original_schema, dict):
scope = validator.ID_OF(original_schema)
resolver = validator.resolver
Expand All @@ -42,19 +42,11 @@ def normalize_schema_rec(validator, original_schema) -> Any:
elif isinstance(original_schema, list):
normalized = [normalize_schema_rec(validator, item) for item in original_schema]
else:
raise ValueError("Cannot handle object of type {type(original_schema)}")
raise ValueError(f"Cannot handle object of type {type(original_schema)}")

return normalized


def is_incompatible(result: SchemaCompatibilityResult) -> bool:
return result.compatibility is SchemaCompatibilityType.incompatible


def is_compatible(result: SchemaCompatibilityResult) -> bool:
return result.compatibility is SchemaCompatibilityType.compatible


def maybe_get_subschemas_and_type(schema: Any) -> Optional[Tuple[List[Any], Subschema]]:
"""If schema contains `anyOf`, `allOf`, or `oneOf`, return it.
Expand All @@ -77,6 +69,7 @@ def maybe_get_subschemas_and_type(schema: Any) -> Optional[Tuple[List[Any], Subs

type_value = schema.get(Keyword.TYPE.value)

subschema: Any
if isinstance(type_value, list):
normalized_schemas = []
for subtype in type_value:
Expand All @@ -86,11 +79,18 @@ def maybe_get_subschemas_and_type(schema: Any) -> Optional[Tuple[List[Any], Subs

return (normalized_schemas, Subschema.ANY_OF)

for type_ in (Subschema.ALL_OF, Subschema.ANY_OF, Subschema.ONE_OF, Subschema.NOT):
for type_ in (Subschema.ALL_OF, Subschema.ANY_OF, Subschema.ONE_OF):
subschema = schema.get(type_.value)
if subschema is not None:
# https://json-schema.org/draft/2020-12/json-schema-core.html#rfc.section.10.2.1
assert isinstance(subschema, list), "allOf/anyOf/oneOf must be an array"
return (subschema, type_)

subschema = schema.get(Subschema.NOT.value)
if subschema is not None:
# https://json-schema.org/draft/2020-12/json-schema-core.html#rfc.section.10.2.1
return ([subschema], Subschema.NOT)

return None


Expand Down Expand Up @@ -174,19 +174,19 @@ def is_false_schema(schema: Any) -> bool:
The `false` schema forbids a given value. For writers this means the value
is never produced, for readers it means the value is always rejected.
>>> is_false_schema(parse_schema_definition("false"))
>>> is_false_schema(parse_jsonschema_definition("false"))
True
>>> is_false_schema(parse_schema_definition('{"not":{}}'))
>>> is_false_schema(parse_jsonschema_definition('{"not":{}}'))
True
>>> is_false_schema(parse_schema_definition("{}"))
>>> is_false_schema(parse_jsonschema_definition("{}"))
False
>>> is_false_schema(parse_schema_definition("true"))
>>> is_false_schema(parse_jsonschema_definition("true"))
False
Note:
Negated schemas are not the same as the false schema:
>>> is_false_schema(parse_schema_definition('{"not":{"type":"number"}}'))
>>> is_false_schema(parse_jsonschema_definition('{"not":{"type":"number"}}'))
False
"""
# https://json-schema.org/draft/2020-12/json-schema-core.html#rfc.section.4.3.2
Expand Down Expand Up @@ -326,7 +326,7 @@ def schema_from_partially_open_content_model(schema: dict, target_property_name:
return schema.get(Keyword.ADDITIONAL_PROPERTIES.value)


def get_type_of(schema: Any) -> Union[Instance, Subschema, Keyword, BooleanSchema]:
def get_type_of(schema: Any) -> Union[Instance, Subschema, Keyword, Type[BooleanSchema]]:
# https://json-schema.org/draft/2020-12/json-schema-core.html#rfc.section.4.2.1

# The difference is due to the convertion of the JSON value null to the Python value None
Expand Down
27 changes: 18 additions & 9 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
"""
from avro.schema import Schema as AvroSchema, SchemaParseException
from enum import Enum, unique
from json import JSONDecodeError, loads
from json import JSONDecodeError
from jsonschema import Draft7Validator
from jsonschema.exceptions import SchemaError
from jsonschema.validators import Draft7Validator
from kafka import KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from karapace import constants
from karapace.avro_compatibility import parse_json_ignore_trailing
from karapace.avro_compatibility import parse_avro_schema_definition
from karapace.statsd import StatsClient
from karapace.utils import json_encode, KarapaceKafkaClient
from queue import Queue
Expand All @@ -27,6 +27,17 @@
log = logging.getLogger(__name__)


def parse_jsonschema_definition(schema_definition: str) -> Draft7Validator:
""" Parses and validates `schema_definition`.
Raises:
SchemaError: If `schema_definition` is not a valid Draft7 schema.
"""
schema = json.loads(schema_definition)
Draft7Validator.check_schema(schema)
return Draft7Validator(schema)


class InvalidSchema(Exception):
pass

Expand All @@ -47,17 +58,15 @@ def __init__(self, schema, schema_type: SchemaType, schema_str: str):
@staticmethod
def parse_json(schema_str: str):
try:
js = loads(schema_str)
Draft7Validator.check_schema(js)
assert "type" in js
return TypedSchema(Draft7Validator(js), SchemaType.JSONSCHEMA, schema_str)
except (JSONDecodeError, SchemaError, AssertionError) as e:
return TypedSchema(parse_jsonschema_definition(schema_str), SchemaType.JSONSCHEMA, schema_str)
# TypeError - Raised when the user forgets to encode the schema as a string.
except (TypeError, JSONDecodeError, SchemaError, AssertionError) as e:
raise InvalidSchema from e

@staticmethod
def parse_avro(schema_str: str): # pylint: disable=inconsistent-return-statements
try:
ts = TypedSchema(parse_json_ignore_trailing(schema_str), SchemaType.AVRO, schema_str)
ts = TypedSchema(parse_avro_schema_definition(schema_str), SchemaType.AVRO, schema_str)
return ts
except SchemaParseException as e:
raise InvalidSchema from e
Expand Down
Loading

0 comments on commit 61b700e

Please sign in to comment.