Skip to content

Commit

Permalink
Issue 729, allow sort checking to be disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilkka Peltola committed Jun 17, 2022
1 parent 2166416 commit 2197749
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
5 changes: 3 additions & 2 deletions singer_sdk/helpers/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ def increment_state(
latest_record: dict,
replication_key: str,
is_sorted: bool,
check_sorted: bool,
) -> None:
"""Update the state using data from the latest record.
Raises InvalidStreamSortException if is_sorted=True and unsorted
Raises InvalidStreamSortException if is_sorted=True, check_sorted=True and unsorted
data is detected in the stream.
"""
progress_dict = stream_or_partition_state
Expand All @@ -217,7 +218,7 @@ def increment_state(
progress_dict = stream_or_partition_state[PROGRESS_MARKERS]
old_rk_value = to_json_compatible(progress_dict.get("replication_key_value"))
new_rk_value = to_json_compatible(latest_record[replication_key])
if old_rk_value is None or new_rk_value >= old_rk_value:
if old_rk_value is None or new_rk_value >= old_rk_value or not check_sorted:
progress_dict["replication_key"] = replication_key
progress_dict["replication_key_value"] = new_rk_value
return
Expand Down
16 changes: 13 additions & 3 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,18 +431,27 @@ def replication_key(self, new_value: str) -> None:

@property
def is_sorted(self) -> bool:
"""Check if stream is sorted.
"""Expect stream to be sorted.
When `True`, incremental streams will attempt to resume if unexpectedly
interrupted.
Returns:
`True` if stream is sorted. Defaults to `False`.
"""
return False

@property
def check_sorted(self) -> bool:
"""Check if stream is sorted.
This setting enables additional checks which may trigger
`InvalidStreamSortException` if records are found which are unsorted.
Returns:
`True` if stream is sorted. Defaults to `False`.
`True` if sorting is checked. Defaults to `True`.
"""
return False
return True

@property
def metadata(self) -> MetadataMapping:
Expand Down Expand Up @@ -676,6 +685,7 @@ def _increment_stream_state(
replication_key=self.replication_key,
latest_record=latest_record,
is_sorted=treat_as_sorted,
check_sorted=self.check_sorted,
)

# Private message authoring methods:
Expand Down

0 comments on commit 2197749

Please sign in to comment.