Skip to content

Commit

Permalink
fix(taps): Use recent start_date as starting_replication_value (#759)
Browse files Browse the repository at this point in the history
* Use recent start_date as starting_replication_value

* Update core.py

* Update core.py

* Update core.py

* Output full path in error messages

* Revert "Output full path in error messages"

This reverts commit f098a22.

* Update core.py

* Add test

* Test other branch in condition

* Accept only datetime strings in start_date

Co-authored-by: Edgar R. M <[email protected]>
Co-authored-by: Edgar R. M <[email protected]>
  • Loading branch information
3 people authored Aug 23, 2022
1 parent 9dac11c commit d94e0d8
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 34 deletions.
32 changes: 30 additions & 2 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,29 @@ def _write_replication_key_signpost(
state = self.get_context_state(context)
write_replication_key_signpost(state, value)

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a bookmark value to a start date and return the most recent value.
If the replication key is a datetime-formatted string, this method will parse
the value and compare it to the start date. Otherwise, the bookmark value is
returned.
If the tap uses a non-datetime replication key (e.g. an UNIX timestamp), the
developer is encouraged to override this method to provide custom logic for
comparing the bookmark value to the start date.
Args:
value: The replication key value.
start_date_value: The start date value from the config.
Returns:
The most recent value between the bookmark and start date.
"""
if self.is_timestamp_replication_key:
return max(value, start_date_value, key=pendulum.parse)
else:
return value

def _write_starting_replication_value(self, context: Optional[dict]) -> None:
"""Write the starting replication value, if available.
Expand All @@ -320,8 +343,13 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None:
):
value = replication_key_value

elif "start_date" in self.config:
value = self.config["start_date"]
# Use start_date if it is more recent than the replication_key state
start_date_value: Optional[str] = self.config.get("start_date")
if start_date_value:
if not value:
value = start_date_value
else:
value = self.compare_start_date(value, start_date_value)

write_starting_replication_value(state, value)

Expand Down
159 changes: 127 additions & 32 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests

from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._singer import Catalog, CatalogEntry, MetadataMapping
from singer_sdk.helpers.jsonpath import _compile_jsonpath
from singer_sdk.streams.core import (
REPLICATION_FULL_TABLE,
Expand All @@ -25,6 +26,8 @@
StringType,
)

CONFIG_START_DATE = "2021-01-01"


class SimpleTestStream(Stream):
"""Test stream class."""
Expand All @@ -48,6 +51,26 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
yield {"id": 3, "value": "India"}


class UnixTimestampIncrementalStream(SimpleTestStream):
name = "unix_ts"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", IntegerType, required=True),
).to_dict()
replication_key = "updatedAt"


class UnixTimestampIncrementalStream2(UnixTimestampIncrementalStream):
name = "unix_ts_override"

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a value to a start date value."""

start_timestamp = pendulum.parse(start_date_value).format("X")
return max(value, start_timestamp, key=float)


class RestTestStream(RESTStream):
"""Test RESTful stream class."""

Expand Down Expand Up @@ -82,28 +105,19 @@ class SimpleTestTap(Tap):

def discover_streams(self) -> List[Stream]:
"""List all streams."""
return [SimpleTestStream(self)]
return [
SimpleTestStream(self),
UnixTimestampIncrementalStream(self),
UnixTimestampIncrementalStream2(self),
]


@pytest.fixture
def tap() -> SimpleTestTap:
"""Tap instance."""
catalog_dict = {
"streams": [
{
"key_properties": ["id"],
"tap_stream_id": SimpleTestStream.name,
"stream": SimpleTestStream.name,
"schema": SimpleTestStream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
return SimpleTestTap(
config={"start_date": "2021-01-01"},
config={"start_date": CONFIG_START_DATE},
parse_env_config=False,
catalog=catalog_dict,
)


Expand All @@ -113,47 +127,128 @@ def stream(tap: SimpleTestTap) -> SimpleTestStream:
return cast(SimpleTestStream, tap.load_streams()[0])


@pytest.fixture
def unix_timestamp_stream(tap: SimpleTestTap) -> UnixTimestampIncrementalStream:
"""Create a new stream instance."""
return cast(UnixTimestampIncrementalStream, tap.load_streams()[1])


def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
"""Applying a catalog to a stream should overwrite fields."""
assert stream.primary_keys == []
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.forced_replication_method is None

assert tap.input_catalog is not None
stream.apply_catalog(catalog=tap.input_catalog)
stream.apply_catalog(
catalog=Catalog.from_dict(
{
"streams": [
{
"tap_stream_id": stream.name,
"metadata": MetadataMapping(),
"key_properties": ["id"],
"stream": stream.name,
"schema": stream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
)
)

assert stream.primary_keys == ["id"]
assert stream.replication_key is None
assert stream.replication_method == REPLICATION_FULL_TABLE
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream):
"""Validate state and start_time setting handling."""
timestamp_value = "2021-02-01"
@pytest.mark.parametrize(
"stream_name,bookmark_value,expected_starting_value",
[
pytest.param(
"test",
None,
pendulum.parse(CONFIG_START_DATE),
id="datetime-repl-key-no-state",
),
pytest.param(
"test",
"2021-02-01",
pendulum.datetime(2021, 2, 1),
id="datetime-repl-key-recent-bookmark",
),
pytest.param(
"test",
"2020-01-01",
pendulum.parse(CONFIG_START_DATE),
id="datetime-repl-key-old-bookmark",
),
pytest.param(
"unix_ts",
None,
CONFIG_START_DATE,
id="naive-unix-ts-repl-key-no-state",
),
pytest.param(
"unix_ts",
"1612137600",
"1612137600",
id="naive-unix-ts-repl-key-recent-bookmark",
),
pytest.param(
"unix_ts",
"1577858400",
"1577858400",
id="naive-unix-ts-repl-key-old-bookmark",
),
pytest.param(
"unix_ts_override",
None,
CONFIG_START_DATE,
id="unix-ts-repl-key-no-state",
),
pytest.param(
"unix_ts_override",
"1612137600",
"1612137600",
id="unix-ts-repl-key-recent-bookmark",
),
pytest.param(
"unix_ts_override",
"1577858400",
pendulum.parse(CONFIG_START_DATE).format("X"),
id="unix-ts-repl-key-old-bookmark",
),
],
)
def test_stream_starting_timestamp(
tap: SimpleTestTap,
stream_name: str,
bookmark_value: str,
expected_starting_value: Any,
):
"""Test the starting timestamp for a stream."""
stream = tap.streams[stream_name]

if stream.is_timestamp_replication_key:
get_starting_value = stream.get_starting_timestamp
else:
get_starting_value = stream.get_starting_replication_key_value

stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(
cast(str, stream.config.get("start_date"))
)
tap.load_state(
{
"bookmarks": {
stream.name: {
stream_name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
"replication_key_value": bookmark_value,
}
}
}
)
stream._write_starting_replication_value(None)
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.is_timestamp_replication_key
assert stream.get_starting_timestamp(None) == pendulum.parse(
timestamp_value
), f"Incorrect starting timestamp. Tap state was {dict(tap.state)}"
assert get_starting_value(None) == expected_starting_value


@pytest.mark.parametrize(
Expand Down

0 comments on commit d94e0d8

Please sign in to comment.