Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Centralize JSON SerDe into helper functions #2259

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7b50f09
moved deserialize_json and added serialize_json to _util
BuzzCutNorman Feb 20, 2024
e68e994
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Feb 20, 2024
e739083
add serialize_json to _flatten_record
BuzzCutNorman Feb 20, 2024
52f8f05
updated test_flattening_record
BuzzCutNorman Feb 20, 2024
209fc7d
update flattening related snapshots
BuzzCutNorman Feb 20, 2024
c8d594b
updated Tap.catalog_json_text to use serialize_json
BuzzCutNorman Feb 20, 2024
417000c
updated SQLConnector serialize_json and desrialize_json to use _util …
BuzzCutNorman Feb 20, 2024
20260c6
updated JSONLinesBatcher.get_batches to use serialize_json
BuzzCutNorman Feb 20, 2024
29b9903
added kwargs to serde functions, updated read_json_file to use deseri…
BuzzCutNorman Feb 20, 2024
9e4b698
updated Sink.process_batch_files to use deserialize_json
BuzzCutNorman Feb 20, 2024
8dc53ea
changed attributes to match SQLConnector versions.
BuzzCutNorman Feb 21, 2024
6ece47d
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Feb 21, 2024
efcc71e
moved type: ignore to correct line
BuzzCutNorman Feb 21, 2024
f11de62
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Feb 23, 2024
b04b362
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Feb 26, 2024
606387b
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Apr 5, 2024
a1f5b94
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into refa…
BuzzCutNorman Jun 24, 2024
04d9370
applied a mypy suggestion by removing redundant dictionary cast
BuzzCutNorman Jun 24, 2024
1182f38
Merge branch 'main' into refactor-move-json-serde-to-helpers
edgarrmondragon Jul 12, 2024
5cfa21e
Move base SerDe to _singerlib
edgarrmondragon Jul 12, 2024
b9faa7f
Re-add `SingerReader.deserialize_json`
edgarrmondragon Jul 12, 2024
dc5c7a1
Implement naive central JSON loading and dumping functions
edgarrmondragon Jul 12, 2024
3f29a1f
Use util methods to SerDe to and from SQL
edgarrmondragon Jul 12, 2024
96525f1
Make the Singer writer and reader classes generic
edgarrmondragon Jul 13, 2024
af9b5d5
Move Singer IO to _singerlib
edgarrmondragon Jul 13, 2024
f65e2e0
Handle uncovered code
edgarrmondragon Jul 13, 2024
05d1ac1
Update docstrings
edgarrmondragon Jul 13, 2024
ac7be38
Move Singer exception catching to reader implementation
edgarrmondragon Jul 13, 2024
168485c
Increase encoder test coverage
edgarrmondragon Jul 13, 2024
8f390f6
Re-use message writing and formatting logic
edgarrmondragon Jul 13, 2024
a5f67a5
Test records with version
edgarrmondragon Jul 13, 2024
362ad22
Merge branch 'main' into refactor-move-json-serde-to-helpers
edgarrmondragon Jul 16, 2024
f848370
Move encodings to a private submodule
edgarrmondragon Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 2 additions & 19 deletions singer_sdk/_singerlib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone

import simplejson as json
from singer_sdk.helpers._util import serialize_json
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

if sys.version_info < (3, 11):
from backports.datetime_fromisoformat import MonkeyPatch
Expand All @@ -26,18 +26,6 @@ class SingerMessageType(str, enum.Enum):
BATCH = "BATCH"


def _default_encoding(obj: t.Any) -> str: # noqa: ANN401
"""Default JSON encoder.

Args:
obj: The object to encode.

Returns:
The encoded object.
"""
return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj)


def exclude_null_dict(pairs: list[tuple[str, t.Any]]) -> dict[str, t.Any]:
"""Exclude null values from a dictionary.

Expand Down Expand Up @@ -226,12 +214,7 @@ def format_message(message: Message) -> str:
Returns:
The formatted message.
"""
return json.dumps(
message.to_dict(),
use_decimal=True,
default=_default_encoding,
separators=(",", ":"),
)
return serialize_json(message.to_dict())


def write_message(message: Message) -> None:
Expand Down
13 changes: 8 additions & 5 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@

from __future__ import annotations

import decimal
import json
import logging
import typing as t
import warnings
from contextlib import contextmanager
from datetime import datetime
from functools import lru_cache

import simplejson
import sqlalchemy as sa

from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers._util import (
deserialize_json as util_deserialize_json,
)
from singer_sdk.helpers._util import (
serialize_json as util_serialize_json,
)
from singer_sdk.helpers.capabilities import TargetLoadMethods

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -1175,7 +1178,7 @@ def serialize_json(self, obj: object) -> str:

