Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(targets): Added a new built-in setting activate_version for targets to optionally disable processing of ACTIVATE_VERSION messages #2784

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions samples/sample_target_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import datetime
import sqlite3
import typing as t

from singer_sdk import SQLConnector, SQLSink, SQLTarget
Expand All @@ -10,6 +12,46 @@
DB_PATH_CONFIG = "path_to_db"


def adapt_date_iso(val):
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()


def adapt_datetime_iso(val):
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()


def adapt_datetime_epoch(val):
"""Adapt datetime.datetime to Unix timestamp."""
return int(val.timestamp())


sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch)


def convert_date(val):
"""Convert ISO 8601 date to datetime.date object."""
return datetime.date.fromisoformat(val.decode())


def convert_datetime(val):
"""Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.datetime.fromisoformat(val.decode())


def convert_timestamp(val):
"""Convert Unix epoch timestamp to datetime.datetime object."""
return datetime.datetime.fromtimestamp(int(val), tz=datetime.timezone.utc)


sqlite3.register_converter("date", convert_date)
sqlite3.register_converter("datetime", convert_datetime)
sqlite3.register_converter("timestamp", convert_timestamp)


class SQLiteConnector(SQLConnector):
"""The connector for SQLite.

Expand Down
9 changes: 9 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@
description="The default target database schema name to use for all streams.",
),
).to_dict()
ACTIVATE_VERSION_CONFIG = PropertiesList(
Property(
"activate_version",
BooleanType,
default=True,
title="Process `ACTIVATE_VERSION` messages",
description="Whether to process `ACTIVATE_VERSION` messages.",
),
).to_dict()
ADD_RECORD_METADATA_CONFIG = PropertiesList(
Property(
"add_record_metadata",
Expand Down
9 changes: 9 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ def include_sdc_metadata_properties(self) -> bool:
"""
return self.config.get("add_record_metadata", False)

@property
def process_activate_version_messages(self) -> bool:
"""Check if activate version messages should be processed.

Returns:
True if activate version messages should be processed.
"""
return self.config.get("activate_version", True)

@property
def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum:
"""Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL.
Expand Down
18 changes: 18 additions & 0 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers.capabilities import (
ACTIVATE_VERSION_CONFIG,
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_BATCH_SIZE_ROWS_CONFIG,
Expand Down Expand Up @@ -454,6 +455,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None:

for stream_map in self.mapper.stream_maps[stream_name]:
sink = self.get_sink(stream_map.stream_alias)
if not sink.process_activate_version_messages:
self.logger.warning(
"`ACTIVATE_VERSION` messages are not enabled for '%s'. Ignoring.",
stream_map.stream_alias,
)
continue
if not sink.include_sdc_metadata_properties:
self.logger.warning(
"The `ACTIVATE_VERSION` feature uses the `_sdc_deleted_at` and "
"`_sdc_deleted_at` metadata properties so they will be added to "
"the schema for '%s' even though `add_record_metadata` is "
"disabled.",
)
sink.activate_version(message_dict["version"])

def _process_batch_message(self, message_dict: dict) -> None:
Expand Down Expand Up @@ -621,6 +635,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:

capabilities = cls.capabilities

if PluginCapabilities.ACTIVATE_VERSION in capabilities:
_merge_missing(ACTIVATE_VERSION_CONFIG, config_jsonschema)

if PluginCapabilities.BATCH in capabilities:
_merge_missing(BATCH_CONFIG, config_jsonschema)

Expand Down Expand Up @@ -660,6 +677,7 @@ def capabilities(self) -> list[CapabilitiesEnum]:
sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities
sql_target_capabilities.extend(
[
PluginCapabilities.ACTIVATE_VERSION,
TargetCapabilities.TARGET_SCHEMA,
TargetCapabilities.HARD_DELETE,
]
Expand Down
2 changes: 1 addition & 1 deletion tests/samples/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@pytest.fixture
def csv_config(outdir: str) -> dict:
"""Get configuration dictionary for target-csv."""
return {"target_folder": outdir}
return {"target_folder": outdir, "add_record_metadata": False}


@pytest.fixture
Expand Down
141 changes: 140 additions & 1 deletion tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@
from singer_sdk.target_base import SQLTarget


def get_table(config: dict, table_name: str) -> sa.Table:
"""Get SQLAlchemy metadata and table for inspection.

Args:
config: Target configuration dictionary containing database path
table_name: Name of the table to inspect

Returns:
Tuple of (metadata, table)
"""
db_path = config["path_to_db"]
engine = sa.create_engine(f"sqlite:///{db_path}")
meta = sa.MetaData()
meta.reflect(bind=engine)
return meta.tables[table_name]


@pytest.fixture
def path_to_target_db(tmp_path: Path) -> Path:
return Path(f"{tmp_path}/target_test.db")
Expand All @@ -50,7 +67,27 @@ def sqlite_sample_target(sqlite_target_test_config):
@pytest.fixture
def sqlite_sample_target_hard_delete(sqlite_target_test_config):
"""Get a sample target object with hard_delete disabled."""
return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True})
return SQLiteTarget(
config={
**sqlite_target_test_config,
"hard_delete": True,
"add_record_metadata": False,
}
)


@pytest.fixture
def sqlite_sample_target_no_activate_version(sqlite_target_test_config):
"""Get a sample target object with hard_delete disabled."""
return SQLiteTarget(config={**sqlite_target_test_config, "activate_version": False})


@pytest.fixture
def sqlite_target_add_record_metadata(sqlite_target_test_config):
"""Get a sample target object with add_record_metadata enabled."""
return SQLiteTarget(
config={**sqlite_target_test_config, "add_record_metadata": True}
)


@pytest.fixture
Expand Down Expand Up @@ -268,6 +305,108 @@ def test_sqlite_activate_version(
finalize=True,
)

# Check that the record metadata was added
table = get_table(sqlite_sample_target_hard_delete.config, test_tbl)

assert "_sdc_table_version" in table.columns
assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER

assert "_sdc_deleted_at" in table.columns
assert type(table.columns["_sdc_deleted_at"].type) is sa.DATETIME


def test_sqlite_no_activate_version(
sqlite_sample_target_no_activate_version: SQLTarget,
):
"""Test handling the activate_version message for the SQLite target.

Test performs the following actions:

- Sends an activate_version message for a table that doesn't exist (which should
have no effect)
"""
test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}"
schema_msg = {
"type": "SCHEMA",
"stream": test_tbl,
"schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(),
}

tap_output = "\n".join(
json.dumps(msg)
for msg in [
schema_msg,
{"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345},
{
"type": "RECORD",
"stream": test_tbl,
"record": {"col_a": "samplerow1"},
"version": 12345,
},
]
)

target_sync_test(
sqlite_sample_target_no_activate_version,
input=StringIO(tap_output),
finalize=True,
)

# Check that the record metadata was added
table = get_table(sqlite_sample_target_no_activate_version.config, test_tbl)

assert "col_a" in table.columns
assert "_sdc_table_version" not in table.columns
assert "_sdc_deleted_at" not in table.columns


def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget):
"""Test handling the activate_version message for the SQLite target.

Test performs the following actions:

- Sends an activate_version message for a table that doesn't exist (which should
have no effect)
"""
test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}"
schema_msg = {
"type": "SCHEMA",
"stream": test_tbl,
"schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(),
}

tap_output = "\n".join(
json.dumps(msg)
for msg in [
schema_msg,
{"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345},
{
"type": "RECORD",
"stream": test_tbl,
"record": {"col_a": "samplerow1"},
"version": 12345,
},
]
)

target_sync_test(
sqlite_target_add_record_metadata,
input=StringIO(tap_output),
finalize=True,
)

# Check that the record metadata was added
table = get_table(sqlite_target_add_record_metadata.config, test_tbl)

assert "_sdc_received_at" in table.columns
assert type(table.columns["_sdc_received_at"].type) is sa.DATETIME

assert "_sdc_sync_started_at" in table.columns
assert type(table.columns["_sdc_sync_started_at"].type) is sa.INTEGER

assert "_sdc_table_version" in table.columns
assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER


def test_sqlite_column_morph(sqlite_sample_target: SQLTarget):
"""End-to-end-to-end test for SQLite tap and target.
Expand Down