diff --git a/noxfile.py b/noxfile.py index 875e6670d..8a0d56a2c 100644 --- a/noxfile.py +++ b/noxfile.py @@ -42,6 +42,7 @@ "coverage[toml]", "duckdb", "duckdb-engine", + "fastjsonschema", "pyarrow", "pytest", "pytest-benchmark", diff --git a/poetry.lock b/poetry.lock index 0cecf51f1..2b258b5fc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -634,6 +634,20 @@ files = [ python-dateutil = ">=2.4" typing-extensions = {version = ">=3.10.0.1", markers = "python_version <= \"3.8\""} +[[package]] +name = "fastjsonschema" +version = "2.19.1" +description = "Fastest Python implementation of JSON schema" +optional = false +python-versions = "*" +files = [ + {file = "fastjsonschema-2.19.1-py3-none-any.whl", hash = "sha256:3672b47bc94178c9f23dbb654bf47440155d4db9df5f7bc47643315f9c405cd0"}, + {file = "fastjsonschema-2.19.1.tar.gz", hash = "sha256:e3126a94bdc4623d3de4485f8d468a12f02a67921315ddc87836d6e456dc789d"}, +] + +[package.extras] +devel = ["colorama", "json-spec", "jsonschema", "pylint", "pytest", "pytest-benchmark", "pytest-cache", "validictory"] + [[package]] name = "filelock" version = "3.12.4" @@ -2635,4 +2649,4 @@ testing = ["pytest", "pytest-durations"] [metadata] lock-version = "2.0" python-versions = ">=3.8" -content-hash = "e9747e01321a2fd07fb58447bced9656dca2b3602f9b124a9fbd68792c47c30b" +content-hash = "37a0fb5f11f53814739deaf7ba08ded83d345087110b1dae7c5b7956f27a3a51" diff --git a/pyproject.toml b/pyproject.toml index 51931a608..bd4c2ba17 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -120,6 +120,7 @@ coverage = {extras = ["toml"], version = ">=7.4"} duckdb = { version = ">=0.8.0", python = "<3.12" } duckdb-engine = { version = ">=0.9.4", python = "<3.12" } +fastjsonschema = ">=2.19.1" mypy = ">=1.0" pytest-benchmark = ">=4.0.0" pytest-snapshot = ">=0.9.0" diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 7cff95672..e35353789 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -201,6 +201,31 @@ def get_validator(self) -> BaseJSONSchemaValidator | None: Returns: An instance of a subclass of ``BaseJSONSchemaValidator``. + + Example implementation using the `fastjsonschema`_ library: + + .. code-block:: python + + import fastjsonschema + + + class FastJSONSchemaValidator(BaseJSONSchemaValidator): + def __init__(self, schema: dict[str, t.Any]) -> None: + super().__init__(schema) + try: + self.validator = fastjsonschema.compile(self.schema) + except fastjsonschema.JsonSchemaDefinitionException as e: + error_message = "Schema Validation Error" + raise InvalidJSONSchema(error_message) from e + + def validate(self, record: dict): + try: + self.validator(record) + except fastjsonschema.JsonSchemaValueException as e: + error_message = f"Record Message Validation Error: {e.message}" + raise InvalidRecord(error_message, record) from e + + .. _fastjsonschema: https://pypi.org/project/fastjsonschema/ """ if self.validate_schema: return JSONSchemaValidator( diff --git a/tests/core/sinks/test_validation.py b/tests/core/sinks/test_validation.py index e583fddad..c6a05ced1 100644 --- a/tests/core/sinks/test_validation.py +++ b/tests/core/sinks/test_validation.py @@ -2,13 +2,38 @@ import datetime import itertools +import typing as t +import fastjsonschema import pytest from singer_sdk.exceptions import InvalidRecord +from singer_sdk.sinks.core import BaseJSONSchemaValidator, InvalidJSONSchema from tests.conftest import BatchSinkMock, TargetMock +class FastJSONSchemaValidator(BaseJSONSchemaValidator): + def __init__(self, schema: dict[str, t.Any]) -> None: + super().__init__(schema) + try: + self.validator = fastjsonschema.compile(self.schema) + except fastjsonschema.JsonSchemaDefinitionException as e: + error_message = "Schema Validation Error" + raise InvalidJSONSchema(error_message) from e + + def validate(self, record: dict): + try: + self.validator(record) + except fastjsonschema.JsonSchemaValueException as e: + error_message = f"Record Message Validation Error: {e.message}" + raise InvalidRecord(error_message, record) from e + + +class FastJSONSchemaSink(BatchSinkMock): + def get_validator(self) -> BaseJSONSchemaValidator | None: + return FastJSONSchemaValidator(self.schema) + + def test_validate_record(): target = TargetMock() sink = BatchSinkMock( @@ -59,6 +84,42 @@ def test_validate_record(): assert updated_record["invalid_datetime"] == "9999-12-31 23:59:59.999999" +def test_validate_fastjsonschema(): + target = TargetMock() + sink = FastJSONSchemaSink( + target, + "users", + { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + "created_at_date": {"type": "string", "format": "date"}, + "created_at_time": {"type": "string", "format": "time"}, + "invalid_datetime": {"type": "string", "format": "date-time"}, + }, + }, + ["id"], + ) + + record = { + "id": 1, + "created_at": "2021-01-01T00:00:00+00:00", + "created_at_date": "2021-01-01", + "created_at_time": "00:01:00+00:00", + "missing_datetime": "2021-01-01T00:00:00+00:00", + "invalid_datetime": "not a datetime", + } + + with pytest.raises( + InvalidRecord, + match=r"Record Message Validation Error", + ) as exc_info: + sink._validator.validate(record) + + assert isinstance(exc_info.value.__cause__, fastjsonschema.JsonSchemaValueException) + + @pytest.fixture def draft7_sink_stop(): """Return a sink object with Draft7 checks enabled."""