diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index cbf46a360..3d0081cd9 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -304,6 +304,29 @@ def _write_replication_key_signpost( state = self.get_context_state(context) write_replication_key_signpost(state, value) + def compare_start_date(self, value: str, start_date_value: str) -> str: + """Compare a bookmark value to a start date and return the most recent value. + + If the replication key is a datetime-formatted string, this method will parse + the value and compare it to the start date. Otherwise, the bookmark value is + returned. + + If the tap uses a non-datetime replication key (e.g. an UNIX timestamp), the + developer is encouraged to override this method to provide custom logic for + comparing the bookmark value to the start date. + + Args: + value: The replication key value. + start_date_value: The start date value from the config. + + Returns: + The most recent value between the bookmark and start date. + """ + if self.is_timestamp_replication_key: + return max(value, start_date_value, key=pendulum.parse) + else: + return value + def _write_starting_replication_value(self, context: Optional[dict]) -> None: """Write the starting replication value, if available. @@ -320,8 +343,13 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None: ): value = replication_key_value - elif "start_date" in self.config: - value = self.config["start_date"] + # Use start_date if it is more recent than the replication_key state + start_date_value: Optional[str] = self.config.get("start_date") + if start_date_value: + if not value: + value = start_date_value + else: + value = self.compare_start_date(value, start_date_value) write_starting_replication_value(state, value) diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index 2b87dd75b..8732e1536 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -8,6 +8,7 @@ import requests from singer_sdk.helpers._classproperty import classproperty +from singer_sdk.helpers._singer import Catalog, CatalogEntry, MetadataMapping from singer_sdk.helpers.jsonpath import _compile_jsonpath from singer_sdk.streams.core import ( REPLICATION_FULL_TABLE, @@ -25,6 +26,8 @@ StringType, ) +CONFIG_START_DATE = "2021-01-01" + class SimpleTestStream(Stream): """Test stream class.""" @@ -48,6 +51,26 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: yield {"id": 3, "value": "India"} +class UnixTimestampIncrementalStream(SimpleTestStream): + name = "unix_ts" + schema = PropertiesList( + Property("id", IntegerType, required=True), + Property("value", StringType, required=True), + Property("updatedAt", IntegerType, required=True), + ).to_dict() + replication_key = "updatedAt" + + +class UnixTimestampIncrementalStream2(UnixTimestampIncrementalStream): + name = "unix_ts_override" + + def compare_start_date(self, value: str, start_date_value: str) -> str: + """Compare a value to a start date value.""" + + start_timestamp = pendulum.parse(start_date_value).format("X") + return max(value, start_timestamp, key=float) + + class RestTestStream(RESTStream): """Test RESTful stream class.""" @@ -82,28 +105,19 @@ class SimpleTestTap(Tap): def discover_streams(self) -> List[Stream]: """List all streams.""" - return [SimpleTestStream(self)] + return [ + SimpleTestStream(self), + UnixTimestampIncrementalStream(self), + UnixTimestampIncrementalStream2(self), + ] @pytest.fixture def tap() -> SimpleTestTap: """Tap instance.""" - catalog_dict = { - "streams": [ - { - "key_properties": ["id"], - "tap_stream_id": SimpleTestStream.name, - "stream": SimpleTestStream.name, - "schema": SimpleTestStream.schema, - "replication_method": REPLICATION_FULL_TABLE, - "replication_key": None, - } - ] - } return SimpleTestTap( - config={"start_date": "2021-01-01"}, + config={"start_date": CONFIG_START_DATE}, parse_env_config=False, - catalog=catalog_dict, ) @@ -113,6 +127,12 @@ def stream(tap: SimpleTestTap) -> SimpleTestStream: return cast(SimpleTestStream, tap.load_streams()[0]) +@pytest.fixture +def unix_timestamp_stream(tap: SimpleTestTap) -> UnixTimestampIncrementalStream: + """Create a new stream instance.""" + return cast(UnixTimestampIncrementalStream, tap.load_streams()[1]) + + def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream): """Applying a catalog to a stream should overwrite fields.""" assert stream.primary_keys == [] @@ -120,8 +140,23 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream): assert stream.replication_method == REPLICATION_INCREMENTAL assert stream.forced_replication_method is None - assert tap.input_catalog is not None - stream.apply_catalog(catalog=tap.input_catalog) + stream.apply_catalog( + catalog=Catalog.from_dict( + { + "streams": [ + { + "tap_stream_id": stream.name, + "metadata": MetadataMapping(), + "key_properties": ["id"], + "stream": stream.name, + "schema": stream.schema, + "replication_method": REPLICATION_FULL_TABLE, + "replication_key": None, + } + ] + } + ) + ) assert stream.primary_keys == ["id"] assert stream.replication_key is None @@ -129,31 +164,91 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream): assert stream.forced_replication_method == REPLICATION_FULL_TABLE -def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream): - """Validate state and start_time setting handling.""" - timestamp_value = "2021-02-01" +@pytest.mark.parametrize( + "stream_name,bookmark_value,expected_starting_value", + [ + pytest.param( + "test", + None, + pendulum.parse(CONFIG_START_DATE), + id="datetime-repl-key-no-state", + ), + pytest.param( + "test", + "2021-02-01", + pendulum.datetime(2021, 2, 1), + id="datetime-repl-key-recent-bookmark", + ), + pytest.param( + "test", + "2020-01-01", + pendulum.parse(CONFIG_START_DATE), + id="datetime-repl-key-old-bookmark", + ), + pytest.param( + "unix_ts", + None, + CONFIG_START_DATE, + id="naive-unix-ts-repl-key-no-state", + ), + pytest.param( + "unix_ts", + "1612137600", + "1612137600", + id="naive-unix-ts-repl-key-recent-bookmark", + ), + pytest.param( + "unix_ts", + "1577858400", + "1577858400", + id="naive-unix-ts-repl-key-old-bookmark", + ), + pytest.param( + "unix_ts_override", + None, + CONFIG_START_DATE, + id="unix-ts-repl-key-no-state", + ), + pytest.param( + "unix_ts_override", + "1612137600", + "1612137600", + id="unix-ts-repl-key-recent-bookmark", + ), + pytest.param( + "unix_ts_override", + "1577858400", + pendulum.parse(CONFIG_START_DATE).format("X"), + id="unix-ts-repl-key-old-bookmark", + ), + ], +) +def test_stream_starting_timestamp( + tap: SimpleTestTap, + stream_name: str, + bookmark_value: str, + expected_starting_value: Any, +): + """Test the starting timestamp for a stream.""" + stream = tap.streams[stream_name] + + if stream.is_timestamp_replication_key: + get_starting_value = stream.get_starting_timestamp + else: + get_starting_value = stream.get_starting_replication_key_value - stream._write_starting_replication_value(None) - assert stream.get_starting_timestamp(None) == pendulum.parse( - cast(str, stream.config.get("start_date")) - ) tap.load_state( { "bookmarks": { - stream.name: { + stream_name: { "replication_key": stream.replication_key, - "replication_key_value": timestamp_value, + "replication_key_value": bookmark_value, } } } ) stream._write_starting_replication_value(None) - assert stream.replication_key == "updatedAt" - assert stream.replication_method == REPLICATION_INCREMENTAL - assert stream.is_timestamp_replication_key - assert stream.get_starting_timestamp(None) == pendulum.parse( - timestamp_value - ), f"Incorrect starting timestamp. Tap state was {dict(tap.state)}" + assert get_starting_value(None) == expected_starting_value @pytest.mark.parametrize(