.. versionadded:: 0.31.0
"""
return simplejson.dumps(obj, use_decimal=True)
return util_serialize_json(obj)

def deserialize_json(self, json_str: str) -> object:
"""Deserialize a JSON string to an object.
Expand All @@ -1191,7 +1194,7 @@ def deserialize_json(self, json_str: str) -> object:

.. versionadded:: 0.31.0
"""
return json.loads(json_str, parse_float=decimal.Decimal)
return util_deserialize_json(json_str)

def delete_old_versions(
self,
Expand Down
5 changes: 2 additions & 3 deletions singer_sdk/contrib/batch_encoder_jsonl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from __future__ import annotations

import gzip
import json
import typing as t
from uuid import uuid4

from singer_sdk.batch import BaseBatcher, lazy_chunked_generator
from singer_sdk.helpers._util import serialize_json

__all__ = ["JSONLinesBatcher"]

Expand Down Expand Up @@ -45,8 +45,7 @@ def get_batches(
mode="wb",
) as gz:
gz.writelines(
(json.dumps(record, default=str) + "\n").encode()
for record in chunk
(serialize_json(record) + "\n").encode() for record in chunk
)
file_url = fs.geturl(filename)
yield [file_url]
5 changes: 3 additions & 2 deletions singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from copy import deepcopy

import inflection
import simplejson as json

from singer_sdk.helpers._util import serialize_json

DEFAULT_FLATTENING_SEPARATOR = "__"

Expand Down Expand Up @@ -437,7 +438,7 @@ def _flatten_record(
items.append(
(
new_key,
json.dumps(v, use_decimal=True, default=str)
serialize_json(v)
if _should_jsondump_value(k, v, flattened_schema)
else v,
),
Expand Down
70 changes: 69 additions & 1 deletion singer_sdk/helpers/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,79 @@

from __future__ import annotations

import decimal
import json
import logging
import sys
import typing as t
from datetime import datetime
from pathlib import Path, PurePath

import pendulum
import simplejson

if sys.version_info < (3, 11):
from backports.datetime_fromisoformat import MonkeyPatch

MonkeyPatch.patch_fromisoformat()


logger = logging.getLogger(__name__)


def _default_encoding(obj: t.Any) -> str: # noqa: ANN401
"""Default JSON encoder.

Args:
obj: The object to encode.

Returns:
The encoded object.
"""
return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj)


def deserialize_json(json_str: str, **kwargs: t.Any) -> dict:
"""Deserialize a line of json.

Args:
json_str: A single line of json.
**kwargs: Optional key word arguments.

Returns:
A dictionary of the deserialized json.

Raises:
json.decoder.JSONDecodeError: raised if any lines are not valid json
"""
try:
return json.loads( # type: ignore[no-any-return]
json_str,
parse_float=decimal.Decimal,
**kwargs,
)
except json.decoder.JSONDecodeError as exc:
logger.error("Unable to parse:\n%s", json_str, exc_info=exc)
raise


def serialize_json(obj: object, **kwargs: t.Any) -> str:
"""Serialize a dictionary into a line of json.

Args:
obj: A Python object usually a dict.
**kwargs: Optional key word arguments.

Returns:
A string of serialized json.
"""
return simplejson.dumps(
obj,
use_decimal=True,
default=_default_encoding,
separators=(",", ":"),
**kwargs,
)


def read_json_file(path: PurePath | str) -> dict[str, t.Any]:
Expand All @@ -22,7 +90,7 @@ def read_json_file(path: PurePath | str) -> dict[str, t.Any]:
msg += f"\nFor more info, please see the sample template at: {template}"
raise FileExistsError(msg)

return t.cast(dict, json.loads(Path(path).read_text()))
return deserialize_json(Path(path).read_text())


def utc_now() -> pendulum.DateTime:
Expand Down
26 changes: 2 additions & 24 deletions singer_sdk/io_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from __future__ import annotations

import abc
import decimal
import json
import logging
import sys
import typing as t
Expand All @@ -13,6 +11,7 @@
from singer_sdk._singerlib.messages import Message, SingerMessageType
from singer_sdk._singerlib.messages import format_message as singer_format_message
from singer_sdk._singerlib.messages import write_message as singer_write_message
from singer_sdk.helpers._util import deserialize_json

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,27 +50,6 @@ def _assert_line_requires(line_dict: dict, requires: set[str]) -> None:
msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}"
raise Exception(msg) # TODO: Raise a more specific exception

def deserialize_json(self, line: str) -> dict:
"""Deserialize a line of json.

Args:
line: A single line of json.

Returns:
A dictionary of the deserialized json.

Raises:
json.decoder.JSONDecodeError: raised if any lines are not valid json
"""
try:
return json.loads( # type: ignore[no-any-return]
line,
parse_float=decimal.Decimal,
)
except json.decoder.JSONDecodeError as exc:
logger.error("Unable to parse:\n%s", line, exc_info=exc)
raise

def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]:
"""Internal method to process jsonl lines from a Singer tap.

