From 96525f1ce27273237416140e1a4114293ab26c57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 18:19:25 -0600 Subject: [PATCH] Make the Singer writer and reader classes generic --- singer_sdk/_singerlib/serde.py | 2 +- singer_sdk/io_base.py | 124 +++++++++++++++++++++------------ 2 files changed, 79 insertions(+), 47 deletions(-) diff --git a/singer_sdk/_singerlib/serde.py b/singer_sdk/_singerlib/serde.py index cd0cfce2e..9971bc301 100644 --- a/singer_sdk/_singerlib/serde.py +++ b/singer_sdk/_singerlib/serde.py @@ -23,7 +23,7 @@ def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 return obj.isoformat(sep="T") if isinstance(obj, datetime.datetime) else str(obj) -def deserialize_json(json_str: str, **kwargs: t.Any) -> dict: +def deserialize_json(json_str: str | bytes, **kwargs: t.Any) -> dict: """Deserialize a line of json. Args: diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index 1834b6fad..ce7d09ee1 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -9,60 +9,30 @@ from collections import Counter, defaultdict from singer_sdk._singerlib.messages import Message, SingerMessageType -from singer_sdk._singerlib.messages import format_message as singer_format_message -from singer_sdk._singerlib.messages import write_message as singer_write_message -from singer_sdk._singerlib.serde import deserialize_json +from singer_sdk._singerlib.serde import deserialize_json, serialize_json from singer_sdk.exceptions import InvalidInputLine logger = logging.getLogger(__name__) +# TODO: Use to default to 'str' here +# https://peps.python.org/pep-0696/ +T = t.TypeVar("T", str, bytes) -class SingerReader(metaclass=abc.ABCMeta): - """Interface for all plugins reading Singer messages from stdin.""" + +class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins reading Singer messages as strings or bytes.""" @t.final - def listen(self, file_input: t.IO[str] | None = None) -> None: + def listen(self, file_input: t.IO[T] | None = None) -> None: """Read from input until all messages are processed. Args: file_input: Readable stream of messages. Defaults to standard in. - - This method is internal to the SDK and should not need to be overridden. """ - if not file_input: - file_input = sys.stdin - - self._process_lines(file_input) + self._process_lines(file_input or self.default_input) self._process_endofpipe() - @staticmethod - def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: - """Check if dictionary . - - Args: - line_dict: TODO - requires: TODO - - Raises: - InvalidInputLine: raised if any required keys are missing - """ - if not requires.issubset(line_dict): - missing = requires - set(line_dict) - msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" - raise InvalidInputLine(msg) - - def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 - """Deserialize a line of json. - - Args: - line: A single line of json. - - Returns: - A dictionary of the deserialized json. - """ - return deserialize_json(line) - - def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: + def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: """Internal method to process jsonl lines from a Singer tap. Args: @@ -99,6 +69,29 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: return Counter(**stats) + @property + @abc.abstractmethod + def default_input(self) -> t.IO[T]: ... # noqa: D102 + + @staticmethod + def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: + """Check if dictionary . + + Args: + line_dict: TODO + requires: TODO + + Raises: + InvalidInputLine: raised if any required keys are missing + """ + if not requires.issubset(line_dict): + missing = requires - set(line_dict) + msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" + raise InvalidInputLine(msg) + + @abc.abstractmethod + def deserialize_json(self, line: T) -> dict: ... # noqa: D102 + @abc.abstractmethod def _process_schema_message(self, message_dict: dict) -> None: ... @@ -131,10 +124,27 @@ def _process_endofpipe(self) -> None: # noqa: PLR6301 logger.debug("End of pipe reached") -class SingerWriter: - """Interface for all plugins writting Singer messages to stdout.""" +class SingerReader(GenericSingerReader[str]): + """Base class for all plugins reading Singer messages as strings from stdin.""" + + default_input = sys.stdin + + def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 + """Deserialize a line of json. + + Args: + line: A single line of json. + + Returns: + A dictionary of the deserialized json. + """ + return deserialize_json(line) + + +class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins writing Singer messages as strings or bytes.""" - def format_message(self, message: Message) -> str: # noqa: PLR6301 + def format_message(self, message: Message) -> T: """Format a message as a JSON string. Args: @@ -143,12 +153,34 @@ def format_message(self, message: Message) -> str: # noqa: PLR6301 Returns: The formatted message. """ - return singer_format_message(message) + return self.serialize_json(message.to_dict()) + + @abc.abstractmethod + def serialize_json(self, obj: object) -> T: ... # noqa: D102 + + @abc.abstractmethod + def write_message(self, message: Message) -> None: ... # noqa: D102 + + +class SingerWriter(GenericSingerWriter[str]): + """Interface for all plugins writing Singer messages to stdout.""" + + def serialize_json(self, obj: object) -> str: # noqa: PLR6301 + """Serialize a dictionary into a line of json. + + Args: + obj: A Python object usually a dict. + + Returns: + A string of serialized json. + """ + return serialize_json(obj) - def write_message(self, message: Message) -> None: # noqa: PLR6301 + def write_message(self, message: Message) -> None: """Write a message to stdout. Args: message: The message to write. """ - singer_write_message(message) + sys.stdout.write(self.format_message(message) + "\n") + sys.stdout.flush()