diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 7d2a49e72d..3d0081cd93 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -304,6 +304,29 @@ def _write_replication_key_signpost( state = self.get_context_state(context) write_replication_key_signpost(state, value) + def compare_start_date(self, value: str, start_date_value: str) -> str: + """Compare a bookmark value to a start date and return the most recent value. + + If the replication key is a datetime-formatted string, this method will parse + the value and compare it to the start date. Otherwise, the bookmark value is + returned. + + If the tap uses a non-datetime replication key (e.g. an UNIX timestamp), the + developer is encouraged to override this method to provide custom logic for + comparing the bookmark value to the start date. + + Args: + value: The replication key value. + start_date_value: The start date value from the config. + + Returns: + The most recent value between the bookmark and start date. + """ + if self.is_timestamp_replication_key: + return max(value, start_date_value, key=pendulum.parse) + else: + return value + def _write_starting_replication_value(self, context: Optional[dict]) -> None: """Write the starting replication value, if available. @@ -321,14 +344,12 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None: value = replication_key_value # Use start_date if it is more recent than the replication_key state - if "start_date" in self.config: - start_date_value = self.config["start_date"] + start_date_value: Optional[str] = self.config.get("start_date") + if start_date_value: if not value: value = start_date_value - elif self.is_timestamp_replication_key: - value = max(value, start_date_value, key=pendulum.parse) else: - value = max(value, start_date_value) + value = self.compare_start_date(value, start_date_value) write_starting_replication_value(state, value) diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index 07c5fd1ed9..7bea40c6dc 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -8,6 +8,7 @@ import requests from singer_sdk.helpers._classproperty import classproperty +from singer_sdk.helpers._singer import Catalog, CatalogEntry, MetadataMapping from singer_sdk.helpers.jsonpath import _compile_jsonpath from singer_sdk.streams.core import ( REPLICATION_FULL_TABLE, @@ -25,6 +26,8 @@ StringType, ) +CONFIG_START_DATE = "2021-01-01" + class SimpleTestStream(Stream): """Test stream class.""" @@ -58,6 +61,16 @@ class UnixTimestampIncrementalStream(SimpleTestStream): replication_key = "updatedAt" +class UnixTimestampIncrementalStream2(UnixTimestampIncrementalStream): + name = "unix_ts_override" + + def compare_start_date(self, value: str, start_date_value: str) -> str: + """Compare a value to a start date value.""" + + start_timestamp = pendulum.parse(start_date_value).format("X") + return max(value, start_timestamp, key=float) + + class RestTestStream(RESTStream): """Test RESTful stream class.""" @@ -92,18 +105,11 @@ class SimpleTestTap(Tap): def discover_streams(self) -> List[Stream]: """List all streams.""" - return [SimpleTestStream(self)] - - -class UnixTimestampTap(Tap): - """Test tap class.""" - - name = "test-tap" - settings_jsonschema = PropertiesList(Property("start_date", IntegerType)).to_dict() - - def discover_streams(self) -> List[Stream]: - """List all streams.""" - return [UnixTimestampIncrementalStream(self)] + return [ + SimpleTestStream(self), + UnixTimestampIncrementalStream(self), + UnixTimestampIncrementalStream2(self), + ] @pytest.fixture @@ -117,34 +123,28 @@ def tap() -> SimpleTestTap: "stream": SimpleTestStream.name, "schema": SimpleTestStream.schema, "replication_method": REPLICATION_FULL_TABLE, - "replication_key": None, - } - ] - } - return SimpleTestTap( - config={"start_date": "2021-01-01"}, - parse_env_config=False, - catalog=catalog_dict, - ) - - -@pytest.fixture -def unix_tap() -> UnixTimestampTap: - """Tap instance.""" - catalog_dict = { - "streams": [ + "replication_key": SimpleTestStream.replication_key, + }, { "key_properties": ["id"], "tap_stream_id": UnixTimestampIncrementalStream.name, "stream": UnixTimestampIncrementalStream.name, "schema": UnixTimestampIncrementalStream.schema, "replication_method": REPLICATION_FULL_TABLE, - "replication_key": None, - } + "replication_key": UnixTimestampIncrementalStream.replication_key, + }, + { + "key_properties": ["id"], + "tap_stream_id": UnixTimestampIncrementalStream2.name, + "stream": UnixTimestampIncrementalStream2.name, + "schema": UnixTimestampIncrementalStream2.schema, + "replication_method": REPLICATION_FULL_TABLE, + "replication_key": UnixTimestampIncrementalStream2.replication_key, + }, ] } - return UnixTimestampTap( - config={"start_date": "1640991660"}, + return SimpleTestTap( + config={"start_date": CONFIG_START_DATE}, parse_env_config=False, catalog=catalog_dict, ) @@ -157,9 +157,9 @@ def stream(tap: SimpleTestTap) -> SimpleTestStream: @pytest.fixture -def unix_timestamp_stream(unix_tap: UnixTimestampTap) -> UnixTimestampIncrementalStream: +def unix_timestamp_stream(tap: SimpleTestTap) -> UnixTimestampIncrementalStream: """Create a new stream instance.""" - return cast(UnixTimestampIncrementalStream, unix_tap.load_streams()[0]) + return cast(UnixTimestampIncrementalStream, tap.load_streams()[1]) def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream): @@ -170,7 +170,23 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream): assert stream.forced_replication_method is None assert tap.input_catalog is not None - stream.apply_catalog(catalog=tap.input_catalog) + stream.apply_catalog( + catalog=Catalog.from_dict( + { + "streams": [ + { + "tap_stream_id": stream.name, + "metadata": MetadataMapping(), + "key_properties": ["id"], + "stream": stream.name, + "schema": stream.schema, + "replication_method": REPLICATION_FULL_TABLE, + "replication_key": None, + } + ] + } + ) + ) assert stream.primary_keys == ["id"] assert stream.replication_key is None @@ -178,85 +194,91 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream): assert stream.forced_replication_method == REPLICATION_FULL_TABLE +@pytest.mark.parametrize( + "stream_name,bookmark_value,expected_starting_value", + [ + pytest.param( + "test", + None, + pendulum.parse(CONFIG_START_DATE), + id="datetime-repl-key-no-state", + ), + pytest.param( + "test", + "2021-02-01", + pendulum.datetime(2021, 2, 1), + id="datetime-repl-key-recent-bookmark", + ), + pytest.param( + "test", + "2020-01-01", + pendulum.parse(CONFIG_START_DATE), + id="datetime-repl-key-old-bookmark", + ), + pytest.param( + "unix_ts", + None, + CONFIG_START_DATE, + id="naive-unix-ts-repl-key-no-state", + ), + pytest.param( + "unix_ts", + "1612137600", + "1612137600", + id="naive-unix-ts-repl-key-recent-bookmark", + ), + pytest.param( + "unix_ts", + "1577858400", + "1577858400", + id="naive-unix-ts-repl-key-old-bookmark", + ), + pytest.param( + "unix_ts_override", + None, + CONFIG_START_DATE, + id="unix-ts-repl-key-no-state", + ), + pytest.param( + "unix_ts_override", + "1612137600", + "1612137600", + id="unix-ts-repl-key-recent-bookmark", + ), + pytest.param( + "unix_ts_override", + "1577858400", + pendulum.parse(CONFIG_START_DATE).format("X"), + id="unix-ts-repl-key-old-bookmark", + ), + ], +) def test_stream_starting_timestamp( tap: SimpleTestTap, - stream: SimpleTestStream, - unix_tap: UnixTimestampTap, - unix_timestamp_stream: UnixTimestampIncrementalStream, + stream_name: str, + bookmark_value: str, + expected_starting_value: Any, ): - """Validate state and start_time setting handling.""" - timestamp_value = "2021-02-01" - - stream._write_starting_replication_value(None) - assert stream.get_starting_timestamp(None) == pendulum.parse( - cast(str, stream.config.get("start_date")) - ) - tap.load_state( - { - "bookmarks": { - stream.name: { - "replication_key": stream.replication_key, - "replication_key_value": timestamp_value, - } - } - } - ) - - stream._write_starting_replication_value(None) - assert stream.replication_key == "updatedAt" - assert stream.replication_method == REPLICATION_INCREMENTAL - assert stream.is_timestamp_replication_key - assert stream.get_starting_timestamp(None) == pendulum.parse( - timestamp_value - ), f"Incorrect starting timestamp. Tap state was {dict(tap.state)}" + """Test the test.""" + stream = tap.streams[stream_name] - # test with a timestamp_value older than start_date - timestamp_value = "2020-01-01" - tap.load_state( - { - "bookmarks": { - stream.name: { - "replication_key": stream.replication_key, - "replication_key_value": timestamp_value, - } - } - } - ) - stream._write_starting_replication_value(None) - assert stream.get_starting_timestamp(None) == pendulum.parse( - stream.config.get("start_date") - ) + if stream.is_timestamp_replication_key: + get_starting_value = stream.get_starting_timestamp + else: + get_starting_value = stream.get_starting_replication_key_value - timestamp_value = "2030-01-01" tap.load_state( { "bookmarks": { - stream.name: { + stream_name: { "replication_key": stream.replication_key, - "replication_key_value": timestamp_value, + "replication_key_value": bookmark_value, } } } ) stream._write_starting_replication_value(None) - assert stream.get_starting_timestamp(None) == pendulum.parse(timestamp_value) - - timestamp_value = "1640991600" - unix_tap.load_state( - { - "bookmarks": { - unix_timestamp_stream.name: { - "replication_key": unix_timestamp_stream.replication_key, - "replication_key_value": timestamp_value, - } - } - } - ) - unix_timestamp_stream._write_starting_replication_value(None) - assert ( - unix_timestamp_stream.get_starting_replication_key_value(None) - == unix_tap.config["start_date"] - ) + assert get_starting_value(None) == expected_starting_value @pytest.mark.parametrize(