Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SQLTarget connector instance shared with sinks #1864

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4a84b47
added _target_conector and property
BuzzCutNorman Jul 20, 2023
e3d31e9
added add_sqlsink and update get_sink to call it
BuzzCutNorman Jul 20, 2023
c50a933
add default_sink_class type hint SQLSink
BuzzCutNorman Jul 20, 2023
6f7dd00
remove default_sink_class type hint, get_sink and add_sqlsink return …
BuzzCutNorman Jul 20, 2023
cc74008
added final decorator to add_sqlsink
BuzzCutNorman Jul 20, 2023
7aa87f9
Merge branch 'main' into 1772-sqltarget-level-connector-instance-shar…
BuzzCutNorman Jul 21, 2023
e571343
Merge branch 'main' into 1772-sqltarget-level-connector-instance-shar…
BuzzCutNorman Jul 25, 2023
d6f95ed
mypy fixes round 2, added get_sink_class
BuzzCutNorman Jul 25, 2023
e223612
Merge branch '1772-sqltarget-level-connector-instance-shared-with-sin…
BuzzCutNorman Jul 25, 2023
0413e0b
mypy round 3: removed get_sink_class
BuzzCutNorman Jul 25, 2023
c9fce83
mypy attempt 4: add Sink type hints
BuzzCutNorman Jul 25, 2023
b70b965
mypy attempt 5: default_sink_class as Any
BuzzCutNorman Jul 25, 2023
d5bd8f9
mypy attempt 6: added two more Any type hints
BuzzCutNorman Jul 25, 2023
40db24a
mypy revert attempts
BuzzCutNorman Jul 25, 2023
1f68e76
mypy attempt: not initalizing just type hinting
BuzzCutNorman Jul 25, 2023
6f3b9c8
mypy attempt: add get_add_class and use type[]
BuzzCutNorman Jul 26, 2023
6d8e2bb
added test_target_base::test_sql_get_sink
BuzzCutNorman Jul 26, 2023
76775d3
added test for add_sqlsink
BuzzCutNorman Jul 26, 2023
16e96cc
updated test_sql_get_sink
BuzzCutNorman Jul 26, 2023
367e487
Merge branch 'main' into 1772-sqltarget-level-connector-instance-shar…
edgarrmondragon Jul 26, 2023
2a1fca7
Apply suggestions from code review
BuzzCutNorman Jul 26, 2023
8d2d9c3
Merge branch 'main' into 1772-sqltarget-level-connector-instance-shar…
edgarrmondragon Jul 27, 2023
e628032
Merge branch 'main' into 1772-sqltarget-level-connector-instance-shar…
edgarrmondragon Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 131 additions & 2 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
if t.TYPE_CHECKING:
from pathlib import PurePath

from singer_sdk.connectors import SQLConnector
from singer_sdk.mapper import PluginMapper
from singer_sdk.sinks import Sink
from singer_sdk.sinks import Sink, SQLSink

_MAX_PARALLELISM = 8

Expand All @@ -48,7 +49,7 @@ class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta):

# Default class to use for creating new sink objects.
# Required if `Target.get_sink_class()` is not defined.
default_sink_class: type[Sink] | None = None
default_sink_class: type[Sink]

