From 2197749d85c5110efb60f1f20612db4a98da9fab Mon Sep 17 00:00:00 2001 From: Ilkka Peltola Date: Fri, 17 Jun 2022 15:26:13 +0300 Subject: [PATCH] Issue 729, allow sort checking to be disabled --- singer_sdk/helpers/_state.py | 5 +++-- singer_sdk/streams/core.py | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/singer_sdk/helpers/_state.py b/singer_sdk/helpers/_state.py index a4e36afed..08bc5b26b 100644 --- a/singer_sdk/helpers/_state.py +++ b/singer_sdk/helpers/_state.py @@ -202,10 +202,11 @@ def increment_state( latest_record: dict, replication_key: str, is_sorted: bool, + check_sorted: bool, ) -> None: """Update the state using data from the latest record. - Raises InvalidStreamSortException if is_sorted=True and unsorted + Raises InvalidStreamSortException if is_sorted=True, check_sorted=True and unsorted data is detected in the stream. """ progress_dict = stream_or_partition_state @@ -217,7 +218,7 @@ def increment_state( progress_dict = stream_or_partition_state[PROGRESS_MARKERS] old_rk_value = to_json_compatible(progress_dict.get("replication_key_value")) new_rk_value = to_json_compatible(latest_record[replication_key]) - if old_rk_value is None or new_rk_value >= old_rk_value: + if old_rk_value is None or new_rk_value >= old_rk_value or not check_sorted: progress_dict["replication_key"] = replication_key progress_dict["replication_key_value"] = new_rk_value return diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index bfe082562..1bb5a9c3e 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -431,18 +431,27 @@ def replication_key(self, new_value: str) -> None: @property def is_sorted(self) -> bool: - """Check if stream is sorted. + """Expect stream to be sorted. When `True`, incremental streams will attempt to resume if unexpectedly interrupted. + Returns: + `True` if stream is sorted. Defaults to `False`. + """ + return False + + @property + def check_sorted(self) -> bool: + """Check if stream is sorted. + This setting enables additional checks which may trigger `InvalidStreamSortException` if records are found which are unsorted. Returns: - `True` if stream is sorted. Defaults to `False`. + `True` if sorting is checked. Defaults to `True`. """ - return False + return True @property def metadata(self) -> MetadataMapping: @@ -676,6 +685,7 @@ def _increment_stream_state( replication_key=self.replication_key, latest_record=latest_record, is_sorted=treat_as_sorted, + check_sorted=self.check_sorted, ) # Private message authoring methods: