diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index b944072ee4a4..8502bdf2339e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -148,7 +148,10 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o hasattr(record_data_or_message, "type") and record_data_or_message.type == MessageType.RECORD ): record_data = record_data_or_message if isinstance(record_data_or_message, Mapping) else record_data_or_message.record - stream_state = self.get_updated_state(stream_state, record_data) + if self.cursor_field: + # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This + # should be fixed on the stream implementation, but we should also protect against this in the CDK as well + stream_state = self.get_updated_state(stream_state, record_data) record_counter += 1 if sync_mode == SyncMode.incremental: diff --git a/airbyte-cdk/python/unit_tests/sources/streams/test_stream_read.py b/airbyte-cdk/python/unit_tests/sources/streams/test_stream_read.py index 155bd1581535..00f7fe78a4d4 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/test_stream_read.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/test_stream_read.py @@ -127,6 +127,15 @@ def _incremental_concurrent_stream(slice_to_partition_mapping, slice_logger, log return stream +def _stream_with_no_cursor_field(slice_to_partition_mapping, slice_logger, logger, message_repository): + def get_updated_state(current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> MutableMapping[str, Any]: + raise Exception("I shouldn't be invoked by a full_refresh stream") + + mock_stream = _MockStream(slice_to_partition_mapping) + mock_stream.get_updated_state = get_updated_state + return mock_stream + + @pytest.mark.parametrize( "constructor", [ @@ -232,9 +241,10 @@ def test_full_refresh_read_a_single_slice(constructor): [ pytest.param(_stream, id="synchronous_reader"), pytest.param(_concurrent_stream, id="concurrent_reader"), + pytest.param(_stream_with_no_cursor_field, id="no_cursor_field"), ], ) -def test_full_refresh_read_a_two_slices(constructor): +def test_full_refresh_read_two_slices(constructor): # This test verifies that a concurrent stream adapted from a Stream behaves the same as the Stream object # It is done by running the same test cases on both streams configured_stream = ConfiguredAirbyteStream(stream=AirbyteStream(name="mock_stream", supported_sync_modes=[SyncMode.full_refresh], json_schema={}), sync_mode=SyncMode.full_refresh,destination_sync_mode=DestinationSyncMode.overwrite) @@ -261,7 +271,7 @@ def test_full_refresh_read_a_two_slices(constructor): ] # Temporary check to only validate the final state message for synchronous sources since it has not been implemented for concurrent yet - if constructor == _stream: + if constructor == _stream or constructor == _stream_with_no_cursor_field: expected_records.append( AirbyteMessage( type=MessageType.STATE, diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index 7bd56e9b2b49..193d27c382fd 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -187,6 +187,10 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: # type: ignore def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: return "pk" + @property + def cursor_field(self) -> Union[str, List[str]]: + return ["updated_at"] + class MockStreamWithState(MockStream): cursor_field = "cursor"