From 08cddd210d4c1f58052b2d812360f0f919477614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Sun, 19 May 2024 11:42:06 -0600 Subject: [PATCH] fix: Null replication values are now handled when incrementing bookmarks (#2438) * fix: Handle null replication values * Add test --- singer_sdk/helpers/_state.py | 5 +++++ tests/core/test_state_handling.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/singer_sdk/helpers/_state.py b/singer_sdk/helpers/_state.py index f3a831108..ed3d345eb 100644 --- a/singer_sdk/helpers/_state.py +++ b/singer_sdk/helpers/_state.py @@ -218,6 +218,11 @@ def increment_state( progress_dict = stream_or_partition_state[PROGRESS_MARKERS] old_rk_value = to_json_compatible(progress_dict.get("replication_key_value")) new_rk_value = to_json_compatible(latest_record[replication_key]) + + if new_rk_value is None: + logger.warning("New replication value is null") + return + if old_rk_value is None or not check_sorted or new_rk_value >= old_rk_value: progress_dict["replication_key"] = replication_key progress_dict["replication_key_value"] = new_rk_value diff --git a/tests/core/test_state_handling.py b/tests/core/test_state_handling.py index 3f4fff148..f58a0128b 100644 --- a/tests/core/test_state_handling.py +++ b/tests/core/test_state_handling.py @@ -2,6 +2,8 @@ from __future__ import annotations +import logging + import pytest from singer_sdk.helpers import _state @@ -127,3 +129,29 @@ def test_irresumable_state(): "replication_key_value": "2021-05-17T20:41:16Z", }, } + + +def test_null_replication_value(caplog): + stream_state = { + "replication_key": "updated_at", + "replication_key_value": "2021-05-17T20:41:16Z", + } + latest_record = {"updated_at": None} + replication_key = "updated_at" + is_sorted = True + check_sorted = False + + with caplog.at_level(logging.WARNING): + _state.increment_state( + stream_state, + latest_record=latest_record, + replication_key=replication_key, + is_sorted=is_sorted, + check_sorted=check_sorted, + ) + + assert ( + stream_state["replication_key_value"] == "2021-05-17T20:41:16Z" + ), "State should not be updated." + assert caplog.records[0].levelname == "WARNING" + assert "is null" in caplog.records[0].message