Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Built-in handling of default-target-schema for SQL Targets #1157

Merged
16 changes: 0 additions & 16 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ObjectType,
PropertiesList,
Property,
StringType,
)

_EnumMemberT = TypeVar("_EnumMemberT")
Expand Down Expand Up @@ -47,6 +48,13 @@
description="The max depth to flatten schemas.",
),
).to_dict()
TARGET_SCHEMA_CONFIG = PropertiesList(
Property(
"default_target_schema",
StringType(),
description="The default target database schema name to use for all streams.",
),
).to_dict()


class DeprecatedEnum(Enum):
Expand Down
13 changes: 12 additions & 1 deletion singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,22 @@ def schema_name(self) -> Optional[str]:
Returns:
The target schema name.
"""
# Look for a default_target_scheme in the configuraion fle
default_target_schema: str = self.config.get("default_target_schema", None)
parts = self.stream_name.split("-")

# 1) When default_target_scheme is in the configuration use it
# 2) if the streams are in <schema>-<table> format use the
# stream <schema>
# 3) Return None if you don't find anything
if default_target_schema:
return default_target_schema

if len(parts) in {2, 3}:
# Stream name is a two-part or three-part identifier.
# Use the second-to-last part as the schema name.
return self.conform_name(parts[-2], "schema")
stream_schema = self.conform_name(parts[-2], "schema")
return stream_schema

# Schema name not detected.
return None
Expand Down
49 changes: 48 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._compat import final
from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities
from singer_sdk.helpers.capabilities import (
TARGET_SCHEMA_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
TargetCapabilities,
)
from singer_sdk.io_base import SingerMessageType, SingerReader
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase
Expand Down Expand Up @@ -571,4 +576,46 @@ def cli(
class SQLTarget(Target):
"""Target implementation for SQL destinations."""

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get target capabilities.

Returns:
A list of capabilities supported by this target.
"""
sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities
sql_target_capabilities.extend([TargetCapabilities.TARGET_SCHEMA])

return sql_target_capabilities

@classmethod
def append_builtin_config(cls: type[SQLTarget], config_jsonschema: dict) -> None:
"""Appends built-in config to `config_jsonschema` if not already set.

To customize or disable this behavior, developers may either override this class
method or override the `capabilities` property to disabled any unwanted
built-in capabilities.

For all except very advanced use cases, we recommend leaving these
implementations "as-is", since this provides the most choice to users and is
the most "future proof" in terms of taking advantage of built-in capabilities
which may be added in the future.

Args:
config_jsonschema: [description]
"""

def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
# Append any missing properties in the target with those from source.
for k, v in source_jsonschema["properties"].items():
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v

capabilities = cls.capabilities

if TargetCapabilities.TARGET_SCHEMA in capabilities:
_merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema)

super().append_builtin_config(config_jsonschema)

pass