From 4a84b476bdf45dcdbdde6f13e3e0caed38a1d74b Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 20 Jul 2023 12:30:50 -0700 Subject: [PATCH 01/17] added _target_conector and property --- singer_sdk/target_base.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index d62bbbfd8..b15305413 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -28,6 +28,7 @@ if t.TYPE_CHECKING: from pathlib import PurePath + from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper from singer_sdk.sinks import Sink @@ -574,6 +575,21 @@ def get_singer_command(cls: type[Target]) -> click.Command: class SQLTarget(Target): """Target implementation for SQL destinations.""" + _target_connector: SQLConnector | None = None + + @property + def target_connector(self) -> SQLConnector: + """The connector object. + + Returns: + The connector object. + """ + if self._target_connector is None: + self._target_connector = self.default_sink_class.connector_class( + dict(self.config), + ) + return self._target_connector + @classproperty def capabilities(self) -> list[CapabilitiesEnum]: """Get target capabilities. From e3d31e9b756f385358c2e283142785f6ef9c253f Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 20 Jul 2023 12:45:00 -0700 Subject: [PATCH 02/17] added add_sqlsink and update get_sink to call it --- singer_sdk/target_base.py | 87 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index b15305413..635861256 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -30,7 +30,7 @@ from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper - from singer_sdk.sinks import Sink + from singer_sdk.sinks import Sink, SQLSink _MAX_PARALLELISM = 8 @@ -633,3 +633,88 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: super().append_builtin_config(config_jsonschema) pass + + def add_sqlsink( + self, + stream_name: str, + schema: dict, + key_properties: list[str] | None = None, + ) -> SQLSink: + """Create a sink and register it. + + This method is internal to the SDK and should not need to be overridden. + + Args: + stream_name: Name of the stream. + schema: Schema of the stream. + key_properties: Primary key of the stream. + + Returns: + A new sink for the stream. + """ + self.logger.info("Initializing '%s' target sink...", self.name) + sink_class = self.default_sink_class + sink = sink_class( + target=self, + stream_name=stream_name, + schema=schema, + key_properties=key_properties, + connector=self.target_connector, + ) + sink.setup() + self._sinks_active[stream_name] = sink + return sink + + def get_sink( + self, + stream_name: str, + *, + record: dict | None = None, + schema: dict | None = None, + key_properties: list[str] | None = None, + ) -> SQLSink: + """Return a sink for the given stream name. + + A new sink will be created if `schema` is provided and if either `schema` or + `key_properties` has changed. If so, the old sink becomes archived and held + until the next drain_all() operation. + + Developers only need to override this method if they want to provide a different + sink depending on the values within the `record` object. Otherwise, please see + `default_sink_class` property and/or the `get_sink_class()` method. + + Raises :class:`singer_sdk.exceptions.RecordsWithoutSchemaException` if sink does + not exist and schema is not sent. + + Args: + stream_name: Name of the stream. + record: Record being processed. + schema: Stream schema. + key_properties: Primary key of the stream. + + Returns: + The sink used for this target. + """ + _ = record # Custom implementations may use record in sink selection. + if schema is None: + self._assert_sink_exists(stream_name) + return self._sinks_active[stream_name] + + existing_sink = self._sinks_active.get(stream_name, None) + if not existing_sink: + return self.add_sqlsink(stream_name, schema, key_properties) + + if ( + existing_sink.schema != schema + or existing_sink.key_properties != key_properties + ): + self.logger.info( + "Schema or key properties for '%s' stream have changed. " + "Initializing a new '%s' sink...", + stream_name, + stream_name, + ) + self._sinks_to_clear.append(self._sinks_active.pop(stream_name)) + return self.add_sqlsink(stream_name, schema, key_properties) + + return existing_sink From c50a9336f6e5b15b71d9e9c011fd675c97801b63 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 20 Jul 2023 13:50:05 -0700 Subject: [PATCH 03/17] add default_sink_class type hint SQLSink --- singer_sdk/target_base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 635861256..c8c3646e4 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -575,6 +575,9 @@ def get_singer_command(cls: type[Target]) -> click.Command: class SQLTarget(Target): """Target implementation for SQL destinations.""" + # Sink class used to initialize new SQL sink from their stream schema. + default_sink_class: SQLSink + _target_connector: SQLConnector | None = None @property @@ -663,6 +666,7 @@ def add_sqlsink( ) sink.setup() self._sinks_active[stream_name] = sink + return sink def get_sink( @@ -672,7 +676,7 @@ def get_sink( record: dict | None = None, schema: dict | None = None, key_properties: list[str] | None = None, - ) -> SQLSink: + ) -> Sink | SQLSink: """Return a sink for the given stream name. A new sink will be created if `schema` is provided and if either `schema` or From 6f7dd0026d1af923b77348b41d72dbfa83a18a2c Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 20 Jul 2023 14:28:33 -0700 Subject: [PATCH 04/17] remove default_sink_class type hint, get_sink and add_sqlsink return type Sink. --- singer_sdk/target_base.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index c8c3646e4..3ccb7ed20 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -30,7 +30,7 @@ from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper - from singer_sdk.sinks import Sink, SQLSink + from singer_sdk.sinks import Sink _MAX_PARALLELISM = 8 @@ -575,9 +575,6 @@ def get_singer_command(cls: type[Target]) -> click.Command: class SQLTarget(Target): """Target implementation for SQL destinations.""" - # Sink class used to initialize new SQL sink from their stream schema. - default_sink_class: SQLSink - _target_connector: SQLConnector | None = None @property @@ -642,7 +639,7 @@ def add_sqlsink( stream_name: str, schema: dict, key_properties: list[str] | None = None, - ) -> SQLSink: + ) -> Sink: """Create a sink and register it. This method is internal to the SDK and should not need to be overridden. @@ -676,7 +673,7 @@ def get_sink( record: dict | None = None, schema: dict | None = None, key_properties: list[str] | None = None, - ) -> Sink | SQLSink: + ) -> Sink: """Return a sink for the given stream name. A new sink will be created if `schema` is provided and if either `schema` or From cc74008e6ffbe302a0078af5c19c10b212ce4849 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 20 Jul 2023 14:35:21 -0700 Subject: [PATCH 05/17] added final decorator to add_sqlsink --- singer_sdk/target_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 3ccb7ed20..41f287ce2 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -634,6 +634,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: pass + @final def add_sqlsink( self, stream_name: str, From d6f95ed41d457d7f595b689e034ceb81be63299d Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 25 Jul 2023 12:15:36 -0700 Subject: [PATCH 06/17] mypy fixes round 2, added get_sink_class --- singer_sdk/target_base.py | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 41f287ce2..ef643f402 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -30,7 +30,7 @@ from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper - from singer_sdk.sinks import Sink + from singer_sdk.sinks import Sink, SQLSink _MAX_PARALLELISM = 8 @@ -577,6 +577,10 @@ class SQLTarget(Target): _target_connector: SQLConnector | None = None + # Default class to use for creating new sink objects. + # Required if `SQLTarget.get_sink_class()` is not defined. + default_sink_class: SQLSink | None = None + @property def target_connector(self) -> SQLConnector: """The connector object. @@ -634,13 +638,37 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: pass + def get_sink_class(self, stream_name: str) -> SQLSink: + """Get sink for a stream. + + Developers can override this method to return a custom Sink type depending + on the value of `stream_name`. Optional when `default_sink_class` is set. + + Args: + stream_name: Name of the stream. + + Raises: + ValueError: If no :class:`singer_sdk.sinks.Sink` class is defined. + + Returns: + The sink class to be used with the stream. + """ + if self.default_sink_class: + return self.default_sink_class + + msg = ( + f"No sink class defined for '{stream_name}' and no default sink class " + "available." + ) + raise ValueError(msg) + @final def add_sqlsink( self, stream_name: str, schema: dict, key_properties: list[str] | None = None, - ) -> Sink: + ) -> SQLSink: """Create a sink and register it. This method is internal to the SDK and should not need to be overridden. @@ -674,7 +702,7 @@ def get_sink( record: dict | None = None, schema: dict | None = None, key_properties: list[str] | None = None, - ) -> Sink: + ) -> SQLSink: """Return a sink for the given stream name. A new sink will be created if `schema` is provided and if either `schema` or From 0413e0b65966ca75656924573c453e271b55da2d Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 25 Jul 2023 12:29:31 -0700 Subject: [PATCH 07/17] mypy round 3: removed get_sink_class --- singer_sdk/target_base.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index ef643f402..f09c0fc7d 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -638,30 +638,6 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: pass - def get_sink_class(self, stream_name: str) -> SQLSink: - """Get sink for a stream. - - Developers can override this method to return a custom Sink type depending - on the value of `stream_name`. Optional when `default_sink_class` is set. - - Args: - stream_name: Name of the stream. - - Raises: - ValueError: If no :class:`singer_sdk.sinks.Sink` class is defined. - - Returns: - The sink class to be used with the stream. - """ - if self.default_sink_class: - return self.default_sink_class - - msg = ( - f"No sink class defined for '{stream_name}' and no default sink class " - "available." - ) - raise ValueError(msg) - @final def add_sqlsink( self, From c9fce83c6d759579887ae85900d5509bf78a9336 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 25 Jul 2023 12:40:03 -0700 Subject: [PATCH 08/17] mypy attempt 4: add Sink type hints --- singer_sdk/target_base.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index f09c0fc7d..d2b8ce4ff 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -577,9 +577,7 @@ class SQLTarget(Target): _target_connector: SQLConnector | None = None - # Default class to use for creating new sink objects. - # Required if `SQLTarget.get_sink_class()` is not defined. - default_sink_class: SQLSink | None = None + default_sink_class: SQLSink | Sink | None = None @property def target_connector(self) -> SQLConnector: @@ -644,7 +642,7 @@ def add_sqlsink( stream_name: str, schema: dict, key_properties: list[str] | None = None, - ) -> SQLSink: + ) -> SQLSink | Sink: """Create a sink and register it. This method is internal to the SDK and should not need to be overridden. @@ -678,7 +676,7 @@ def get_sink( record: dict | None = None, schema: dict | None = None, key_properties: list[str] | None = None, - ) -> SQLSink: + ) -> SQLSink | Sink: """Return a sink for the given stream name. A new sink will be created if `schema` is provided and if either `schema` or From b70b965232941a82090fe681674a0b1d1b8b60a6 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 25 Jul 2023 14:06:09 -0700 Subject: [PATCH 09/17] mypy attempt 5: default_sink_class as Any --- singer_sdk/target_base.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index d2b8ce4ff..244a800e2 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -30,7 +30,7 @@ from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper - from singer_sdk.sinks import Sink, SQLSink + from singer_sdk.sinks import Sink _MAX_PARALLELISM = 8 @@ -49,7 +49,7 @@ class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta): # Default class to use for creating new sink objects. # Required if `Target.get_sink_class()` is not defined. - default_sink_class: type[Sink] | None = None + default_sink_class: t.Any = None def __init__( self, @@ -577,8 +577,6 @@ class SQLTarget(Target): _target_connector: SQLConnector | None = None - default_sink_class: SQLSink | Sink | None = None - @property def target_connector(self) -> SQLConnector: """The connector object. @@ -642,7 +640,7 @@ def add_sqlsink( stream_name: str, schema: dict, key_properties: list[str] | None = None, - ) -> SQLSink | Sink: + ) -> Sink: """Create a sink and register it. This method is internal to the SDK and should not need to be overridden. @@ -676,7 +674,7 @@ def get_sink( record: dict | None = None, schema: dict | None = None, key_properties: list[str] | None = None, - ) -> SQLSink | Sink: + ) -> Sink: """Return a sink for the given stream name. A new sink will be created if `schema` is provided and if either `schema` or From d5bd8f9756af6677b629a8486fd8a00351980471 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 25 Jul 2023 14:36:43 -0700 Subject: [PATCH 10/17] mypy attempt 6: added two more Any type hints --- singer_sdk/target_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 244a800e2..4afe811d5 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -181,7 +181,7 @@ def get_sink( return existing_sink - def get_sink_class(self, stream_name: str) -> type[Sink]: + def get_sink_class(self, stream_name: str) -> t.Any: # noqa: ANN401 """Get sink for a stream. Developers can override this method to return a custom Sink type depending @@ -640,7 +640,7 @@ def add_sqlsink( stream_name: str, schema: dict, key_properties: list[str] | None = None, - ) -> Sink: + ) -> t.Any: # noqa: ANN401 """Create a sink and register it. This method is internal to the SDK and should not need to be overridden. From 40db24a5ac15ffb99243a501dcc408790301c7d3 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 25 Jul 2023 14:58:27 -0700 Subject: [PATCH 11/17] mypy revert attempts --- singer_sdk/target_base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 4afe811d5..41f287ce2 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -49,7 +49,7 @@ class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta): # Default class to use for creating new sink objects. # Required if `Target.get_sink_class()` is not defined. - default_sink_class: t.Any = None + default_sink_class: type[Sink] | None = None def __init__( self, @@ -181,7 +181,7 @@ def get_sink( return existing_sink - def get_sink_class(self, stream_name: str) -> t.Any: # noqa: ANN401 + def get_sink_class(self, stream_name: str) -> type[Sink]: """Get sink for a stream. Developers can override this method to return a custom Sink type depending @@ -640,7 +640,7 @@ def add_sqlsink( stream_name: str, schema: dict, key_properties: list[str] | None = None, - ) -> t.Any: # noqa: ANN401 + ) -> Sink: """Create a sink and register it. This method is internal to the SDK and should not need to be overridden. From 1f68e76145eb6235454e24b01433c538c5230a4f Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 25 Jul 2023 15:56:59 -0700 Subject: [PATCH 12/17] mypy attempt: not initalizing just type hinting --- singer_sdk/target_base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 41f287ce2..04cb75777 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -30,7 +30,7 @@ from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper - from singer_sdk.sinks import Sink + from singer_sdk.sinks import Sink, SQLSink _MAX_PARALLELISM = 8 @@ -49,7 +49,7 @@ class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta): # Default class to use for creating new sink objects. # Required if `Target.get_sink_class()` is not defined. - default_sink_class: type[Sink] | None = None + default_sink_class: Sink def __init__( self, @@ -577,6 +577,8 @@ class SQLTarget(Target): _target_connector: SQLConnector | None = None + default_sink_class: SQLSink + @property def target_connector(self) -> SQLConnector: """The connector object. From 6f3b9c8c254ed77210db1db28b83f3843ab4d283 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Wed, 26 Jul 2023 08:20:45 -0700 Subject: [PATCH 13/17] mypy attempt: add get_add_class and use type[] --- singer_sdk/target_base.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 04cb75777..cf1038831 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -49,7 +49,7 @@ class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta): # Default class to use for creating new sink objects. # Required if `Target.get_sink_class()` is not defined. - default_sink_class: Sink + default_sink_class: type[Sink] def __init__( self, @@ -577,7 +577,7 @@ class SQLTarget(Target): _target_connector: SQLConnector | None = None - default_sink_class: SQLSink + default_sink_class: type[SQLSink] @property def target_connector(self) -> SQLConnector: @@ -656,7 +656,7 @@ def add_sqlsink( A new sink for the stream. """ self.logger.info("Initializing '%s' target sink...", self.name) - sink_class = self.default_sink_class + sink_class = self.get_sink_class(stream_name=stream_name) sink = sink_class( target=self, stream_name=stream_name, @@ -669,6 +669,30 @@ def add_sqlsink( return sink + def get_sink_class(self, stream_name: str) -> type[SQLSink]: + """Get sink for a stream. + + Developers can override this method to return a custom Sink type depending + on the value of `stream_name`. Optional when `default_sink_class` is set. + + Args: + stream_name: Name of the stream. + + Raises: + ValueError: If no :class:`singer_sdk.sinks.Sink` class is defined. + + Returns: + The sink class to be used with the stream. + """ + if self.default_sink_class: + return self.default_sink_class + + msg = ( + f"No sink class defined for '{stream_name}' and no default sink class " + "available." + ) + raise ValueError(msg) + def get_sink( self, stream_name: str, From 6d8e2bb1d4ee23d2656b201b2d5a2a7c0d2f1fc6 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Wed, 26 Jul 2023 09:08:00 -0700 Subject: [PATCH 14/17] added test_target_base::test_sql_get_sink --- tests/conftest.py | 56 +++++++++++++++++++++++++++++++++- tests/core/test_target_base.py | 27 +++++++++++++++- 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 25319c015..259a20e78 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,8 +10,9 @@ import pytest from sqlalchemy import __version__ as sqlalchemy_version +from singer_sdk import SQLConnector from singer_sdk import typing as th -from singer_sdk.sinks import BatchSink +from singer_sdk.sinks import BatchSink, SQLSink from singer_sdk.target_base import Target if t.TYPE_CHECKING: @@ -97,6 +98,38 @@ def key_properties(self) -> list[str]: return [key.upper() for key in super().key_properties] +class SQLSinkMock(SQLSink): + """A mock Sink class.""" + + name = "sql-sink-mock" + connector_class = SQLConnector + + def __init__( + self, + target: TargetMock, + stream_name: str, + schema: dict, + key_properties: list[str] | None, + ): + """Create the Mock batch-based sink.""" + super().__init__(target, stream_name, schema, key_properties) + self.target = target + + def process_record(self, record: dict, context: dict) -> None: + """Tracks the count of processed records.""" + self.target.num_records_processed += 1 + super().process_record(record, context) + + def process_batch(self, context: dict) -> None: + """Write to mock trackers.""" + self.target.records_written.extend(context["records"]) + self.target.num_batches_processed += 1 + + @property + def key_properties(self) -> list[str]: + return [key.upper() for key in super().key_properties] + + class TargetMock(Target): """A mock Target class.""" @@ -116,3 +149,24 @@ def _write_state_message(self, state: dict): """Emit the stream's latest state.""" super()._write_state_message(state) self.state_messages_written.append(state) + + +class SQLTargetMock(Target): + """A mock Target class.""" + + name = "target-mock" + config_jsonschema = th.PropertiesList().to_dict() + default_sink_class = SQLSinkMock + + def __init__(self, *args, **kwargs): + """Create the Mock target sync.""" + super().__init__(*args, **kwargs) + self.state_messages_written: list[dict] = [] + self.records_written: list[dict] = [] + self.num_records_processed: int = 0 + self.num_batches_processed: int = 0 + + def _write_state_message(self, state: dict): + """Emit the stream's latest state.""" + super()._write_state_message(state) + self.state_messages_written.append(state) diff --git a/tests/core/test_target_base.py b/tests/core/test_target_base.py index 1fd6b9a93..f3ab40713 100644 --- a/tests/core/test_target_base.py +++ b/tests/core/test_target_base.py @@ -5,7 +5,7 @@ import pytest from singer_sdk.exceptions import MissingKeyPropertiesError -from tests.conftest import BatchSinkMock, TargetMock +from tests.conftest import BatchSinkMock, SQLSinkMock, TargetMock def test_get_sink(): @@ -33,6 +33,31 @@ def test_get_sink(): assert sink_returned == sink +def test_sql_get_sink(): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + input_schema_2 = copy.deepcopy(input_schema_1) + key_properties = [] + target = TargetMock(config={"add_record_metadata": True}) + sink = SQLSinkMock(target, "foo", input_schema_1, key_properties) + target._sinks_active["foo"] = sink + sink_returned = target.get_sink( + "foo", + schema=input_schema_2, + key_properties=key_properties, + ) + assert sink_returned == sink + + def test_validate_record(): target = TargetMock() sink = BatchSinkMock( From 76775d34d9e16ea3839c05eb92c865f7ce530040 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Wed, 26 Jul 2023 11:36:00 -0700 Subject: [PATCH 15/17] added test for add_sqlsink --- tests/conftest.py | 61 ++++++++++++++++++-------------- tests/core/test_target_base.py | 64 ++++++++++++++++++++-------------- 2 files changed, 72 insertions(+), 53 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 259a20e78..cf64e28dd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,7 +13,7 @@ from singer_sdk import SQLConnector from singer_sdk import typing as th from singer_sdk.sinks import BatchSink, SQLSink -from singer_sdk.target_base import Target +from singer_sdk.target_base import SQLTarget, Target if t.TYPE_CHECKING: from _pytest.config import Config @@ -98,21 +98,49 @@ def key_properties(self) -> list[str]: return [key.upper() for key in super().key_properties] +class TargetMock(Target): + """A mock Target class.""" + + name = "target-mock" + config_jsonschema = th.PropertiesList().to_dict() + default_sink_class = BatchSinkMock + + def __init__(self, *args, **kwargs): + """Create the Mock target sync.""" + super().__init__(*args, **kwargs) + self.state_messages_written: list[dict] = [] + self.records_written: list[dict] = [] + self.num_records_processed: int = 0 + self.num_batches_processed: int = 0 + + def _write_state_message(self, state: dict): + """Emit the stream's latest state.""" + super()._write_state_message(state) + self.state_messages_written.append(state) + + +class SQLConnectorMock(SQLConnector): + """A Mock SQLConnector class.""" + + class SQLSinkMock(SQLSink): """A mock Sink class.""" name = "sql-sink-mock" - connector_class = SQLConnector + connector_class = SQLConnectorMock def __init__( self, - target: TargetMock, + target: SQLTargetMock, stream_name: str, schema: dict, key_properties: list[str] | None, + connector: SQLConnector | None = None, ): """Create the Mock batch-based sink.""" - super().__init__(target, stream_name, schema, key_properties) + self._connector: SQLConnector + self._connector = connector or self.connector_class(dict(target.config)) + super().__init__(target, stream_name, schema, key_properties, connector) self.target = target def process_record(self, record: dict, context: dict) -> None: @@ -130,31 +158,10 @@ def key_properties(self) -> list[str]: return [key.upper() for key in super().key_properties] -class TargetMock(Target): +class SQLTargetMock(SQLTarget): """A mock Target class.""" - name = "target-mock" - config_jsonschema = th.PropertiesList().to_dict() - default_sink_class = BatchSinkMock - - def __init__(self, *args, **kwargs): - """Create the Mock target sync.""" - super().__init__(*args, **kwargs) - self.state_messages_written: list[dict] = [] - self.records_written: list[dict] = [] - self.num_records_processed: int = 0 - self.num_batches_processed: int = 0 - - def _write_state_message(self, state: dict): - """Emit the stream's latest state.""" - super()._write_state_message(state) - self.state_messages_written.append(state) - - -class SQLTargetMock(Target): - """A mock Target class.""" - - name = "target-mock" + name = "sql-target-mock" config_jsonschema = th.PropertiesList().to_dict() default_sink_class = SQLSinkMock diff --git a/tests/core/test_target_base.py b/tests/core/test_target_base.py index f3ab40713..fedd011c8 100644 --- a/tests/core/test_target_base.py +++ b/tests/core/test_target_base.py @@ -4,8 +4,11 @@ import pytest -from singer_sdk.exceptions import MissingKeyPropertiesError -from tests.conftest import BatchSinkMock, SQLSinkMock, TargetMock +from singer_sdk.exceptions import ( + MissingKeyPropertiesError, + RecordsWithoutSchemaException, +) +from tests.conftest import BatchSinkMock, SQLTargetMock, TargetMock def test_get_sink(): @@ -33,7 +36,29 @@ def test_get_sink(): assert sink_returned == sink -def test_sql_get_sink(): +def test_validate_record(): + target = TargetMock() + sink = BatchSinkMock( + target=target, + stream_name="test", + schema={ + "properties": { + "id": {"type": ["integer"]}, + "name": {"type": ["string"]}, + }, + }, + key_properties=["id"], + ) + + # Test valid record + sink._singer_validate_message({"id": 1, "name": "test"}) + + # Test invalid record + with pytest.raises(MissingKeyPropertiesError): + sink._singer_validate_message({"name": "test"}) + + +def test_add_sqlsink_and_get_sink(): input_schema_1 = { "properties": { "id": { @@ -47,34 +72,21 @@ def test_sql_get_sink(): } input_schema_2 = copy.deepcopy(input_schema_1) key_properties = [] - target = TargetMock(config={"add_record_metadata": True}) - sink = SQLSinkMock(target, "foo", input_schema_1, key_properties) - target._sinks_active["foo"] = sink - sink_returned = target.get_sink( + target = SQLTargetMock(config={"sqlalchemy_url": "sqlite:///"}) + sink = target.add_sqlsink( "foo", schema=input_schema_2, key_properties=key_properties, ) - assert sink_returned == sink - -def test_validate_record(): - target = TargetMock() - sink = BatchSinkMock( - target=target, - stream_name="test", - schema={ - "properties": { - "id": {"type": ["integer"]}, - "name": {"type": ["string"]}, - }, - }, - key_properties=["id"], + sink_returned = target.get_sink( + "foo", ) - # Test valid record - sink._singer_validate_message({"id": 1, "name": "test"}) + assert sink_returned == sink - # Test invalid record - with pytest.raises(MissingKeyPropertiesError): - sink._singer_validate_message({"name": "test"}) + # Test invalid call + with pytest.raises(RecordsWithoutSchemaException): + target.get_sink( + "bar", + ) From 16e96cc9d13962f96d7a2d29af12e968f9758d06 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Wed, 26 Jul 2023 12:41:33 -0700 Subject: [PATCH 16/17] updated test_sql_get_sink --- tests/core/test_target_base.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/tests/core/test_target_base.py b/tests/core/test_target_base.py index fedd011c8..fd0ea1dd1 100644 --- a/tests/core/test_target_base.py +++ b/tests/core/test_target_base.py @@ -8,7 +8,7 @@ MissingKeyPropertiesError, RecordsWithoutSchemaException, ) -from tests.conftest import BatchSinkMock, SQLTargetMock, TargetMock +from tests.conftest import BatchSinkMock, SQLSinkMock, SQLTargetMock, TargetMock def test_get_sink(): @@ -58,6 +58,37 @@ def test_validate_record(): sink._singer_validate_message({"name": "test"}) +def test_sql_get_sink(): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + input_schema_2 = copy.deepcopy(input_schema_1) + key_properties = [] + target = SQLTargetMock(config={"sqlalchemy_url": "sqlite:///"}) + sink = SQLSinkMock( + target=target, + stream_name="foo", + schema=input_schema_1, + key_properties=key_properties, + connector=target.target_connector, + ) + target._sinks_active["foo"] = sink + sink_returned = target.get_sink( + "foo", + schema=input_schema_2, + key_properties=key_properties, + ) + assert sink_returned == sink + + def test_add_sqlsink_and_get_sink(): input_schema_1 = { "properties": { From 2a1fca7ec67d4d7624a80093dbe957537ce38bea Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Wed, 26 Jul 2023 16:53:13 -0600 Subject: [PATCH 17/17] Apply suggestions from code review Co-authored-by: Edgar R. M. --- tests/core/test_target_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/core/test_target_base.py b/tests/core/test_target_base.py index fd0ea1dd1..ee00d35eb 100644 --- a/tests/core/test_target_base.py +++ b/tests/core/test_target_base.py @@ -86,7 +86,7 @@ def test_sql_get_sink(): schema=input_schema_2, key_properties=key_properties, ) - assert sink_returned == sink + assert sink_returned is sink def test_add_sqlsink_and_get_sink(): @@ -114,7 +114,7 @@ def test_add_sqlsink_and_get_sink(): "foo", ) - assert sink_returned == sink + assert sink_returned is sink # Test invalid call with pytest.raises(RecordsWithoutSchemaException):