Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(taps): Use recent start_date as starting_replication_value #759

Merged
merged 19 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,15 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None:
):
value = replication_key_value

elif "start_date" in self.config:
value = self.config["start_date"]
# Use start_date if it is more recent than the replication_key state
if "start_date" in self.config:
start_date_value = self.config["start_date"]
if not value:
value = start_date_value
elif self.is_timestamp_replication_key:
value = max(value, start_date_value, key=pendulum.parse)
else:
value = max(value, start_date_value)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved

write_starting_replication_value(state, value)

Expand Down
105 changes: 104 additions & 1 deletion tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
yield {"id": 3, "value": "India"}


class UnixTimestampIncrementalStream(SimpleTestStream):
name = "unix_ts"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", IntegerType, required=True),
).to_dict()
replication_key = "updatedAt"
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved


class RestTestStream(RESTStream):
"""Test RESTful stream class."""

Expand Down Expand Up @@ -85,6 +95,17 @@ def discover_streams(self) -> List[Stream]:
return [SimpleTestStream(self)]


class UnixTimestampTap(Tap):
"""Test tap class."""

name = "test-tap"
settings_jsonschema = PropertiesList(Property("start_date", IntegerType)).to_dict()

def discover_streams(self) -> List[Stream]:
"""List all streams."""
return [UnixTimestampIncrementalStream(self)]


@pytest.fixture
def tap() -> SimpleTestTap:
"""Tap instance."""
Expand All @@ -107,12 +128,40 @@ def tap() -> SimpleTestTap:
)


@pytest.fixture
def unix_tap() -> UnixTimestampTap:
"""Tap instance."""
catalog_dict = {
"streams": [
{
"key_properties": ["id"],
"tap_stream_id": UnixTimestampIncrementalStream.name,
"stream": UnixTimestampIncrementalStream.name,
"schema": UnixTimestampIncrementalStream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
return UnixTimestampTap(
config={"start_date": "1640991660"},
parse_env_config=False,
catalog=catalog_dict,
)


@pytest.fixture
def stream(tap: SimpleTestTap) -> SimpleTestStream:
"""Create a new stream instance."""
return cast(SimpleTestStream, tap.load_streams()[0])


@pytest.fixture
def unix_timestamp_stream(unix_tap: UnixTimestampTap) -> UnixTimestampIncrementalStream:
"""Create a new stream instance."""
return cast(UnixTimestampIncrementalStream, unix_tap.load_streams()[0])


def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
"""Applying a catalog to a stream should overwrite fields."""
assert stream.primary_keys == []
Expand All @@ -129,7 +178,12 @@ def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream):
def test_stream_starting_timestamp(
tap: SimpleTestTap,
stream: SimpleTestStream,
unix_tap: UnixTimestampTap,
unix_timestamp_stream: UnixTimestampIncrementalStream,
):
"""Validate state and start_time setting handling."""
timestamp_value = "2021-02-01"

Expand All @@ -147,6 +201,7 @@ def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream)
}
}
)

stream._write_starting_replication_value(None)
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
Expand All @@ -155,6 +210,54 @@ def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream)
timestamp_value
), f"Incorrect starting timestamp. Tap state was {dict(tap.state)}"

# test with a timestamp_value older than start_date
timestamp_value = "2020-01-01"
tap.load_state(
{
"bookmarks": {
stream.name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
}
}
}
)
stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(
stream.config.get("start_date")
)

timestamp_value = "2030-01-01"
tap.load_state(
{
"bookmarks": {
stream.name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
}
}
}
)
stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(timestamp_value)

timestamp_value = "1640991600"
unix_tap.load_state(
{
"bookmarks": {
unix_timestamp_stream.name: {
"replication_key": unix_timestamp_stream.replication_key,
"replication_key_value": timestamp_value,
}
}
}
)
unix_timestamp_stream._write_starting_replication_value(None)
assert (
unix_timestamp_stream.get_starting_replication_key_value(None)
== unix_tap.config["start_date"]
)


@pytest.mark.parametrize(
"path,content,result",
Expand Down