def __init__(
self,
Expand Down Expand Up @@ -574,6 +575,23 @@ def get_singer_command(cls: type[Target]) -> click.Command:
class SQLTarget(Target):
"""Target implementation for SQL destinations."""

_target_connector: SQLConnector | None = None

default_sink_class: type[SQLSink]

@property
def target_connector(self) -> SQLConnector:
"""The connector object.

Returns:
The connector object.
"""
if self._target_connector is None:
self._target_connector = self.default_sink_class.connector_class(
dict(self.config),
)
return self._target_connector

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get target capabilities.
Expand Down Expand Up @@ -617,3 +635,114 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
super().append_builtin_config(config_jsonschema)

pass

@final
def add_sqlsink(
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
self,
stream_name: str,
schema: dict,
key_properties: list[str] | None = None,
) -> Sink:
"""Create a sink and register it.

This method is internal to the SDK and should not need to be overridden.

Args:
stream_name: Name of the stream.
schema: Schema of the stream.
key_properties: Primary key of the stream.

Returns:
A new sink for the stream.
"""
self.logger.info("Initializing '%s' target sink...", self.name)
sink_class = self.get_sink_class(stream_name=stream_name)
sink = sink_class(
target=self,
stream_name=stream_name,
schema=schema,
key_properties=key_properties,
connector=self.target_connector,
)
sink.setup()
self._sinks_active[stream_name] = sink

return sink

def get_sink_class(self, stream_name: str) -> type[SQLSink]:
"""Get sink for a stream.

Developers can override this method to return a custom Sink type depending
on the value of `stream_name`. Optional when `default_sink_class` is set.

Args:
stream_name: Name of the stream.

Raises:
ValueError: If no :class:`singer_sdk.sinks.Sink` class is defined.

Returns:
The sink class to be used with the stream.
"""
if self.default_sink_class:
return self.default_sink_class

msg = (
f"No sink class defined for '{stream_name}' and no default sink class "
"available."
)
raise ValueError(msg)

def get_sink(
self,
stream_name: str,
*,
record: dict | None = None,
schema: dict | None = None,
key_properties: list[str] | None = None,
) -> Sink:
"""Return a sink for the given stream name.

A new sink will be created if `schema` is provided and if either `schema` or
`key_properties` has changed. If so, the old sink becomes archived and held
until the next drain_all() operation.

Developers only need to override this method if they want to provide a different
sink depending on the values within the `record` object. Otherwise, please see
`default_sink_class` property and/or the `get_sink_class()` method.

Raises :class:`singer_sdk.exceptions.RecordsWithoutSchemaException` if sink does
not exist and schema is not sent.

Args:
stream_name: Name of the stream.
record: Record being processed.
schema: Stream schema.
key_properties: Primary key of the stream.

Returns:
The sink used for this target.
"""
_ = record # Custom implementations may use record in sink selection.
if schema is None:
self._assert_sink_exists(stream_name)
return self._sinks_active[stream_name]

existing_sink = self._sinks_active.get(stream_name, None)
if not existing_sink:
return self.add_sqlsink(stream_name, schema, key_properties)

if (
existing_sink.schema != schema
or existing_sink.key_properties != key_properties
):
self.logger.info(
"Schema or key properties for '%s' stream have changed. "
"Initializing a new '%s' sink...",
stream_name,
stream_name,
)
self._sinks_to_clear.append(self._sinks_active.pop(stream_name))
return self.add_sqlsink(stream_name, schema, key_properties)

return existing_sink
65 changes: 63 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import pytest
from sqlalchemy import __version__ as sqlalchemy_version

from singer_sdk import SQLConnector
from singer_sdk import typing as th
from singer_sdk.sinks import BatchSink
from singer_sdk.target_base import Target
from singer_sdk.sinks import BatchSink, SQLSink
from singer_sdk.target_base import SQLTarget, Target

if t.TYPE_CHECKING:
from _pytest.config import Config
Expand Down Expand Up @@ -116,3 +117,63 @@ def _write_state_message(self, state: dict):
"""Emit the stream's latest state."""
super()._write_state_message(state)
self.state_messages_written.append(state)


class SQLConnectorMock(SQLConnector):
"""A Mock SQLConnector class."""


class SQLSinkMock(SQLSink):
"""A mock Sink class."""

name = "sql-sink-mock"
connector_class = SQLConnectorMock

def __init__(
self,
target: SQLTargetMock,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
connector: SQLConnector | None = None,
):
"""Create the Mock batch-based sink."""
self._connector: SQLConnector
self._connector = connector or self.connector_class(dict(target.config))
super().__init__(target, stream_name, schema, key_properties, connector)
self.target = target

def process_record(self, record: dict, context: dict) -> None:
"""Tracks the count of processed records."""
self.target.num_records_processed += 1
super().process_record(record, context)

def process_batch(self, context: dict) -> None:
"""Write to mock trackers."""
self.target.records_written.extend(context["records"])
self.target.num_batches_processed += 1

@property
def key_properties(self) -> list[str]:
return [key.upper() for key in super().key_properties]


class SQLTargetMock(SQLTarget):
"""A mock Target class."""

name = "sql-target-mock"
config_jsonschema = th.PropertiesList().to_dict()
default_sink_class = SQLSinkMock

def __init__(self, *args, **kwargs):
"""Create the Mock target sync."""
super().__init__(*args, **kwargs)
self.state_messages_written: list[dict] = []
self.records_written: list[dict] = []
self.num_records_processed: int = 0
self.num_batches_processed: int = 0

def _write_state_message(self, state: dict):
"""Emit the stream's latest state."""
super()._write_state_message(state)
self.state_messages_written.append(state)
72 changes: 70 additions & 2 deletions tests/core/test_target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import pytest

from singer_sdk.exceptions import MissingKeyPropertiesError
from tests.conftest import BatchSinkMock, TargetMock
from singer_sdk.exceptions import (
MissingKeyPropertiesError,
RecordsWithoutSchemaException,
)
from tests.conftest import BatchSinkMock, SQLSinkMock, SQLTargetMock, TargetMock


def test_get_sink():
Expand Down Expand Up @@ -53,3 +56,68 @@ def test_validate_record():
# Test invalid record
with pytest.raises(MissingKeyPropertiesError):
sink._singer_validate_message({"name": "test"})


def test_sql_get_sink():
input_schema_1 = {
"properties": {
"id": {
"type": ["string", "null"],
},
"col_ts": {
"format": "date-time",
"type": ["string", "null"],
},
},
}
input_schema_2 = copy.deepcopy(input_schema_1)
key_properties = []
target = SQLTargetMock(config={"sqlalchemy_url": "sqlite:///"})
sink = SQLSinkMock(
target=target,
stream_name="foo",
schema=input_schema_1,
key_properties=key_properties,
connector=target.target_connector,
)
target._sinks_active["foo"] = sink
sink_returned = target.get_sink(
"foo",
schema=input_schema_2,
key_properties=key_properties,
)
assert sink_returned is sink


def test_add_sqlsink_and_get_sink():
input_schema_1 = {
"properties": {
"id": {
"type": ["string", "null"],
},
"col_ts": {
"format": "date-time",
"type": ["string", "null"],
},
},
}
input_schema_2 = copy.deepcopy(input_schema_1)
key_properties = []
target = SQLTargetMock(config={"sqlalchemy_url": "sqlite:///"})
sink = target.add_sqlsink(
"foo",
schema=input_schema_2,
key_properties=key_properties,
)

sink_returned = target.get_sink(
"foo",
)

assert sink_returned is sink

# Test invalid call
with pytest.raises(RecordsWithoutSchemaException):
target.get_sink(
"bar",
)