diff --git a/README.md b/README.md index 7efaa224..b775ffc8 100644 --- a/README.md +++ b/README.md @@ -14,20 +14,22 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target * `schema-flattening` ## Settings - -| Setting | Required | Default | Description | -| :------------------- | :------: | :-----------------: | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. | -| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. | -| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. | -| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. | -| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | -| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | -| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | -| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | -| stream_map_config | False | None | User-defined config values to be used within map expressions. | -| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | -| flattening_max_depth | False | None | The max depth to flatten schemas. | +| Setting | Required | Default | Description | +| :-------------------- | :------: | :-----------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. | +| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. | +| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. | +| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. | +| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | +| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port,dialect. Note that you must escape password special characters properly. See https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | +| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | +| default_target_schema | False | None | Postgres schema to send data to, example: tap-clickup | +| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. | +| add_record_metadata | False | 1 | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. | +| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | +| stream_map_config | False | None | User-defined config values to be used within map expressions. | +| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | +| flattening_max_depth | False | None | The max depth to flatten schemas. | A full list of supported settings and capabilities is available by running: `target-postgres --about` diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 90866d16..3d8dd5b2 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -2,9 +2,12 @@ import uuid from typing import Any, Dict, Iterable, List, Optional, Union +import sqlalchemy +from pendulum import now from singer_sdk.sinks import SQLSink from sqlalchemy import Column, MetaData, Table, insert from sqlalchemy.sql import Executable +from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector @@ -252,3 +255,67 @@ def schema_name(self) -> Optional[str]: # Schema name not detected. return None + + def activate_version(self, new_version: int) -> None: + """Bump the active version of the target table. + + Args: + new_version: The version number to activate. + """ + # There's nothing to do if the table doesn't exist yet + # (which it won't the first time the stream is processed) + if not self.connector.table_exists(self.full_table_name): + return + + deleted_at = now() + # Different from SingerSDK as we need to handle types the + # same as SCHEMA messsages + datetime_type = self.connector.to_sql_type( + {"type": "string", "format": "date-time"} + ) + + # Different from SingerSDK as we need to handle types the + # same as SCHEMA messsages + integer_type = self.connector.to_sql_type({"type": "integer"}) + + if not self.connector.column_exists( + full_table_name=self.full_table_name, + column_name=self.version_column_name, + ): + self.connector.prepare_column( + self.full_table_name, + self.version_column_name, + sql_type=integer_type, + ) + + self.logger.info("Hard delete: %s", self.config.get("hard_delete")) + if self.config["hard_delete"] is True: + self.connection.execute( + f"DELETE FROM {self.full_table_name} " + f"WHERE {self.version_column_name} <= {new_version} " + f"OR {self.version_column_name} IS NULL" + ) + return + + if not self.connector.column_exists( + full_table_name=self.full_table_name, + column_name=self.soft_delete_column_name, + ): + self.connector.prepare_column( + self.full_table_name, + self.soft_delete_column_name, + sql_type=datetime_type, + ) + # Need to deal with the case where data doesn't exist for the version column + query = sqlalchemy.text( + f"UPDATE {self.full_table_name}\n" + f"SET {self.soft_delete_column_name} = :deletedate \n" + f"WHERE {self.version_column_name} < :version " + f"OR {self.version_column_name} IS NULL \n" + f" AND {self.soft_delete_column_name} IS NULL\n" + ) + query = query.bindparams( + bindparam("deletedate", value=deleted_at, type_=datetime_type), + bindparam("version", value=new_version, type_=integer_type), + ) + self.connector.connection.execute(query) diff --git a/target_postgres/target.py b/target_postgres/target.py index d7ef2624..1b2e02e4 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -71,7 +71,7 @@ def __init__( th.StringType, description=( "User name used to authenticate. " - + "Note if sqlalchemy_url is set this will be ignored.", + + "Note if sqlalchemy_url is set this will be ignored." ), ), th.Property( @@ -95,9 +95,9 @@ def __init__( th.StringType, description=( "SQLAlchemy connection string. " - + "This will override using host, user, password, port," - + "dialect. Note that you must esacpe password special" - + "characters properly see" + + "This will override using host, user, password, port, " + + "dialect. Note that you must esacpe password special " + + "characters properly see " + "https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords" # noqa: E501 ), ), @@ -117,6 +117,27 @@ def __init__( th.StringType, description="Postgres schema to send data to, example: tap-clickup", ), + th.Property( + "hard_delete", + th.BooleanType, + default=False, + description=( + "When activate version is sent from a tap this specefies " + + "if we should delete the records that don't match, or mark " + + "them with a date in the `_sdc_deleted_at` column." + ), + ), + th.Property( + "add_record_metadata", + th.BooleanType, + default=True, + description=( + "Note that this must be enabled for activate_version to work!" + + "This adds _sdc_extracted_at, _sdc_batched_at, and more to every " + + "table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " # noqa: E501 + + "for more information." + ), + ), ).to_dict() default_sink_class = PostgresSink diff --git a/target_postgres/tests/data_files/activate_version_hard.singer b/target_postgres/tests/data_files/activate_version_hard.singer new file mode 100644 index 00000000..1d05c3f8 --- /dev/null +++ b/target_postgres/tests/data_files/activate_version_hard.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "test_activate_version_hard", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/activate_version_soft.singer b/target_postgres/tests/data_files/activate_version_soft.singer new file mode 100644 index 00000000..991434c5 --- /dev/null +++ b/target_postgres/tests/data_files/activate_version_soft.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/test_activate_version_deletes_data_properly.singer b/target_postgres/tests/data_files/test_activate_version_deletes_data_properly.singer new file mode 100644 index 00000000..d49c733e --- /dev/null +++ b/target_postgres/tests/data_files/test_activate_version_deletes_data_properly.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "test_activate_version_deletes_data_properly", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/test_activate_version_deletes_data_properly_2.singer b/target_postgres/tests/data_files/test_activate_version_deletes_data_properly_2.singer new file mode 100644 index 00000000..05e5f28b --- /dev/null +++ b/target_postgres/tests/data_files/test_activate_version_deletes_data_properly_2.singer @@ -0,0 +1,2 @@ +{"type": "SCHEMA", "stream": "test_activate_version_deletes_data_properly", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431564} diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index a2a7dc91..4a67eb6f 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -1,5 +1,6 @@ """ Attempt at making some standard Target Tests. """ # flake8: noqa +import copy import io import uuid from contextlib import redirect_stdout @@ -7,7 +8,9 @@ import jsonschema import pytest +import sqlalchemy from singer_sdk.testing import sync_end_to_end +from sqlalchemy import create_engine, engine_from_config from target_postgres.target import TargetPostgres from target_postgres.tests.samples.aapl.aapl import Fundamentals @@ -24,6 +27,9 @@ def postgres_config(): "user": "postgres", "password": "postgres", "database": "postgres", + "port": 5432, + "add_record_metadata": True, + "hard_delete": False, } @@ -32,6 +38,12 @@ def postgres_target(postgres_config) -> TargetPostgres: return TargetPostgres(config=postgres_config) +def sqlalchemy_engine(config) -> sqlalchemy.engine.Engine: + return create_engine( + f"{config['dialect+driver']}://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}" + ) + + def singer_file_to_target(file_name, target) -> None: """Singer file to Target, emulates a tap run @@ -232,3 +244,100 @@ def test_new_array_column(postgres_target): """Create a new Array column with an existing table""" file_name = "new_array_column.singer" singer_file_to_target(file_name, postgres_target) + + +def test_activate_version_hard_delete(postgres_config): + """Activate Version Hard Delete Test""" + file_name = "activate_version_hard.singer" + postgres_config_hard_delete_true = copy.deepcopy(postgres_config) + postgres_config_hard_delete_true["hard_delete"] = True + pg_hard_delete_true = TargetPostgres(config=postgres_config_hard_delete_true) + singer_file_to_target(file_name, pg_hard_delete_true) + engine = sqlalchemy_engine(postgres_config) + with engine.connect() as connection: + result = connection.execute("SELECT * FROM test_activate_version_hard") + assert result.rowcount == 7 + # Add a record like someone would if they weren't using the tap target combo + result = connection.execute( + "INSERT INTO test_activate_version_hard(code, \"name\") VALUES('Manual1', 'Meltano')" + ) + result = connection.execute( + "INSERT INTO test_activate_version_hard(code, \"name\") VALUES('Manual2', 'Meltano')" + ) + result = connection.execute("SELECT * FROM test_activate_version_hard") + assert result.rowcount == 9 + + singer_file_to_target(file_name, pg_hard_delete_true) + + # Should remove the 2 records we added manually + with engine.connect() as connection: + result = connection.execute("SELECT * FROM test_activate_version_hard") + assert result.rowcount == 7 + + +def test_activate_version_soft_delete(postgres_config): + """Activate Version Soft Delete Test""" + file_name = "activate_version_soft.singer" + engine = sqlalchemy_engine(postgres_config) + with engine.connect() as connection: + result = connection.execute("DROP TABLE IF EXISTS test_activate_version_soft") + postgres_config_soft_delete = copy.deepcopy(postgres_config) + postgres_config_soft_delete["hard_delete"] = False + pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete) + singer_file_to_target(file_name, pg_soft_delete) + + with engine.connect() as connection: + result = connection.execute("SELECT * FROM test_activate_version_soft") + assert result.rowcount == 7 + # Add a record like someone would if they weren't using the tap target combo + result = connection.execute( + "INSERT INTO test_activate_version_soft(code, \"name\") VALUES('Manual1', 'Meltano')" + ) + result = connection.execute( + "INSERT INTO test_activate_version_soft(code, \"name\") VALUES('Manual2', 'Meltano')" + ) + result = connection.execute("SELECT * FROM test_activate_version_soft") + assert result.rowcount == 9 + + singer_file_to_target(file_name, pg_soft_delete) + + # Should have all records including the 2 we added manually + with engine.connect() as connection: + result = connection.execute("SELECT * FROM test_activate_version_soft") + assert result.rowcount == 9 + + result = connection.execute( + "SELECT * FROM test_activate_version_soft where _sdc_deleted_at is NOT NULL" + ) + assert result.rowcount == 2 + + +def test_activate_version_deletes_data_properly(postgres_config): + """Activate Version should""" + table_name = "test_activate_version_deletes_data_properly" + file_name = f"{table_name}.singer" + engine = sqlalchemy_engine(postgres_config) + with engine.connect() as connection: + result = connection.execute(f"DROP TABLE IF EXISTS {table_name}") + + postgres_config_soft_delete = copy.deepcopy(postgres_config) + postgres_config_soft_delete["hard_delete"] = True + pg_hard_delete = TargetPostgres(config=postgres_config_soft_delete) + singer_file_to_target(file_name, pg_hard_delete) + # Will populate us with 7 records + with engine.connect() as connection: + result = connection.execute( + f"INSERT INTO {table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" + ) + result = connection.execute( + f"INSERT INTO {table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" + ) + result = connection.execute(f"SELECT * FROM {table_name}") + assert result.rowcount == 9 + + # Only has a schema and one activate_version message, should delete all records as it's a higher version than what's currently in the table + file_name = f"{table_name}_2.singer" + singer_file_to_target(file_name, pg_hard_delete) + with engine.connect() as connection: + result = connection.execute(f"SELECT * FROM {table_name}") + assert result.rowcount == 0