From 10a4ceb30153efdcf0faf5a0aa50c9b652184da6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 12 Aug 2024 17:32:23 -0600 Subject: [PATCH] fix: Handle reserved words in db, schema and table names with the `normalize_name` dialect helper Co-authored-by: Pat Nadolny --- .github/workflows/test.yml | 13 ++-- target_snowflake/connector.py | 9 ++- target_snowflake/sinks.py | 9 +-- tests/core.py | 59 ++++++++++++++++++- .../reserved_words_in_table.singer | 2 + 5 files changed, 80 insertions(+), 12 deletions(-) create mode 100644 tests/target_test_streams/reserved_words_in_table.singer diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6040f12..985e36f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -21,12 +21,15 @@ jobs: fail-fast: false matrix: python-version: - - "3.8" - - "3.9" - - "3.10" - - "3.11" - "3.12" - os: ["ubuntu-latest", "macos-latest", "windows-latest"] + # - "3.11" + # - "3.10" + # - "3.9" + # - "3.8" + os: + - "ubuntu-latest" + # - "macos-latest" + # - "windows-latest" steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index 1bdd4d9..5cf64be 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -12,7 +12,7 @@ from snowflake.sqlalchemy import URL from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer from snowflake.sqlalchemy.snowdialect import SnowflakeDialect -from sqlalchemy.sql import quoted_name, text +from sqlalchemy.sql import text from target_snowflake.snowflake_types import NUMBER, TIMESTAMP_NTZ, VARIANT @@ -61,6 +61,11 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.schema_cache: dict = {} super().__init__(*args, **kwargs) + @property + def dialect(self) -> SnowflakeDialect: + """Return a Snowflake dialect instance.""" + return self._engine.dialect + def get_table_columns( self, full_table_name: str, @@ -388,7 +393,7 @@ def _get_merge_from_stage_statement( # noqa: ANN202 dedup = f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {dedup_cols} ORDER BY SEQ8() DESC) = 1" return ( text( - f"merge into {quoted_name(full_table_name, quote=True)} d using " # noqa: ISC003 + f"merge into {full_table_name} d using " # noqa: ISC003 + f"(select {json_casting_selects} from '@~/target-snowflake/{sync_id}'" # noqa: S608 + f"(file_format => {file_format}) {dedup}) s " + f"on {join_expr} " diff --git a/target_snowflake/sinks.py b/target_snowflake/sinks.py index fc980fb..01c4170 100644 --- a/target_snowflake/sinks.py +++ b/target_snowflake/sinks.py @@ -17,6 +17,7 @@ from singer_sdk.sinks import SQLSink from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer from snowflake.sqlalchemy.snowdialect import SnowflakeDialect +from sqlalchemy.sql import quoted_name from target_snowflake.connector import SnowflakeConnector @@ -29,7 +30,7 @@ } -class SnowflakeSink(SQLSink): +class SnowflakeSink(SQLSink[SnowflakeConnector]): """Snowflake target sink class.""" connector_class = SnowflakeConnector @@ -55,16 +56,16 @@ def __init__( @property def schema_name(self) -> str | None: schema = super().schema_name or self.config.get("schema") - return schema.upper() if schema else None + return quoted_name(self.connector.dialect.normalize_name(schema.upper() if schema else None), quote=True) @property def database_name(self) -> str | None: db = super().database_name or self.config.get("database") - return db.upper() if db else None + return quoted_name(self.connector.dialect.normalize_name(db.upper() if db else None), quote=True) @property def table_name(self) -> str: - return super().table_name.upper() + return quoted_name(self.connector.dialect.normalize_name(super().table_name.upper()), quote=True) def setup(self) -> None: """Set up Sink. diff --git a/tests/core.py b/tests/core.py index 6643ea8..1206655 100644 --- a/tests/core.py +++ b/tests/core.py @@ -66,7 +66,7 @@ def validate(self) -> None: class SnowflakeTargetCamelcaseComplexSchema(TargetCamelcaseComplexSchema): def validate(self) -> None: connector = self.target.default_sink_class.connector_class(self.target.config) - table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.ForecastingTypeToCategory".upper() # noqa: E501 + table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"ForecastingTypeToCategory\"" # noqa: E501 table_schema = connector.get_table(table) expected_types = { "id": sqlalchemy.VARCHAR, @@ -458,6 +458,61 @@ def setup(self) -> None: ) +class SnowflakeTargetExistingReservedNameTableAlter(TargetFileTestTemplate): + name = "existing_reserved_name_table_alter" + # This sends a schema that will request altering from TIMESTAMP_NTZ to VARCHAR + + @property + def singer_filepath(self) -> Path: + current_dir = Path(__file__).resolve().parent + return current_dir / "target_test_streams" / "reserved_words_in_table.singer" + + def setup(self) -> None: + connector = self.target.default_sink_class.connector_class(self.target.config) + table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"order\"".upper() + connector.connection.execute( + f""" + CREATE OR REPLACE TABLE {table} ( + ID VARCHAR(16777216), + COL_STR VARCHAR(16777216), + COL_TS TIMESTAMP_NTZ(9), + COL_INT STRING, + COL_BOOL BOOLEAN, + COL_VARIANT VARIANT, + _SDC_BATCHED_AT TIMESTAMP_NTZ(9), + _SDC_DELETED_AT VARCHAR(16777216), + _SDC_EXTRACTED_AT TIMESTAMP_NTZ(9), + _SDC_RECEIVED_AT TIMESTAMP_NTZ(9), + _SDC_SEQUENCE NUMBER(38,0), + _SDC_TABLE_VERSION NUMBER(38,0), + PRIMARY KEY (ID) + ) + """, + ) + + +class SnowflakeTargetReservedWordsInTable(TargetFileTestTemplate): + # Contains reserved words from + # https://docs.snowflake.com/en/sql-reference/reserved-keywords + # Syncs records then alters schema by adding a non-reserved word column. + name = "reserved_words_in_table" + + @property + def singer_filepath(self) -> Path: + current_dir = Path(__file__).resolve().parent + return current_dir / "target_test_streams" / "reserved_words_in_table.singer" + + def validate(self) -> None: + connector = self.target.default_sink_class.connector_class(self.target.config) + table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"order\"".upper() + result = connector.connection.execute( + f"select * from {table}", + ) + assert result.rowcount == 1 + row = result.first() + assert len(row) == 13, f"Row has unexpected length {len(row)}" + + class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate): name = "type_edge_cases" @@ -540,6 +595,8 @@ def singer_filepath(self) -> Path: SnowflakeTargetColonsInColName, SnowflakeTargetExistingTable, SnowflakeTargetExistingTableAlter, + SnowflakeTargetExistingReservedNameTableAlter, + SnowflakeTargetReservedWordsInTable, SnowflakeTargetTypeEdgeCasesTest, SnowflakeTargetColumnOrderMismatch, ], diff --git a/tests/target_test_streams/reserved_words_in_table.singer b/tests/target_test_streams/reserved_words_in_table.singer new file mode 100644 index 0000000..ede9a18 --- /dev/null +++ b/tests/target_test_streams/reserved_words_in_table.singer @@ -0,0 +1,2 @@ +{ "type": "SCHEMA", "stream": "order", "schema": { "properties": { "id": { "type": [ "string", "null" ] }, "col_str": { "type": [ "string", "null" ] }, "col_ts": { "format": "date-time", "type": [ "string", "null" ] }, "col_int": { "type": "integer" }, "col_bool": { "type": [ "boolean", "null" ] }, "col_variant": {"type": "object"} }, "type": "object" }, "key_properties": [ "id" ], "bookmark_properties": [ "col_ts" ] } +{ "type": "RECORD", "stream": "order", "record": { "id": "123", "col_str": "foo", "col_ts": "2023-06-13 11:50:04.072", "col_int": 5, "col_bool": true, "col_variant": {"key": "val"} }, "time_extracted": "2023-06-14T18:08:23.074716+00:00" }