From 3c77f5213540f1790905bff4aee47fd1eadc29d2 Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Tue, 3 Oct 2023 08:20:33 -0700 Subject: [PATCH 1/9] fix --- tap_snowflake/client.py | 21 +++++++++++++++++++++ tests/catalog.json | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index 977c8d6..3f693a3 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -11,14 +11,35 @@ from pathlib import Path from typing import Any, Iterable, List, Tuple from uuid import uuid4 +import datetime import sqlalchemy from singer_sdk import SQLConnector, SQLStream, metrics from singer_sdk.helpers._batch import BaseBatchFileEncoding, BatchConfig from singer_sdk.streams.core import REPLICATION_FULL_TABLE, REPLICATION_INCREMENTAL +import singer_sdk.helpers._typing from snowflake.sqlalchemy import URL from sqlalchemy.sql import text +unpatched_conform = singer_sdk.helpers._typing._conform_primitive_property + + +def patched_conform( + elem: Any, + property_schema: dict, +) -> Any: + """Overrides Singer SDK type conformance to prevent dates turning into datetimes. + Converts a primitive (i.e. not object or array) to a json compatible type. + Returns: + The appropriate json compatible type. + """ + if isinstance(elem, datetime.date): + return elem.isoformat() + return unpatched_conform(elem=elem, property_schema=property_schema) + + +singer_sdk.helpers._typing._conform_primitive_property = patched_conform + class ProfileStats(Enum): """Profile Statistics Enum.""" diff --git a/tests/catalog.json b/tests/catalog.json index c3f4c5f..62e5e69 100644 --- a/tests/catalog.json +++ b/tests/catalog.json @@ -378,7 +378,7 @@ "type": ["number"] }, "o_orderdate": { - "format": "date-time", + "format": "date", "type": ["string"] }, "o_orderpriority": { From c4cd6ac5ff2f4d637655c17116fbaad28ba957ae Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Wed, 22 Nov 2023 05:30:52 -0800 Subject: [PATCH 2/9] test-commit --- tap_snowflake/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index 3f693a3..e1dfcfe 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -20,6 +20,7 @@ import singer_sdk.helpers._typing from snowflake.sqlalchemy import URL from sqlalchemy.sql import text +LOGGER = singer.get_logger() unpatched_conform = singer_sdk.helpers._typing._conform_primitive_property From 6a52fe1b53144755e2e68115b8cb520f57c9d91d Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Wed, 22 Nov 2023 05:40:01 -0800 Subject: [PATCH 3/9] adding logging --- tap_snowflake/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index e1dfcfe..9795301 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -111,6 +111,8 @@ def discover_catalog_entries(self) -> list[dict]: """ result: list[dict] = [] tables = [t.lower() for t in self.config.get("tables", [])] + LOGGER.debug("This is the tables: %s", tables) + LOGGER.debug("This is the config: %s", self.config) engine = self.create_sqlalchemy_engine() inspected = sqlalchemy.inspect(engine) schema_names = [ @@ -118,6 +120,7 @@ def discover_catalog_entries(self) -> list[dict]: for schema_name in self.get_schema_names(engine, inspected) if schema_name.lower() != "information_schema" ] + LOGGER.debug("This is the schema_names: %s", schema_names) for schema_name in schema_names: # Iterate through each table and view for table_name, is_view in self.get_object_names( From 081bfb5e0f21b1e0ad9ffa4702e4ad3a013845fa Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Wed, 22 Nov 2023 06:29:26 -0800 Subject: [PATCH 4/9] fixed import --- tap_snowflake/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index 9795301..5572b8e 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -12,6 +12,7 @@ from typing import Any, Iterable, List, Tuple from uuid import uuid4 import datetime +import singer import sqlalchemy from singer_sdk import SQLConnector, SQLStream, metrics From 0ba04c94503da09b28ab9ea9bd8a2ff1a97a0ee4 Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Wed, 22 Nov 2023 07:40:57 -0800 Subject: [PATCH 5/9] test. must revert --- tap_snowflake/client.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index 5572b8e..0e42aba 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -114,24 +114,25 @@ def discover_catalog_entries(self) -> list[dict]: tables = [t.lower() for t in self.config.get("tables", [])] LOGGER.debug("This is the tables: %s", tables) LOGGER.debug("This is the config: %s", self.config) - engine = self.create_sqlalchemy_engine() - inspected = sqlalchemy.inspect(engine) - schema_names = [ - schema_name - for schema_name in self.get_schema_names(engine, inspected) - if schema_name.lower() != "information_schema" - ] - LOGGER.debug("This is the schema_names: %s", schema_names) - for schema_name in schema_names: - # Iterate through each table and view - for table_name, is_view in self.get_object_names( - engine, inspected, schema_name - ): - if (not tables) or (f"{schema_name}.{table_name}" in tables): - catalog_entry = self.discover_catalog_entry( - engine, inspected, schema_name, table_name, is_view - ) - result.append(catalog_entry.to_dict()) + result.append(self.config) + # engine = self.create_sqlalchemy_engine() + # inspected = sqlalchemy.inspect(engine) + # schema_names = [ + # schema_name + # for schema_name in self.get_schema_names(engine, inspected) + # if schema_name.lower() != "information_schema" + # ] + # LOGGER.debug("This is the schema_names: %s", schema_names) + # for schema_name in schema_names: + # # Iterate through each table and view + # for table_name, is_view in self.get_object_names( + # engine, inspected, schema_name + # ): + # if (not tables) or (f"{schema_name}.{table_name}" in tables): + # catalog_entry = self.discover_catalog_entry( + # engine, inspected, schema_name, table_name, is_view + # ) + # result.append(catalog_entry.to_dict()) return result From 1be1eb00e9420b16d63b7af9c35112cdc577fa7f Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Wed, 22 Nov 2023 07:46:53 -0800 Subject: [PATCH 6/9] remove logging --- tap_snowflake/client.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index 0e42aba..8553f74 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -12,7 +12,6 @@ from typing import Any, Iterable, List, Tuple from uuid import uuid4 import datetime -import singer import sqlalchemy from singer_sdk import SQLConnector, SQLStream, metrics @@ -21,7 +20,6 @@ import singer_sdk.helpers._typing from snowflake.sqlalchemy import URL from sqlalchemy.sql import text -LOGGER = singer.get_logger() unpatched_conform = singer_sdk.helpers._typing._conform_primitive_property @@ -112,8 +110,6 @@ def discover_catalog_entries(self) -> list[dict]: """ result: list[dict] = [] tables = [t.lower() for t in self.config.get("tables", [])] - LOGGER.debug("This is the tables: %s", tables) - LOGGER.debug("This is the config: %s", self.config) result.append(self.config) # engine = self.create_sqlalchemy_engine() # inspected = sqlalchemy.inspect(engine) From 63059c2ade5418f3a581ac69f22b84dab5367b96 Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Wed, 22 Nov 2023 07:58:25 -0800 Subject: [PATCH 7/9] adding logging again --- tap_snowflake/client.py | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index 8553f74..a5e8b7c 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -13,6 +13,7 @@ from uuid import uuid4 import datetime +from loguru import logger import sqlalchemy from singer_sdk import SQLConnector, SQLStream, metrics from singer_sdk.helpers._batch import BaseBatchFileEncoding, BatchConfig @@ -111,24 +112,25 @@ def discover_catalog_entries(self) -> list[dict]: result: list[dict] = [] tables = [t.lower() for t in self.config.get("tables", [])] result.append(self.config) - # engine = self.create_sqlalchemy_engine() - # inspected = sqlalchemy.inspect(engine) - # schema_names = [ - # schema_name - # for schema_name in self.get_schema_names(engine, inspected) - # if schema_name.lower() != "information_schema" - # ] - # LOGGER.debug("This is the schema_names: %s", schema_names) - # for schema_name in schema_names: - # # Iterate through each table and view - # for table_name, is_view in self.get_object_names( - # engine, inspected, schema_name - # ): - # if (not tables) or (f"{schema_name}.{table_name}" in tables): - # catalog_entry = self.discover_catalog_entry( - # engine, inspected, schema_name, table_name, is_view - # ) - # result.append(catalog_entry.to_dict()) + engine = self.create_sqlalchemy_engine() + inspected = sqlalchemy.inspect(engine) + schema_names = [ + schema_name + for schema_name in self.get_schema_names(engine, inspected) + if schema_name.lower() != "information_schema" + ] + logger.info("This is the schema_names: %s", schema_names) + logger.info("This is the tables: %s", tables) + for schema_name in schema_names: + # Iterate through each table and view + for table_name, is_view in self.get_object_names( + engine, inspected, schema_name + ): + if (not tables) or (f"{schema_name}.{table_name}" in tables): + catalog_entry = self.discover_catalog_entry( + engine, inspected, schema_name, table_name, is_view + ) + result.append(catalog_entry.to_dict()) return result From 78fa22bbeb27ee4977eb1d255c661d319c6f379e Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Mon, 27 Nov 2023 05:45:08 -0800 Subject: [PATCH 8/9] added self.logger --- tap_snowflake/client.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index a5e8b7c..74fa0c4 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -13,7 +13,6 @@ from uuid import uuid4 import datetime -from loguru import logger import sqlalchemy from singer_sdk import SQLConnector, SQLStream, metrics from singer_sdk.helpers._batch import BaseBatchFileEncoding, BatchConfig @@ -119,8 +118,15 @@ def discover_catalog_entries(self) -> list[dict]: for schema_name in self.get_schema_names(engine, inspected) if schema_name.lower() != "information_schema" ] - logger.info("This is the schema_names: %s", schema_names) - logger.info("This is the tables: %s", tables) + #print("This is self.config: %s", self.config) + #print(self.config) + print("This is the schema_names: %s", schema_names) + print("This is the tables: %s", tables) + self.logger.info("This is tables") + self.logger.info(tables) + self.logger.info("This is schema_names") + self.logger.info(schema_names) + self.logger.info(self.config) for schema_name in schema_names: # Iterate through each table and view for table_name, is_view in self.get_object_names( From 6abc66eace51d4faea27788565b044742928d9c4 Mon Sep 17 00:00:00 2001 From: Nidhi Kakulawaram Date: Mon, 27 Nov 2023 09:27:02 -0800 Subject: [PATCH 9/9] done --- tap_snowflake/client.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tap_snowflake/client.py b/tap_snowflake/client.py index 74fa0c4..3c22601 100644 --- a/tap_snowflake/client.py +++ b/tap_snowflake/client.py @@ -12,6 +12,7 @@ from typing import Any, Iterable, List, Tuple from uuid import uuid4 import datetime +import re import sqlalchemy from singer_sdk import SQLConnector, SQLStream, metrics @@ -99,7 +100,7 @@ def create_engine(self) -> sqlalchemy.engine.Engine: self.sqlalchemy_url, echo=False, pool_timeout=10, - ) + ) # overridden to filter out the information_schema from catalog discovery def discover_catalog_entries(self) -> list[dict]: @@ -110,25 +111,24 @@ def discover_catalog_entries(self) -> list[dict]: """ result: list[dict] = [] tables = [t.lower() for t in self.config.get("tables", [])] - result.append(self.config) engine = self.create_sqlalchemy_engine() inspected = sqlalchemy.inspect(engine) - schema_names = [ - schema_name - for schema_name in self.get_schema_names(engine, inspected) - if schema_name.lower() != "information_schema" - ] - #print("This is self.config: %s", self.config) - #print(self.config) - print("This is the schema_names: %s", schema_names) - print("This is the tables: %s", tables) - self.logger.info("This is tables") - self.logger.info(tables) - self.logger.info("This is schema_names") - self.logger.info(schema_names) - self.logger.info(self.config) + + if self.config.get("schema"): + schema_names = [] + schema_names.append(self.config.get("schema")) + + else: + schema_names = [ + schema_name + for schema_name in self.get_schema_names(engine, inspected) + if schema_name.lower() != "information_schema" + ] + for schema_name in schema_names: # Iterate through each table and view + # We shouldn't have to iterate through every table in a schema if tables are provided + # However, the only way to get is_view for tables is self.get_object_names with schema for table_name, is_view in self.get_object_names( engine, inspected, schema_name ):