Expand All @@ -83,7 +61,7 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]:
"""
stats: dict[str, int] = defaultdict(int)
for line in file_input:
line_dict = self.deserialize_json(line)
line_dict = deserialize_json(line)
self._assert_line_requires(line_dict, requires={"type"})

record_type: SingerMessageType = line_dict["type"]
Expand Down
6 changes: 4 additions & 2 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import copy
import datetime
import importlib.util
import json
import time
import typing as t
from functools import cached_property
Expand Down Expand Up @@ -38,6 +37,7 @@
get_datelike_property_type,
handle_invalid_timestamp_in_record,
)
from singer_sdk.helpers._util import deserialize_json

if t.TYPE_CHECKING:
from logging import Logger
Expand Down Expand Up @@ -713,7 +713,9 @@ def process_batch_files(
context_file = (
gzip_open(file) if encoding.compression == "gzip" else file
)
context = {"records": [json.loads(line) for line in context_file]} # type: ignore[attr-defined]
context = {
"records": [deserialize_json(line) for line in context_file] # type: ignore[attr-defined]
}
self.process_batch(context)
elif (
importlib.util.find_spec("pyarrow")
Expand Down
5 changes: 2 additions & 3 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import abc
import contextlib
import json
import typing as t
from enum import Enum

Expand All @@ -21,7 +20,7 @@
from singer_sdk.helpers import _state
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._state import write_stream_state
from singer_sdk.helpers._util import read_json_file
from singer_sdk.helpers._util import read_json_file, serialize_json
from singer_sdk.helpers.capabilities import (
BATCH_CONFIG,
CapabilitiesEnum,
Expand Down Expand Up @@ -313,7 +312,7 @@ def catalog_json_text(self) -> str:
Returns:
The tap's catalog as formatted JSON text.
"""
return json.dumps(self.catalog_dict, indent=2)
return serialize_json(self.catalog_dict, indent=2)

@property
def _singer_catalog(self) -> Catalog:
Expand Down
6 changes: 3 additions & 3 deletions tests/core/test_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
{
"key_1": 1,
"key_2__key_3": "value",
"key_2__key_4": '{"key_5": 1, "key_6": ["a", "b"]}',
"key_2__key_4": '{"key_5":1,"key_6":["a","b"]}',
},
id="flattened schema limiting the max level",
),
Expand All @@ -38,7 +38,7 @@
"key_1": 1,
"key_2__key_3": "value",
"key_2__key_4__key_5": 1,
"key_2__key_4__key_6": '["a", "b"]',
"key_2__key_4__key_6": '["a","b"]',
},
id="flattened schema not limiting the max level",
),
Expand All @@ -55,7 +55,7 @@
{
"key_1": 1,
"key_2__key_3": "value",
"key_2__key_4": '{"key_5": 1, "key_6": ["a", "b"]}',
"key_2__key_4": '{"key_5":1,"key_6":["a","b"]}',
},
id="max level limiting flattened schema",
),
Expand Down
8 changes: 3 additions & 5 deletions tests/core/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest

from singer_sdk._singerlib import RecordMessage
from singer_sdk.helpers._util import deserialize_json
from singer_sdk.io_base import SingerReader, SingerWriter


Expand Down Expand Up @@ -52,9 +53,8 @@ def _process_state_message(self, message_dict: dict) -> None:
],
)
def test_deserialize(line, expected, exception):
reader = DummyReader()
with exception:
assert reader.deserialize_json(line) == expected
assert deserialize_json(line) == expected


# Benchmark Tests
Expand Down Expand Up @@ -104,10 +104,8 @@ def test_bench_deserialize_json(benchmark, bench_encoded_record):
"""Run benchmark for Sink._validator method validate."""
number_of_runs = 1000

reader = DummyReader()

def run_deserialize_json():
for record in itertools.repeat(bench_encoded_record, number_of_runs):
reader.deserialize_json(record)
deserialize_json(record)

benchmark(run_deserialize_json)
6 changes: 3 additions & 3 deletions tests/snapshots/mapped_stream/flatten_all.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type":"STATE","value":{}}
{"type":"SCHEMA","stream":"mystream","schema":{"properties":{"email":{"type":["string","null"]},"count":{"type":["integer","null"]},"user__id":{"type":["integer","null"]},"user__sub__num":{"type":["integer","null"]},"user__sub__custom_obj":{"type":["string","null"]},"user__some_numbers":{"type":["string","null"]}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"mystream","record":{"email":"[email protected]","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14, 2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"mystream","record":{"email":"[email protected]","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32, 1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"mystream","record":{"email":"[email protected]","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414, 1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"mystream","record":{"email":"[email protected]","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14,2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"mystream","record":{"email":"[email protected]","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32,1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"mystream","record":{"email":"[email protected]","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414,1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}
Loading