Skip to content

Commit

Permalink
fix(targets): Default handling of ACTIVATE_VERSION messages to soft…
Browse files Browse the repository at this point in the history
… deletes
  • Loading branch information
edgarrmondragon committed Dec 19, 2023
1 parent 0192d58 commit 154ca15
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
25 changes: 25 additions & 0 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1189,3 +1189,28 @@ def deserialize_json(self, json_str: str) -> object:
.. versionadded:: 0.31.0
"""
return json.loads(json_str, parse_float=decimal.Decimal)

def delete_old_versions(
self,
*,
full_table_name: str,
version_column_name: str,
current_version: int,
) -> None:
"""Hard-deletes any old version rows from the table.
This is used to clean up old versions when an ACTIVATE_VERSION message is
received.
Args:
full_table_name: The fully qualified table name.
version_column_name: The name of the version column.
current_version: The current ACTIVATE version of the table.
"""
with self._connect() as conn, conn.begin():
conn.execute(
sqlalchemy.text(
f"DELETE FROM {full_table_name} " # noqa: S608
f"WHERE {version_column_name} <= {current_version}",
),
)
8 changes: 8 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@
description="Add metadata to records.",
),
).to_dict()
TARGET_HARD_DELETE_CONFIG = PropertiesList(
Property(
"hard_delete",
BooleanType(),
description="Hard delete records.",
default=False,
),
).to_dict()


class TargetLoadMethods(str, Enum):
Expand Down
14 changes: 6 additions & 8 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,12 @@ def activate_version(self, new_version: int) -> None:
sql_type=sqlalchemy.types.Integer(),
)

if self.config.get("hard_delete", True):
with self.connector._connect() as conn, conn.begin(): # noqa: SLF001
conn.execute(
sqlalchemy.text(
f"DELETE FROM {self.full_table_name} " # noqa: S608
f"WHERE {self.version_column_name} <= {new_version}",
),
)
if self.config.get("hard_delete", False):
self.connector.delete_old_versions(
full_table_name=self.full_table_name,
version_column_name=self.version_column_name,
current_version=new_version,
)
return

if not self.connector.column_exists(
Expand Down
11 changes: 10 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from singer_sdk.helpers.capabilities import (
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_HARD_DELETE_CONFIG,
TARGET_LOAD_METHOD_CONFIG,
TARGET_SCHEMA_CONFIG,
CapabilitiesEnum,
Expand Down Expand Up @@ -636,7 +637,12 @@ def capabilities(self) -> list[CapabilitiesEnum]:
A list of capabilities supported by this target.
"""
sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities
sql_target_capabilities.extend([TargetCapabilities.TARGET_SCHEMA])
sql_target_capabilities.extend(
[
TargetCapabilities.TARGET_SCHEMA,
TargetCapabilities.HARD_DELETE,
]
)

return sql_target_capabilities

Expand Down Expand Up @@ -668,6 +674,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
if TargetCapabilities.TARGET_SCHEMA in capabilities:
_merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema)

if TargetCapabilities.HARD_DELETE in capabilities:
_merge_missing(TARGET_HARD_DELETE_CONFIG, config_jsonschema)

super().append_builtin_config(config_jsonschema)

@final
Expand Down
8 changes: 4 additions & 4 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def sqlite_sample_target(sqlite_target_test_config):


@pytest.fixture
def sqlite_sample_target_soft_delete(sqlite_target_test_config):
def sqlite_sample_target_hard_delete(sqlite_target_test_config):
"""Get a sample target object with hard_delete disabled."""
return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": False})
return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True})


@pytest.fixture
Expand Down Expand Up @@ -217,7 +217,7 @@ def test_sqlite_column_addition(sqlite_sample_target: SQLTarget):

def test_sqlite_activate_version(
sqlite_sample_target: SQLTarget,
sqlite_sample_target_soft_delete: SQLTarget,
sqlite_sample_target_hard_delete: SQLTarget,
):
"""Test handling the activate_version message for the SQLite target.
Expand Down Expand Up @@ -249,7 +249,7 @@ def test_sqlite_activate_version(

target_sync_test(sqlite_sample_target, input=StringIO(tap_output), finalize=True)
target_sync_test(
sqlite_sample_target_soft_delete,
sqlite_sample_target_hard_delete,
input=StringIO(tap_output),
finalize=True,
)
Expand Down

0 comments on commit 154ca15

Please sign in to comment.