Skip to content

Commit

Permalink
Accept only datetime strings in start_date
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Aug 23, 2022
1 parent 4c53c31 commit b39a41f
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 105 deletions.
31 changes: 26 additions & 5 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,29 @@ def _write_replication_key_signpost(
state = self.get_context_state(context)
write_replication_key_signpost(state, value)

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a bookmark value to a start date and return the most recent value.
If the replication key is a datetime-formatted string, this method will parse
the value and compare it to the start date. Otherwise, the bookmark value is
returned.
If the tap uses a non-datetime replication key (e.g. an UNIX timestamp), the
developer is encouraged to override this method to provide custom logic for
comparing the bookmark value to the start date.
Args:
value: The replication key value.
start_date_value: The start date value from the config.
Returns:
The most recent value between the bookmark and start date.
"""
if self.is_timestamp_replication_key:
return max(value, start_date_value, key=pendulum.parse)
else:
return value

def _write_starting_replication_value(self, context: Optional[dict]) -> None:
"""Write the starting replication value, if available.
Expand All @@ -321,14 +344,12 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None:
value = replication_key_value

# Use start_date if it is more recent than the replication_key state
if "start_date" in self.config:
start_date_value = self.config["start_date"]
start_date_value: Optional[str] = self.config.get("start_date")
if start_date_value:
if not value:
value = start_date_value
elif self.is_timestamp_replication_key:
value = max(value, start_date_value, key=pendulum.parse)
else:
value = max(value, start_date_value)
value = self.compare_start_date(value, start_date_value)

write_starting_replication_value(state, value)

Expand Down
222 changes: 122 additions & 100 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests

from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._singer import Catalog, CatalogEntry, MetadataMapping
from singer_sdk.helpers.jsonpath import _compile_jsonpath
from singer_sdk.streams.core import (
REPLICATION_FULL_TABLE,
Expand All @@ -25,6 +26,8 @@
StringType,
)

CONFIG_START_DATE = "2021-01-01"


class SimpleTestStream(Stream):
"""Test stream class."""
Expand Down Expand Up @@ -58,6 +61,16 @@ class UnixTimestampIncrementalStream(SimpleTestStream):
replication_key = "updatedAt"


class UnixTimestampIncrementalStream2(UnixTimestampIncrementalStream):
name = "unix_ts_override"

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a value to a start date value."""

start_timestamp = pendulum.parse(start_date_value).format("X")
return max(value, start_timestamp, key=float)


class RestTestStream(RESTStream):
"""Test RESTful stream class."""

Expand Down Expand Up @@ -92,18 +105,11 @@ class SimpleTestTap(Tap):

def discover_streams(self) -> List[Stream]:
"""List all streams."""
return [SimpleTestStream(self)]


class UnixTimestampTap(Tap):
"""Test tap class."""

name = "test-tap"
settings_jsonschema = PropertiesList(Property("start_date", IntegerType)).to_dict()

def discover_streams(self) -> List[Stream]:
"""List all streams."""
return [UnixTimestampIncrementalStream(self)]
return [
SimpleTestStream(self),
UnixTimestampIncrementalStream(self),
UnixTimestampIncrementalStream2(self),
]


@pytest.fixture
Expand All @@ -117,34 +123,28 @@ def tap() -> SimpleTestTap:
"stream": SimpleTestStream.name,
"schema": SimpleTestStream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
return SimpleTestTap(
config={"start_date": "2021-01-01"},
parse_env_config=False,
catalog=catalog_dict,
)


@pytest.fixture
def unix_tap() -> UnixTimestampTap:
"""Tap instance."""
catalog_dict = {
"streams": [
"replication_key": SimpleTestStream.replication_key,
},
{
"key_properties": ["id"],
"tap_stream_id": UnixTimestampIncrementalStream.name,
"stream": UnixTimestampIncrementalStream.name,
"schema": UnixTimestampIncrementalStream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
"replication_key": UnixTimestampIncrementalStream.replication_key,
},
{
"key_properties": ["id"],
"tap_stream_id": UnixTimestampIncrementalStream2.name,
"stream": UnixTimestampIncrementalStream2.name,
"schema": UnixTimestampIncrementalStream2.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": UnixTimestampIncrementalStream2.replication_key,
},
]
}
return UnixTimestampTap(
config={"start_date": "1640991660"},
return SimpleTestTap(
config={"start_date": CONFIG_START_DATE},
parse_env_config=False,
catalog=catalog_dict,
)
Expand All @@ -157,9 +157,9 @@ def stream(tap: SimpleTestTap) -> SimpleTestStream:


@pytest.fixture
def unix_timestamp_stream(unix_tap: UnixTimestampTap) -> UnixTimestampIncrementalStream:
def unix_timestamp_stream(tap: SimpleTestTap) -> UnixTimestampIncrementalStream:
"""Create a new stream instance."""
return cast(UnixTimestampIncrementalStream, unix_tap.load_streams()[0])
return cast(UnixTimestampIncrementalStream, tap.load_streams()[1])


def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
Expand All @@ -170,93 +170,115 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
assert stream.forced_replication_method is None

assert tap.input_catalog is not None
stream.apply_catalog(catalog=tap.input_catalog)
stream.apply_catalog(
catalog=Catalog.from_dict(
{
"streams": [
{
"tap_stream_id": stream.name,
"metadata": MetadataMapping(),
"key_properties": ["id"],
"stream": stream.name,
"schema": stream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
)
)

assert stream.primary_keys == ["id"]
assert stream.replication_key is None
assert stream.replication_method == REPLICATION_FULL_TABLE
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


@pytest.mark.parametrize(
"stream_name,bookmark_value,expected_starting_value",
[
pytest.param(
"test",
None,
pendulum.parse(CONFIG_START_DATE),
id="datetime-repl-key-no-state",
),
pytest.param(
"test",
"2021-02-01",
pendulum.datetime(2021, 2, 1),
id="datetime-repl-key-recent-bookmark",
),
pytest.param(
"test",
"2020-01-01",
pendulum.parse(CONFIG_START_DATE),
id="datetime-repl-key-old-bookmark",
),
pytest.param(
"unix_ts",
None,
CONFIG_START_DATE,
id="naive-unix-ts-repl-key-no-state",
),
pytest.param(
"unix_ts",
"1612137600",
"1612137600",
id="naive-unix-ts-repl-key-recent-bookmark",
),
pytest.param(
"unix_ts",
"1577858400",
"1577858400",
id="naive-unix-ts-repl-key-old-bookmark",
),
pytest.param(
"unix_ts_override",
None,
CONFIG_START_DATE,
id="unix-ts-repl-key-no-state",
),
pytest.param(
"unix_ts_override",
"1612137600",
"1612137600",
id="unix-ts-repl-key-recent-bookmark",
),
pytest.param(
"unix_ts_override",
"1577858400",
pendulum.parse(CONFIG_START_DATE).format("X"),
id="unix-ts-repl-key-old-bookmark",
),
],
)
def test_stream_starting_timestamp(
tap: SimpleTestTap,
stream: SimpleTestStream,
unix_tap: UnixTimestampTap,
unix_timestamp_stream: UnixTimestampIncrementalStream,
stream_name: str,
bookmark_value: str,
expected_starting_value: Any,
):
"""Validate state and start_time setting handling."""
timestamp_value = "2021-02-01"

stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(
cast(str, stream.config.get("start_date"))
)
tap.load_state(
{
"bookmarks": {
stream.name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
}
}
}
)

stream._write_starting_replication_value(None)
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.is_timestamp_replication_key
assert stream.get_starting_timestamp(None) == pendulum.parse(
timestamp_value
), f"Incorrect starting timestamp. Tap state was {dict(tap.state)}"
"""Test the test."""
stream = tap.streams[stream_name]

# test with a timestamp_value older than start_date
timestamp_value = "2020-01-01"
tap.load_state(
{
"bookmarks": {
stream.name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
}
}
}
)
stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(
stream.config.get("start_date")
)
if stream.is_timestamp_replication_key:
get_starting_value = stream.get_starting_timestamp
else:
get_starting_value = stream.get_starting_replication_key_value

timestamp_value = "2030-01-01"
tap.load_state(
{
"bookmarks": {
stream.name: {
stream_name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
"replication_key_value": bookmark_value,
}
}
}
)
stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(timestamp_value)

timestamp_value = "1640991600"
unix_tap.load_state(
{
"bookmarks": {
unix_timestamp_stream.name: {
"replication_key": unix_timestamp_stream.replication_key,
"replication_key_value": timestamp_value,
}
}
}
)
unix_timestamp_stream._write_starting_replication_value(None)
assert (
unix_timestamp_stream.get_starting_replication_key_value(None)
== unix_tap.config["start_date"]
)
assert get_starting_value(None) == expected_starting_value


@pytest.mark.parametrize(
Expand Down

0 comments on commit b39a41f

Please sign in to comment.