Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Activate Version with example test #89

Merged
merged 13 commits into from
Feb 14, 2023
30 changes: 16 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target
* `schema-flattening`

## Settings

| Setting | Required | Default | Description |
| :------------------- | :------: | :-----------------: | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. |
| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. |
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords |
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| Setting | Required | Default | Description |
| :-------------------- | :------: | :-----------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. |
| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. |
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port,dialect. Note that you must escape password special characters properly. See https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords |
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. |
| default_target_schema | False | None | Postgres schema to send data to, example: tap-clickup |
| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. |
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean should be generated as True / False here I think? I'll leave for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Very strange that it shows defaults of 0. Agreed though we don't need to block on this.

| add_record_metadata | False | 1 | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |

A full list of supported settings and capabilities is available by running: `target-postgres --about`

Expand Down
67 changes: 67 additions & 0 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import uuid
from typing import Any, Dict, Iterable, List, Optional, Union

import sqlalchemy
from pendulum import now
from singer_sdk.sinks import SQLSink
from sqlalchemy import Column, MetaData, Table, insert
from sqlalchemy.sql import Executable
from sqlalchemy.sql.expression import bindparam

from target_postgres.connector import PostgresConnector

Expand Down Expand Up @@ -252,3 +255,67 @@ def schema_name(self) -> Optional[str]:

# Schema name not detected.
return None

def activate_version(self, new_version: int) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could take this function and just drop it into the SDK as it is here, this just has a few tweaks that are needed to make this work in a more generic way

"""Bump the active version of the target table.

Args:
new_version: The version number to activate.
"""
# There's nothing to do if the table doesn't exist yet
# (which it won't the first time the stream is processed)
if not self.connector.table_exists(self.full_table_name):
return

deleted_at = now()
# Different from SingerSDK as we need to handle types the
# same as SCHEMA messsages
datetime_type = self.connector.to_sql_type(
{"type": "string", "format": "date-time"}
)

# Different from SingerSDK as we need to handle types the
# same as SCHEMA messsages
integer_type = self.connector.to_sql_type({"type": "integer"})

if not self.connector.column_exists(
full_table_name=self.full_table_name,
column_name=self.version_column_name,
):
self.connector.prepare_column(
self.full_table_name,
self.version_column_name,
sql_type=integer_type,
)

self.logger.info("Hard delete: %s", self.config.get("hard_delete"))
if self.config["hard_delete"] is True:
self.connection.execute(
f"DELETE FROM {self.full_table_name} "
visch marked this conversation as resolved.
Show resolved Hide resolved
f"WHERE {self.version_column_name} <= {new_version} "
f"OR {self.version_column_name} IS NULL"
)
return

if not self.connector.column_exists(
full_table_name=self.full_table_name,
column_name=self.soft_delete_column_name,
):
self.connector.prepare_column(
self.full_table_name,
self.soft_delete_column_name,
sql_type=datetime_type,
)
# Need to deal with the case where data doesn't exist for the version column
query = sqlalchemy.text(
f"UPDATE {self.full_table_name}\n"
f"SET {self.soft_delete_column_name} = :deletedate \n"
f"WHERE {self.version_column_name} < :version "
f"OR {self.version_column_name} IS NULL \n"
f" AND {self.soft_delete_column_name} IS NULL\n"
)
query = query.bindparams(
bindparam("deletedate", value=deleted_at, type_=datetime_type),
bindparam("version", value=new_version, type_=integer_type),
)
self.connector.connection.execute(query)
29 changes: 25 additions & 4 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
th.StringType,
description=(
"User name used to authenticate. "
+ "Note if sqlalchemy_url is set this will be ignored.",
visch marked this conversation as resolved.
Show resolved Hide resolved
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
Expand All @@ -95,9 +95,9 @@ def __init__(
th.StringType,
description=(
"SQLAlchemy connection string. "
+ "This will override using host, user, password, port,"
+ "dialect. Note that you must esacpe password special"
+ "characters properly see"
+ "This will override using host, user, password, port, "
+ "dialect. Note that you must esacpe password special "
+ "characters properly see "
+ "https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords" # noqa: E501
),
),
Expand All @@ -117,6 +117,27 @@ def __init__(
th.StringType,
description="Postgres schema to send data to, example: tap-clickup",
),
th.Property(
"hard_delete",
th.BooleanType,
default=False,
description=(
"When activate version is sent from a tap this specefies "
+ "if we should delete the records that don't match, or mark "
+ "them with a date in the `_sdc_deleted_at` column."
),
),
th.Property(
"add_record_metadata",
th.BooleanType,
default=True,
description=(
"Note that this must be enabled for activate_version to work!"
+ "This adds _sdc_extracted_at, _sdc_batched_at, and more to every "
+ "table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " # noqa: E501
+ "for more information."
),
),
).to_dict()
default_sink_class = PostgresSink

Expand Down
10 changes: 10 additions & 0 deletions target_postgres/tests/data_files/activate_version_hard.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"type": "SCHEMA", "stream": "test_activate_version_hard", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563}
10 changes: 10 additions & 0 deletions target_postgres/tests/data_files/activate_version_soft.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"type": "SCHEMA", "stream": "test_activate_version_deletes_data_properly", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563}
Loading