Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(targets): Create interface for schema validation in sinks, and implement it for python-jsonschema #2136

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9342f14
added invalid json schema and record exceptions
BuzzCutNorman Dec 13, 2023
97ebdc2
added jsonschema type tests for sinks
BuzzCutNorman Dec 13, 2023
ec61180
added jsonschema format tests for sinks
BuzzCutNorman Dec 13, 2023
b984573
add test of json schmea checking controls
BuzzCutNorman Dec 13, 2023
10fcb05
test_type_checker updated for jsonschema validation errors
BuzzCutNorman Dec 13, 2023
53c68a7
mostly updated format tests
BuzzCutNorman Jan 3, 2024
a1c24f2
first draft - abstraction of the JSON Schema library
BuzzCutNorman Jan 3, 2024
b7664d6
Merge branch 'main' into 1457-abstract-jsonschmea-library-allowing-cu…
BuzzCutNorman Jan 3, 2024
3c25550
chore: Feedback for PR 2136
edgarrmondragon Jan 4, 2024
c59823d
Apply suggestions from code review on 1/3/2023
BuzzCutNorman Jan 5, 2024
12049ca
Merge branch 'main' into 1457-abstract-jsonschmea-library-allowing-cu…
BuzzCutNorman Jan 9, 2024
91c2aaf
Merge branch 'main' into 1457-abstract-jsonschmea-library-allowing-cu…
BuzzCutNorman Jan 10, 2024
ddd0085
removed the file test_format_checker.py
BuzzCutNorman Jan 10, 2024
4b53078
wokring test_validate_record_jsonschema_format_checking_enabled_conti…
BuzzCutNorman Jan 10, 2024
213fd2b
Merge branch 'meltano:main' into 1457-abstract-jsonschmea-library-all…
BuzzCutNorman Jan 11, 2024
c91ab6e
Merge branch '2136-pr-feedback' of https://github.com/meltano/sdk int…
BuzzCutNorman Jan 11, 2024
d281407
Merge branch 'meltano-2136-pr-feedback' into 1457-abstract-jsonschmea…
BuzzCutNorman Jan 11, 2024
7a158b5
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1457…
BuzzCutNorman Jan 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"pytest-snapshot",
"pyarrow",
"requests-mock",
"rfc3339-validator",
"time-machine",
]

