diff --git a/.flake8 b/.flake8 index 9311ae8d0..07b7824bd 100644 --- a/.flake8 +++ b/.flake8 @@ -1,7 +1,7 @@ [flake8] max-line-length = 88 exclude = cookiecutter -ignore = E, W +ignore = E, F, W per-file-ignores = # Don't require docstrings conventions in private modules singer_sdk/helpers/_*.py:DAR diff --git a/singer_sdk/_singerlib/__init__.py b/singer_sdk/_singerlib/__init__.py index bc0b4523b..8386399d6 100644 --- a/singer_sdk/_singerlib/__init__.py +++ b/singer_sdk/_singerlib/__init__.py @@ -1,5 +1,6 @@ from __future__ import annotations +from singer_sdk._singerlib import exceptions from singer_sdk._singerlib.catalog import ( Catalog, CatalogEntry, @@ -16,6 +17,7 @@ SingerMessageType, StateMessage, exclude_null_dict, + format_message, write_message, ) from singer_sdk._singerlib.schema import Schema, resolve_schema_references @@ -35,7 +37,9 @@ "SingerMessageType", "StateMessage", "StreamMetadata", + "exceptions", "exclude_null_dict", + "format_message", "resolve_schema_references", "strftime", "strptime_to_utc", diff --git a/singer_sdk/_singerlib/_encoding/__init__.py b/singer_sdk/_singerlib/_encoding/__init__.py new file mode 100644 index 000000000..7819a1452 --- /dev/null +++ b/singer_sdk/_singerlib/_encoding/__init__.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from .base import GenericSingerReader, GenericSingerWriter, SingerMessageType +from .simple import SingerReader, SingerWriter + +__all__ = [ + "GenericSingerReader", + "GenericSingerWriter", + "SingerMessageType", + "SingerReader", + "SingerWriter", +] diff --git a/singer_sdk/_singerlib/_encoding/base.py b/singer_sdk/_singerlib/_encoding/base.py new file mode 100644 index 000000000..798a4f6dd --- /dev/null +++ b/singer_sdk/_singerlib/_encoding/base.py @@ -0,0 +1,341 @@ +"""Abstract base classes for all Singer messages IO operations.""" + +from __future__ import annotations + +import abc +import enum +import logging +import sys +import typing as t +from collections import Counter, defaultdict +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone + +from singer_sdk._singerlib import exceptions + +if sys.version_info < (3, 11): + from backports.datetime_fromisoformat import MonkeyPatch + + MonkeyPatch.patch_fromisoformat() + +logger = logging.getLogger(__name__) + + +# TODO: Use to default to 'str' here +# https://peps.python.org/pep-0696/ +T = t.TypeVar("T", str, bytes) + + +class SingerMessageType(str, enum.Enum): + """Singer specification message types.""" + + RECORD = "RECORD" + SCHEMA = "SCHEMA" + STATE = "STATE" + ACTIVATE_VERSION = "ACTIVATE_VERSION" + BATCH = "BATCH" + + +def exclude_null_dict(pairs: list[tuple[str, t.Any]]) -> dict[str, t.Any]: + """Exclude null values from a dictionary. + + Args: + pairs: The dictionary key-value pairs. + + Returns: + The filtered key-value pairs. + """ + return {key: value for key, value in pairs if value is not None} + + +@dataclass +class Message: + """Singer base message.""" + + type: SingerMessageType = field(init=False) + """The message type.""" + + def to_dict(self) -> dict[str, t.Any]: + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self, dict_factory=exclude_null_dict) + + @classmethod + def from_dict( + cls: t.Type[Message], # noqa: UP006 + data: dict[str, t.Any], + ) -> Message: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + data.pop("type") + return cls(**data) + + +@dataclass +class RecordMessage(Message): + """Singer record message.""" + + stream: str + """The stream name.""" + + record: dict[str, t.Any] + """The record data.""" + + version: int | None = None + """The record version.""" + + time_extracted: datetime | None = None + """The time the record was extracted.""" + + @classmethod + def from_dict(cls: type[RecordMessage], data: dict[str, t.Any]) -> RecordMessage: + """Create a record message from a dictionary. + + This overrides the default conversion logic, since it uses unnecessary + deep copying and is very slow. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + time_extracted = data.get("time_extracted") + return cls( + stream=data["stream"], + record=data["record"], + version=data.get("version"), + time_extracted=datetime.fromisoformat(time_extracted) + if time_extracted + else None, + ) + + def to_dict(self) -> dict[str, t.Any]: + """Return a dictionary representation of the message. + + This overrides the default conversion logic, since it uses unnecessary + deep copying and is very slow. + + Returns: + A dictionary with the defined message fields. + """ + result: dict[str, t.Any] = { + "type": "RECORD", + "stream": self.stream, + "record": self.record, + } + if self.version is not None: + result["version"] = self.version + if self.time_extracted is not None: + result["time_extracted"] = self.time_extracted + return result + + def __post_init__(self) -> None: + """Post-init processing. + + Raises: + ValueError: If the time_extracted is not timezone-aware. + """ + self.type = SingerMessageType.RECORD + if self.time_extracted and not self.time_extracted.tzinfo: + msg = ( + "'time_extracted' must be either None or an aware datetime (with a " + "time zone)" + ) + raise ValueError(msg) + + if self.time_extracted: + self.time_extracted = self.time_extracted.astimezone(timezone.utc) + + +@dataclass +class SchemaMessage(Message): + """Singer schema message.""" + + stream: str + """The stream name.""" + + schema: dict[str, t.Any] + """The schema definition.""" + + key_properties: t.Sequence[str] | None = None + """The key properties.""" + + bookmark_properties: list[str] | None = None + """The bookmark properties.""" + + def __post_init__(self) -> None: + """Post-init processing. + + Raises: + ValueError: If bookmark_properties is not a string or list of strings. + """ + self.type = SingerMessageType.SCHEMA + + if isinstance(self.bookmark_properties, (str, bytes)): + self.bookmark_properties = [self.bookmark_properties] + if self.bookmark_properties and not isinstance(self.bookmark_properties, list): + msg = "bookmark_properties must be a string or list of strings" + raise ValueError(msg) + + +@dataclass +class StateMessage(Message): + """Singer state message.""" + + value: dict[str, t.Any] + """The state value.""" + + def __post_init__(self) -> None: + """Post-init processing.""" + self.type = SingerMessageType.STATE + + +@dataclass +class ActivateVersionMessage(Message): + """Singer activate version message.""" + + stream: str + """The stream name.""" + + version: int + """The version to activate.""" + + def __post_init__(self) -> None: + """Post-init processing.""" + self.type = SingerMessageType.ACTIVATE_VERSION + + +class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins reading Singer messages as strings or bytes.""" + + @t.final + def listen(self, file_input: t.IO[T] | None = None) -> None: + """Read from input until all messages are processed. + + Args: + file_input: Readable stream of messages. Defaults to standard in. + """ + self._process_lines(file_input or self.default_input) + self._process_endofpipe() + + def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: + """Internal method to process jsonl lines from a Singer tap. + + Args: + file_input: Readable stream of messages, each on a separate line. + + Returns: + A counter object for the processed lines. + """ + stats: dict[str, int] = defaultdict(int) + for line in file_input: + line_dict = self.deserialize_json(line) + self._assert_line_requires(line_dict, requires={"type"}) + + record_type: SingerMessageType = line_dict["type"] + if record_type == SingerMessageType.SCHEMA: + self._process_schema_message(line_dict) + + elif record_type == SingerMessageType.RECORD: + self._process_record_message(line_dict) + + elif record_type == SingerMessageType.ACTIVATE_VERSION: + self._process_activate_version_message(line_dict) + + elif record_type == SingerMessageType.STATE: + self._process_state_message(line_dict) + + elif record_type == SingerMessageType.BATCH: + self._process_batch_message(line_dict) + + else: + self._process_unknown_message(line_dict) + + stats[record_type] += 1 + + return Counter(**stats) + + @property + @abc.abstractmethod + def default_input(self) -> t.IO[T]: ... + + @staticmethod + def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: + """Check if dictionary . + + Args: + line_dict: TODO + requires: TODO + + Raises: + InvalidInputLine: raised if any required keys are missing + """ + if not requires.issubset(line_dict): + missing = requires - set(line_dict) + msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" + raise exceptions.InvalidInputLine(msg) + + @abc.abstractmethod + def deserialize_json(self, line: T) -> dict: ... + + @abc.abstractmethod + def _process_schema_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_record_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_state_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_activate_version_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_batch_message(self, message_dict: dict) -> None: ... + + def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 + """Internal method to process unknown message types from a Singer tap. + + Args: + message_dict: Dictionary representation of the Singer message. + + Raises: + ValueError: raised if a message type is not recognized + """ + record_type = message_dict["type"] + msg = f"Unknown message type '{record_type}' in message." + raise ValueError(msg) + + def _process_endofpipe(self) -> None: # noqa: PLR6301 + logger.debug("End of pipe reached") + + +class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins writing Singer messages as strings or bytes.""" + + def format_message(self, message: Message) -> T: + """Format a message as a JSON string. + + Args: + message: The message to format. + + Returns: + The formatted message. + """ + return self.serialize_json(message.to_dict()) + + @abc.abstractmethod + def serialize_json(self, obj: object) -> T: ... + + @abc.abstractmethod + def write_message(self, message: Message) -> None: ... diff --git a/singer_sdk/_singerlib/_encoding/simple.py b/singer_sdk/_singerlib/_encoding/simple.py new file mode 100644 index 000000000..636a8f981 --- /dev/null +++ b/singer_sdk/_singerlib/_encoding/simple.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import json +import logging +import sys +import typing as t + +from singer_sdk._singerlib.exceptions import InvalidInputLine +from singer_sdk._singerlib.json import deserialize_json, serialize_json + +from .base import GenericSingerReader, GenericSingerWriter + +if t.TYPE_CHECKING: + from singer_sdk._singerlib.messages import Message + +logger = logging.getLogger(__name__) + + +class SingerReader(GenericSingerReader[str]): + """Base class for all plugins reading Singer messages as strings from stdin.""" + + default_input = sys.stdin + + def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 + """Deserialize a line of json. + + Args: + line: A single line of json. + + Returns: + A dictionary of the deserialized json. + + Raises: + InvalidInputLine: If the line is not valid JSON. + """ + try: + return deserialize_json(line) + except json.decoder.JSONDecodeError as exc: + logger.exception("Unable to parse:\n%s", line) + msg = f"Unable to parse line as JSON: {line}" + raise InvalidInputLine(msg) from exc + + +class SingerWriter(GenericSingerWriter[str]): + """Interface for all plugins writing Singer messages to stdout.""" + + def serialize_json(self, obj: object) -> str: # noqa: PLR6301 + """Serialize a dictionary into a line of json. + + Args: + obj: A Python object usually a dict. + + Returns: + A string of serialized json. + """ + return serialize_json(obj) + + def write_message(self, message: Message) -> None: + """Write a message to stdout. + + Args: + message: The message to write. + """ + sys.stdout.write(self.format_message(message) + "\n") + sys.stdout.flush() diff --git a/singer_sdk/_singerlib/exceptions.py b/singer_sdk/_singerlib/exceptions.py new file mode 100644 index 000000000..e3726bb6a --- /dev/null +++ b/singer_sdk/_singerlib/exceptions.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +__all__ = [ + "InvalidInputLine", +] + + +class InvalidInputLine(Exception): + """Raised when an input line is not a valid Singer message.""" diff --git a/singer_sdk/_singerlib/json.py b/singer_sdk/_singerlib/json.py new file mode 100644 index 000000000..acb94ad9c --- /dev/null +++ b/singer_sdk/_singerlib/json.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import datetime +import decimal +import json +import typing as t + +import simplejson + +__all__ = [ + "deserialize_json", + "serialize_json", +] + + +def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 + """Default JSON encoder. + + Args: + obj: The object to encode. + + Returns: + The encoded object. + """ + return obj.isoformat(sep="T") if isinstance(obj, datetime.datetime) else str(obj) + + +def deserialize_json(json_str: str | bytes, **kwargs: t.Any) -> dict: + """Deserialize a line of json. + + Args: + json_str: A single line of json. + **kwargs: Optional key word arguments. + + Returns: + A dictionary of the deserialized json. + """ + return json.loads( # type: ignore[no-any-return] + json_str, + parse_float=decimal.Decimal, + **kwargs, + ) + + +def serialize_json(obj: object, **kwargs: t.Any) -> str: + """Serialize a dictionary into a line of json. + + Args: + obj: A Python object usually a dict. + **kwargs: Optional key word arguments. + + Returns: + A string of serialized json. + """ + return simplejson.dumps( + obj, + use_decimal=True, + default=_default_encoding, + separators=(",", ":"), + **kwargs, + ) diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index f9a93b76b..b08739126 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -2,243 +2,29 @@ from __future__ import annotations -import enum -import sys -import typing as t -from dataclasses import asdict, dataclass, field -from datetime import datetime, timezone - -import simplejson as json - -if sys.version_info < (3, 11): - from backports.datetime_fromisoformat import MonkeyPatch - - MonkeyPatch.patch_fromisoformat() - - -class SingerMessageType(str, enum.Enum): - """Singer specification message types.""" - - RECORD = "RECORD" - SCHEMA = "SCHEMA" - STATE = "STATE" - ACTIVATE_VERSION = "ACTIVATE_VERSION" - BATCH = "BATCH" - - -def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 - """Default JSON encoder. - - Args: - obj: The object to encode. - - Returns: - The encoded object. - """ - return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj) - - -def exclude_null_dict(pairs: list[tuple[str, t.Any]]) -> dict[str, t.Any]: - """Exclude null values from a dictionary. - - Args: - pairs: The dictionary key-value pairs. - - Returns: - The filtered key-value pairs. - """ - return {key: value for key, value in pairs if value is not None} - - -@dataclass -class Message: - """Singer base message.""" - - type: SingerMessageType = field(init=False) - """The message type.""" - - def to_dict(self) -> dict[str, t.Any]: - """Return a dictionary representation of the message. - - Returns: - A dictionary with the defined message fields. - """ - return asdict(self, dict_factory=exclude_null_dict) - - @classmethod - def from_dict( - cls: t.Type[Message], # noqa: UP006 - data: dict[str, t.Any], - ) -> Message: - """Create an encoding from a dictionary. - - Args: - data: The dictionary to create the message from. - - Returns: - The created message. - """ - data.pop("type") - return cls(**data) - - -@dataclass -class RecordMessage(Message): - """Singer record message.""" - - stream: str - """The stream name.""" - - record: dict[str, t.Any] - """The record data.""" - - version: int | None = None - """The record version.""" - - time_extracted: datetime | None = None - """The time the record was extracted.""" - - @classmethod - def from_dict(cls: type[RecordMessage], data: dict[str, t.Any]) -> RecordMessage: - """Create a record message from a dictionary. - - This overrides the default conversion logic, since it uses unnecessary - deep copying and is very slow. - - Args: - data: The dictionary to create the message from. - - Returns: - The created message. - """ - time_extracted = data.get("time_extracted") - return cls( - stream=data["stream"], - record=data["record"], - version=data.get("version"), - time_extracted=datetime.fromisoformat(time_extracted) - if time_extracted - else None, - ) - - def to_dict(self) -> dict[str, t.Any]: - """Return a dictionary representation of the message. - - This overrides the default conversion logic, since it uses unnecessary - deep copying and is very slow. - - Returns: - A dictionary with the defined message fields. - """ - result: dict[str, t.Any] = { - "type": "RECORD", - "stream": self.stream, - "record": self.record, - } - if self.version is not None: - result["version"] = self.version - if self.time_extracted is not None: - result["time_extracted"] = self.time_extracted - return result - - def __post_init__(self) -> None: - """Post-init processing. - - Raises: - ValueError: If the time_extracted is not timezone-aware. - """ - self.type = SingerMessageType.RECORD - if self.time_extracted and not self.time_extracted.tzinfo: - msg = ( - "'time_extracted' must be either None or an aware datetime (with a " - "time zone)" - ) - raise ValueError(msg) - - if self.time_extracted: - self.time_extracted = self.time_extracted.astimezone(timezone.utc) - - -@dataclass -class SchemaMessage(Message): - """Singer schema message.""" - - stream: str - """The stream name.""" - - schema: dict[str, t.Any] - """The schema definition.""" - - key_properties: t.Sequence[str] | None = None - """The key properties.""" - - bookmark_properties: list[str] | None = None - """The bookmark properties.""" - - def __post_init__(self) -> None: - """Post-init processing. - - Raises: - ValueError: If bookmark_properties is not a string or list of strings. - """ - self.type = SingerMessageType.SCHEMA - - if isinstance(self.bookmark_properties, (str, bytes)): - self.bookmark_properties = [self.bookmark_properties] - if self.bookmark_properties and not isinstance(self.bookmark_properties, list): - msg = "bookmark_properties must be a string or list of strings" - raise ValueError(msg) - - -@dataclass -class StateMessage(Message): - """Singer state message.""" - - value: dict[str, t.Any] - """The state value.""" - - def __post_init__(self) -> None: - """Post-init processing.""" - self.type = SingerMessageType.STATE - - -@dataclass -class ActivateVersionMessage(Message): - """Singer activate version message.""" - - stream: str - """The stream name.""" - - version: int - """The version to activate.""" - - def __post_init__(self) -> None: - """Post-init processing.""" - self.type = SingerMessageType.ACTIVATE_VERSION - - -def format_message(message: Message) -> str: - """Format a message as a JSON string. - - Args: - message: The message to format. - - Returns: - The formatted message. - """ - return json.dumps( - message.to_dict(), - use_decimal=True, - default=_default_encoding, - separators=(",", ":"), - ) - - -def write_message(message: Message) -> None: - """Write a message to stdout. - - Args: - message: The message to write. - """ - sys.stdout.write(format_message(message) + "\n") - sys.stdout.flush() +from singer_sdk._singerlib._encoding import SingerWriter +from singer_sdk._singerlib._encoding.base import ( + ActivateVersionMessage, + Message, + RecordMessage, + SchemaMessage, + SingerMessageType, + StateMessage, + exclude_null_dict, +) + +__all__ = [ + "ActivateVersionMessage", + "Message", + "RecordMessage", + "SchemaMessage", + "SingerMessageType", + "StateMessage", + "exclude_null_dict", + "format_message", + "write_message", +] + +WRITER = SingerWriter() +format_message = WRITER.format_message +write_message = WRITER.write_message diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 132c37518..4d2dd4842 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -2,8 +2,6 @@ from __future__ import annotations -import decimal -import json import logging import typing as t import warnings @@ -11,12 +9,12 @@ from datetime import datetime from functools import lru_cache -import simplejson import sqlalchemy as sa from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.exceptions import ConfigValidationError +from singer_sdk.helpers._util import dump_json, load_json from singer_sdk.helpers.capabilities import TargetLoadMethods if t.TYPE_CHECKING: @@ -1167,7 +1165,7 @@ def serialize_json(self, obj: object) -> str: # noqa: PLR6301 .. versionadded:: 0.31.0 """ - return simplejson.dumps(obj, use_decimal=True) + return dump_json(obj) def deserialize_json(self, json_str: str) -> object: # noqa: PLR6301 """Deserialize a JSON string to an object. @@ -1183,7 +1181,7 @@ def deserialize_json(self, json_str: str) -> object: # noqa: PLR6301 .. versionadded:: 0.31.0 """ - return json.loads(json_str, parse_float=decimal.Decimal) + return load_json(json_str) def delete_old_versions( self, diff --git a/singer_sdk/contrib/batch_encoder_jsonl.py b/singer_sdk/contrib/batch_encoder_jsonl.py index 6ce4c8793..6f121f8d4 100644 --- a/singer_sdk/contrib/batch_encoder_jsonl.py +++ b/singer_sdk/contrib/batch_encoder_jsonl.py @@ -3,10 +3,10 @@ from __future__ import annotations import gzip -import json import typing as t from uuid import uuid4 +from singer_sdk._singerlib.json import serialize_json from singer_sdk.batch import BaseBatcher, lazy_chunked_generator __all__ = ["JSONLinesBatcher"] @@ -45,8 +45,7 @@ def get_batches( mode="wb", ) as gz: gz.writelines( - (json.dumps(record, default=str) + "\n").encode() - for record in chunk + (serialize_json(record) + "\n").encode() for record in chunk ) file_url = fs.geturl(filename) yield [file_url] diff --git a/singer_sdk/exceptions.py b/singer_sdk/exceptions.py index 20ec7ae65..a766952f9 100644 --- a/singer_sdk/exceptions.py +++ b/singer_sdk/exceptions.py @@ -5,6 +5,8 @@ import abc import typing as t +from singer_sdk._singerlib.exceptions import InvalidInputLine # noqa: F401 + if t.TYPE_CHECKING: import requests @@ -137,11 +139,7 @@ class ConformedNameClashException(Exception): class MissingKeyPropertiesError(Exception): - """Raised when a recieved (and/or transformed) record is missing key properties.""" - - -class InvalidInputLine(Exception): - """Raised when an input line is not a valid Singer message.""" + """Raised when a received (and/or transformed) record is missing key properties.""" class InvalidJSONSchema(Exception): diff --git a/singer_sdk/helpers/_flattening.py b/singer_sdk/helpers/_flattening.py index 145b76bb2..79ca50fdc 100644 --- a/singer_sdk/helpers/_flattening.py +++ b/singer_sdk/helpers/_flattening.py @@ -9,7 +9,8 @@ from copy import deepcopy import inflection -import simplejson as json + +from singer_sdk._singerlib.json import serialize_json DEFAULT_FLATTENING_SEPARATOR = "__" @@ -435,7 +436,7 @@ def _flatten_record( items.append( ( new_key, - json.dumps(v, use_decimal=True, default=str) + serialize_json(v) if _should_jsondump_value(k, v, flattened_schema) else v, ), diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py index 0e8250c2a..308fd7a30 100644 --- a/singer_sdk/helpers/_util.py +++ b/singer_sdk/helpers/_util.py @@ -3,10 +3,53 @@ from __future__ import annotations import datetime -import json +import decimal import typing as t from pathlib import Path, PurePath +import simplejson + + +def dump_json(obj: t.Any, **kwargs: t.Any) -> str: # noqa: ANN401 + """Dump json data to a file. + + Args: + obj: A Python object, usually a dict. + **kwargs: Optional key word arguments. + + Returns: + A string of serialized json. + + .. warning:: Do not use this function to serialize Singer messages or bulk data. + Use the functions in ``singer_sdk._singerlib.json`` instead. + """ + return simplejson.dumps( + obj, + use_decimal=True, + separators=(",", ":"), + **kwargs, + ) + + +def load_json(json_str: str, **kwargs: t.Any) -> dict: + """Load json data from a file. + + Args: + json_str: A valid JSON string. + **kwargs: Optional key word arguments. + + Returns: + A Python object, usually a dict. + + .. warning:: Do not use this function to parse Singer messages or bulk data. + Use the functions in ``singer_sdk._singerlib.json`` instead. + """ + return simplejson.loads( # type: ignore[no-any-return] + json_str, + parse_float=decimal.Decimal, + **kwargs, + ) + def read_json_file(path: PurePath | str) -> dict[str, t.Any]: """Read json file, throwing an error if missing.""" @@ -21,7 +64,7 @@ def read_json_file(path: PurePath | str) -> dict[str, t.Any]: msg += f"\nFor more info, please see the sample template at: {template}" raise FileExistsError(msg) - return t.cast(dict, json.loads(Path(path).read_text(encoding="utf-8"))) + return load_json(Path(path).read_text(encoding="utf-8")) def utc_now() -> datetime.datetime: diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index 2c5698e29..ddc933eb8 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -2,164 +2,4 @@ from __future__ import annotations -import abc -import decimal -import json -import logging -import sys -import typing as t -from collections import Counter, defaultdict - -from singer_sdk._singerlib.messages import Message, SingerMessageType -from singer_sdk._singerlib.messages import format_message as singer_format_message -from singer_sdk._singerlib.messages import write_message as singer_write_message -from singer_sdk.exceptions import InvalidInputLine - -logger = logging.getLogger(__name__) - - -class SingerReader(metaclass=abc.ABCMeta): - """Interface for all plugins reading Singer messages from stdin.""" - - @t.final - def listen(self, file_input: t.IO[str] | None = None) -> None: - """Read from input until all messages are processed. - - Args: - file_input: Readable stream of messages. Defaults to standard in. - - This method is internal to the SDK and should not need to be overridden. - """ - if not file_input: - file_input = sys.stdin - - self._process_lines(file_input) - self._process_endofpipe() - - @staticmethod - def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: - """Check if dictionary . - - Args: - line_dict: TODO - requires: TODO - - Raises: - InvalidInputLine: raised if any required keys are missing - """ - if not requires.issubset(line_dict): - missing = requires - set(line_dict) - msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" - raise InvalidInputLine(msg) - - def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 - """Deserialize a line of json. - - Args: - line: A single line of json. - - Returns: - A dictionary of the deserialized json. - - Raises: - json.decoder.JSONDecodeError: raised if any lines are not valid json - """ - try: - return json.loads( # type: ignore[no-any-return] - line, - parse_float=decimal.Decimal, - ) - except json.decoder.JSONDecodeError as exc: - logger.exception("Unable to parse:\n%s", line, exc_info=exc) - raise - - def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: - """Internal method to process jsonl lines from a Singer tap. - - Args: - file_input: Readable stream of messages, each on a separate line. - - Returns: - A counter object for the processed lines. - """ - stats: dict[str, int] = defaultdict(int) - for line in file_input: - line_dict = self.deserialize_json(line) - self._assert_line_requires(line_dict, requires={"type"}) - - record_type: SingerMessageType = line_dict["type"] - if record_type == SingerMessageType.SCHEMA: - self._process_schema_message(line_dict) - - elif record_type == SingerMessageType.RECORD: - self._process_record_message(line_dict) - - elif record_type == SingerMessageType.ACTIVATE_VERSION: - self._process_activate_version_message(line_dict) - - elif record_type == SingerMessageType.STATE: - self._process_state_message(line_dict) - - elif record_type == SingerMessageType.BATCH: - self._process_batch_message(line_dict) - - else: - self._process_unknown_message(line_dict) - - stats[record_type] += 1 - - return Counter(**stats) - - @abc.abstractmethod - def _process_schema_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_record_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_state_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_activate_version_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_batch_message(self, message_dict: dict) -> None: ... - - def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 - """Internal method to process unknown message types from a Singer tap. - - Args: - message_dict: Dictionary representation of the Singer message. - - Raises: - ValueError: raised if a message type is not recognized - """ - record_type = message_dict["type"] - msg = f"Unknown message type '{record_type}' in message." - raise ValueError(msg) - - def _process_endofpipe(self) -> None: # noqa: PLR6301 - logger.debug("End of pipe reached") - - -class SingerWriter: - """Interface for all plugins writting Singer messages to stdout.""" - - def format_message(self, message: Message) -> str: # noqa: PLR6301 - """Format a message as a JSON string. - - Args: - message: The message to format. - - Returns: - The formatted message. - """ - return singer_format_message(message) - - def write_message(self, message: Message) -> None: # noqa: PLR6301 - """Write a message to stdout. - - Args: - message: The message to write. - """ - singer_write_message(message) +from singer_sdk._singerlib._encoding import * # noqa: F403 diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index a2f54c8ca..53533d58b 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -6,7 +6,6 @@ import copy import datetime import importlib.util -import json import time import typing as t from functools import cached_property @@ -17,6 +16,7 @@ import jsonschema from typing_extensions import override +from singer_sdk._singerlib.json import deserialize_json from singer_sdk.exceptions import ( InvalidJSONSchema, InvalidRecord, @@ -714,7 +714,9 @@ def process_batch_files( context_file = ( gzip_open(file) if encoding.compression == "gzip" else file ) - context = {"records": [json.loads(line) for line in context_file]} # type: ignore[attr-defined] + context = { + "records": [deserialize_json(line) for line in context_file] # type: ignore[attr-defined] + } self.process_batch(context) elif ( importlib.util.find_spec("pyarrow") diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index d8fb75a8f..d69fa5f38 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -4,7 +4,6 @@ import abc import contextlib -import json import typing as t from enum import Enum @@ -20,7 +19,7 @@ from singer_sdk.helpers import _state from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._state import write_stream_state -from singer_sdk.helpers._util import read_json_file +from singer_sdk.helpers._util import dump_json, read_json_file from singer_sdk.helpers.capabilities import ( BATCH_CONFIG, CapabilitiesEnum, @@ -312,7 +311,7 @@ def catalog_json_text(self) -> str: Returns: The tap's catalog as formatted JSON text. """ - return json.dumps(self.catalog_dict, indent=2) + return dump_json(self.catalog_dict, indent=2) @property def _singer_catalog(self) -> Catalog: diff --git a/tests/_singerlib/test_messages.py b/tests/_singerlib/test_messages.py index 491573545..f9cd73bbe 100644 --- a/tests/_singerlib/test_messages.py +++ b/tests/_singerlib/test_messages.py @@ -8,7 +8,6 @@ from pytz import timezone import singer_sdk._singerlib as singer -from singer_sdk.io_base import SingerWriter UTC = datetime.timezone.utc @@ -19,24 +18,22 @@ def test_exclude_null_dict(): def test_format_message(): - singerwriter = SingerWriter() message = singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, ) - assert singerwriter.format_message(message) == ( + assert singer.format_message(message) == ( '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}' ) def test_write_message(): - singerwriter = SingerWriter() message = singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, ) with redirect_stdout(io.StringIO()) as out: - singerwriter.write_message(message) + singer.write_message(message) assert out.getvalue() == ( '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' @@ -96,6 +93,21 @@ def test_record_message_time_extracted_to_utc(): assert record.time_extracted == datetime.datetime(2021, 1, 1, 9, tzinfo=UTC) +def test_record_message_with_version(): + record = singer.RecordMessage( + stream="test", + record={"id": 1, "name": "test"}, + version=1614556800, + ) + assert record.version == 1614556800 + assert record.to_dict() == { + "type": "RECORD", + "stream": "test", + "record": {"id": 1, "name": "test"}, + "version": 1614556800, + } + + def test_schema_message(): schema = singer.SchemaMessage( stream="test", diff --git a/tests/core/test_flattening.py b/tests/core/test_flattening.py index 73169eab3..1e0466986 100644 --- a/tests/core/test_flattening.py +++ b/tests/core/test_flattening.py @@ -20,7 +20,7 @@ { "key_1": 1, "key_2__key_3": "value", - "key_2__key_4": '{"key_5": 1, "key_6": ["a", "b"]}', + "key_2__key_4": '{"key_5":1,"key_6":["a","b"]}', }, id="flattened schema limiting the max level", ), @@ -38,7 +38,7 @@ "key_1": 1, "key_2__key_3": "value", "key_2__key_4__key_5": 1, - "key_2__key_4__key_6": '["a", "b"]', + "key_2__key_4__key_6": '["a","b"]', }, id="flattened schema not limiting the max level", ), @@ -55,7 +55,7 @@ { "key_1": 1, "key_2__key_3": "value", - "key_2__key_4": '{"key_5": 1, "key_6": ["a", "b"]}', + "key_2__key_4": '{"key_5":1,"key_6":["a","b"]}', }, id="max level limiting flattened schema", ), diff --git a/tests/core/test_io.py b/tests/core/test_io.py index 0fcce614b..a48a785df 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -3,13 +3,16 @@ from __future__ import annotations import decimal +import io import itertools import json -from contextlib import nullcontext +from contextlib import nullcontext, redirect_stdout +from textwrap import dedent import pytest from singer_sdk._singerlib import RecordMessage +from singer_sdk._singerlib.exceptions import InvalidInputLine from singer_sdk.io_base import SingerReader, SingerWriter @@ -36,7 +39,7 @@ def _process_state_message(self, message_dict: dict) -> None: pytest.param( "not-valid-json", None, - pytest.raises(json.decoder.JSONDecodeError), + pytest.raises(InvalidInputLine), id="unparsable", ), pytest.param( @@ -57,6 +60,43 @@ def test_deserialize(line, expected, exception): assert reader.deserialize_json(line) == expected +def test_listen(): + reader = DummyReader() + input_lines = io.StringIO( + dedent("""\ + {"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} + {"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}} + {"type": "RECORD", "stream": "users", "record": {"id": 2, "value": 2.34}} + {"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}}}} + {"type": "SCHEMA", "stream": "batches", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} + {"type": "BATCH", "stream": "batches", "encoding": {"format": "jsonl", "compression": "gzip"}, "manifest": ["file1.jsonl.gz", "file2.jsonl.gz"]} + {"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}, "batches": {"id": 1000000}}}} + """) # noqa: E501 + ) + reader.listen(input_lines) + + +def test_listen_unknown_message(): + reader = DummyReader() + input_lines = io.StringIO('{"type": "UNKNOWN"}\n') + with pytest.raises(ValueError, match="Unknown message type"): + reader.listen(input_lines) + + +def test_write_message(): + writer = SingerWriter() + message = RecordMessage( + stream="test", + record={"id": 1, "name": "test"}, + ) + with redirect_stdout(io.StringIO()) as out: + writer.write_message(message) + + assert out.getvalue() == ( + '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' + ) + + # Benchmark Tests diff --git a/tests/snapshots/mapped_stream/flatten_all.jsonl b/tests/snapshots/mapped_stream/flatten_all.jsonl index 21504a38f..79f981ac5 100644 --- a/tests/snapshots/mapped_stream/flatten_all.jsonl +++ b/tests/snapshots/mapped_stream/flatten_all.jsonl @@ -1,6 +1,6 @@ {"type":"STATE","value":{}} {"type":"SCHEMA","stream":"mystream","schema":{"properties":{"email":{"type":["string","null"]},"count":{"type":["integer","null"]},"user__id":{"type":["integer","null"]},"user__sub__num":{"type":["integer","null"]},"user__sub__custom_obj":{"type":["string","null"]},"user__some_numbers":{"type":["string","null"]}},"type":"object"},"key_properties":[]} -{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14, 2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32, 1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414, 1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14,2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32,1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414,1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"} {"type":"STATE","value":{"bookmarks":{"mystream":{}}}} diff --git a/tests/snapshots/mapped_stream/flatten_depth_1.jsonl b/tests/snapshots/mapped_stream/flatten_depth_1.jsonl index 317008dd8..4dd18f86d 100644 --- a/tests/snapshots/mapped_stream/flatten_depth_1.jsonl +++ b/tests/snapshots/mapped_stream/flatten_depth_1.jsonl @@ -1,6 +1,6 @@ {"type":"STATE","value":{}} {"type":"SCHEMA","stream":"mystream","schema":{"properties":{"email":{"type":["string","null"]},"count":{"type":["integer","null"]},"user__id":{"type":["integer","null"]},"user__sub":{"type":["string","null"]},"user__some_numbers":{"type":["string","null"]}},"type":"object"},"key_properties":[]} -{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub":"{\"num\": 1, \"custom_obj\": \"obj-hello\"}","user__some_numbers":"[3.14, 2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub":"{\"num\": 2, \"custom_obj\": \"obj-world\"}","user__some_numbers":"[10.32, 1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub":"{\"num\": 3, \"custom_obj\": \"obj-hello\"}","user__some_numbers":"[1.414, 1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub":"{\"num\":1,\"custom_obj\":\"obj-hello\"}","user__some_numbers":"[3.14,2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub":"{\"num\":2,\"custom_obj\":\"obj-world\"}","user__some_numbers":"[10.32,1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub":"{\"num\":3,\"custom_obj\":\"obj-hello\"}","user__some_numbers":"[1.414,1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"} {"type":"STATE","value":{"bookmarks":{"mystream":{}}}} diff --git a/tests/snapshots/mapped_stream/map_and_flatten.jsonl b/tests/snapshots/mapped_stream/map_and_flatten.jsonl index 89397a046..5bc3b7f42 100644 --- a/tests/snapshots/mapped_stream/map_and_flatten.jsonl +++ b/tests/snapshots/mapped_stream/map_and_flatten.jsonl @@ -1,6 +1,6 @@ {"type":"STATE","value":{}} {"type":"SCHEMA","stream":"mystream","schema":{"properties":{"email":{"type":["string","null"]},"count":{"type":["integer","null"]},"user__id":{"type":["integer","null"]},"user__sub__num":{"type":["integer","null"]},"user__sub__custom_obj":{"type":["string","null"]},"user__some_numbers":{"type":["string","null"]},"email_hash":{"type":["string","null"]}},"type":"object"},"key_properties":["email_hash"]} -{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14, 2.718]","email_hash":"c160f8cc69a4f0bf2b0362752353d060"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32, 1.618]","email_hash":"4b9bb80620f03eb3719e0a061c14283d"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414, 1.732]","email_hash":"426b189df1e2f359efe6ee90f2d2030f"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14,2.718]","email_hash":"c160f8cc69a4f0bf2b0362752353d060"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32,1.618]","email_hash":"4b9bb80620f03eb3719e0a061c14283d"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414,1.732]","email_hash":"426b189df1e2f359efe6ee90f2d2030f"},"time_extracted":"2022-01-01T00:00:00+00:00"} {"type":"STATE","value":{"bookmarks":{"mystream":{}}}}