From c0917b01aba601449d5234ec510936a2c346ed70 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 4 Oct 2022 15:49:04 +0100 Subject: [PATCH 01/13] start on schema and table creation on --- singer_sdk/__init__.py | 9 +- singer_sdk/sinks/core.py | 8 + singer_sdk/sinks/sql.py | 78 +-- singer_sdk/sql/__init__.py | 3 + singer_sdk/sql/connector.py | 911 +++++++++++++++++++++++++++++++++ singer_sdk/streams/__init__.py | 3 +- singer_sdk/streams/sql.py | 880 +------------------------------ singer_sdk/target_base.py | 7 +- 8 files changed, 977 insertions(+), 922 deletions(-) create mode 100644 singer_sdk/sql/__init__.py create mode 100644 singer_sdk/sql/connector.py diff --git a/singer_sdk/__init__.py b/singer_sdk/__init__.py index 71a3d017d..0e440820b 100644 --- a/singer_sdk/__init__.py +++ b/singer_sdk/__init__.py @@ -4,13 +4,8 @@ from singer_sdk.mapper_base import InlineMapper from singer_sdk.plugin_base import PluginBase from singer_sdk.sinks import BatchSink, RecordSink, Sink, SQLSink -from singer_sdk.streams import ( - GraphQLStream, - RESTStream, - SQLConnector, - SQLStream, - Stream, -) +from singer_sdk.sql import SQLConnector +from singer_sdk.streams import GraphQLStream, RESTStream, SQLStream, Stream from singer_sdk.tap_base import SQLTap, Tap from singer_sdk.target_base import SQLTarget, Target diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index d3f8badad..18487546c 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -423,6 +423,14 @@ def activate_version(self, new_version: int) -> None: "Ignoring." ) + def setup(self) -> None: + """Perform any setup actions at the beginning of a Stream. + + Setup is executed once per Sink instance, after instantiation. If a Schema + change is detected, a new Sink is instantiated and this method is called again. + """ + pass + def clean_up(self) -> None: """Perform any clean up actions required at end of a stream. diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 5f37a0236..e7f97fe93 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -10,7 +10,7 @@ from singer_sdk.plugin_base import PluginBase from singer_sdk.sinks.batch import BatchSink -from singer_sdk.streams.sql import SQLConnector +from singer_sdk.sql import SQLConnector class SQLSink(BatchSink): @@ -38,11 +38,7 @@ def __init__( connector: Optional connector to reuse. """ self._connector: SQLConnector - if connector: - self._connector = connector - else: - self._connector = self.connector_class(dict(target.config)) - + self._connector = connector or self.connector_class(dict(target.config)) super().__init__(target, stream_name, schema, key_properties) @property @@ -65,21 +61,17 @@ def connection(self) -> sqlalchemy.engine.Connection: @property def table_name(self) -> str: - """Returns the table name, with no schema or database part. + """Return the table name, with no schema or database part. Returns: The target table name. """ parts = self.stream_name.split("-") - - if len(parts) == 1: - return self.stream_name - else: - return parts[-1] + return self.stream_name if len(parts) == 1 else parts[-1] @property def schema_name(self) -> Optional[str]: - """Returns the schema name or `None` if using names with no schema part. + """Return the schema name or `None` if using names with no schema part. Returns: The target schema name. @@ -88,13 +80,51 @@ def schema_name(self) -> Optional[str]: @property def database_name(self) -> Optional[str]: - """Returns the DB name or `None` if using names with no database part. + """Return the DB name or `None` if using names with no database part. Returns: The target database name. """ return None # Assumes single-DB target context. + @property + def full_table_name(self) -> str: + """Return the fully qualified table name. + + Returns: + The fully qualified table name. + """ + return self.connector.get_fully_qualified_name( + table_name=self.table_name, + schema_name=self.schema_name, + db_name=self.database_name, + ) + + @property + def full_schema_name(self) -> str: + """Return the fully qualified schema name. + + Returns: + The fully qualified schema name. + """ + return self.connector.get_fully_qualified_name( + schema_name=self.schema_name, db_name=self.database_name + ) + + def setup(self) -> None: + """Set up Sink. + + This method is called on Sink creation, and creates the required Schema and + Table entities in the target database. + """ + self.connector.prepare_schema(self.full_schema_name) + self.connector.prepare_table( + full_table_name=self.full_table_name, + schema=self.schema, + primary_keys=self.key_properties, + as_temp_table=False, + ) + def process_batch(self, context: dict) -> None: """Process a batch with the given batch context. @@ -106,31 +136,12 @@ def process_batch(self, context: dict) -> None: """ # If duplicates are merged, these can be tracked via # :meth:`~singer_sdk.Sink.tally_duplicate_merged()`. - self.connector.prepare_table( - full_table_name=self.full_table_name, - schema=self.schema, - primary_keys=self.key_properties, - as_temp_table=False, - ) self.bulk_insert_records( full_table_name=self.full_table_name, schema=self.schema, records=context["records"], ) - @property - def full_table_name(self) -> str: - """Gives the fully qualified table name. - - Returns: - The fully qualified table name. - """ - return self.connector.get_fully_qualified_name( - self.table_name, - self.schema_name, - self.database_name, - ) - def create_table_with_records( self, full_table_name: Optional[str], @@ -154,6 +165,7 @@ def create_table_with_records( if primary_keys is None: primary_keys = self.key_properties partition_keys = partition_keys or None + # TODO: determine if this call to `prepare_table` is necessary (in addition to in `setup` above) self.connector.prepare_table( full_table_name=full_table_name, primary_keys=primary_keys, diff --git a/singer_sdk/sql/__init__.py b/singer_sdk/sql/__init__.py new file mode 100644 index 000000000..49890d841 --- /dev/null +++ b/singer_sdk/sql/__init__.py @@ -0,0 +1,3 @@ +from .connector import SQLConnector + +__all__ = ["SQLConnector"] diff --git a/singer_sdk/sql/connector.py b/singer_sdk/sql/connector.py new file mode 100644 index 000000000..a3bc20898 --- /dev/null +++ b/singer_sdk/sql/connector.py @@ -0,0 +1,911 @@ +from __future__ import annotations + +import logging +from datetime import datetime +from functools import lru_cache +from typing import Any, Iterable, cast + +import sqlalchemy +from sqlalchemy.engine import Engine +from sqlalchemy.engine.reflection import Inspector + +from singer_sdk import typing as th +from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema +from singer_sdk._singerlib.messages import SchemaMessage +from singer_sdk.exceptions import ConfigValidationError + + +class SQLConnector: + """Base class for SQLAlchemy-based connectors. + + The connector class serves as a wrapper around the SQL connection. + + The functions of the connector are: + + - connecting to the source + - generating SQLAlchemy connection and engine objects + - discovering schema catalog entries + - performing type conversions to/from JSONSchema types + - dialect-specific functions, such as escaping and fully qualified names + """ + + allow_column_add: bool = True # Whether ADD COLUMN is supported. + allow_column_rename: bool = True # Whether RENAME COLUMN is supported. + allow_column_alter: bool = False # Whether altering column types is supported. + allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported. + allow_temp_tables: bool = True # Whether temp tables are supported. + + def __init__( + self, config: dict | None = None, sqlalchemy_url: str | None = None + ) -> None: + """Initialize the SQL connector. + + Args: + config: The parent tap or target object's config. + sqlalchemy_url: Optional URL for the connection. + """ + self._config: dict[str, Any] = config or {} + self._sqlalchemy_url: str | None = sqlalchemy_url or None + self._connection: sqlalchemy.engine.Connection | None = None + + @property + def config(self) -> dict: + """If set, provides access to the tap or target config. + + Returns: + The settings as a dict. + """ + return self._config + + @property + def logger(self) -> logging.Logger: + """Get logger. + + Returns: + Plugin logger. + """ + return logging.getLogger("sqlconnector") + + def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: + """Return a new SQLAlchemy connection using the provided config. + + By default this will create using the sqlalchemy `stream_results=True` option + described here: + https://docs.sqlalchemy.org/en/14/core/connections.html#using-server-side-cursors-a-k-a-stream-results + + Developers may override this method if their provider does not support + server side cursors (`stream_results`) or in order to use different + configurations options when creating the connection object. + + Returns: + A newly created SQLAlchemy engine object. + """ + return ( + self.create_sqlalchemy_engine() + .connect() + .execution_options(stream_results=True) + ) + + def create_sqlalchemy_engine(self) -> sqlalchemy.engine.Engine: + """Return a new SQLAlchemy engine using the provided config. + + Developers can generally override just one of the following: + `sqlalchemy_engine`, sqlalchemy_url`. + + Returns: + A newly created SQLAlchemy engine object. + """ + return sqlalchemy.create_engine(self.sqlalchemy_url, echo=False) + + @property + def connection(self) -> sqlalchemy.engine.Connection: + """Return or set the SQLAlchemy connection object. + + Returns: + The active SQLAlchemy connection object. + """ + if not self._connection: + self._connection = self.create_sqlalchemy_connection() + + return self._connection + + @property + def sqlalchemy_url(self) -> str: + """Return the SQLAlchemy URL string. + + Returns: + The URL as a string. + """ + if not self._sqlalchemy_url: + self._sqlalchemy_url = self.get_sqlalchemy_url(self.config) + + return self._sqlalchemy_url + + def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: + """Return the SQLAlchemy URL string. + + Developers can generally override just one of the following: + `sqlalchemy_engine`, `get_sqlalchemy_url`. + + Args: + config: A dictionary of settings from the tap or target config. + + Returns: + The URL as a string. + + Raises: + ConfigValidationError: If no valid sqlalchemy_url can be found. + """ + if "sqlalchemy_url" not in config: + raise ConfigValidationError( + "Could not find or create 'sqlalchemy_url' for connection." + ) + + return cast(str, config["sqlalchemy_url"]) + + @staticmethod + def to_jsonschema_type( + sql_type: ( + str | sqlalchemy.types.TypeEngine | type[sqlalchemy.types.TypeEngine] | Any + ), + ) -> dict: + """Return a JSON Schema representation of the provided type. + + By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy + types. + + Developers may override this method to accept additional input argument types, + to support non-standard types, or to provide custom typing logic. + + Args: + sql_type: The string representation of the SQL type, a SQLAlchemy + TypeEngine class or object, or a custom-specified object. + + Raises: + ValueError: If the type received could not be translated to jsonschema. + + Returns: + The JSON Schema representation of the provided type. + """ + if isinstance(sql_type, (str, sqlalchemy.types.TypeEngine)): + return th.to_jsonschema_type(sql_type) + + if isinstance(sql_type, type): + if issubclass(sql_type, sqlalchemy.types.TypeEngine): + return th.to_jsonschema_type(sql_type) + + raise ValueError(f"Unexpected type received: '{sql_type.__name__}'") + + raise ValueError(f"Unexpected type received: '{type(sql_type).__name__}'") + + @staticmethod + def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: + """Return a JSON Schema representation of the provided type. + + By default will call `typing.to_sql_type()`. + + Developers may override this method to accept additional input argument types, + to support non-standard types, or to provide custom typing logic. + + If overriding this method, developers should call the default implementation + from the base class for all unhandled cases. + + Args: + jsonschema_type: The JSON Schema representation of the source type. + + Returns: + The SQLAlchemy type representation of the data type. + """ + return th.to_sql_type(jsonschema_type) + + @staticmethod + def get_fully_qualified_name( + table_name: str | None = None, + schema_name: str | None = None, + db_name: str | None = None, + delimiter: str = ".", + ) -> str: + """Concatenates a fully qualified name from the parts. + + Args: + table_name: The name of the table. + schema_name: The name of the schema. Defaults to None. + db_name: The name of the database. Defaults to None. + delimiter: Generally: '.' for SQL names and '-' for Singer names. + + Raises: + ValueError: If all 3 name parts not supplied. + + Returns: + The fully qualified name as a string. + """ + parts = [] + + if db_name: + parts.append(db_name) + if schema_name: + parts.append(schema_name) + if table_name: + parts.append(table_name) + + if not parts: + raise ValueError( + "Could not generate fully qualified name: " + + ":".join( + [ + db_name or "(unknown-db)", + schema_name or "(unknown-schema)", + table_name or "(unknown-table-name)", + ] + ) + ) + + return delimiter.join(parts) + + @property + def _dialect(self) -> sqlalchemy.engine.Dialect: + """Return the dialect object. + + Returns: + The dialect object. + """ + return cast(sqlalchemy.engine.Dialect, self.connection.engine.dialect) + + @property + def _engine(self) -> sqlalchemy.engine.Engine: + """Return the dialect object. + + Returns: + The dialect object. + """ + return cast(sqlalchemy.engine.Engine, self.connection.engine) + + def quote(self, name: str) -> str: + """Quote a name if it needs quoting, using '.' as a name-part delimiter. + + Examples: + "my_table" => "`my_table`" + "my_schema.my_table" => "`my_schema`.`my_table`" + + Args: + name: The unquoted name. + + Returns: + str: The quoted name. + """ + return ".".join( + [ + self._dialect.identifier_preparer.quote(name_part) + for name_part in name.split(".") + ] + ) + + @lru_cache() + def _warn_no_view_detection(self) -> None: + """Print a warning, but only the first time.""" + self.logger.warning( + "Provider does not support get_view_names(). " + "Streams list may be incomplete or `is_view` may be unpopulated." + ) + + def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: + """Return a list of schema names in DB. + + Args: + engine: SQLAlchemy engine + inspected: SQLAlchemy inspector instance for engine + + Returns: + List of schema names + """ + return inspected.get_schema_names() + + def get_object_names( + self, engine: Engine, inspected: Inspector, schema_name: str + ) -> list[tuple[str, bool]]: + """Return a list of syncable objects. + + Args: + engine: SQLAlchemy engine + inspected: SQLAlchemy inspector instance for engine + schema_name: Schema name to inspect + + Returns: + List of tuples (, ) + """ + # Get list of tables and views + table_names = inspected.get_table_names(schema=schema_name) + try: + view_names = inspected.get_view_names(schema=schema_name) + except NotImplementedError: + # Some DB providers do not understand 'views' + self._warn_no_view_detection() + view_names = [] + return [(t, False) for t in table_names] + [(v, True) for v in view_names] + + # TODO maybe should be splitted into smaller parts? + def discover_catalog_entry( + self, + engine: Engine, + inspected: Inspector, + schema_name: str, + table_name: str, + is_view: bool, + ) -> CatalogEntry: + """Create `CatalogEntry` object for the given table or a view. + + Args: + engine: SQLAlchemy engine + inspected: SQLAlchemy inspector instance for engine + schema_name: Schema name to inspect + table_name: Name of the table or a view + is_view: Flag whether this object is a view, returned by `get_object_names` + + Returns: + `CatalogEntry` object for the given table or a view + """ + # Initialize unique stream name + unique_stream_id = self.get_fully_qualified_name( + db_name=None, + schema_name=schema_name, + table_name=table_name, + delimiter="-", + ) + + # Detect key properties + possible_primary_keys: list[list[str]] = [] + pk_def = inspected.get_pk_constraint(table_name, schema=schema_name) + if pk_def and "constrained_columns" in pk_def: + possible_primary_keys.append(pk_def["constrained_columns"]) + + possible_primary_keys.extend( + index_def["column_names"] + for index_def in inspected.get_indexes(table_name, schema=schema_name) + if index_def.get("unique", False) + ) + + key_properties = next(iter(possible_primary_keys), None) + + # Initialize columns list + table_schema = th.PropertiesList() + for column_def in inspected.get_columns(table_name, schema=schema_name): + column_name = column_def["name"] + is_nullable = column_def.get("nullable", False) + jsonschema_type: dict = self.to_jsonschema_type( + cast(sqlalchemy.types.TypeEngine, column_def["type"]) + ) + table_schema.append( + th.Property( + name=column_name, + wrapped=th.CustomType(jsonschema_type), + required=not is_nullable, + ) + ) + schema = table_schema.to_dict() + + # Initialize available replication methods + addl_replication_methods: list[str] = [""] # By default an empty list. + # Notes regarding replication methods: + # - 'INCREMENTAL' replication must be enabled by the user by specifying + # a replication_key value. + # - 'LOG_BASED' replication must be enabled by the developer, according + # to source-specific implementation capabilities. + replication_method = next(reversed(["FULL_TABLE"] + addl_replication_methods)) + + # Create the catalog entry object + return CatalogEntry( + tap_stream_id=unique_stream_id, + stream=unique_stream_id, + table=table_name, + key_properties=key_properties, + schema=Schema.from_dict(schema), + is_view=is_view, + replication_method=replication_method, + metadata=MetadataMapping.get_standard_metadata( + schema_name=schema_name, + schema=schema, + replication_method=replication_method, + key_properties=key_properties, + valid_replication_keys=None, # Must be defined by user + ), + database=None, # Expects single-database context + row_count=None, + stream_alias=None, + replication_key=None, # Must be defined by user + ) + + def discover_catalog_entries(self) -> list[dict]: + """Return a list of catalog entries from discovery. + + Returns: + The discovered catalog entries as a list. + """ + result: list[dict] = [] + engine = self.create_sqlalchemy_engine() + inspected = sqlalchemy.inspect(engine) + for schema_name in self.get_schema_names(engine, inspected): + # Iterate through each table and view + for table_name, is_view in self.get_object_names( + engine, inspected, schema_name + ): + catalog_entry = self.discover_catalog_entry( + engine, inspected, schema_name, table_name, is_view + ) + result.append(catalog_entry.to_dict()) + + return result + + def parse_full_table_name( + self, full_table_name: str + ) -> tuple[str | None, str | None, str]: + """Parse a fully qualified table name into its parts. + + Developers may override this method if their platform does not support the + traditional 3-part convention: `db_name.schema_name.table_name` + + Args: + full_table_name: A table name or a fully qualified table name. Depending on + SQL the platform, this could take the following forms: + - `..` (three part names) + - `.
` (platforms which do not use schema groupings) + - `.` (if DB name is already in context) + - `
` (if DB name and schema name are already in context) + + Returns: + A three part tuple (db_name, schema_name, table_name) with any unspecified + or unused parts returned as None. + """ + db_name: str | None = None + schema_name: str | None = None + + parts = full_table_name.split(".") + if len(parts) == 1: + table_name = full_table_name + if len(parts) == 2: + schema_name, table_name = parts + if len(parts) == 3: + db_name, schema_name, table_name = parts + + return db_name, schema_name, table_name + + def table_exists(self, full_table_name: str) -> bool: + """Determine if the target table already exists. + + Args: + full_table_name: the target table name. + + Returns: + True if table exists, False if not, None if unsure or undetectable. + """ + return cast( + bool, + sqlalchemy.inspect(self._engine).has_table(full_table_name), + ) + + def schema_exists(self, full_schema_name: str) -> bool: + """Determine if the target database schema already exists. + + Args: + schema_name: The target database schema name. + + Returns: + True if the database schema exists, False if not. + """ + schema_names = sqlalchemy.inspect(self._engine).get_schema_names() + return full_schema_name in schema_names + + def get_table_columns(self, full_table_name: str) -> dict[str, sqlalchemy.Column]: + """Return a list of table columns. + + Args: + full_table_name: Fully qualified table name. + + Returns: + An ordered list of column objects. + """ + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + inspector = sqlalchemy.inspect(self._engine) + columns = inspector.get_columns(table_name, schema_name) + + return { + col_meta["name"]: sqlalchemy.Column( + col_meta["name"], + col_meta["type"], + nullable=col_meta.get("nullable", False), + ) + for col_meta in columns + } + + def get_table(self, full_table_name: str) -> sqlalchemy.Table: + """Return a table object. + + Args: + full_table_name: Fully qualified table name. + + Returns: + A table object with column list. + """ + columns = self.get_table_columns(full_table_name).values() + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + meta = sqlalchemy.MetaData() + return sqlalchemy.schema.Table( + table_name, meta, *list(columns), schema=schema_name + ) + + def column_exists(self, full_table_name: str, column_name: str) -> bool: + """Determine if the target table already exists. + + Args: + full_table_name: the target table name. + column_name: the target column name. + + Returns: + True if table exists, False if not. + """ + return column_name in self.get_table_columns(full_table_name) + + def create_schema(self, full_schema_name: str) -> None: + """Create target database schema. + + Args: + full_schema_name: The target database schema to create. + """ + self._engine.execute(sqlalchemy.schema.CreateSchema(full_schema_name)) + + def create_empty_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str] | None = None, + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> None: + """Create an empty target table. + + Args: + full_table_name: the target table name. + schema: the JSON schema for the new table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + + Raises: + NotImplementedError: if temp tables are unsupported and as_temp_table=True. + RuntimeError: if a variant schema is passed with no properties defined. + """ + if as_temp_table: + raise NotImplementedError("Temporary tables are not supported.") + + _ = partition_keys # Not supported in generic implementation. + + meta = sqlalchemy.MetaData() + columns: list[sqlalchemy.Column] = [] + primary_keys = primary_keys or [] + try: + properties: dict = schema["properties"] + except KeyError: + raise RuntimeError( + f"Schema for '{full_table_name}' does not define properties: {schema}" + ) + for property_name, property_jsonschema in properties.items(): + is_primary_key = property_name in primary_keys + columns.append( + sqlalchemy.Column( + property_name, + self.to_sql_type(property_jsonschema), + primary_key=is_primary_key, + ) + ) + + _ = sqlalchemy.Table(full_table_name, meta, *columns) + meta.create_all(self._engine) + + def _create_empty_column( + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, + ) -> None: + """Create a new column. + + Args: + full_table_name: The target table name. + column_name: The name of the new column. + sql_type: SQLAlchemy type engine to be used in creating the new column. + + Raises: + NotImplementedError: if adding columns is not supported. + """ + if not self.allow_column_add: + raise NotImplementedError("Adding columns is not supported.") + + create_column_clause = sqlalchemy.schema.CreateColumn( + sqlalchemy.Column( + column_name, + sql_type, + ) + ) + self.connection.execute( + sqlalchemy.DDL( + "ALTER TABLE %(table)s ADD COLUMN %(create_column)s", + { + "table": full_table_name, + "create_column": create_column_clause, + }, + ) + ) + + def prepare_schema(self, full_schema_name: str) -> None: + """Create the target database schema. + + Args: + full_schema_name: The target schema name. + """ + if not self.schema_exists(full_schema_name): + self.create_schema(full_schema_name) + + def prepare_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str], + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> None: + """Adapt target table to provided schema if possible. + + Args: + full_table_name: the target table name. + schema: the JSON Schema for the table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + """ + if not self.table_exists(full_table_name=full_table_name): + self.create_empty_table( + full_table_name=full_table_name, + schema=schema, + primary_keys=primary_keys, + partition_keys=partition_keys, + as_temp_table=as_temp_table, + ) + return + + for property_name, property_def in schema["properties"].items(): + self.prepare_column( + full_table_name, property_name, self.to_sql_type(property_def) + ) + + def prepare_column( + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, + ) -> None: + """Adapt target table to provided schema if possible. + + Args: + full_table_name: the target table name. + column_name: the target column name. + sql_type: the SQLAlchemy type. + """ + if not self.column_exists(full_table_name, column_name): + self._create_empty_column( + full_table_name=full_table_name, + column_name=column_name, + sql_type=sql_type, + ) + return + + self._adapt_column_type( + full_table_name, + column_name=column_name, + sql_type=sql_type, + ) + + def rename_column(self, full_table_name: str, old_name: str, new_name: str) -> None: + """Rename the provided columns. + + Args: + full_table_name: The fully qualified table name. + old_name: The old column to be renamed. + new_name: The new name for the column. + + Raises: + NotImplementedError: If `self.allow_column_rename` is false. + """ + if not self.allow_column_rename: + raise NotImplementedError("Renaming columns is not supported.") + + self.connection.execute( + f"ALTER TABLE {full_table_name} " + f'RENAME COLUMN "{old_name}" to "{new_name}"' + ) + + def merge_sql_types( + self, sql_types: list[sqlalchemy.types.TypeEngine] + ) -> sqlalchemy.types.TypeEngine: + """Return a compatible SQL type for the selected type list. + + Args: + sql_types: List of SQL types. + + Returns: + A SQL type that is compatible with the input types. + + Raises: + ValueError: If sql_types argument has zero members. + """ + if not sql_types: + raise ValueError("Expected at least one member in `sql_types` argument.") + + if len(sql_types) == 1: + return sql_types[0] + + # Gathering Type to match variables + # sent in _adapt_column_type + current_type = sql_types[0] + # sql_type = sql_types[1] + + # Getting the length of each type + # current_type_len: int = getattr(sql_types[0], "length", 0) + sql_type_len: int = getattr(sql_types[1], "length", 0) + if sql_type_len is None: + sql_type_len = 0 + + # Convert the two types given into a sorted list + # containing the best conversion classes + sql_types = self._sort_types(sql_types) + + # If greater than two evaluate the first pair then on down the line + if len(sql_types) > 2: + return self.merge_sql_types( + [self.merge_sql_types([sql_types[0], sql_types[1]])] + sql_types[2:] + ) + + assert len(sql_types) == 2 + # Get the generic type class + for opt in sql_types: + # Get the length + opt_len: int = getattr(opt, "length", 0) + generic_type = type(opt.as_generic()) + + if isinstance(generic_type, type): + if issubclass( + generic_type, + (sqlalchemy.types.String, sqlalchemy.types.Unicode), + ): + # If length None or 0 then is varchar max ? + if (opt_len is None) or (opt_len == 0): + return opt + elif isinstance( + generic_type, + (sqlalchemy.types.String, sqlalchemy.types.Unicode), + ): + # If length None or 0 then is varchar max ? + if (opt_len is None) or (opt_len == 0): + return opt + # If best conversion class is equal to current type + # return the best conversion class + elif str(opt) == str(current_type): + return opt + + raise ValueError( + f"Unable to merge sql types: {', '.join([str(t) for t in sql_types])}" + ) + + def _sort_types( + self, + sql_types: Iterable[sqlalchemy.types.TypeEngine], + ) -> list[sqlalchemy.types.TypeEngine]: + """Return the input types sorted from most to least compatible. + + For example, [Smallint, Integer, Datetime, String, Double] would become + [Unicode, String, Double, Integer, Smallint, Datetime]. + String types will be listed first, then decimal types, then integer types, + then bool types, and finally datetime and date. Higher precision, scale, and + length will be sorted earlier. + + Args: + sql_types (List[sqlalchemy.types.TypeEngine]): [description] + + Returns: + The sorted list. + """ + + def _get_type_sort_key( + sql_type: sqlalchemy.types.TypeEngine, + ) -> tuple[int, int]: + # return rank, with higher numbers ranking first + + _len = int(getattr(sql_type, "length", 0) or 0) + + _pytype = cast(type, sql_type.python_type) + if issubclass(_pytype, (str, bytes)): + return 900, _len + elif issubclass(_pytype, datetime): + return 600, _len + elif issubclass(_pytype, float): + return 400, _len + elif issubclass(_pytype, int): + return 300, _len + + return 0, _len + + return sorted(sql_types, key=_get_type_sort_key, reverse=True) + + def _get_column_type( + self, full_table_name: str, column_name: str + ) -> sqlalchemy.types.TypeEngine: + """Get the SQL type of the declared column. + + Args: + full_table_name: The name of the table. + column_name: The name of the column. + + Returns: + The type of the column. + + Raises: + KeyError: If the provided column name does not exist. + """ + try: + column = self.get_table_columns(full_table_name)[column_name] + except KeyError as ex: + raise KeyError( + f"Column `{column_name}` does not exist in table `{full_table_name}`." + ) from ex + + return cast(sqlalchemy.types.TypeEngine, column.type) + + def _adapt_column_type( + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, + ) -> None: + """Adapt table column type to support the new JSON schema type. + + Args: + full_table_name: The target table name. + column_name: The target column name. + sql_type: The new SQLAlchemy type. + + Raises: + NotImplementedError: if altering columns is not supported. + """ + current_type: sqlalchemy.types.TypeEngine = self._get_column_type( + full_table_name, column_name + ) + + # Check if the existing column type and the sql type are the same + if str(sql_type) == str(current_type): + # The current column and sql type are the same + # Nothing to do + return + + # Not the same type, generic type or compatible types + # calling merge_sql_types for assistnace + compatible_sql_type = self.merge_sql_types([current_type, sql_type]) + + if str(compatible_sql_type) == str(current_type): + # Nothing to do + return + + if not self.allow_column_alter: + raise NotImplementedError( + "Altering columns is not supported. " + f"Could not convert column '{full_table_name}.{column_name}' " + f"from '{current_type}' to '{compatible_sql_type}'." + ) + + self.connection.execute( + sqlalchemy.DDL( + "ALTER TABLE %(table)s ALTER COLUMN %(col_name)s (%(col_type)s)", + { + "table": full_table_name, + "col_name": column_name, + "col_type": compatible_sql_type, + }, + ) + ) diff --git a/singer_sdk/streams/__init__.py b/singer_sdk/streams/__init__.py index d34bb67c7..9aea603d0 100644 --- a/singer_sdk/streams/__init__.py +++ b/singer_sdk/streams/__init__.py @@ -1,9 +1,10 @@ """SDK for building singer-compliant taps.""" +from singer_sdk.sql import SQLConnector from singer_sdk.streams.core import Stream from singer_sdk.streams.graphql import GraphQLStream from singer_sdk.streams.rest import RESTStream -from singer_sdk.streams.sql import SQLConnector, SQLStream +from singer_sdk.streams.sql import SQLStream __all__ = [ "Stream", diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index 114c01674..78a2c9186 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -3,892 +3,16 @@ from __future__ import annotations import abc -import logging -from datetime import datetime -from functools import lru_cache from typing import Any, Iterable, cast import sqlalchemy -from sqlalchemy.engine import Engine -from sqlalchemy.engine.reflection import Inspector -from singer_sdk import typing as th -from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema -from singer_sdk.exceptions import ConfigValidationError +from singer_sdk._singerlib import CatalogEntry, MetadataMapping from singer_sdk.plugin_base import PluginBase as TapBaseClass +from singer_sdk.sql import SQLConnector from singer_sdk.streams.core import Stream -class SQLConnector: - """Base class for SQLAlchemy-based connectors. - - The connector class serves as a wrapper around the SQL connection. - - The functions of the connector are: - - - connecting to the source - - generating SQLAlchemy connection and engine objects - - discovering schema catalog entries - - performing type conversions to/from JSONSchema types - - dialect-specific functions, such as escaping and fully qualified names - """ - - allow_column_add: bool = True # Whether ADD COLUMN is supported. - allow_column_rename: bool = True # Whether RENAME COLUMN is supported. - allow_column_alter: bool = False # Whether altering column types is supported. - allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported. - allow_temp_tables: bool = True # Whether temp tables are supported. - - def __init__( - self, config: dict | None = None, sqlalchemy_url: str | None = None - ) -> None: - """Initialize the SQL connector. - - Args: - config: The parent tap or target object's config. - sqlalchemy_url: Optional URL for the connection. - """ - self._config: dict[str, Any] = config or {} - self._sqlalchemy_url: str | None = sqlalchemy_url or None - self._connection: sqlalchemy.engine.Connection | None = None - - @property - def config(self) -> dict: - """If set, provides access to the tap or target config. - - Returns: - The settings as a dict. - """ - return self._config - - @property - def logger(self) -> logging.Logger: - """Get logger. - - Returns: - Plugin logger. - """ - return logging.getLogger("sqlconnector") - - def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: - """Return a new SQLAlchemy connection using the provided config. - - By default this will create using the sqlalchemy `stream_results=True` option - described here: - https://docs.sqlalchemy.org/en/14/core/connections.html#using-server-side-cursors-a-k-a-stream-results - - Developers may override this method if their provider does not support - server side cursors (`stream_results`) or in order to use different - configurations options when creating the connection object. - - Returns: - A newly created SQLAlchemy engine object. - """ - return ( - self.create_sqlalchemy_engine() - .connect() - .execution_options(stream_results=True) - ) - - def create_sqlalchemy_engine(self) -> sqlalchemy.engine.Engine: - """Return a new SQLAlchemy engine using the provided config. - - Developers can generally override just one of the following: - `sqlalchemy_engine`, sqlalchemy_url`. - - Returns: - A newly created SQLAlchemy engine object. - """ - return sqlalchemy.create_engine(self.sqlalchemy_url, echo=False) - - @property - def connection(self) -> sqlalchemy.engine.Connection: - """Return or set the SQLAlchemy connection object. - - Returns: - The active SQLAlchemy connection object. - """ - if not self._connection: - self._connection = self.create_sqlalchemy_connection() - - return self._connection - - @property - def sqlalchemy_url(self) -> str: - """Return the SQLAlchemy URL string. - - Returns: - The URL as a string. - """ - if not self._sqlalchemy_url: - self._sqlalchemy_url = self.get_sqlalchemy_url(self.config) - - return self._sqlalchemy_url - - def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: - """Return the SQLAlchemy URL string. - - Developers can generally override just one of the following: - `sqlalchemy_engine`, `get_sqlalchemy_url`. - - Args: - config: A dictionary of settings from the tap or target config. - - Returns: - The URL as a string. - - Raises: - ConfigValidationError: If no valid sqlalchemy_url can be found. - """ - if "sqlalchemy_url" not in config: - raise ConfigValidationError( - "Could not find or create 'sqlalchemy_url' for connection." - ) - - return cast(str, config["sqlalchemy_url"]) - - @staticmethod - def to_jsonschema_type( - sql_type: ( - str | sqlalchemy.types.TypeEngine | type[sqlalchemy.types.TypeEngine] | Any - ), - ) -> dict: - """Return a JSON Schema representation of the provided type. - - By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy - types. - - Developers may override this method to accept additional input argument types, - to support non-standard types, or to provide custom typing logic. - - Args: - sql_type: The string representation of the SQL type, a SQLAlchemy - TypeEngine class or object, or a custom-specified object. - - Raises: - ValueError: If the type received could not be translated to jsonschema. - - Returns: - The JSON Schema representation of the provided type. - """ - if isinstance(sql_type, (str, sqlalchemy.types.TypeEngine)): - return th.to_jsonschema_type(sql_type) - - if isinstance(sql_type, type): - if issubclass(sql_type, sqlalchemy.types.TypeEngine): - return th.to_jsonschema_type(sql_type) - - raise ValueError(f"Unexpected type received: '{sql_type.__name__}'") - - raise ValueError(f"Unexpected type received: '{type(sql_type).__name__}'") - - @staticmethod - def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: - """Return a JSON Schema representation of the provided type. - - By default will call `typing.to_sql_type()`. - - Developers may override this method to accept additional input argument types, - to support non-standard types, or to provide custom typing logic. - - If overriding this method, developers should call the default implementation - from the base class for all unhandled cases. - - Args: - jsonschema_type: The JSON Schema representation of the source type. - - Returns: - The SQLAlchemy type representation of the data type. - """ - return th.to_sql_type(jsonschema_type) - - @staticmethod - def get_fully_qualified_name( - table_name: str, - schema_name: str | None = None, - db_name: str | None = None, - delimiter: str = ".", - ) -> str: - """Concatenates a fully qualified name from the parts. - - Args: - table_name: The name of the table. - schema_name: The name of the schema. Defaults to None. - db_name: The name of the database. Defaults to None. - delimiter: Generally: '.' for SQL names and '-' for Singer names. - - Raises: - ValueError: If table_name is not provided or if neither schema_name or - db_name are provided. - - Returns: - The fully qualified name as a string. - """ - if db_name and schema_name: - result = delimiter.join([db_name, schema_name, table_name]) - elif db_name: - result = delimiter.join([db_name, table_name]) - elif schema_name: - result = delimiter.join([schema_name, table_name]) - elif table_name: - result = table_name - else: - raise ValueError( - "Could not generate fully qualified name for stream: " - + ":".join( - [ - db_name or "(unknown-db)", - schema_name or "(unknown-schema)", - table_name or "(unknown-table-name)", - ] - ) - ) - - return result - - @property - def _dialect(self) -> sqlalchemy.engine.Dialect: - """Return the dialect object. - - Returns: - The dialect object. - """ - return cast(sqlalchemy.engine.Dialect, self.connection.engine.dialect) - - @property - def _engine(self) -> sqlalchemy.engine.Engine: - """Return the dialect object. - - Returns: - The dialect object. - """ - return cast(sqlalchemy.engine.Engine, self.connection.engine) - - def quote(self, name: str) -> str: - """Quote a name if it needs quoting, using '.' as a name-part delimiter. - - Examples: - "my_table" => "`my_table`" - "my_schema.my_table" => "`my_schema`.`my_table`" - - Args: - name: The unquoted name. - - Returns: - str: The quoted name. - """ - return ".".join( - [ - self._dialect.identifier_preparer.quote(name_part) - for name_part in name.split(".") - ] - ) - - @lru_cache() - def _warn_no_view_detection(self) -> None: - """Print a warning, but only the first time.""" - self.logger.warning( - "Provider does not support get_view_names(). " - "Streams list may be incomplete or `is_view` may be unpopulated." - ) - - def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: - """Return a list of schema names in DB. - - Args: - engine: SQLAlchemy engine - inspected: SQLAlchemy inspector instance for engine - - Returns: - List of schema names - """ - return inspected.get_schema_names() - - def get_object_names( - self, engine: Engine, inspected: Inspector, schema_name: str - ) -> list[tuple[str, bool]]: - """Return a list of syncable objects. - - Args: - engine: SQLAlchemy engine - inspected: SQLAlchemy inspector instance for engine - schema_name: Schema name to inspect - - Returns: - List of tuples (, ) - """ - # Get list of tables and views - table_names = inspected.get_table_names(schema=schema_name) - try: - view_names = inspected.get_view_names(schema=schema_name) - except NotImplementedError: - # Some DB providers do not understand 'views' - self._warn_no_view_detection() - view_names = [] - object_names = [(t, False) for t in table_names] + [ - (v, True) for v in view_names - ] - - return object_names - - # TODO maybe should be splitted into smaller parts? - def discover_catalog_entry( - self, - engine: Engine, - inspected: Inspector, - schema_name: str, - table_name: str, - is_view: bool, - ) -> CatalogEntry: - """Create `CatalogEntry` object for the given table or a view. - - Args: - engine: SQLAlchemy engine - inspected: SQLAlchemy inspector instance for engine - schema_name: Schema name to inspect - table_name: Name of the table or a view - is_view: Flag whether this object is a view, returned by `get_object_names` - - Returns: - `CatalogEntry` object for the given table or a view - """ - # Initialize unique stream name - unique_stream_id = self.get_fully_qualified_name( - db_name=None, - schema_name=schema_name, - table_name=table_name, - delimiter="-", - ) - - # Detect key properties - possible_primary_keys: list[list[str]] = [] - pk_def = inspected.get_pk_constraint(table_name, schema=schema_name) - if pk_def and "constrained_columns" in pk_def: - possible_primary_keys.append(pk_def["constrained_columns"]) - for index_def in inspected.get_indexes(table_name, schema=schema_name): - if index_def.get("unique", False): - possible_primary_keys.append(index_def["column_names"]) - key_properties = next(iter(possible_primary_keys), None) - - # Initialize columns list - table_schema = th.PropertiesList() - for column_def in inspected.get_columns(table_name, schema=schema_name): - column_name = column_def["name"] - is_nullable = column_def.get("nullable", False) - jsonschema_type: dict = self.to_jsonschema_type( - cast(sqlalchemy.types.TypeEngine, column_def["type"]) - ) - table_schema.append( - th.Property( - name=column_name, - wrapped=th.CustomType(jsonschema_type), - required=not is_nullable, - ) - ) - schema = table_schema.to_dict() - - # Initialize available replication methods - addl_replication_methods: list[str] = [""] # By default an empty list. - # Notes regarding replication methods: - # - 'INCREMENTAL' replication must be enabled by the user by specifying - # a replication_key value. - # - 'LOG_BASED' replication must be enabled by the developer, according - # to source-specific implementation capabilities. - replication_method = next(reversed(["FULL_TABLE"] + addl_replication_methods)) - - # Create the catalog entry object - catalog_entry = CatalogEntry( - tap_stream_id=unique_stream_id, - stream=unique_stream_id, - table=table_name, - key_properties=key_properties, - schema=Schema.from_dict(schema), - is_view=is_view, - replication_method=replication_method, - metadata=MetadataMapping.get_standard_metadata( - schema_name=schema_name, - schema=schema, - replication_method=replication_method, - key_properties=key_properties, - valid_replication_keys=None, # Must be defined by user - ), - database=None, # Expects single-database context - row_count=None, - stream_alias=None, - replication_key=None, # Must be defined by user - ) - - return catalog_entry - - def discover_catalog_entries(self) -> list[dict]: - """Return a list of catalog entries from discovery. - - Returns: - The discovered catalog entries as a list. - """ - result: list[dict] = [] - engine = self.create_sqlalchemy_engine() - inspected = sqlalchemy.inspect(engine) - for schema_name in self.get_schema_names(engine, inspected): - # Iterate through each table and view - for table_name, is_view in self.get_object_names( - engine, inspected, schema_name - ): - catalog_entry = self.discover_catalog_entry( - engine, inspected, schema_name, table_name, is_view - ) - result.append(catalog_entry.to_dict()) - - return result - - def parse_full_table_name( - self, full_table_name: str - ) -> tuple[str | None, str | None, str]: - """Parse a fully qualified table name into its parts. - - Developers may override this method if their platform does not support the - traditional 3-part convention: `db_name.schema_name.table_name` - - Args: - full_table_name: A table name or a fully qualified table name. Depending on - SQL the platform, this could take the following forms: - - `..
` (three part names) - - `.
` (platforms which do not use schema groupings) - - `.` (if DB name is already in context) - - `
` (if DB name and schema name are already in context) - - Returns: - A three part tuple (db_name, schema_name, table_name) with any unspecified - or unused parts returned as None. - """ - db_name: str | None = None - schema_name: str | None = None - - parts = full_table_name.split(".") - if len(parts) == 1: - table_name = full_table_name - if len(parts) == 2: - schema_name, table_name = parts - if len(parts) == 3: - db_name, schema_name, table_name = parts - - return db_name, schema_name, table_name - - def table_exists(self, full_table_name: str) -> bool: - """Determine if the target table already exists. - - Args: - full_table_name: the target table name. - - Returns: - True if table exists, False if not, None if unsure or undetectable. - """ - return cast( - bool, - sqlalchemy.inspect(self._engine).has_table(full_table_name), - ) - - def get_table_columns(self, full_table_name: str) -> dict[str, sqlalchemy.Column]: - """Return a list of table columns. - - Args: - full_table_name: Fully qualified table name. - - Returns: - An ordered list of column objects. - """ - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - inspector = sqlalchemy.inspect(self._engine) - columns = inspector.get_columns(table_name, schema_name) - - result: dict[str, sqlalchemy.Column] = {} - for col_meta in columns: - result[col_meta["name"]] = sqlalchemy.Column( - col_meta["name"], - col_meta["type"], - nullable=col_meta.get("nullable", False), - ) - - return result - - def get_table(self, full_table_name: str) -> sqlalchemy.Table: - """Return a table object. - - Args: - full_table_name: Fully qualified table name. - - Returns: - A table object with column list. - """ - columns = self.get_table_columns(full_table_name).values() - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData() - return sqlalchemy.schema.Table( - table_name, meta, *list(columns), schema=schema_name - ) - - def column_exists(self, full_table_name: str, column_name: str) -> bool: - """Determine if the target table already exists. - - Args: - full_table_name: the target table name. - column_name: the target column name. - - Returns: - True if table exists, False if not. - """ - return column_name in self.get_table_columns(full_table_name) - - def create_empty_table( - self, - full_table_name: str, - schema: dict, - primary_keys: list[str] | None = None, - partition_keys: list[str] | None = None, - as_temp_table: bool = False, - ) -> None: - """Create an empty target table. - - Args: - full_table_name: the target table name. - schema: the JSON schema for the new table. - primary_keys: list of key properties. - partition_keys: list of partition keys. - as_temp_table: True to create a temp table. - - Raises: - NotImplementedError: if temp tables are unsupported and as_temp_table=True. - RuntimeError: if a variant schema is passed with no properties defined. - """ - if as_temp_table: - raise NotImplementedError("Temporary tables are not supported.") - - _ = partition_keys # Not supported in generic implementation. - - meta = sqlalchemy.MetaData() - columns: list[sqlalchemy.Column] = [] - primary_keys = primary_keys or [] - try: - properties: dict = schema["properties"] - except KeyError: - raise RuntimeError( - f"Schema for '{full_table_name}' does not define properties: {schema}" - ) - for property_name, property_jsonschema in properties.items(): - is_primary_key = property_name in primary_keys - columns.append( - sqlalchemy.Column( - property_name, - self.to_sql_type(property_jsonschema), - primary_key=is_primary_key, - ) - ) - - _ = sqlalchemy.Table(full_table_name, meta, *columns) - meta.create_all(self._engine) - - def _create_empty_column( - self, - full_table_name: str, - column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - ) -> None: - """Create a new column. - - Args: - full_table_name: The target table name. - column_name: The name of the new column. - sql_type: SQLAlchemy type engine to be used in creating the new column. - - Raises: - NotImplementedError: if adding columns is not supported. - """ - if not self.allow_column_add: - raise NotImplementedError("Adding columns is not supported.") - - create_column_clause = sqlalchemy.schema.CreateColumn( - sqlalchemy.Column( - column_name, - sql_type, - ) - ) - self.connection.execute( - sqlalchemy.DDL( - "ALTER TABLE %(table)s ADD COLUMN %(create_column)s", - { - "table": full_table_name, - "create_column": create_column_clause, - }, - ) - ) - - def prepare_table( - self, - full_table_name: str, - schema: dict, - primary_keys: list[str], - partition_keys: list[str] | None = None, - as_temp_table: bool = False, - ) -> None: - """Adapt target table to provided schema if possible. - - Args: - full_table_name: the target table name. - schema: the JSON Schema for the table. - primary_keys: list of key properties. - partition_keys: list of partition keys. - as_temp_table: True to create a temp table. - """ - if not self.table_exists(full_table_name=full_table_name): - self.create_empty_table( - full_table_name=full_table_name, - schema=schema, - primary_keys=primary_keys, - partition_keys=partition_keys, - as_temp_table=as_temp_table, - ) - return - - for property_name, property_def in schema["properties"].items(): - self.prepare_column( - full_table_name, property_name, self.to_sql_type(property_def) - ) - - def prepare_column( - self, - full_table_name: str, - column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - ) -> None: - """Adapt target table to provided schema if possible. - - Args: - full_table_name: the target table name. - column_name: the target column name. - sql_type: the SQLAlchemy type. - """ - if not self.column_exists(full_table_name, column_name): - self._create_empty_column( - full_table_name=full_table_name, - column_name=column_name, - sql_type=sql_type, - ) - return - - self._adapt_column_type( - full_table_name, - column_name=column_name, - sql_type=sql_type, - ) - - def rename_column(self, full_table_name: str, old_name: str, new_name: str) -> None: - """Rename the provided columns. - - Args: - full_table_name: The fully qualified table name. - old_name: The old column to be renamed. - new_name: The new name for the column. - - Raises: - NotImplementedError: If `self.allow_column_rename` is false. - """ - if not self.allow_column_rename: - raise NotImplementedError("Renaming columns is not supported.") - - self.connection.execute( - f"ALTER TABLE {full_table_name} " - f'RENAME COLUMN "{old_name}" to "{new_name}"' - ) - - def merge_sql_types( - self, sql_types: list[sqlalchemy.types.TypeEngine] - ) -> sqlalchemy.types.TypeEngine: - """Return a compatible SQL type for the selected type list. - - Args: - sql_types: List of SQL types. - - Returns: - A SQL type that is compatible with the input types. - - Raises: - ValueError: If sql_types argument has zero members. - """ - if not sql_types: - raise ValueError("Expected at least one member in `sql_types` argument.") - - if len(sql_types) == 1: - return sql_types[0] - - # Gathering Type to match variables - # sent in _adapt_column_type - current_type = sql_types[0] - # sql_type = sql_types[1] - - # Getting the length of each type - # current_type_len: int = getattr(sql_types[0], "length", 0) - sql_type_len: int = getattr(sql_types[1], "length", 0) - if sql_type_len is None: - sql_type_len = 0 - - # Convert the two types given into a sorted list - # containing the best conversion classes - sql_types = self._sort_types(sql_types) - - # If greater than two evaluate the first pair then on down the line - if len(sql_types) > 2: - return self.merge_sql_types( - [self.merge_sql_types([sql_types[0], sql_types[1]])] + sql_types[2:] - ) - - assert len(sql_types) == 2 - # Get the generic type class - for opt in sql_types: - # Get the length - opt_len: int = getattr(opt, "length", 0) - generic_type = type(opt.as_generic()) - - if isinstance(generic_type, type): - if issubclass( - generic_type, - (sqlalchemy.types.String, sqlalchemy.types.Unicode), - ): - # If length None or 0 then is varchar max ? - if (opt_len is None) or (opt_len == 0): - return opt - elif isinstance( - generic_type, - (sqlalchemy.types.String, sqlalchemy.types.Unicode), - ): - # If length None or 0 then is varchar max ? - if (opt_len is None) or (opt_len == 0): - return opt - # If best conversion class is equal to current type - # return the best conversion class - elif str(opt) == str(current_type): - return opt - - raise ValueError( - f"Unable to merge sql types: {', '.join([str(t) for t in sql_types])}" - ) - - def _sort_types( - self, - sql_types: Iterable[sqlalchemy.types.TypeEngine], - ) -> list[sqlalchemy.types.TypeEngine]: - """Return the input types sorted from most to least compatible. - - For example, [Smallint, Integer, Datetime, String, Double] would become - [Unicode, String, Double, Integer, Smallint, Datetime]. - String types will be listed first, then decimal types, then integer types, - then bool types, and finally datetime and date. Higher precision, scale, and - length will be sorted earlier. - - Args: - sql_types (List[sqlalchemy.types.TypeEngine]): [description] - - Returns: - The sorted list. - """ - - def _get_type_sort_key( - sql_type: sqlalchemy.types.TypeEngine, - ) -> tuple[int, int]: - # return rank, with higher numbers ranking first - - _len = int(getattr(sql_type, "length", 0) or 0) - - _pytype = cast(type, sql_type.python_type) - if issubclass(_pytype, (str, bytes)): - return 900, _len - elif issubclass(_pytype, datetime): - return 600, _len - elif issubclass(_pytype, float): - return 400, _len - elif issubclass(_pytype, int): - return 300, _len - - return 0, _len - - return sorted(sql_types, key=_get_type_sort_key, reverse=True) - - def _get_column_type( - self, full_table_name: str, column_name: str - ) -> sqlalchemy.types.TypeEngine: - """Gets the SQL type of the declared column. - - Args: - full_table_name: The name of the table. - column_name: The name of the column. - - Returns: - The type of the column. - - Raises: - KeyError: If the provided column name does not exist. - """ - try: - column = self.get_table_columns(full_table_name)[column_name] - except KeyError as ex: - raise KeyError( - f"Column `{column_name}` does not exist in table `{full_table_name}`." - ) from ex - - return cast(sqlalchemy.types.TypeEngine, column.type) - - def _adapt_column_type( - self, - full_table_name: str, - column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - ) -> None: - """Adapt table column type to support the new JSON schema type. - - Args: - full_table_name: The target table name. - column_name: The target column name. - sql_type: The new SQLAlchemy type. - - Raises: - NotImplementedError: if altering columns is not supported. - """ - current_type: sqlalchemy.types.TypeEngine = self._get_column_type( - full_table_name, column_name - ) - - # Check if the existing column type and the sql type are the same - if str(sql_type) == str(current_type): - # The current column and sql type are the same - # Nothing to do - return - - # Not the same type, generic type or compatible types - # calling merge_sql_types for assistnace - compatible_sql_type = self.merge_sql_types([current_type, sql_type]) - - if str(compatible_sql_type) == str(current_type): - # Nothing to do - return - - if not self.allow_column_alter: - raise NotImplementedError( - "Altering columns is not supported. " - f"Could not convert column '{full_table_name}.{column_name}' " - f"from '{current_type}' to '{compatible_sql_type}'." - ) - - self.connection.execute( - sqlalchemy.DDL( - "ALTER TABLE %(table)s ALTER COLUMN %(col_name)s (%(col_type)s)", - { - "table": full_table_name, - "col_name": column_name, - "col_type": compatible_sql_type, - }, - ) - ) - - class SQLStream(Stream, metaclass=abc.ABCMeta): """Base class for SQLAlchemy-based streams.""" diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 0b21b04c5..a1ddd5e78 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -224,14 +224,15 @@ def add_sink( """ self.logger.info(f"Initializing '{self.name}' target sink...") sink_class = self.get_sink_class(stream_name=stream_name) - result = sink_class( + sink = sink_class( target=self, stream_name=stream_name, schema=schema, key_properties=key_properties, ) - self._sinks_active[stream_name] = result - return result + sink.setup() + self._sinks_active[stream_name] = sink + return sink def _assert_sink_exists(self, stream_name: str) -> None: """Raise a RecordsWithoutSchemaException exception if stream doesn't exist. From 1af0a5727bbc6284dd0e78bf4681baa1c14861eb Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 4 Oct 2022 15:54:51 +0100 Subject: [PATCH 02/13] linting --- singer_sdk/sinks/sql.py | 3 ++- singer_sdk/sql/__init__.py | 1 + singer_sdk/sql/connector.py | 5 +++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index e7f97fe93..b63cf358a 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -165,7 +165,8 @@ def create_table_with_records( if primary_keys is None: primary_keys = self.key_properties partition_keys = partition_keys or None - # TODO: determine if this call to `prepare_table` is necessary (in addition to in `setup` above) + # TODO: determine if this call to `prepare_table` is necessary + # (in addition to in `setup` above) self.connector.prepare_table( full_table_name=full_table_name, primary_keys=primary_keys, diff --git a/singer_sdk/sql/__init__.py b/singer_sdk/sql/__init__.py index 49890d841..dccd25c5a 100644 --- a/singer_sdk/sql/__init__.py +++ b/singer_sdk/sql/__init__.py @@ -1,3 +1,4 @@ +"""Module for helpers common to SQL streams/sinks.""" from .connector import SQLConnector __all__ = ["SQLConnector"] diff --git a/singer_sdk/sql/connector.py b/singer_sdk/sql/connector.py index a3bc20898..6d2c2a9f1 100644 --- a/singer_sdk/sql/connector.py +++ b/singer_sdk/sql/connector.py @@ -1,3 +1,5 @@ +"""Base class for SQLAlchemy-based connectors.""" + from __future__ import annotations import logging @@ -11,7 +13,6 @@ from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema -from singer_sdk._singerlib.messages import SchemaMessage from singer_sdk.exceptions import ConfigValidationError @@ -486,7 +487,7 @@ def schema_exists(self, full_schema_name: str) -> bool: """Determine if the target database schema already exists. Args: - schema_name: The target database schema name. + full_schema_name: The target database schema name. Returns: True if the database schema exists, False if not. From 5e4150086fc831c9ce03601c7cfbacd3e7bb41b0 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 4 Oct 2022 15:59:42 +0100 Subject: [PATCH 03/13] add default schema name --- singer_sdk/sinks/sql.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index b63cf358a..47e23b5a7 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -76,7 +76,12 @@ def schema_name(self) -> Optional[str]: Returns: The target schema name. """ - return None # Assumes single-schema target context. + parts = self.stream_name.split("-") + if len(parts) == 2: + return parts[0] + if len(parts) == 3: + return parts[1] + return None @property def database_name(self) -> Optional[str]: @@ -117,7 +122,8 @@ def setup(self) -> None: This method is called on Sink creation, and creates the required Schema and Table entities in the target database. """ - self.connector.prepare_schema(self.full_schema_name) + if self.full_schema_name: + self.connector.prepare_schema(self.full_schema_name) self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.schema, From 05ea897befb5b204ff7ad63f31e3989a8050abeb Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 4 Oct 2022 17:29:47 +0100 Subject: [PATCH 04/13] add schema to table metadata --- singer_sdk/sinks/sql.py | 4 ++-- singer_sdk/sql/connector.py | 28 +++++++++++++++------------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 47e23b5a7..220a90f44 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -122,8 +122,8 @@ def setup(self) -> None: This method is called on Sink creation, and creates the required Schema and Table entities in the target database. """ - if self.full_schema_name: - self.connector.prepare_schema(self.full_schema_name) + if self.schema_name: + self.connector.prepare_schema(self.schema_name) self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.schema, diff --git a/singer_sdk/sql/connector.py b/singer_sdk/sql/connector.py index 6d2c2a9f1..e67ce9642 100644 --- a/singer_sdk/sql/connector.py +++ b/singer_sdk/sql/connector.py @@ -483,17 +483,17 @@ def table_exists(self, full_table_name: str) -> bool: sqlalchemy.inspect(self._engine).has_table(full_table_name), ) - def schema_exists(self, full_schema_name: str) -> bool: + def schema_exists(self, schema_name: str) -> bool: """Determine if the target database schema already exists. Args: - full_schema_name: The target database schema name. + schema_name: The target database schema name. Returns: True if the database schema exists, False if not. """ schema_names = sqlalchemy.inspect(self._engine).get_schema_names() - return full_schema_name in schema_names + return schema_name in schema_names def get_table_columns(self, full_table_name: str) -> dict[str, sqlalchemy.Column]: """Return a list of table columns. @@ -545,13 +545,13 @@ def column_exists(self, full_table_name: str, column_name: str) -> bool: """ return column_name in self.get_table_columns(full_table_name) - def create_schema(self, full_schema_name: str) -> None: - """Create target database schema. + def create_schema(self, schema_name: str) -> None: + """Create target schema. Args: - full_schema_name: The target database schema to create. + schema_name: The target schema to create. """ - self._engine.execute(sqlalchemy.schema.CreateSchema(full_schema_name)) + self._engine.execute(sqlalchemy.schema.CreateSchema(schema_name)) def create_empty_table( self, @@ -579,7 +579,8 @@ def create_empty_table( _ = partition_keys # Not supported in generic implementation. - meta = sqlalchemy.MetaData() + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + meta = sqlalchemy.MetaData(schema=schema_name) columns: list[sqlalchemy.Column] = [] primary_keys = primary_keys or [] try: @@ -598,7 +599,7 @@ def create_empty_table( ) ) - _ = sqlalchemy.Table(full_table_name, meta, *columns) + _ = sqlalchemy.Table(table_name, meta, *columns) meta.create_all(self._engine) def _create_empty_column( @@ -636,14 +637,15 @@ def _create_empty_column( ) ) - def prepare_schema(self, full_schema_name: str) -> None: + def prepare_schema(self, schema_name: str) -> None: """Create the target database schema. Args: - full_schema_name: The target schema name. + schema_name: The target schema name. """ - if not self.schema_exists(full_schema_name): - self.create_schema(full_schema_name) + schema_exists = self.schema_exists(schema_name) + if not schema_exists: + self.create_schema(schema_name) def prepare_table( self, From e68c0453a339ad6ac4258ab0fb7733ea1bb4fe0b Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Wed, 5 Oct 2022 14:58:42 -0500 Subject: [PATCH 05/13] Add missing import for `singer_sdk.helpers._catalog` --- singer_sdk/streams/sql.py | 1 + 1 file changed, 1 insertion(+) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index 6865c0d92..1fd4248b7 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -7,6 +7,7 @@ import sqlalchemy +import singer_sdk.helpers._catalog as catalog from singer_sdk._singerlib import CatalogEntry, MetadataMapping from singer_sdk.plugin_base import PluginBase as TapBaseClass from singer_sdk.sql import SQLConnector From c7abd72b5bdf9d9e20c5ca74197ac68e4acf92c2 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 11 Oct 2022 16:21:28 +0100 Subject: [PATCH 06/13] undo connection module --- singer_sdk/__init__.py | 9 +- singer_sdk/sql/__init__.py | 4 - singer_sdk/sql/connector.py | 914 --------------------------------- singer_sdk/streams/__init__.py | 3 +- singer_sdk/streams/sql.py | 815 ++++++++++++++++++++++++++++- 5 files changed, 821 insertions(+), 924 deletions(-) delete mode 100644 singer_sdk/sql/__init__.py delete mode 100644 singer_sdk/sql/connector.py diff --git a/singer_sdk/__init__.py b/singer_sdk/__init__.py index f6c8bc547..a906f358c 100644 --- a/singer_sdk/__init__.py +++ b/singer_sdk/__init__.py @@ -4,8 +4,13 @@ from singer_sdk.mapper_base import InlineMapper from singer_sdk.plugin_base import PluginBase from singer_sdk.sinks import BatchSink, RecordSink, Sink, SQLSink -from singer_sdk.sql import SQLConnector -from singer_sdk.streams import GraphQLStream, RESTStream, SQLStream, Stream +from singer_sdk.streams import ( + GraphQLStream, + RESTStream, + SQLConnector, + SQLStream, + Stream, +) from singer_sdk.tap_base import SQLTap, Tap from singer_sdk.target_base import SQLTarget, Target diff --git a/singer_sdk/sql/__init__.py b/singer_sdk/sql/__init__.py deleted file mode 100644 index dccd25c5a..000000000 --- a/singer_sdk/sql/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Module for helpers common to SQL streams/sinks.""" -from .connector import SQLConnector - -__all__ = ["SQLConnector"] diff --git a/singer_sdk/sql/connector.py b/singer_sdk/sql/connector.py deleted file mode 100644 index e67ce9642..000000000 --- a/singer_sdk/sql/connector.py +++ /dev/null @@ -1,914 +0,0 @@ -"""Base class for SQLAlchemy-based connectors.""" - -from __future__ import annotations - -import logging -from datetime import datetime -from functools import lru_cache -from typing import Any, Iterable, cast - -import sqlalchemy -from sqlalchemy.engine import Engine -from sqlalchemy.engine.reflection import Inspector - -from singer_sdk import typing as th -from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema -from singer_sdk.exceptions import ConfigValidationError - - -class SQLConnector: - """Base class for SQLAlchemy-based connectors. - - The connector class serves as a wrapper around the SQL connection. - - The functions of the connector are: - - - connecting to the source - - generating SQLAlchemy connection and engine objects - - discovering schema catalog entries - - performing type conversions to/from JSONSchema types - - dialect-specific functions, such as escaping and fully qualified names - """ - - allow_column_add: bool = True # Whether ADD COLUMN is supported. - allow_column_rename: bool = True # Whether RENAME COLUMN is supported. - allow_column_alter: bool = False # Whether altering column types is supported. - allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported. - allow_temp_tables: bool = True # Whether temp tables are supported. - - def __init__( - self, config: dict | None = None, sqlalchemy_url: str | None = None - ) -> None: - """Initialize the SQL connector. - - Args: - config: The parent tap or target object's config. - sqlalchemy_url: Optional URL for the connection. - """ - self._config: dict[str, Any] = config or {} - self._sqlalchemy_url: str | None = sqlalchemy_url or None - self._connection: sqlalchemy.engine.Connection | None = None - - @property - def config(self) -> dict: - """If set, provides access to the tap or target config. - - Returns: - The settings as a dict. - """ - return self._config - - @property - def logger(self) -> logging.Logger: - """Get logger. - - Returns: - Plugin logger. - """ - return logging.getLogger("sqlconnector") - - def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: - """Return a new SQLAlchemy connection using the provided config. - - By default this will create using the sqlalchemy `stream_results=True` option - described here: - https://docs.sqlalchemy.org/en/14/core/connections.html#using-server-side-cursors-a-k-a-stream-results - - Developers may override this method if their provider does not support - server side cursors (`stream_results`) or in order to use different - configurations options when creating the connection object. - - Returns: - A newly created SQLAlchemy engine object. - """ - return ( - self.create_sqlalchemy_engine() - .connect() - .execution_options(stream_results=True) - ) - - def create_sqlalchemy_engine(self) -> sqlalchemy.engine.Engine: - """Return a new SQLAlchemy engine using the provided config. - - Developers can generally override just one of the following: - `sqlalchemy_engine`, sqlalchemy_url`. - - Returns: - A newly created SQLAlchemy engine object. - """ - return sqlalchemy.create_engine(self.sqlalchemy_url, echo=False) - - @property - def connection(self) -> sqlalchemy.engine.Connection: - """Return or set the SQLAlchemy connection object. - - Returns: - The active SQLAlchemy connection object. - """ - if not self._connection: - self._connection = self.create_sqlalchemy_connection() - - return self._connection - - @property - def sqlalchemy_url(self) -> str: - """Return the SQLAlchemy URL string. - - Returns: - The URL as a string. - """ - if not self._sqlalchemy_url: - self._sqlalchemy_url = self.get_sqlalchemy_url(self.config) - - return self._sqlalchemy_url - - def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: - """Return the SQLAlchemy URL string. - - Developers can generally override just one of the following: - `sqlalchemy_engine`, `get_sqlalchemy_url`. - - Args: - config: A dictionary of settings from the tap or target config. - - Returns: - The URL as a string. - - Raises: - ConfigValidationError: If no valid sqlalchemy_url can be found. - """ - if "sqlalchemy_url" not in config: - raise ConfigValidationError( - "Could not find or create 'sqlalchemy_url' for connection." - ) - - return cast(str, config["sqlalchemy_url"]) - - @staticmethod - def to_jsonschema_type( - sql_type: ( - str | sqlalchemy.types.TypeEngine | type[sqlalchemy.types.TypeEngine] | Any - ), - ) -> dict: - """Return a JSON Schema representation of the provided type. - - By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy - types. - - Developers may override this method to accept additional input argument types, - to support non-standard types, or to provide custom typing logic. - - Args: - sql_type: The string representation of the SQL type, a SQLAlchemy - TypeEngine class or object, or a custom-specified object. - - Raises: - ValueError: If the type received could not be translated to jsonschema. - - Returns: - The JSON Schema representation of the provided type. - """ - if isinstance(sql_type, (str, sqlalchemy.types.TypeEngine)): - return th.to_jsonschema_type(sql_type) - - if isinstance(sql_type, type): - if issubclass(sql_type, sqlalchemy.types.TypeEngine): - return th.to_jsonschema_type(sql_type) - - raise ValueError(f"Unexpected type received: '{sql_type.__name__}'") - - raise ValueError(f"Unexpected type received: '{type(sql_type).__name__}'") - - @staticmethod - def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: - """Return a JSON Schema representation of the provided type. - - By default will call `typing.to_sql_type()`. - - Developers may override this method to accept additional input argument types, - to support non-standard types, or to provide custom typing logic. - - If overriding this method, developers should call the default implementation - from the base class for all unhandled cases. - - Args: - jsonschema_type: The JSON Schema representation of the source type. - - Returns: - The SQLAlchemy type representation of the data type. - """ - return th.to_sql_type(jsonschema_type) - - @staticmethod - def get_fully_qualified_name( - table_name: str | None = None, - schema_name: str | None = None, - db_name: str | None = None, - delimiter: str = ".", - ) -> str: - """Concatenates a fully qualified name from the parts. - - Args: - table_name: The name of the table. - schema_name: The name of the schema. Defaults to None. - db_name: The name of the database. Defaults to None. - delimiter: Generally: '.' for SQL names and '-' for Singer names. - - Raises: - ValueError: If all 3 name parts not supplied. - - Returns: - The fully qualified name as a string. - """ - parts = [] - - if db_name: - parts.append(db_name) - if schema_name: - parts.append(schema_name) - if table_name: - parts.append(table_name) - - if not parts: - raise ValueError( - "Could not generate fully qualified name: " - + ":".join( - [ - db_name or "(unknown-db)", - schema_name or "(unknown-schema)", - table_name or "(unknown-table-name)", - ] - ) - ) - - return delimiter.join(parts) - - @property - def _dialect(self) -> sqlalchemy.engine.Dialect: - """Return the dialect object. - - Returns: - The dialect object. - """ - return cast(sqlalchemy.engine.Dialect, self.connection.engine.dialect) - - @property - def _engine(self) -> sqlalchemy.engine.Engine: - """Return the dialect object. - - Returns: - The dialect object. - """ - return cast(sqlalchemy.engine.Engine, self.connection.engine) - - def quote(self, name: str) -> str: - """Quote a name if it needs quoting, using '.' as a name-part delimiter. - - Examples: - "my_table" => "`my_table`" - "my_schema.my_table" => "`my_schema`.`my_table`" - - Args: - name: The unquoted name. - - Returns: - str: The quoted name. - """ - return ".".join( - [ - self._dialect.identifier_preparer.quote(name_part) - for name_part in name.split(".") - ] - ) - - @lru_cache() - def _warn_no_view_detection(self) -> None: - """Print a warning, but only the first time.""" - self.logger.warning( - "Provider does not support get_view_names(). " - "Streams list may be incomplete or `is_view` may be unpopulated." - ) - - def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: - """Return a list of schema names in DB. - - Args: - engine: SQLAlchemy engine - inspected: SQLAlchemy inspector instance for engine - - Returns: - List of schema names - """ - return inspected.get_schema_names() - - def get_object_names( - self, engine: Engine, inspected: Inspector, schema_name: str - ) -> list[tuple[str, bool]]: - """Return a list of syncable objects. - - Args: - engine: SQLAlchemy engine - inspected: SQLAlchemy inspector instance for engine - schema_name: Schema name to inspect - - Returns: - List of tuples (, ) - """ - # Get list of tables and views - table_names = inspected.get_table_names(schema=schema_name) - try: - view_names = inspected.get_view_names(schema=schema_name) - except NotImplementedError: - # Some DB providers do not understand 'views' - self._warn_no_view_detection() - view_names = [] - return [(t, False) for t in table_names] + [(v, True) for v in view_names] - - # TODO maybe should be splitted into smaller parts? - def discover_catalog_entry( - self, - engine: Engine, - inspected: Inspector, - schema_name: str, - table_name: str, - is_view: bool, - ) -> CatalogEntry: - """Create `CatalogEntry` object for the given table or a view. - - Args: - engine: SQLAlchemy engine - inspected: SQLAlchemy inspector instance for engine - schema_name: Schema name to inspect - table_name: Name of the table or a view - is_view: Flag whether this object is a view, returned by `get_object_names` - - Returns: - `CatalogEntry` object for the given table or a view - """ - # Initialize unique stream name - unique_stream_id = self.get_fully_qualified_name( - db_name=None, - schema_name=schema_name, - table_name=table_name, - delimiter="-", - ) - - # Detect key properties - possible_primary_keys: list[list[str]] = [] - pk_def = inspected.get_pk_constraint(table_name, schema=schema_name) - if pk_def and "constrained_columns" in pk_def: - possible_primary_keys.append(pk_def["constrained_columns"]) - - possible_primary_keys.extend( - index_def["column_names"] - for index_def in inspected.get_indexes(table_name, schema=schema_name) - if index_def.get("unique", False) - ) - - key_properties = next(iter(possible_primary_keys), None) - - # Initialize columns list - table_schema = th.PropertiesList() - for column_def in inspected.get_columns(table_name, schema=schema_name): - column_name = column_def["name"] - is_nullable = column_def.get("nullable", False) - jsonschema_type: dict = self.to_jsonschema_type( - cast(sqlalchemy.types.TypeEngine, column_def["type"]) - ) - table_schema.append( - th.Property( - name=column_name, - wrapped=th.CustomType(jsonschema_type), - required=not is_nullable, - ) - ) - schema = table_schema.to_dict() - - # Initialize available replication methods - addl_replication_methods: list[str] = [""] # By default an empty list. - # Notes regarding replication methods: - # - 'INCREMENTAL' replication must be enabled by the user by specifying - # a replication_key value. - # - 'LOG_BASED' replication must be enabled by the developer, according - # to source-specific implementation capabilities. - replication_method = next(reversed(["FULL_TABLE"] + addl_replication_methods)) - - # Create the catalog entry object - return CatalogEntry( - tap_stream_id=unique_stream_id, - stream=unique_stream_id, - table=table_name, - key_properties=key_properties, - schema=Schema.from_dict(schema), - is_view=is_view, - replication_method=replication_method, - metadata=MetadataMapping.get_standard_metadata( - schema_name=schema_name, - schema=schema, - replication_method=replication_method, - key_properties=key_properties, - valid_replication_keys=None, # Must be defined by user - ), - database=None, # Expects single-database context - row_count=None, - stream_alias=None, - replication_key=None, # Must be defined by user - ) - - def discover_catalog_entries(self) -> list[dict]: - """Return a list of catalog entries from discovery. - - Returns: - The discovered catalog entries as a list. - """ - result: list[dict] = [] - engine = self.create_sqlalchemy_engine() - inspected = sqlalchemy.inspect(engine) - for schema_name in self.get_schema_names(engine, inspected): - # Iterate through each table and view - for table_name, is_view in self.get_object_names( - engine, inspected, schema_name - ): - catalog_entry = self.discover_catalog_entry( - engine, inspected, schema_name, table_name, is_view - ) - result.append(catalog_entry.to_dict()) - - return result - - def parse_full_table_name( - self, full_table_name: str - ) -> tuple[str | None, str | None, str]: - """Parse a fully qualified table name into its parts. - - Developers may override this method if their platform does not support the - traditional 3-part convention: `db_name.schema_name.table_name` - - Args: - full_table_name: A table name or a fully qualified table name. Depending on - SQL the platform, this could take the following forms: - - `..
` (three part names) - - `.
` (platforms which do not use schema groupings) - - `.` (if DB name is already in context) - - `
` (if DB name and schema name are already in context) - - Returns: - A three part tuple (db_name, schema_name, table_name) with any unspecified - or unused parts returned as None. - """ - db_name: str | None = None - schema_name: str | None = None - - parts = full_table_name.split(".") - if len(parts) == 1: - table_name = full_table_name - if len(parts) == 2: - schema_name, table_name = parts - if len(parts) == 3: - db_name, schema_name, table_name = parts - - return db_name, schema_name, table_name - - def table_exists(self, full_table_name: str) -> bool: - """Determine if the target table already exists. - - Args: - full_table_name: the target table name. - - Returns: - True if table exists, False if not, None if unsure or undetectable. - """ - return cast( - bool, - sqlalchemy.inspect(self._engine).has_table(full_table_name), - ) - - def schema_exists(self, schema_name: str) -> bool: - """Determine if the target database schema already exists. - - Args: - schema_name: The target database schema name. - - Returns: - True if the database schema exists, False if not. - """ - schema_names = sqlalchemy.inspect(self._engine).get_schema_names() - return schema_name in schema_names - - def get_table_columns(self, full_table_name: str) -> dict[str, sqlalchemy.Column]: - """Return a list of table columns. - - Args: - full_table_name: Fully qualified table name. - - Returns: - An ordered list of column objects. - """ - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - inspector = sqlalchemy.inspect(self._engine) - columns = inspector.get_columns(table_name, schema_name) - - return { - col_meta["name"]: sqlalchemy.Column( - col_meta["name"], - col_meta["type"], - nullable=col_meta.get("nullable", False), - ) - for col_meta in columns - } - - def get_table(self, full_table_name: str) -> sqlalchemy.Table: - """Return a table object. - - Args: - full_table_name: Fully qualified table name. - - Returns: - A table object with column list. - """ - columns = self.get_table_columns(full_table_name).values() - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData() - return sqlalchemy.schema.Table( - table_name, meta, *list(columns), schema=schema_name - ) - - def column_exists(self, full_table_name: str, column_name: str) -> bool: - """Determine if the target table already exists. - - Args: - full_table_name: the target table name. - column_name: the target column name. - - Returns: - True if table exists, False if not. - """ - return column_name in self.get_table_columns(full_table_name) - - def create_schema(self, schema_name: str) -> None: - """Create target schema. - - Args: - schema_name: The target schema to create. - """ - self._engine.execute(sqlalchemy.schema.CreateSchema(schema_name)) - - def create_empty_table( - self, - full_table_name: str, - schema: dict, - primary_keys: list[str] | None = None, - partition_keys: list[str] | None = None, - as_temp_table: bool = False, - ) -> None: - """Create an empty target table. - - Args: - full_table_name: the target table name. - schema: the JSON schema for the new table. - primary_keys: list of key properties. - partition_keys: list of partition keys. - as_temp_table: True to create a temp table. - - Raises: - NotImplementedError: if temp tables are unsupported and as_temp_table=True. - RuntimeError: if a variant schema is passed with no properties defined. - """ - if as_temp_table: - raise NotImplementedError("Temporary tables are not supported.") - - _ = partition_keys # Not supported in generic implementation. - - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) - columns: list[sqlalchemy.Column] = [] - primary_keys = primary_keys or [] - try: - properties: dict = schema["properties"] - except KeyError: - raise RuntimeError( - f"Schema for '{full_table_name}' does not define properties: {schema}" - ) - for property_name, property_jsonschema in properties.items(): - is_primary_key = property_name in primary_keys - columns.append( - sqlalchemy.Column( - property_name, - self.to_sql_type(property_jsonschema), - primary_key=is_primary_key, - ) - ) - - _ = sqlalchemy.Table(table_name, meta, *columns) - meta.create_all(self._engine) - - def _create_empty_column( - self, - full_table_name: str, - column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - ) -> None: - """Create a new column. - - Args: - full_table_name: The target table name. - column_name: The name of the new column. - sql_type: SQLAlchemy type engine to be used in creating the new column. - - Raises: - NotImplementedError: if adding columns is not supported. - """ - if not self.allow_column_add: - raise NotImplementedError("Adding columns is not supported.") - - create_column_clause = sqlalchemy.schema.CreateColumn( - sqlalchemy.Column( - column_name, - sql_type, - ) - ) - self.connection.execute( - sqlalchemy.DDL( - "ALTER TABLE %(table)s ADD COLUMN %(create_column)s", - { - "table": full_table_name, - "create_column": create_column_clause, - }, - ) - ) - - def prepare_schema(self, schema_name: str) -> None: - """Create the target database schema. - - Args: - schema_name: The target schema name. - """ - schema_exists = self.schema_exists(schema_name) - if not schema_exists: - self.create_schema(schema_name) - - def prepare_table( - self, - full_table_name: str, - schema: dict, - primary_keys: list[str], - partition_keys: list[str] | None = None, - as_temp_table: bool = False, - ) -> None: - """Adapt target table to provided schema if possible. - - Args: - full_table_name: the target table name. - schema: the JSON Schema for the table. - primary_keys: list of key properties. - partition_keys: list of partition keys. - as_temp_table: True to create a temp table. - """ - if not self.table_exists(full_table_name=full_table_name): - self.create_empty_table( - full_table_name=full_table_name, - schema=schema, - primary_keys=primary_keys, - partition_keys=partition_keys, - as_temp_table=as_temp_table, - ) - return - - for property_name, property_def in schema["properties"].items(): - self.prepare_column( - full_table_name, property_name, self.to_sql_type(property_def) - ) - - def prepare_column( - self, - full_table_name: str, - column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - ) -> None: - """Adapt target table to provided schema if possible. - - Args: - full_table_name: the target table name. - column_name: the target column name. - sql_type: the SQLAlchemy type. - """ - if not self.column_exists(full_table_name, column_name): - self._create_empty_column( - full_table_name=full_table_name, - column_name=column_name, - sql_type=sql_type, - ) - return - - self._adapt_column_type( - full_table_name, - column_name=column_name, - sql_type=sql_type, - ) - - def rename_column(self, full_table_name: str, old_name: str, new_name: str) -> None: - """Rename the provided columns. - - Args: - full_table_name: The fully qualified table name. - old_name: The old column to be renamed. - new_name: The new name for the column. - - Raises: - NotImplementedError: If `self.allow_column_rename` is false. - """ - if not self.allow_column_rename: - raise NotImplementedError("Renaming columns is not supported.") - - self.connection.execute( - f"ALTER TABLE {full_table_name} " - f'RENAME COLUMN "{old_name}" to "{new_name}"' - ) - - def merge_sql_types( - self, sql_types: list[sqlalchemy.types.TypeEngine] - ) -> sqlalchemy.types.TypeEngine: - """Return a compatible SQL type for the selected type list. - - Args: - sql_types: List of SQL types. - - Returns: - A SQL type that is compatible with the input types. - - Raises: - ValueError: If sql_types argument has zero members. - """ - if not sql_types: - raise ValueError("Expected at least one member in `sql_types` argument.") - - if len(sql_types) == 1: - return sql_types[0] - - # Gathering Type to match variables - # sent in _adapt_column_type - current_type = sql_types[0] - # sql_type = sql_types[1] - - # Getting the length of each type - # current_type_len: int = getattr(sql_types[0], "length", 0) - sql_type_len: int = getattr(sql_types[1], "length", 0) - if sql_type_len is None: - sql_type_len = 0 - - # Convert the two types given into a sorted list - # containing the best conversion classes - sql_types = self._sort_types(sql_types) - - # If greater than two evaluate the first pair then on down the line - if len(sql_types) > 2: - return self.merge_sql_types( - [self.merge_sql_types([sql_types[0], sql_types[1]])] + sql_types[2:] - ) - - assert len(sql_types) == 2 - # Get the generic type class - for opt in sql_types: - # Get the length - opt_len: int = getattr(opt, "length", 0) - generic_type = type(opt.as_generic()) - - if isinstance(generic_type, type): - if issubclass( - generic_type, - (sqlalchemy.types.String, sqlalchemy.types.Unicode), - ): - # If length None or 0 then is varchar max ? - if (opt_len is None) or (opt_len == 0): - return opt - elif isinstance( - generic_type, - (sqlalchemy.types.String, sqlalchemy.types.Unicode), - ): - # If length None or 0 then is varchar max ? - if (opt_len is None) or (opt_len == 0): - return opt - # If best conversion class is equal to current type - # return the best conversion class - elif str(opt) == str(current_type): - return opt - - raise ValueError( - f"Unable to merge sql types: {', '.join([str(t) for t in sql_types])}" - ) - - def _sort_types( - self, - sql_types: Iterable[sqlalchemy.types.TypeEngine], - ) -> list[sqlalchemy.types.TypeEngine]: - """Return the input types sorted from most to least compatible. - - For example, [Smallint, Integer, Datetime, String, Double] would become - [Unicode, String, Double, Integer, Smallint, Datetime]. - String types will be listed first, then decimal types, then integer types, - then bool types, and finally datetime and date. Higher precision, scale, and - length will be sorted earlier. - - Args: - sql_types (List[sqlalchemy.types.TypeEngine]): [description] - - Returns: - The sorted list. - """ - - def _get_type_sort_key( - sql_type: sqlalchemy.types.TypeEngine, - ) -> tuple[int, int]: - # return rank, with higher numbers ranking first - - _len = int(getattr(sql_type, "length", 0) or 0) - - _pytype = cast(type, sql_type.python_type) - if issubclass(_pytype, (str, bytes)): - return 900, _len - elif issubclass(_pytype, datetime): - return 600, _len - elif issubclass(_pytype, float): - return 400, _len - elif issubclass(_pytype, int): - return 300, _len - - return 0, _len - - return sorted(sql_types, key=_get_type_sort_key, reverse=True) - - def _get_column_type( - self, full_table_name: str, column_name: str - ) -> sqlalchemy.types.TypeEngine: - """Get the SQL type of the declared column. - - Args: - full_table_name: The name of the table. - column_name: The name of the column. - - Returns: - The type of the column. - - Raises: - KeyError: If the provided column name does not exist. - """ - try: - column = self.get_table_columns(full_table_name)[column_name] - except KeyError as ex: - raise KeyError( - f"Column `{column_name}` does not exist in table `{full_table_name}`." - ) from ex - - return cast(sqlalchemy.types.TypeEngine, column.type) - - def _adapt_column_type( - self, - full_table_name: str, - column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - ) -> None: - """Adapt table column type to support the new JSON schema type. - - Args: - full_table_name: The target table name. - column_name: The target column name. - sql_type: The new SQLAlchemy type. - - Raises: - NotImplementedError: if altering columns is not supported. - """ - current_type: sqlalchemy.types.TypeEngine = self._get_column_type( - full_table_name, column_name - ) - - # Check if the existing column type and the sql type are the same - if str(sql_type) == str(current_type): - # The current column and sql type are the same - # Nothing to do - return - - # Not the same type, generic type or compatible types - # calling merge_sql_types for assistnace - compatible_sql_type = self.merge_sql_types([current_type, sql_type]) - - if str(compatible_sql_type) == str(current_type): - # Nothing to do - return - - if not self.allow_column_alter: - raise NotImplementedError( - "Altering columns is not supported. " - f"Could not convert column '{full_table_name}.{column_name}' " - f"from '{current_type}' to '{compatible_sql_type}'." - ) - - self.connection.execute( - sqlalchemy.DDL( - "ALTER TABLE %(table)s ALTER COLUMN %(col_name)s (%(col_type)s)", - { - "table": full_table_name, - "col_name": column_name, - "col_type": compatible_sql_type, - }, - ) - ) diff --git a/singer_sdk/streams/__init__.py b/singer_sdk/streams/__init__.py index 6d13542be..68d54f74c 100644 --- a/singer_sdk/streams/__init__.py +++ b/singer_sdk/streams/__init__.py @@ -1,10 +1,9 @@ """SDK for building Singer taps.""" -from singer_sdk.sql import SQLConnector from singer_sdk.streams.core import Stream from singer_sdk.streams.graphql import GraphQLStream from singer_sdk.streams.rest import RESTStream -from singer_sdk.streams.sql import SQLStream +from singer_sdk.streams.sql import SQLConnector, SQLStream __all__ = [ "Stream", diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index 1fd4248b7..f256b2b42 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -3,17 +3,828 @@ from __future__ import annotations import abc +import logging +from datetime import datetime +from functools import lru_cache from typing import Any, Iterable, cast import sqlalchemy +from sqlalchemy.engine import Engine +from sqlalchemy.engine.reflection import Inspector import singer_sdk.helpers._catalog as catalog -from singer_sdk._singerlib import CatalogEntry, MetadataMapping +from singer_sdk import typing as th +from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema +from singer_sdk.exceptions import ConfigValidationError from singer_sdk.plugin_base import PluginBase as TapBaseClass -from singer_sdk.sql import SQLConnector from singer_sdk.streams.core import Stream +class SQLConnector: + """Base class for SQLAlchemy-based connectors. + The connector class serves as a wrapper around the SQL connection. + The functions of the connector are: + - connecting to the source + - generating SQLAlchemy connection and engine objects + - discovering schema catalog entries + - performing type conversions to/from JSONSchema types + - dialect-specific functions, such as escaping and fully qualified names + """ + + allow_column_add: bool = True # Whether ADD COLUMN is supported. + allow_column_rename: bool = True # Whether RENAME COLUMN is supported. + allow_column_alter: bool = False # Whether altering column types is supported. + allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported. + allow_temp_tables: bool = True # Whether temp tables are supported. + + def __init__( + self, config: dict | None = None, sqlalchemy_url: str | None = None + ) -> None: + """Initialize the SQL connector. + Args: + config: The parent tap or target object's config. + sqlalchemy_url: Optional URL for the connection. + """ + self._config: dict[str, Any] = config or {} + self._sqlalchemy_url: str | None = sqlalchemy_url or None + self._connection: sqlalchemy.engine.Connection | None = None + + @property + def config(self) -> dict: + """If set, provides access to the tap or target config. + Returns: + The settings as a dict. + """ + return self._config + + @property + def logger(self) -> logging.Logger: + """Get logger. + Returns: + Plugin logger. + """ + return logging.getLogger("sqlconnector") + + def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: + """Return a new SQLAlchemy connection using the provided config. + By default this will create using the sqlalchemy `stream_results=True` option + described here: + https://docs.sqlalchemy.org/en/14/core/connections.html#using-server-side-cursors-a-k-a-stream-results + Developers may override this method if their provider does not support + server side cursors (`stream_results`) or in order to use different + configurations options when creating the connection object. + Returns: + A newly created SQLAlchemy engine object. + """ + return ( + self.create_sqlalchemy_engine() + .connect() + .execution_options(stream_results=True) + ) + + def create_sqlalchemy_engine(self) -> sqlalchemy.engine.Engine: + """Return a new SQLAlchemy engine using the provided config. + Developers can generally override just one of the following: + `sqlalchemy_engine`, sqlalchemy_url`. + Returns: + A newly created SQLAlchemy engine object. + """ + return sqlalchemy.create_engine(self.sqlalchemy_url, echo=False) + + @property + def connection(self) -> sqlalchemy.engine.Connection: + """Return or set the SQLAlchemy connection object. + Returns: + The active SQLAlchemy connection object. + """ + if not self._connection: + self._connection = self.create_sqlalchemy_connection() + + return self._connection + + @property + def sqlalchemy_url(self) -> str: + """Return the SQLAlchemy URL string. + Returns: + The URL as a string. + """ + if not self._sqlalchemy_url: + self._sqlalchemy_url = self.get_sqlalchemy_url(self.config) + + return self._sqlalchemy_url + + def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: + """Return the SQLAlchemy URL string. + Developers can generally override just one of the following: + `sqlalchemy_engine`, `get_sqlalchemy_url`. + Args: + config: A dictionary of settings from the tap or target config. + Returns: + The URL as a string. + Raises: + ConfigValidationError: If no valid sqlalchemy_url can be found. + """ + if "sqlalchemy_url" not in config: + raise ConfigValidationError( + "Could not find or create 'sqlalchemy_url' for connection." + ) + + return cast(str, config["sqlalchemy_url"]) + + @staticmethod + def to_jsonschema_type( + sql_type: ( + str | sqlalchemy.types.TypeEngine | type[sqlalchemy.types.TypeEngine] | Any + ), + ) -> dict: + """Return a JSON Schema representation of the provided type. + By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy + types. + Developers may override this method to accept additional input argument types, + to support non-standard types, or to provide custom typing logic. + Args: + sql_type: The string representation of the SQL type, a SQLAlchemy + TypeEngine class or object, or a custom-specified object. + Raises: + ValueError: If the type received could not be translated to jsonschema. + Returns: + The JSON Schema representation of the provided type. + """ + if isinstance(sql_type, (str, sqlalchemy.types.TypeEngine)): + return th.to_jsonschema_type(sql_type) + + if isinstance(sql_type, type): + if issubclass(sql_type, sqlalchemy.types.TypeEngine): + return th.to_jsonschema_type(sql_type) + + raise ValueError(f"Unexpected type received: '{sql_type.__name__}'") + + raise ValueError(f"Unexpected type received: '{type(sql_type).__name__}'") + + @staticmethod + def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: + """Return a JSON Schema representation of the provided type. + By default will call `typing.to_sql_type()`. + Developers may override this method to accept additional input argument types, + to support non-standard types, or to provide custom typing logic. + If overriding this method, developers should call the default implementation + from the base class for all unhandled cases. + Args: + jsonschema_type: The JSON Schema representation of the source type. + Returns: + The SQLAlchemy type representation of the data type. + """ + return th.to_sql_type(jsonschema_type) + + @staticmethod + def get_fully_qualified_name( + table_name: str, + schema_name: str | None = None, + db_name: str | None = None, + delimiter: str = ".", + ) -> str: + """Concatenates a fully qualified name from the parts. + Args: + table_name: The name of the table. + schema_name: The name of the schema. Defaults to None. + db_name: The name of the database. Defaults to None. + delimiter: Generally: '.' for SQL names and '-' for Singer names. + Raises: + ValueError: If table_name is not provided or if neither schema_name or + db_name are provided. + Returns: + The fully qualified name as a string. + """ + if db_name and schema_name: + result = delimiter.join([db_name, schema_name, table_name]) + elif db_name: + result = delimiter.join([db_name, table_name]) + elif schema_name: + result = delimiter.join([schema_name, table_name]) + elif table_name: + result = table_name + else: + raise ValueError( + "Could not generate fully qualified name for stream: " + + ":".join( + [ + db_name or "(unknown-db)", + schema_name or "(unknown-schema)", + table_name or "(unknown-table-name)", + ] + ) + ) + + return result + + @property + def _dialect(self) -> sqlalchemy.engine.Dialect: + """Return the dialect object. + Returns: + The dialect object. + """ + return cast(sqlalchemy.engine.Dialect, self.connection.engine.dialect) + + @property + def _engine(self) -> sqlalchemy.engine.Engine: + """Return the dialect object. + Returns: + The dialect object. + """ + return cast(sqlalchemy.engine.Engine, self.connection.engine) + + def quote(self, name: str) -> str: + """Quote a name if it needs quoting, using '.' as a name-part delimiter. + Examples: + "my_table" => "`my_table`" + "my_schema.my_table" => "`my_schema`.`my_table`" + Args: + name: The unquoted name. + Returns: + str: The quoted name. + """ + return ".".join( + [ + self._dialect.identifier_preparer.quote(name_part) + for name_part in name.split(".") + ] + ) + + @lru_cache() + def _warn_no_view_detection(self) -> None: + """Print a warning, but only the first time.""" + self.logger.warning( + "Provider does not support get_view_names(). " + "Streams list may be incomplete or `is_view` may be unpopulated." + ) + + def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: + """Return a list of schema names in DB. + Args: + engine: SQLAlchemy engine + inspected: SQLAlchemy inspector instance for engine + Returns: + List of schema names + """ + return inspected.get_schema_names() + + def get_object_names( + self, engine: Engine, inspected: Inspector, schema_name: str + ) -> list[tuple[str, bool]]: + """Return a list of syncable objects. + Args: + engine: SQLAlchemy engine + inspected: SQLAlchemy inspector instance for engine + schema_name: Schema name to inspect + Returns: + List of tuples (, ) + """ + # Get list of tables and views + table_names = inspected.get_table_names(schema=schema_name) + try: + view_names = inspected.get_view_names(schema=schema_name) + except NotImplementedError: + # Some DB providers do not understand 'views' + self._warn_no_view_detection() + view_names = [] + return [(t, False) for t in table_names] + [(v, True) for v in view_names] + + # TODO maybe should be splitted into smaller parts? + def discover_catalog_entry( + self, + engine: Engine, + inspected: Inspector, + schema_name: str, + table_name: str, + is_view: bool, + ) -> CatalogEntry: + """Create `CatalogEntry` object for the given table or a view. + Args: + engine: SQLAlchemy engine + inspected: SQLAlchemy inspector instance for engine + schema_name: Schema name to inspect + table_name: Name of the table or a view + is_view: Flag whether this object is a view, returned by `get_object_names` + Returns: + `CatalogEntry` object for the given table or a view + """ + # Initialize unique stream name + unique_stream_id = self.get_fully_qualified_name( + db_name=None, + schema_name=schema_name, + table_name=table_name, + delimiter="-", + ) + + # Detect key properties + possible_primary_keys: list[list[str]] = [] + pk_def = inspected.get_pk_constraint(table_name, schema=schema_name) + if pk_def and "constrained_columns" in pk_def: + possible_primary_keys.append(pk_def["constrained_columns"]) + + possible_primary_keys.extend( + index_def["column_names"] + for index_def in inspected.get_indexes(table_name, schema=schema_name) + if index_def.get("unique", False) + ) + + key_properties = next(iter(possible_primary_keys), None) + + # Initialize columns list + table_schema = th.PropertiesList() + for column_def in inspected.get_columns(table_name, schema=schema_name): + column_name = column_def["name"] + is_nullable = column_def.get("nullable", False) + jsonschema_type: dict = self.to_jsonschema_type( + cast(sqlalchemy.types.TypeEngine, column_def["type"]) + ) + table_schema.append( + th.Property( + name=column_name, + wrapped=th.CustomType(jsonschema_type), + required=not is_nullable, + ) + ) + schema = table_schema.to_dict() + + # Initialize available replication methods + addl_replication_methods: list[str] = [""] # By default an empty list. + # Notes regarding replication methods: + # - 'INCREMENTAL' replication must be enabled by the user by specifying + # a replication_key value. + # - 'LOG_BASED' replication must be enabled by the developer, according + # to source-specific implementation capabilities. + replication_method = next(reversed(["FULL_TABLE"] + addl_replication_methods)) + + # Create the catalog entry object + return CatalogEntry( + tap_stream_id=unique_stream_id, + stream=unique_stream_id, + table=table_name, + key_properties=key_properties, + schema=Schema.from_dict(schema), + is_view=is_view, + replication_method=replication_method, + metadata=MetadataMapping.get_standard_metadata( + schema_name=schema_name, + schema=schema, + replication_method=replication_method, + key_properties=key_properties, + valid_replication_keys=None, # Must be defined by user + ), + database=None, # Expects single-database context + row_count=None, + stream_alias=None, + replication_key=None, # Must be defined by user + ) + + def discover_catalog_entries(self) -> list[dict]: + """Return a list of catalog entries from discovery. + Returns: + The discovered catalog entries as a list. + """ + result: list[dict] = [] + engine = self.create_sqlalchemy_engine() + inspected = sqlalchemy.inspect(engine) + for schema_name in self.get_schema_names(engine, inspected): + # Iterate through each table and view + for table_name, is_view in self.get_object_names( + engine, inspected, schema_name + ): + catalog_entry = self.discover_catalog_entry( + engine, inspected, schema_name, table_name, is_view + ) + result.append(catalog_entry.to_dict()) + + return result + + def parse_full_table_name( + self, full_table_name: str + ) -> tuple[str | None, str | None, str]: + """Parse a fully qualified table name into its parts. + Developers may override this method if their platform does not support the + traditional 3-part convention: `db_name.schema_name.table_name` + Args: + full_table_name: A table name or a fully qualified table name. Depending on + SQL the platform, this could take the following forms: + - `..
` (three part names) + - `.
` (platforms which do not use schema groupings) + - `.` (if DB name is already in context) + - `
` (if DB name and schema name are already in context) + Returns: + A three part tuple (db_name, schema_name, table_name) with any unspecified + or unused parts returned as None. + """ + db_name: str | None = None + schema_name: str | None = None + + parts = full_table_name.split(".") + if len(parts) == 1: + table_name = full_table_name + if len(parts) == 2: + schema_name, table_name = parts + if len(parts) == 3: + db_name, schema_name, table_name = parts + + return db_name, schema_name, table_name + + def table_exists(self, full_table_name: str) -> bool: + """Determine if the target table already exists. + Args: + full_table_name: the target table name. + Returns: + True if table exists, False if not, None if unsure or undetectable. + """ + return cast( + bool, + sqlalchemy.inspect(self._engine).has_table(full_table_name), + ) + + def get_table_columns( + self, full_table_name: str, column_names: list[str] | None = None + ) -> dict[str, sqlalchemy.Column]: + """Return a list of table columns. + Args: + full_table_name: Fully qualified table name. + column_names: A list of column names to filter to. + Returns: + An ordered list of column objects. + """ + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + inspector = sqlalchemy.inspect(self._engine) + columns = inspector.get_columns(table_name, schema_name) + + return { + col_meta["name"]: sqlalchemy.Column( + col_meta["name"], + col_meta["type"], + nullable=col_meta.get("nullable", False), + ) + for col_meta in columns + if not column_names + or col_meta["name"].casefold() in {col.casefold() for col in column_names} + } + + def get_table( + self, full_table_name: str, column_names: list[str] | None = None + ) -> sqlalchemy.Table: + """Return a table object. + Args: + full_table_name: Fully qualified table name. + column_names: A list of column names to filter to. + Returns: + A table object with column list. + """ + columns = self.get_table_columns( + full_table_name=full_table_name, column_names=column_names + ).values() + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + meta = sqlalchemy.MetaData() + return sqlalchemy.schema.Table( + table_name, meta, *list(columns), schema=schema_name + ) + + def column_exists(self, full_table_name: str, column_name: str) -> bool: + """Determine if the target table already exists. + Args: + full_table_name: the target table name. + column_name: the target column name. + Returns: + True if table exists, False if not. + """ + return column_name in self.get_table_columns(full_table_name) + + def create_empty_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str] | None = None, + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> None: + """Create an empty target table. + Args: + full_table_name: the target table name. + schema: the JSON schema for the new table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + Raises: + NotImplementedError: if temp tables are unsupported and as_temp_table=True. + RuntimeError: if a variant schema is passed with no properties defined. + """ + if as_temp_table: + raise NotImplementedError("Temporary tables are not supported.") + + _ = partition_keys # Not supported in generic implementation. + + meta = sqlalchemy.MetaData() + columns: list[sqlalchemy.Column] = [] + primary_keys = primary_keys or [] + try: + properties: dict = schema["properties"] + except KeyError: + raise RuntimeError( + f"Schema for '{full_table_name}' does not define properties: {schema}" + ) + for property_name, property_jsonschema in properties.items(): + is_primary_key = property_name in primary_keys + columns.append( + sqlalchemy.Column( + property_name, + self.to_sql_type(property_jsonschema), + primary_key=is_primary_key, + ) + ) + + _ = sqlalchemy.Table(full_table_name, meta, *columns) + meta.create_all(self._engine) + + def _create_empty_column( + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, + ) -> None: + """Create a new column. + Args: + full_table_name: The target table name. + column_name: The name of the new column. + sql_type: SQLAlchemy type engine to be used in creating the new column. + Raises: + NotImplementedError: if adding columns is not supported. + """ + if not self.allow_column_add: + raise NotImplementedError("Adding columns is not supported.") + + create_column_clause = sqlalchemy.schema.CreateColumn( + sqlalchemy.Column( + column_name, + sql_type, + ) + ) + self.connection.execute( + sqlalchemy.DDL( + "ALTER TABLE %(table)s ADD COLUMN %(create_column)s", + { + "table": full_table_name, + "create_column": create_column_clause, + }, + ) + ) + + def prepare_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str], + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> None: + """Adapt target table to provided schema if possible. + Args: + full_table_name: the target table name. + schema: the JSON Schema for the table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + """ + if not self.table_exists(full_table_name=full_table_name): + self.create_empty_table( + full_table_name=full_table_name, + schema=schema, + primary_keys=primary_keys, + partition_keys=partition_keys, + as_temp_table=as_temp_table, + ) + return + + for property_name, property_def in schema["properties"].items(): + self.prepare_column( + full_table_name, property_name, self.to_sql_type(property_def) + ) + + def prepare_column( + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, + ) -> None: + """Adapt target table to provided schema if possible. + Args: + full_table_name: the target table name. + column_name: the target column name. + sql_type: the SQLAlchemy type. + """ + if not self.column_exists(full_table_name, column_name): + self._create_empty_column( + full_table_name=full_table_name, + column_name=column_name, + sql_type=sql_type, + ) + return + + self._adapt_column_type( + full_table_name, + column_name=column_name, + sql_type=sql_type, + ) + + def rename_column(self, full_table_name: str, old_name: str, new_name: str) -> None: + """Rename the provided columns. + Args: + full_table_name: The fully qualified table name. + old_name: The old column to be renamed. + new_name: The new name for the column. + Raises: + NotImplementedError: If `self.allow_column_rename` is false. + """ + if not self.allow_column_rename: + raise NotImplementedError("Renaming columns is not supported.") + + self.connection.execute( + f"ALTER TABLE {full_table_name} " + f'RENAME COLUMN "{old_name}" to "{new_name}"' + ) + + def merge_sql_types( + self, sql_types: list[sqlalchemy.types.TypeEngine] + ) -> sqlalchemy.types.TypeEngine: + """Return a compatible SQL type for the selected type list. + Args: + sql_types: List of SQL types. + Returns: + A SQL type that is compatible with the input types. + Raises: + ValueError: If sql_types argument has zero members. + """ + if not sql_types: + raise ValueError("Expected at least one member in `sql_types` argument.") + + if len(sql_types) == 1: + return sql_types[0] + + # Gathering Type to match variables + # sent in _adapt_column_type + current_type = sql_types[0] + # sql_type = sql_types[1] + + # Getting the length of each type + # current_type_len: int = getattr(sql_types[0], "length", 0) + sql_type_len: int = getattr(sql_types[1], "length", 0) + if sql_type_len is None: + sql_type_len = 0 + + # Convert the two types given into a sorted list + # containing the best conversion classes + sql_types = self._sort_types(sql_types) + + # If greater than two evaluate the first pair then on down the line + if len(sql_types) > 2: + return self.merge_sql_types( + [self.merge_sql_types([sql_types[0], sql_types[1]])] + sql_types[2:] + ) + + assert len(sql_types) == 2 + # Get the generic type class + for opt in sql_types: + # Get the length + opt_len: int = getattr(opt, "length", 0) + generic_type = type(opt.as_generic()) + + if isinstance(generic_type, type): + if issubclass( + generic_type, + (sqlalchemy.types.String, sqlalchemy.types.Unicode), + ): + # If length None or 0 then is varchar max ? + if (opt_len is None) or (opt_len == 0): + return opt + elif isinstance( + generic_type, + (sqlalchemy.types.String, sqlalchemy.types.Unicode), + ): + # If length None or 0 then is varchar max ? + if (opt_len is None) or (opt_len == 0): + return opt + # If best conversion class is equal to current type + # return the best conversion class + elif str(opt) == str(current_type): + return opt + + raise ValueError( + f"Unable to merge sql types: {', '.join([str(t) for t in sql_types])}" + ) + + def _sort_types( + self, + sql_types: Iterable[sqlalchemy.types.TypeEngine], + ) -> list[sqlalchemy.types.TypeEngine]: + """Return the input types sorted from most to least compatible. + For example, [Smallint, Integer, Datetime, String, Double] would become + [Unicode, String, Double, Integer, Smallint, Datetime]. + String types will be listed first, then decimal types, then integer types, + then bool types, and finally datetime and date. Higher precision, scale, and + length will be sorted earlier. + Args: + sql_types (List[sqlalchemy.types.TypeEngine]): [description] + Returns: + The sorted list. + """ + + def _get_type_sort_key( + sql_type: sqlalchemy.types.TypeEngine, + ) -> tuple[int, int]: + # return rank, with higher numbers ranking first + + _len = int(getattr(sql_type, "length", 0) or 0) + + _pytype = cast(type, sql_type.python_type) + if issubclass(_pytype, (str, bytes)): + return 900, _len + elif issubclass(_pytype, datetime): + return 600, _len + elif issubclass(_pytype, float): + return 400, _len + elif issubclass(_pytype, int): + return 300, _len + + return 0, _len + + return sorted(sql_types, key=_get_type_sort_key, reverse=True) + + def _get_column_type( + self, full_table_name: str, column_name: str + ) -> sqlalchemy.types.TypeEngine: + """Gets the SQL type of the declared column. + Args: + full_table_name: The name of the table. + column_name: The name of the column. + Returns: + The type of the column. + Raises: + KeyError: If the provided column name does not exist. + """ + try: + column = self.get_table_columns(full_table_name)[column_name] + except KeyError as ex: + raise KeyError( + f"Column `{column_name}` does not exist in table `{full_table_name}`." + ) from ex + + return cast(sqlalchemy.types.TypeEngine, column.type) + + def _adapt_column_type( + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, + ) -> None: + """Adapt table column type to support the new JSON schema type. + Args: + full_table_name: The target table name. + column_name: The target column name. + sql_type: The new SQLAlchemy type. + Raises: + NotImplementedError: if altering columns is not supported. + """ + current_type: sqlalchemy.types.TypeEngine = self._get_column_type( + full_table_name, column_name + ) + + # Check if the existing column type and the sql type are the same + if str(sql_type) == str(current_type): + # The current column and sql type are the same + # Nothing to do + return + + # Not the same type, generic type or compatible types + # calling merge_sql_types for assistnace + compatible_sql_type = self.merge_sql_types([current_type, sql_type]) + + if str(compatible_sql_type) == str(current_type): + # Nothing to do + return + + if not self.allow_column_alter: + raise NotImplementedError( + "Altering columns is not supported. " + f"Could not convert column '{full_table_name}.{column_name}' " + f"from '{current_type}' to '{compatible_sql_type}'." + ) + + self.connection.execute( + sqlalchemy.DDL( + "ALTER TABLE %(table)s ALTER COLUMN %(col_name)s (%(col_type)s)", + { + "table": full_table_name, + "col_name": column_name, + "col_type": compatible_sql_type, + }, + ) + ) + + class SQLStream(Stream, metaclass=abc.ABCMeta): """Base class for SQLAlchemy-based streams.""" From c59bd5e8fd3d30510d69fa4cba55d5ed772e234c Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 11 Oct 2022 16:34:46 +0100 Subject: [PATCH 07/13] fix copy-paste formatting --- singer_sdk/sinks/sql.py | 2 +- singer_sdk/streams/sql.py | 100 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 220a90f44..3c3e1cc17 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -10,7 +10,7 @@ from singer_sdk.plugin_base import PluginBase from singer_sdk.sinks.batch import BatchSink -from singer_sdk.sql import SQLConnector +from singer_sdk.streams import SQLConnector class SQLSink(BatchSink): diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index f256b2b42..ad764a1b1 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -22,7 +22,9 @@ class SQLConnector: """Base class for SQLAlchemy-based connectors. + The connector class serves as a wrapper around the SQL connection. + The functions of the connector are: - connecting to the source - generating SQLAlchemy connection and engine objects @@ -41,6 +43,7 @@ def __init__( self, config: dict | None = None, sqlalchemy_url: str | None = None ) -> None: """Initialize the SQL connector. + Args: config: The parent tap or target object's config. sqlalchemy_url: Optional URL for the connection. @@ -52,6 +55,7 @@ def __init__( @property def config(self) -> dict: """If set, provides access to the tap or target config. + Returns: The settings as a dict. """ @@ -60,6 +64,7 @@ def config(self) -> dict: @property def logger(self) -> logging.Logger: """Get logger. + Returns: Plugin logger. """ @@ -67,12 +72,16 @@ def logger(self) -> logging.Logger: def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: """Return a new SQLAlchemy connection using the provided config. + By default this will create using the sqlalchemy `stream_results=True` option described here: + https://docs.sqlalchemy.org/en/14/core/connections.html#using-server-side-cursors-a-k-a-stream-results + Developers may override this method if their provider does not support server side cursors (`stream_results`) or in order to use different configurations options when creating the connection object. + Returns: A newly created SQLAlchemy engine object. """ @@ -84,8 +93,10 @@ def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: def create_sqlalchemy_engine(self) -> sqlalchemy.engine.Engine: """Return a new SQLAlchemy engine using the provided config. + Developers can generally override just one of the following: `sqlalchemy_engine`, sqlalchemy_url`. + Returns: A newly created SQLAlchemy engine object. """ @@ -94,6 +105,7 @@ def create_sqlalchemy_engine(self) -> sqlalchemy.engine.Engine: @property def connection(self) -> sqlalchemy.engine.Connection: """Return or set the SQLAlchemy connection object. + Returns: The active SQLAlchemy connection object. """ @@ -105,6 +117,7 @@ def connection(self) -> sqlalchemy.engine.Connection: @property def sqlalchemy_url(self) -> str: """Return the SQLAlchemy URL string. + Returns: The URL as a string. """ @@ -115,12 +128,16 @@ def sqlalchemy_url(self) -> str: def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: """Return the SQLAlchemy URL string. + Developers can generally override just one of the following: `sqlalchemy_engine`, `get_sqlalchemy_url`. + Args: config: A dictionary of settings from the tap or target config. + Returns: The URL as a string. + Raises: ConfigValidationError: If no valid sqlalchemy_url can be found. """ @@ -138,15 +155,20 @@ def to_jsonschema_type( ), ) -> dict: """Return a JSON Schema representation of the provided type. + By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy types. + Developers may override this method to accept additional input argument types, to support non-standard types, or to provide custom typing logic. + Args: sql_type: The string representation of the SQL type, a SQLAlchemy TypeEngine class or object, or a custom-specified object. + Raises: ValueError: If the type received could not be translated to jsonschema. + Returns: The JSON Schema representation of the provided type. """ @@ -164,13 +186,17 @@ def to_jsonschema_type( @staticmethod def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: """Return a JSON Schema representation of the provided type. + By default will call `typing.to_sql_type()`. + Developers may override this method to accept additional input argument types, to support non-standard types, or to provide custom typing logic. If overriding this method, developers should call the default implementation from the base class for all unhandled cases. + Args: jsonschema_type: The JSON Schema representation of the source type. + Returns: The SQLAlchemy type representation of the data type. """ @@ -184,14 +210,17 @@ def get_fully_qualified_name( delimiter: str = ".", ) -> str: """Concatenates a fully qualified name from the parts. + Args: table_name: The name of the table. schema_name: The name of the schema. Defaults to None. db_name: The name of the database. Defaults to None. delimiter: Generally: '.' for SQL names and '-' for Singer names. + Raises: ValueError: If table_name is not provided or if neither schema_name or db_name are provided. + Returns: The fully qualified name as a string. """ @@ -220,6 +249,7 @@ def get_fully_qualified_name( @property def _dialect(self) -> sqlalchemy.engine.Dialect: """Return the dialect object. + Returns: The dialect object. """ @@ -228,6 +258,7 @@ def _dialect(self) -> sqlalchemy.engine.Dialect: @property def _engine(self) -> sqlalchemy.engine.Engine: """Return the dialect object. + Returns: The dialect object. """ @@ -235,11 +266,14 @@ def _engine(self) -> sqlalchemy.engine.Engine: def quote(self, name: str) -> str: """Quote a name if it needs quoting, using '.' as a name-part delimiter. + Examples: "my_table" => "`my_table`" "my_schema.my_table" => "`my_schema`.`my_table`" + Args: name: The unquoted name. + Returns: str: The quoted name. """ @@ -260,9 +294,11 @@ def _warn_no_view_detection(self) -> None: def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: """Return a list of schema names in DB. + Args: engine: SQLAlchemy engine inspected: SQLAlchemy inspector instance for engine + Returns: List of schema names """ @@ -272,10 +308,12 @@ def get_object_names( self, engine: Engine, inspected: Inspector, schema_name: str ) -> list[tuple[str, bool]]: """Return a list of syncable objects. + Args: engine: SQLAlchemy engine inspected: SQLAlchemy inspector instance for engine schema_name: Schema name to inspect + Returns: List of tuples (, ) """ @@ -299,12 +337,14 @@ def discover_catalog_entry( is_view: bool, ) -> CatalogEntry: """Create `CatalogEntry` object for the given table or a view. + Args: engine: SQLAlchemy engine inspected: SQLAlchemy inspector instance for engine schema_name: Schema name to inspect table_name: Name of the table or a view is_view: Flag whether this object is a view, returned by `get_object_names` + Returns: `CatalogEntry` object for the given table or a view """ @@ -380,6 +420,7 @@ def discover_catalog_entry( def discover_catalog_entries(self) -> list[dict]: """Return a list of catalog entries from discovery. + Returns: The discovered catalog entries as a list. """ @@ -402,8 +443,10 @@ def parse_full_table_name( self, full_table_name: str ) -> tuple[str | None, str | None, str]: """Parse a fully qualified table name into its parts. + Developers may override this method if their platform does not support the traditional 3-part convention: `db_name.schema_name.table_name` + Args: full_table_name: A table name or a fully qualified table name. Depending on SQL the platform, this could take the following forms: @@ -411,6 +454,7 @@ def parse_full_table_name( - `.
` (platforms which do not use schema groupings) - `.` (if DB name is already in context) - `
` (if DB name and schema name are already in context) + Returns: A three part tuple (db_name, schema_name, table_name) with any unspecified or unused parts returned as None. @@ -430,8 +474,10 @@ def parse_full_table_name( def table_exists(self, full_table_name: str) -> bool: """Determine if the target table already exists. + Args: full_table_name: the target table name. + Returns: True if table exists, False if not, None if unsure or undetectable. """ @@ -440,13 +486,27 @@ def table_exists(self, full_table_name: str) -> bool: sqlalchemy.inspect(self._engine).has_table(full_table_name), ) + def schema_exists(self, schema_name: str) -> bool: + """Determine if the target database schema already exists. + + Args: + schema_name: The target database schema name. + + Returns: + True if the database schema exists, False if not. + """ + schema_names = sqlalchemy.inspect(self._engine).get_schema_names() + return schema_name in schema_names + def get_table_columns( self, full_table_name: str, column_names: list[str] | None = None ) -> dict[str, sqlalchemy.Column]: """Return a list of table columns. + Args: full_table_name: Fully qualified table name. column_names: A list of column names to filter to. + Returns: An ordered list of column objects. """ @@ -469,9 +529,11 @@ def get_table( self, full_table_name: str, column_names: list[str] | None = None ) -> sqlalchemy.Table: """Return a table object. + Args: full_table_name: Fully qualified table name. column_names: A list of column names to filter to. + Returns: A table object with column list. """ @@ -486,9 +548,11 @@ def get_table( def column_exists(self, full_table_name: str, column_name: str) -> bool: """Determine if the target table already exists. + Args: full_table_name: the target table name. column_name: the target column name. + Returns: True if table exists, False if not. """ @@ -503,12 +567,14 @@ def create_empty_table( as_temp_table: bool = False, ) -> None: """Create an empty target table. + Args: full_table_name: the target table name. schema: the JSON schema for the new table. primary_keys: list of key properties. partition_keys: list of partition keys. as_temp_table: True to create a temp table. + Raises: NotImplementedError: if temp tables are unsupported and as_temp_table=True. RuntimeError: if a variant schema is passed with no properties defined. @@ -547,10 +613,12 @@ def _create_empty_column( sql_type: sqlalchemy.types.TypeEngine, ) -> None: """Create a new column. + Args: full_table_name: The target table name. column_name: The name of the new column. sql_type: SQLAlchemy type engine to be used in creating the new column. + Raises: NotImplementedError: if adding columns is not supported. """ @@ -573,6 +641,16 @@ def _create_empty_column( ) ) + def prepare_schema(self, schema_name: str) -> None: + """Create the target database schema. + + Args: + schema_name: The target schema name. + """ + schema_exists = self.schema_exists(schema_name) + if not schema_exists: + self.create_schema(schema_name) + def prepare_table( self, full_table_name: str, @@ -582,6 +660,7 @@ def prepare_table( as_temp_table: bool = False, ) -> None: """Adapt target table to provided schema if possible. + Args: full_table_name: the target table name. schema: the JSON Schema for the table. @@ -611,6 +690,7 @@ def prepare_column( sql_type: sqlalchemy.types.TypeEngine, ) -> None: """Adapt target table to provided schema if possible. + Args: full_table_name: the target table name. column_name: the target column name. @@ -632,10 +712,12 @@ def prepare_column( def rename_column(self, full_table_name: str, old_name: str, new_name: str) -> None: """Rename the provided columns. + Args: full_table_name: The fully qualified table name. old_name: The old column to be renamed. new_name: The new name for the column. + Raises: NotImplementedError: If `self.allow_column_rename` is false. """ @@ -651,10 +733,13 @@ def merge_sql_types( self, sql_types: list[sqlalchemy.types.TypeEngine] ) -> sqlalchemy.types.TypeEngine: """Return a compatible SQL type for the selected type list. + Args: sql_types: List of SQL types. + Returns: A SQL type that is compatible with the input types. + Raises: ValueError: If sql_types argument has zero members. """ @@ -721,13 +806,17 @@ def _sort_types( sql_types: Iterable[sqlalchemy.types.TypeEngine], ) -> list[sqlalchemy.types.TypeEngine]: """Return the input types sorted from most to least compatible. + For example, [Smallint, Integer, Datetime, String, Double] would become [Unicode, String, Double, Integer, Smallint, Datetime]. + String types will be listed first, then decimal types, then integer types, then bool types, and finally datetime and date. Higher precision, scale, and length will be sorted earlier. + Args: sql_types (List[sqlalchemy.types.TypeEngine]): [description] + Returns: The sorted list. """ @@ -756,12 +845,15 @@ def _get_type_sort_key( def _get_column_type( self, full_table_name: str, column_name: str ) -> sqlalchemy.types.TypeEngine: - """Gets the SQL type of the declared column. + """Get the SQL type of the declared column. + Args: full_table_name: The name of the table. column_name: The name of the column. + Returns: The type of the column. + Raises: KeyError: If the provided column name does not exist. """ @@ -781,10 +873,12 @@ def _adapt_column_type( sql_type: sqlalchemy.types.TypeEngine, ) -> None: """Adapt table column type to support the new JSON schema type. + Args: full_table_name: The target table name. column_name: The target column name. sql_type: The new SQLAlchemy type. + Raises: NotImplementedError: if altering columns is not supported. """ @@ -865,7 +959,7 @@ def _singer_catalog_entry(self) -> CatalogEntry: @property def connector(self) -> SQLConnector: - """The connector object. + """Return a connector object. Returns: The connector object. @@ -874,7 +968,7 @@ def connector(self) -> SQLConnector: @property def metadata(self) -> MetadataMapping: - """The Singer metadata. + """Return the Singer metadata. Metadata from an input catalog will override standard metadata. From 7fd3bb111d5cb46fc0c91e9b1aa6aaf77a04e322 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 11 Oct 2022 16:38:55 +0100 Subject: [PATCH 08/13] fix test --- singer_sdk/streams/sql.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index ad764a1b1..a48191f21 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -584,7 +584,8 @@ def create_empty_table( _ = partition_keys # Not supported in generic implementation. - meta = sqlalchemy.MetaData() + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + meta = sqlalchemy.MetaData(schema=schema_name) columns: list[sqlalchemy.Column] = [] primary_keys = primary_keys or [] try: @@ -603,7 +604,7 @@ def create_empty_table( ) ) - _ = sqlalchemy.Table(full_table_name, meta, *columns) + _ = sqlalchemy.Table(table_name, meta, *columns) meta.create_all(self._engine) def _create_empty_column( From 615e5a6b117a85a7286aa3be27c0fb10ce2a42f8 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 11 Oct 2022 17:00:07 +0100 Subject: [PATCH 09/13] more connector changes --- singer_sdk/streams/sql.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index a48191f21..ee4672f57 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -204,7 +204,7 @@ def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: @staticmethod def get_fully_qualified_name( - table_name: str, + table_name: str | None = None, schema_name: str | None = None, db_name: str | None = None, delimiter: str = ".", @@ -218,23 +218,23 @@ def get_fully_qualified_name( delimiter: Generally: '.' for SQL names and '-' for Singer names. Raises: - ValueError: If table_name is not provided or if neither schema_name or - db_name are provided. + ValueError: If all 3 name parts not supplied. Returns: The fully qualified name as a string. """ - if db_name and schema_name: - result = delimiter.join([db_name, schema_name, table_name]) - elif db_name: - result = delimiter.join([db_name, table_name]) - elif schema_name: - result = delimiter.join([schema_name, table_name]) - elif table_name: - result = table_name - else: + parts = [] + + if db_name: + parts.append(db_name) + if schema_name: + parts.append(schema_name) + if table_name: + parts.append(table_name) + + if not parts: raise ValueError( - "Could not generate fully qualified name for stream: " + "Could not generate fully qualified name: " + ":".join( [ db_name or "(unknown-db)", @@ -244,7 +244,7 @@ def get_fully_qualified_name( ) ) - return result + return delimiter.join(parts) @property def _dialect(self) -> sqlalchemy.engine.Dialect: @@ -558,6 +558,13 @@ def column_exists(self, full_table_name: str, column_name: str) -> bool: """ return column_name in self.get_table_columns(full_table_name) + def create_schema(self, schema_name: str) -> None: + """Create target schema. + Args: + schema_name: The target schema to create. + """ + self._engine.execute(sqlalchemy.schema.CreateSchema(schema_name)) + def create_empty_table( self, full_table_name: str, From 4171a9504ae18ad230b35b015d114f0338638b5e Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 11 Oct 2022 17:12:10 +0100 Subject: [PATCH 10/13] fix docstring --- singer_sdk/streams/sql.py | 1 + 1 file changed, 1 insertion(+) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index ee4672f57..eead34c97 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -560,6 +560,7 @@ def column_exists(self, full_table_name: str, column_name: str) -> bool: def create_schema(self, schema_name: str) -> None: """Create target schema. + Args: schema_name: The target schema to create. """ From b60ddca8357fb24627ca2d8909898a88bbb1e978 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Wed, 12 Oct 2022 12:14:17 +0100 Subject: [PATCH 11/13] add schema creation test --- tests/core/test_sqlite.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/core/test_sqlite.py b/tests/core/test_sqlite.py index 5e76d8168..b9c7c82ca 100644 --- a/tests/core/test_sqlite.py +++ b/tests/core/test_sqlite.py @@ -10,6 +10,7 @@ from uuid import uuid4 import pytest +import sqlalchemy from samples.sample_tap_sqlite import SQLiteConnector, SQLiteTap from samples.sample_target_csv.csv_target import SampleTargetCSV @@ -265,6 +266,41 @@ def test_sync_sqlite_to_sqlite( assert line_num > 0, "No lines read." +def test_sqlite_schema_addition( + sqlite_target_test_config: dict, sqlite_sample_target: SQLTarget +): + """Test that SQL-based targets attempt to create new schema if included in stream name.""" + schema_name = f"test_schema_{str(uuid4()).split('-')[-1]}" + table_name = f"zzz_tmp_{str(uuid4()).split('-')[-1]}" + test_stream_name = f"{schema_name}-{table_name}" + schema_message = { + "type": "SCHEMA", + "stream": test_stream_name, + "schema": { + "type": "object", + "properties": {"col_a": th.StringType().to_dict()}, + }, + } + tap_output = "\n".join( + json.dumps(msg) + for msg in [ + schema_message, + { + "type": "RECORD", + "stream": test_stream_name, + "record": {"col_a": "samplerow1"}, + }, + ] + ) + # sqlite doesn't support schema creation + with pytest.raises(sqlalchemy.exc.OperationalError) as excinfo: + target_sync_test( + sqlite_sample_target, input=StringIO(tap_output), finalize=True + ) + # check the target at least tried to create the schema + assert excinfo.value.statement == f"CREATE SCHEMA {schema_name}" + + def test_sqlite_column_addition(sqlite_sample_target: SQLTarget): """End-to-end-to-end test for SQLite tap and target. From d33b822f2c4aeb63c3bb07c0f6fcc95b67e8f024 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Wed, 19 Oct 2022 16:44:05 +0100 Subject: [PATCH 12/13] remove create_table_with_records method --- singer_sdk/sinks/sql.py | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 3c3e1cc17..0acebef21 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -148,41 +148,6 @@ def process_batch(self, context: dict) -> None: records=context["records"], ) - def create_table_with_records( - self, - full_table_name: Optional[str], - schema: dict, - records: Iterable[Dict[str, Any]], - primary_keys: Optional[List[str]] = None, - partition_keys: Optional[List[str]] = None, - as_temp_table: bool = False, - ) -> None: - """Create an empty table. - - Args: - full_table_name: the target table name. - schema: the JSON schema for the new table. - records: records to load. - primary_keys: list of key properties. - partition_keys: list of partition keys. - as_temp_table: True to create a temp table. - """ - full_table_name = full_table_name or self.full_table_name - if primary_keys is None: - primary_keys = self.key_properties - partition_keys = partition_keys or None - # TODO: determine if this call to `prepare_table` is necessary - # (in addition to in `setup` above) - self.connector.prepare_table( - full_table_name=full_table_name, - primary_keys=primary_keys, - schema=schema, - as_temp_table=as_temp_table, - ) - self.bulk_insert_records( - full_table_name=full_table_name, schema=schema, records=records - ) - def generate_insert_statement( self, full_table_name: str, From e3e3a30e296d6d26e818b267cb0e7a7d4fbd2abd Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Wed, 19 Oct 2022 17:08:34 +0100 Subject: [PATCH 13/13] Update singer_sdk/sinks/sql.py Co-authored-by: Aaron ("AJ") Steers --- singer_sdk/sinks/sql.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 0acebef21..c3455d5df 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -77,10 +77,12 @@ def schema_name(self) -> Optional[str]: The target schema name. """ parts = self.stream_name.split("-") - if len(parts) == 2: - return parts[0] - if len(parts) == 3: - return parts[1] + if len(parts) in {2, 3}: + # Stream name is a two-part or three-part identifier. + # Use the second-to-last part as the schema name. + return parts[-2] + + # Schema name not detected. return None @property