Expand Down
36 changes: 15 additions & 21 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ mypy = [
pytest-benchmark = ">=4.0.0"
pytest-snapshot = ">=0.9.0"
requests-mock = ">=1.10.0"
rfc3339-validator = ">=0.1.4"
time-machine = [
{ version = ">=2.10.0,<2.11", python = "<3.8" },
{ version = ">=2.10.0", python = ">=3.8" },
Expand Down
19 changes: 19 additions & 0 deletions singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,22 @@ class ConformedNameClashException(Exception):

class MissingKeyPropertiesError(Exception):
"""Raised when a recieved (and/or transformed) record is missing key properties."""


class InvalidJSONSchema(Exception):
"""Raised when a JSON schema is invalid."""


class InvalidRecord(Exception):
"""Raised when a stream record is invalid according to its declared schema."""

def __init__(self, error_message: str, record: dict) -> None:
"""Initialize an InvalidRecord exception.

Args:
error_message: A message describing the error.
record: The invalid record.
"""
super().__init__(f"Record Message Validation Error: {error_message}")
self.error_message = error_message
self.record = record
131 changes: 123 additions & 8 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
from gzip import open as gzip_open
from types import MappingProxyType

from jsonschema import Draft7Validator
import jsonschema
from typing_extensions import override

from singer_sdk.exceptions import MissingKeyPropertiesError
from singer_sdk.exceptions import (
InvalidJSONSchema,
InvalidRecord,
MissingKeyPropertiesError,
)
from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchConfig,
Expand All @@ -39,7 +44,83 @@

from singer_sdk.target_base import Target

JSONSchemaValidator = Draft7Validator

class BaseJSONSchemaValidator(abc.ABC):
"""Abstract base class for JSONSchema validator."""

def __init__(self, schema: dict[str, t.Any]) -> None:
"""Initialize the record validator.

Args:
schema: Schema of the stream to sink.
"""
self.schema = schema

@abc.abstractmethod
def validate(self, record: dict[str, t.Any]) -> None:
"""Validate a record message.

This method MUST raise an ``InvalidRecord`` exception if the record is invalid.

Args:
record: Record message to validate.
"""


class JSONSchemaValidator(BaseJSONSchemaValidator):
"""Validate records using the ``fastjsonschema`` library."""

def __init__(
self,
schema: dict,
*,
validate_formats: bool = False,
format_checker: jsonschema.FormatChecker | None = None,
):
"""Initialize the validator.

Args:
schema: Schema of the stream to sink.
validate_formats: Whether JSON string formats (e.g. ``date-time``) should
be validated.
format_checker: User-defined format checker.

Raises:
InvalidJSONSchema: If the schema provided from tap or mapper is invalid.
"""
jsonschema_validator = jsonschema.Draft7Validator

super().__init__(schema)
if validate_formats:
format_checker = format_checker or jsonschema_validator.FORMAT_CHECKER
else:
format_checker = jsonschema.FormatChecker(formats=())

try:
jsonschema_validator.check_schema(schema)
except jsonschema.SchemaError as e:
error_message = f"Schema Validation Error: {e}"
raise InvalidJSONSchema(error_message) from e

self.validator = jsonschema_validator(
schema=schema,
format_checker=format_checker,
)

@override
def validate(self, record: dict): # noqa: ANN201
"""Validate a record message.

Args:
record: Record message to validate.

Raises:
InvalidRecord: If the record is invalid.
"""
try:
self.validator.validate(record)
except jsonschema.ValidationError as e:
raise InvalidRecord(e.message, record) from e


class Sink(metaclass=abc.ABCMeta):
Expand All @@ -51,6 +132,15 @@ class Sink(metaclass=abc.ABCMeta):

MAX_SIZE_DEFAULT = 10000

validate_schema = True
"""Enable JSON schema record validation."""

validate_field_string_format = False
"""Enable JSON schema format validation, for example `date-time` string fields."""

fail_on_record_validation_exception: bool = True
"""Interrupt the target execution when a record fails schema validation."""

def __init__(
self,
target: Target,
Expand Down Expand Up @@ -95,10 +185,23 @@ def __init__(
self._batch_records_read: int = 0
self._batch_dupe_records_merged: int = 0

self._validator = Draft7Validator(
schema,
format_checker=Draft7Validator.FORMAT_CHECKER,
)
self._validator: BaseJSONSchemaValidator | None = self.get_validator()

def get_validator(self) -> BaseJSONSchemaValidator | None:
"""Get a record validator for this sink.

Override this method to use a custom format validator, or disable record
validation by returning `None`.

Returns:
An instance of a subclass of ``BaseJSONSchemaValidator``.
"""
if self.validate_schema:
return JSONSchemaValidator(
self.schema,
validate_formats=self.validate_field_string_format,
)
return None

def _get_context(self, record: dict) -> dict: # noqa: ARG002
"""Return an empty dictionary by default.
Expand Down Expand Up @@ -328,8 +431,20 @@ def _validate_and_parse(self, record: dict) -> dict:

Returns:
TODO

Raises:
InvalidRecord: If the record is invalid.
"""
self._validator.validate(record)
if self._validator is not None:
# TODO: Check the performance impact of this try/except block. It runs
# on every record, so it's probably bad and should be moved up the stack.
try:
self._validator.validate(record)
except InvalidRecord as e:
if self.fail_on_record_validation_exception:
raise
self.logger.exception("Record validation failed %s", e)

self._parse_timestamps_in_record(
record=record,
schema=self.schema,
Expand Down
100 changes: 100 additions & 0 deletions tests/core/sinks/test_type_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Test the custom type validator."""

from __future__ import annotations

import pytest
from typing_extensions import override

from singer_sdk.sinks.core import BaseJSONSchemaValidator, InvalidJSONSchema, Sink
from singer_sdk.target_base import Target


@pytest.fixture
def test_schema_invalid():
"""Return a test schema with an invalid type."""

return {
"type": "object",
"properties": {
"datetime_col": {"type": "ssttrriinngg", "format": "date-time"},
},
}


@pytest.fixture
def target():
"""Return a target object."""

class CustomTarget(Target):
name = "test_target"

return CustomTarget()


def test_default_schema_type_checks(target, test_schema_invalid):
"""Test type checks on _validator initialization."""

class CustomSink(Sink):
"""Custom sink class."""

@override
def process_batch(self, context: dict) -> None:
pass

@override
def process_record(self, record: dict, context: dict) -> None:
pass

with pytest.raises(
InvalidJSONSchema,
match=r"Schema Validation Error: 'ssttrriinngg' is not valid under any",
):
CustomSink(target, "test_stream", test_schema_invalid, None)


def test_disable_schema_type_checks_returning_none(target, test_schema_invalid):
"""Test type checks on _validator initialization."""

class CustomSink(Sink):
"""Custom sink class."""

@override
def get_validator(self) -> BaseJSONSchemaValidator | None:
"""Get a record validator for this sink.

Override this method to use a custom format validator
or disable jsonschema validator, by returning `None`.

Returns:
An instance of a subclass of ``BaseJSONSchemaValidator``.
"""
return None

@override
def process_batch(self, context: dict) -> None:
pass

@override
def process_record(self, record: dict, context: dict) -> None:
pass

CustomSink(target, "test_stream", test_schema_invalid, None)


def test_disable_schema_type_checks_setting_false(target, test_schema_invalid):
"""Test type checks on _validator initialization."""

class CustomSink(Sink):
"""Custom sink class."""

validate_schema = False

@override
def process_batch(self, context: dict) -> None:
pass

@override
def process_record(self, record: dict, context: dict) -> None:
pass

CustomSink(target, "test_stream", test_schema_invalid, None)
Loading