Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Built-in handling of default-target-schema for SQL Targets #1157

Merged
8 changes: 8 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ObjectType,
PropertiesList,
Property,
StringType,
)

_EnumMemberT = TypeVar("_EnumMemberT")
Expand Down Expand Up @@ -47,6 +48,13 @@
description="The max depth to flatten schemas.",
),
).to_dict()
TARGET_SCHEMA_CONFIG = PropertiesList(
Property(
"default_target_schema",
StringType(),
description="The default target database schema name to use for all streams.",
),
).to_dict()


class DeprecatedEnum(Enum):
Expand Down
22 changes: 21 additions & 1 deletion singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,31 @@ def schema_name(self) -> Optional[str]:
Returns:
The target schema name.
"""
# Get the current SQL Dialect being used
target_sqla_dialect = self.connection.engine.dialect.name
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
# Look for a default_target_scheme in the configuraion fle
default_target_schema = self.config.get("default_target_schema", None)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
parts = self.stream_name.split("-")

# 1) When default_target_scheme is in the configuration use it
# 2) if the streams are in <schema>-<table> format use the
# stream <schema>
# 3) Return None if you don't find anything
if default_target_schema:
return default_target_schema

if len(parts) in {2, 3}:
# Stream name is a two-part or three-part identifier.
# Use the second-to-last part as the schema name.
return self.conform_name(parts[-2], "schema")
stream_schema = self.conform_name(parts[-2], "schema")

# MS SQL Server has a public database role so the name is reserved
# and it can not be created as a schema. To avoid this common error
# we convert "public" to "dbo" if the target dialet is mssql
if target_sqla_dialect == "mssql" and stream_schema == "public":
return "dbo"
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
else:
return stream_schema

# Schema name not detected.
return None
Expand Down
49 changes: 48 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._compat import final
from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities
from singer_sdk.helpers.capabilities import (
TARGET_SCHEMA_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
TargetCapabilities,
)
from singer_sdk.io_base import SingerMessageType, SingerReader
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase
Expand Down Expand Up @@ -569,4 +574,46 @@ def cli(
class SQLTarget(Target):
"""Target implementation for SQL destinations."""

@classproperty
def capabilities(self) -> List[CapabilitiesEnum]:
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
"""Get target capabilities.

Returns:
A list of capabilities supported by this target.
"""
sql_target_capabilities: List[CapabilitiesEnum] = super().capabilities
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
sql_target_capabilities.extend([TargetCapabilities.TARGET_SCHEMA])

return sql_target_capabilities

@classmethod
def append_builtin_config(self, config_jsonschema: dict) -> None: # noqa: ANN102
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
"""Appends built-in config to `config_jsonschema` if not already set.

To customize or disable this behavior, developers may either override this class
method or override the `capabilities` property to disabled any unwanted
built-in capabilities.

For all except very advanced use cases, we recommend leaving these
implementations "as-is", since this provides the most choice to users and is
the most "future proof" in terms of taking advantage of built-in capabilities
which may be added in the future.

Args:
config_jsonschema: [description]
"""

def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
# Append any missing properties in the target with those from source.
for k, v in source_jsonschema["properties"].items():
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v

capabilities = self.capabilities

if TargetCapabilities.TARGET_SCHEMA in capabilities:
_merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema)

super().append_builtin_config(config_jsonschema)

pass