diff --git a/singer_sdk/helpers/_state.py b/singer_sdk/helpers/_state.py index f3a831108..ed3d345eb 100644 --- a/singer_sdk/helpers/_state.py +++ b/singer_sdk/helpers/_state.py @@ -218,6 +218,11 @@ def increment_state( progress_dict = stream_or_partition_state[PROGRESS_MARKERS] old_rk_value = to_json_compatible(progress_dict.get("replication_key_value")) new_rk_value = to_json_compatible(latest_record[replication_key]) + + if new_rk_value is None: + logger.warning("New replication value is null") + return + if old_rk_value is None or not check_sorted or new_rk_value >= old_rk_value: progress_dict["replication_key"] = replication_key progress_dict["replication_key_value"] = new_rk_value diff --git a/tests/core/test_state_handling.py b/tests/core/test_state_handling.py index 3f4fff148..f58a0128b 100644 --- a/tests/core/test_state_handling.py +++ b/tests/core/test_state_handling.py @@ -2,6 +2,8 @@ from __future__ import annotations +import logging + import pytest from singer_sdk.helpers import _state @@ -127,3 +129,29 @@ def test_irresumable_state(): "replication_key_value": "2021-05-17T20:41:16Z", }, } + + +def test_null_replication_value(caplog): + stream_state = { + "replication_key": "updated_at", + "replication_key_value": "2021-05-17T20:41:16Z", + } + latest_record = {"updated_at": None} + replication_key = "updated_at" + is_sorted = True + check_sorted = False + + with caplog.at_level(logging.WARNING): + _state.increment_state( + stream_state, + latest_record=latest_record, + replication_key=replication_key, + is_sorted=is_sorted, + check_sorted=check_sorted, + ) + + assert ( + stream_state["replication_key_value"] == "2021-05-17T20:41:16Z" + ), "State should not be updated." + assert caplog.records[0].levelname == "WARNING" + assert "is null" in caplog.records[0].message