From 4f677d3f30ceaf8415dd1a401d22ab8aaf06e2a2 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Thu, 15 Jun 2023 15:10:40 -0400 Subject: [PATCH] feat: Use SQLAlchemy core objects where possible and append only added (#138) Closes #121, #22, #64, #119, #21, #54 --- .gitignore | 139 +++++++++++ meltano.yml | 48 ++-- target_postgres/connector.py | 102 +++++--- target_postgres/sinks.py | 218 +++++++++++------- target_postgres/target.py | 1 + ...nger => test_activate_version_hard.singer} | 0 ...nger => test_activate_version_soft.singer} | 0 ..._primary_keys.singer => test_no_pk.singer} | 0 ...append.singer => test_no_pk_append.singer} | 0 target_postgres/tests/test_standard_target.py | 109 +++++---- 10 files changed, 440 insertions(+), 177 deletions(-) rename target_postgres/tests/data_files/{activate_version_hard.singer => test_activate_version_hard.singer} (100%) rename target_postgres/tests/data_files/{activate_version_soft.singer => test_activate_version_soft.singer} (100%) rename target_postgres/tests/data_files/{no_primary_keys.singer => test_no_pk.singer} (100%) rename target_postgres/tests/data_files/{no_primary_keys_append.singer => test_no_pk_append.singer} (100%) diff --git a/.gitignore b/.gitignore index a8977e70..18d70ceb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,142 @@ smoke-test .tox/** .vscode/** output/** +.env +plugins/** + +# Secrets and internal config files +.secrets/* + +# Ignore meltano internal cache and sqlite systemdb + +.meltano/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ diff --git a/meltano.yml b/meltano.yml index 0c7f9fc6..62d70be0 100644 --- a/meltano.yml +++ b/meltano.yml @@ -1,33 +1,49 @@ version: 1 send_anonymous_usage_stats: true default_environment: dev -project_id: "target-postgres" +project_id: target-postgres plugins: extractors: - - name: "tap-smoke-test" - namespace: "tap_smoke_test" - executable: "tap-smoke-test" - pip_url: "git+https://github.com/meltano/tap-smoke-test.git" + - name: tap-smoke-test + namespace: tap_smoke_test + pip_url: git+https://github.com/meltano/tap-smoke-test.git + executable: tap-smoke-test config: streams: - stream_name: animals - "input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl" + input_filename: https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl - stream_name: page_views - "input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/pageviews-data.jsonl" + input_filename: https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/pageviews-data.jsonl stream_maps: - "animals": - "__key_properties__": ["id"] - "page_views": - "__key_properties__": ["vistor_id"] + animals: + __key_properties__: [id] + page_views: + __key_properties__: [vistor_id] + - name: tap-github + variant: meltanolabs + pip_url: git+https://github.com/MeltanoLabs/tap-github.git + config: + repositories: + - sbalnojan/meltano-lightdash + start_date: '2022-01-01' + select: + - commits.url + - commits.sha + - commits.commit_timestamp loaders: - - name: "target-postgres" - namespace: "target_postgres" + - name: target-postgres + namespace: target_postgres pip_url: -e . - config: - sqlalchemy_url: "postgresql://postgres:postgres@localhost:5432/postgres" - target_schema: test settings: - name: sqlalchemy_url kind: password + config: + host: localhost + port: 5432 + user: postgres + password: postgres + database: postgres + target_schema: test + add_record_metadata: true environments: - name: dev diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 776bca29..fea0932a 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -20,6 +20,43 @@ class PostgresConnector(SQLConnector): allow_merge_upsert: bool = True # Whether MERGE UPSERT is supported. allow_temp_tables: bool = True # Whether temp tables are supported. + def prepare_table( + self, + full_table_name: str, + schema: dict, + primary_keys: list[str], + partition_keys: list[str] | None = None, + as_temp_table: bool = False, + ) -> sqlalchemy.Table: + """Adapt target table to provided schema if possible. + + Args: + full_table_name: the target table name. + schema: the JSON Schema for the table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + """ + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + meta = sqlalchemy.MetaData(bind=self._engine, schema=schema_name) + if not self.table_exists(full_table_name=full_table_name): + table = self.create_empty_table( + table_name=table_name, + meta=meta, + schema=schema, + primary_keys=primary_keys, + partition_keys=partition_keys, + as_temp_table=as_temp_table, + ) + return table + for property_name, property_def in schema["properties"].items(): + self.prepare_column( + full_table_name, property_name, self.to_sql_type(property_def) + ) + meta.reflect(only=[table_name]) + + return meta.tables[full_table_name] + def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: """Return a new SQLAlchemy connection using the provided config. @@ -53,22 +90,30 @@ def get_sqlalchemy_url(self, config: dict) -> str: ) return cast(str, sqlalchemy_url) - def truncate_table(self, name): - """Clear table data.""" - self.connection.execute(f"TRUNCATE TABLE {name}") - - def drop_table(self, name): + def drop_table(self, table: sqlalchemy.Table): """Drop table data.""" - self.connection.execute(f"DROP TABLE {name}") - - def create_temp_table_from_table(self, from_table_name, temp_table_name): - """Temp table from another table.""" - ddl = sqlalchemy.DDL( - "CREATE TEMP TABLE %(temp_table_name)s AS " - "SELECT * FROM %(from_table_name)s LIMIT 0", - {"temp_table_name": temp_table_name, "from_table_name": from_table_name}, - ) - self.connection.execute(ddl) + table.drop(bind=self.connection) + + def clone_table( + self, new_table_name, table, metadata, connection, temp_table + ) -> sqlalchemy.Table: + """Clone a table.""" + new_columns = [] + for column in table.columns: + new_columns.append( + sqlalchemy.Column( + column.name, + column.type, + ) + ) + if temp_table is True: + new_table = sqlalchemy.Table( + new_table_name, metadata, *new_columns, prefixes=["TEMPORARY"] + ) + else: + new_table = sqlalchemy.Table(new_table_name, metadata, *new_columns) + new_table.create(bind=connection) + return new_table @staticmethod def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: @@ -99,12 +144,13 @@ def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: def create_empty_table( self, - full_table_name: str, + table_name: str, + meta: sqlalchemy.MetaData, schema: dict, primary_keys: list[str] | None = None, partition_keys: list[str] | None = None, as_temp_table: bool = False, - ) -> None: + ) -> sqlalchemy.Table: """Create an empty target table. Args: @@ -118,21 +164,16 @@ def create_empty_table( NotImplementedError: if temp tables are unsupported and as_temp_table=True. RuntimeError: if a variant schema is passed with no properties defined. """ - if as_temp_table: - raise NotImplementedError("Temporary tables are not supported.") - - _ = partition_keys # Not supported in generic implementation. - - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) columns: list[sqlalchemy.Column] = [] primary_keys = primary_keys or [] try: properties: dict = schema["properties"] except KeyError: raise RuntimeError( - f"Schema for '{full_table_name}' does not define properties: {schema}" + f"Schema for table_name: '{table_name}'" + f"does not define properties: {schema}" ) + for property_name, property_jsonschema in properties.items(): is_primary_key = property_name in primary_keys columns.append( @@ -142,9 +183,16 @@ def create_empty_table( primary_key=is_primary_key, ) ) + if as_temp_table: + new_table = sqlalchemy.Table( + table_name, meta, *columns, prefixes=["TEMPORARY"] + ) + new_table.create(bind=self.connection) + return new_table - _ = sqlalchemy.Table(table_name, meta, *columns) - meta.create_all(self._engine) + new_table = sqlalchemy.Table(table_name, meta, *columns) + new_table.create(bind=self.connection) + return new_table def get_column_add_ddl( self, diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 3d8dd5b2..a94e4747 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -5,7 +5,7 @@ import sqlalchemy from pendulum import now from singer_sdk.sinks import SQLSink -from sqlalchemy import Column, MetaData, Table, insert +from sqlalchemy import Column, MetaData, Table, insert, select, update from sqlalchemy.sql import Executable from sqlalchemy.sql.expression import bindparam @@ -22,6 +22,25 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.temp_table_name = self.generate_temp_table_name() + @property + def connector(self) -> PostgresConnector: + """The connector object. + + Returns: + The connector object. + """ + return self._connector + + @property + def append_only(self) -> bool: + """Return True if the target is append only.""" + return self._append_only + + @append_only.setter + def append_only(self, value: bool) -> None: + """Set the append_only attribute.""" + self._append_only = value + def setup(self) -> None: """Set up Sink. @@ -29,11 +48,9 @@ def setup(self) -> None: Table entities in the target database. """ if self.key_properties is None or self.key_properties == []: - raise ValueError( - "key_properties must be set. See" - "https://github.com/MeltanoLabs/target-postgres/issues/54" - "for more information." - ) + self.append_only = True + else: + self.append_only = False if self.schema_name: self.connector.prepare_schema(self.schema_name) self.connector.prepare_table( @@ -53,97 +70,48 @@ def process_batch(self, context: dict) -> None: context: Stream partition or context dictionary. """ # First we need to be sure the main table is already created - self.connector.prepare_table( + table: sqlalchemy.Table = self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.schema, primary_keys=self.key_properties, as_temp_table=False, ) # Create a temp table (Creates from the table above) - self.connector.create_temp_table_from_table( - from_table_name=self.full_table_name, temp_table_name=self.temp_table_name + temp_table: sqlalchemy.Table = self.connector.prepare_table( + full_table_name=self.temp_table_name, + schema=self.schema, + primary_keys=self.key_properties, + as_temp_table=True, ) # Insert into temp table self.bulk_insert_records( - full_table_name=self.temp_table_name, + table=temp_table, schema=self.schema, primary_keys=self.key_properties, records=context["records"], ) # Merge data from Temp table to main table - self.merge_upsert_from_table( - from_table_name=self.temp_table_name, - to_table_name=self.full_table_name, + self.upsert( + from_table=temp_table, + to_table=table, schema=self.schema, join_keys=self.key_properties, ) # Drop temp table - self.connector.drop_table(self.temp_table_name) + self.connector.drop_table(temp_table) def generate_temp_table_name(self): """Uuid temp table name.""" - # Table name makes debugging easier when data cannot be written to the - # temp table for some reason - return f"temp_{self.table_name}_{str(uuid.uuid4()).replace('-','_')}" - - def merge_upsert_from_table( - self, - from_table_name: str, - to_table_name: str, - schema: dict, - join_keys: List[str], - ) -> Optional[int]: - """Merge upsert data from one table to another. - - Args: - from_table_name: The source table name. - to_table_name: The destination table name. - join_keys: The merge upsert keys, or `None` to append. - schema: Singer Schema message. - - Return: - The number of records copied, if detectable, or `None` if the API does not - report number of records affected/inserted. - - """ - # TODO think about sql injeciton, - # issue here https://github.com/MeltanoLabs/target-postgres/issues/22 - - # INSERT - join_condition = " and ".join( - [f'temp."{key}" = target."{key}"' for key in join_keys] - ) - where_condition = " and ".join([f'target."{key}" is null' for key in join_keys]) - - insert_sql = f""" - INSERT INTO {to_table_name} - SELECT - temp.* - FROM {from_table_name} AS temp - LEFT JOIN {to_table_name} AS target ON {join_condition} - WHERE {where_condition} - """ - self.connection.execute(insert_sql) - - # UPDATE - columns = ", ".join( - [ - f'"{column_name}"=temp."{column_name}"' - for column_name in self.schema["properties"].keys() - ] - ) - where_condition = join_condition - update_sql = f""" - UPDATE {to_table_name} AS target - SET {columns} - FROM {from_table_name} AS temp - WHERE {where_condition} - """ - self.connection.execute(update_sql) + # sqlalchemy.exc.IdentifierError: Identifier + # 'temp_test_optional_attributes_388470e9_fbd0_47b7_a52f_d32a2ee3f5f6' + # exceeds maximum length of 63 characters + # Is hit if we have a long table name, there is no limit on Temporary tables + # in postgres, used a guid just in case we are using the same session + return f"{str(uuid.uuid4()).replace('-','_')}" def bulk_insert_records( self, - full_table_name: str, + table: sqlalchemy.Table, schema: dict, records: Iterable[Dict[str, Any]], primary_keys: List[str], @@ -165,30 +133,110 @@ def bulk_insert_records( """ columns = self.column_representation(schema) insert = self.generate_insert_statement( - full_table_name, + table.name, columns, ) self.logger.info("Inserting with SQL: %s", insert) # Only one record per PK, we want to take the last one - insert_records: Dict[str, Dict] = {} # pk : record - for record in records: - insert_record = {} - for column in columns: - insert_record[column.name] = record.get(column.name) + data_to_insert: List[Dict[str, Any]] = [] + + if self.append_only is False: + insert_records: Dict[str, Dict] = {} # pk : record try: - primary_key_value = "".join([str(record[key]) for key in primary_keys]) + for record in records: + insert_record = {} + for column in columns: + insert_record[column.name] = record.get(column.name) + primary_key_value = "".join( + [str(record[key]) for key in primary_keys] + ) + insert_records[primary_key_value] = insert_record except KeyError: raise RuntimeError( "Primary key not found in record. " - f"full_table_name: {full_table_name}. " - f"schema: {schema}. " + f"full_table_name: {table.name}. " + f"schema: {table.schema}. " f"primary_keys: {primary_keys}." ) - insert_records[primary_key_value] = insert_record - - self.connector.connection.execute(insert, list(insert_records.values())) + data_to_insert = list(insert_records.values()) + else: + for record in records: + insert_record = {} + for column in columns: + insert_record[column.name] = record.get(column.name) + data_to_insert.append(insert_record) + self.connector.connection.execute(insert, data_to_insert) return True + def upsert( + self, + from_table: sqlalchemy.Table, + to_table: sqlalchemy.Table, + schema: dict, + join_keys: List[Column], + ) -> Optional[int]: + """Merge upsert data from one table to another. + + Args: + from_table_name: The source table name. + to_table_name: The destination table name. + join_keys: The merge upsert keys, or `None` to append. + schema: Singer Schema message. + + Return: + The number of records copied, if detectable, or `None` if the API does not + report number of records affected/inserted. + + """ + if self.append_only is True: + # Insert + select_stmt = select(from_table.columns).select_from(from_table) + insert_stmt = to_table.insert().from_select( + names=from_table.columns, select=select_stmt + ) + self.connection.execute(insert_stmt) + else: + # Insert + where_condition = " and ".join( + [f'target."{key}" is null' for key in join_keys] + ) + join_predicates = [] + for key in join_keys: + from_table_key: sqlalchemy.Column = from_table.columns[key] + to_table_key: sqlalchemy.Column = to_table.columns[key] + join_predicates.append(from_table_key == to_table_key) + + join_condition = sqlalchemy.and_(*join_predicates) + + where_predicates = [] + for key in join_keys: + to_table_key: sqlalchemy.Column = to_table.columns[key] + where_predicates.append(to_table_key.is_(None)) + where_condition = sqlalchemy.and_(*where_predicates) + + select_stmt = ( + select(from_table.columns) + .select_from(from_table.outerjoin(to_table, join_condition)) + .where(where_condition) + ) + insert_stmt = insert(to_table).from_select( + names=from_table.columns, select=select_stmt + ) + self.connection.execute(insert_stmt) + + # Update + where_condition = join_condition + update_columns = {} + for column_name in self.schema["properties"].keys(): + from_table_column: sqlalchemy.Column = from_table.columns[column_name] + to_table_column: sqlalchemy.Column = to_table.columns[column_name] + update_columns[from_table_column] = to_table_column + + update_stmt = ( + update(from_table).where(where_condition).values(update_columns) + ) + self.connection.execute(update_stmt) + def column_representation( self, schema: dict, diff --git a/target_postgres/target.py b/target_postgres/target.py index 1b2e02e4..fa99fb00 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -116,6 +116,7 @@ def __init__( "default_target_schema", th.StringType, description="Postgres schema to send data to, example: tap-clickup", + default="melty", ), th.Property( "hard_delete", diff --git a/target_postgres/tests/data_files/activate_version_hard.singer b/target_postgres/tests/data_files/test_activate_version_hard.singer similarity index 100% rename from target_postgres/tests/data_files/activate_version_hard.singer rename to target_postgres/tests/data_files/test_activate_version_hard.singer diff --git a/target_postgres/tests/data_files/activate_version_soft.singer b/target_postgres/tests/data_files/test_activate_version_soft.singer similarity index 100% rename from target_postgres/tests/data_files/activate_version_soft.singer rename to target_postgres/tests/data_files/test_activate_version_soft.singer diff --git a/target_postgres/tests/data_files/no_primary_keys.singer b/target_postgres/tests/data_files/test_no_pk.singer similarity index 100% rename from target_postgres/tests/data_files/no_primary_keys.singer rename to target_postgres/tests/data_files/test_no_pk.singer diff --git a/target_postgres/tests/data_files/no_primary_keys_append.singer b/target_postgres/tests/data_files/test_no_pk_append.singer similarity index 100% rename from target_postgres/tests/data_files/no_primary_keys_append.singer rename to target_postgres/tests/data_files/test_no_pk_append.singer diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index f437e74c..8124debb 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -31,6 +31,7 @@ def postgres_config(): "port": 5432, "add_record_metadata": True, "hard_delete": False, + "default_target_schema": "melty", } @@ -39,9 +40,10 @@ def postgres_target(postgres_config) -> TargetPostgres: return TargetPostgres(config=postgres_config) -def sqlalchemy_engine(config) -> sqlalchemy.engine.Engine: +@pytest.fixture +def engine(postgres_config) -> sqlalchemy.engine.Engine: return create_engine( - f"{config['dialect+driver']}://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}" + f"{(postgres_config)['dialect+driver']}://{(postgres_config)['user']}:{(postgres_config)['password']}@{(postgres_config)['host']}:{(postgres_config)['port']}/{(postgres_config)['database']}" ) @@ -143,11 +145,9 @@ def test_countries_to_postgres(postgres_config): def test_aapl_to_postgres(postgres_config): - """Expect to fail with ValueError due to primary key https://github.com/MeltanoLabs/target-postgres/issues/54""" - with pytest.raises(ValueError): - tap = Fundamentals(config={}, state=None) - target = TargetPostgres(config=postgres_config) - sync_end_to_end(tap, target) + tap = Fundamentals(config={}, state=None) + target = TargetPostgres(config=postgres_config) + sync_end_to_end(tap, target) def test_record_before_schema(postgres_target): @@ -182,14 +182,11 @@ def test_record_missing_required_property(postgres_target): singer_file_to_target(file_name, postgres_target) -@pytest.mark.xfail def test_camelcase(postgres_target): - """https://github.com/MeltanoLabs/target-postgres/issues/64 will address fixing this""" file_name = "camelcase.singer" singer_file_to_target(file_name, postgres_target) -@pytest.mark.xfail def test_special_chars_in_attributes(postgres_target): file_name = "special_chars_in_attributes.singer" singer_file_to_target(file_name, postgres_target) @@ -202,10 +199,9 @@ def test_optional_attributes(postgres_target): def test_schema_no_properties(postgres_target): - """Expect to fail with ValueError due to primary key https://github.com/MeltanoLabs/target-postgres/issues/54""" - with pytest.raises(ValueError): - file_name = "schema_no_properties.singer" - singer_file_to_target(file_name, postgres_target) + """Expect to fail with ValueError""" + file_name = "schema_no_properties.singer" + singer_file_to_target(file_name, postgres_target) # TODO test that data is correct @@ -229,14 +225,28 @@ def test_relational_data(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_no_primary_keys(postgres_target): - """Expect to fail with ValueError due to primary key https://github.com/MeltanoLabs/target-postgres/issues/54""" - with pytest.raises(ValueError): - file_name = "no_primary_keys.singer" - singer_file_to_target(file_name, postgres_target) +def test_no_primary_keys(postgres_target, engine): + """We run both of these tests twice just to ensure that no records are removed and append only works properly""" + table_name = "test_no_pk" + full_table_name = postgres_target.config["default_target_schema"] + "." + table_name + with engine.connect() as connection: + result = connection.execute(f"DROP TABLE IF EXISTS {full_table_name}") + file_name = f"{table_name}.singer" + singer_file_to_target(file_name, postgres_target) - file_name = "no_primary_keys_append.singer" - singer_file_to_target(file_name, postgres_target) + file_name = f"{table_name}_append.singer" + singer_file_to_target(file_name, postgres_target) + + file_name = f"{table_name}.singer" + singer_file_to_target(file_name, postgres_target) + + file_name = f"{table_name}_append.singer" + singer_file_to_target(file_name, postgres_target) + + # Will populate us with 22 records, we run this twice + with engine.connect() as connection: + result = connection.execute(f"SELECT * FROM {full_table_name}") + assert result.rowcount == 16 # TODO test that data is correct @@ -268,9 +278,8 @@ def test_encoded_string_data(postgres_target): def test_tap_appl(postgres_target): """Expect to fail with ValueError due to primary key https://github.com/MeltanoLabs/target-postgres/issues/54""" - with pytest.raises(ValueError): - file_name = "tap_aapl.singer" - singer_file_to_target(file_name, postgres_target) + file_name = "tap_aapl.singer" + singer_file_to_target(file_name, postgres_target) def test_tap_countries(postgres_target): @@ -302,79 +311,81 @@ def test_new_array_column(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_activate_version_hard_delete(postgres_config): +def test_activate_version_hard_delete(postgres_config, engine): """Activate Version Hard Delete Test""" - file_name = "activate_version_hard.singer" + table_name = "test_activate_version_hard" + file_name = f"{table_name}.singer" + full_table_name = postgres_config["default_target_schema"] + "." + table_name postgres_config_hard_delete_true = copy.deepcopy(postgres_config) postgres_config_hard_delete_true["hard_delete"] = True pg_hard_delete_true = TargetPostgres(config=postgres_config_hard_delete_true) singer_file_to_target(file_name, pg_hard_delete_true) - engine = sqlalchemy_engine(postgres_config) with engine.connect() as connection: - result = connection.execute("SELECT * FROM test_activate_version_hard") + result = connection.execute(f"SELECT * FROM {full_table_name}") assert result.rowcount == 7 # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - "INSERT INTO test_activate_version_hard(code, \"name\") VALUES('Manual1', 'Meltano')" + f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" ) result = connection.execute( - "INSERT INTO test_activate_version_hard(code, \"name\") VALUES('Manual2', 'Meltano')" + f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" ) - result = connection.execute("SELECT * FROM test_activate_version_hard") + result = connection.execute(f"SELECT * FROM {full_table_name}") assert result.rowcount == 9 singer_file_to_target(file_name, pg_hard_delete_true) # Should remove the 2 records we added manually with engine.connect() as connection: - result = connection.execute("SELECT * FROM test_activate_version_hard") + result = connection.execute(f"SELECT * FROM {full_table_name}") assert result.rowcount == 7 -def test_activate_version_soft_delete(postgres_config): +def test_activate_version_soft_delete(postgres_config, engine): """Activate Version Soft Delete Test""" - file_name = "activate_version_soft.singer" - engine = sqlalchemy_engine(postgres_config) + table_name = "test_activate_version_soft" + file_name = f"{table_name}.singer" + full_table_name = postgres_config["default_target_schema"] + "." + table_name with engine.connect() as connection: - result = connection.execute("DROP TABLE IF EXISTS test_activate_version_soft") + result = connection.execute(f"DROP TABLE IF EXISTS {full_table_name}") postgres_config_soft_delete = copy.deepcopy(postgres_config) postgres_config_soft_delete["hard_delete"] = False pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete) singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection: - result = connection.execute("SELECT * FROM test_activate_version_soft") + result = connection.execute(f"SELECT * FROM {full_table_name}") assert result.rowcount == 7 # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - "INSERT INTO test_activate_version_soft(code, \"name\") VALUES('Manual1', 'Meltano')" + f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" ) result = connection.execute( - "INSERT INTO test_activate_version_soft(code, \"name\") VALUES('Manual2', 'Meltano')" + f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" ) - result = connection.execute("SELECT * FROM test_activate_version_soft") + result = connection.execute(f"SELECT * FROM {full_table_name}") assert result.rowcount == 9 singer_file_to_target(file_name, pg_soft_delete) # Should have all records including the 2 we added manually with engine.connect() as connection: - result = connection.execute("SELECT * FROM test_activate_version_soft") + result = connection.execute(f"SELECT * FROM {full_table_name}") assert result.rowcount == 9 result = connection.execute( - "SELECT * FROM test_activate_version_soft where _sdc_deleted_at is NOT NULL" + f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" ) assert result.rowcount == 2 -def test_activate_version_deletes_data_properly(postgres_config): +def test_activate_version_deletes_data_properly(postgres_config, engine): """Activate Version should""" table_name = "test_activate_version_deletes_data_properly" file_name = f"{table_name}.singer" - engine = sqlalchemy_engine(postgres_config) + full_table_name = postgres_config["default_target_schema"] + "." + table_name with engine.connect() as connection: - result = connection.execute(f"DROP TABLE IF EXISTS {table_name}") + result = connection.execute(f"DROP TABLE IF EXISTS {full_table_name}") postgres_config_soft_delete = copy.deepcopy(postgres_config) postgres_config_soft_delete["hard_delete"] = True @@ -383,17 +394,17 @@ def test_activate_version_deletes_data_properly(postgres_config): # Will populate us with 7 records with engine.connect() as connection: result = connection.execute( - f"INSERT INTO {table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" + f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" ) result = connection.execute( - f"INSERT INTO {table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" + f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" ) - result = connection.execute(f"SELECT * FROM {table_name}") + result = connection.execute(f"SELECT * FROM {full_table_name}") assert result.rowcount == 9 # Only has a schema and one activate_version message, should delete all records as it's a higher version than what's currently in the table file_name = f"{table_name}_2.singer" singer_file_to_target(file_name, pg_hard_delete) with engine.connect() as connection: - result = connection.execute(f"SELECT * FROM {table_name}") + result = connection.execute(f"SELECT * FROM {full_table_name}") assert result.rowcount == 0