From b36bde77f287a89f7b9bf0feaea5c17fdde54e34 Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Wed, 18 May 2022 13:01:19 -0700 Subject: [PATCH] improve incremental SAT testing to walk through each stream state instead of just the latest (#12802) * improve SAT test for incrementals to walk through steam states not just the latest * convert batch to tuple and pr feedback * bump version and update changelog * patch pytest so test doesn't get skipped * update changelog and dockerfile again --- .../bases/source-acceptance-test/CHANGELOG.md | 3 + .../bases/source-acceptance-test/Dockerfile | 2 +- .../source_acceptance_test/config.py | 3 + .../tests/test_incremental.py | 72 +++- .../unit_tests/test_incremental.py | 362 +++++++++++++++++- .../unit_tests/test_json_schema_helper.py | 22 +- .../source-acceptance-tests-reference.md | 28 +- 7 files changed, 463 insertions(+), 29 deletions(-) diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index 44f2593bee60..91741232377e 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.53 +Add more granular incremental testing that walks through syncs and verifies records according to cursor value. + ## 0.1.52 Add test case for `AirbyteTraceMessage` emission on connector failure: [#12796](https://github.com/airbytehq/airbyte/pull/12796/). diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index f0347cd1ab13..0a4568869a17 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY source_acceptance_test ./source_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.1.52 +LABEL io.airbyte.version=0.1.53 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py index 618e1fec8bf9..78d82285cfd1 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py @@ -111,6 +111,9 @@ class IncrementalConfig(BaseConfig): default=0, ge=0, ) + skip_comprehensive_incremental_tests: Optional[bool] = Field( + description="Determines whether to skip more granular testing for incremental syncs", default=False + ) class TestConfig(BaseConfig): diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py index 34b295991a6b..1d6f7cebb6b7 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py @@ -70,11 +70,17 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It cursor_field = helper.field(stream.cursor_field) record_value = cursor_field.parse(record=record.record.data) try: + if state[stream_name] is None: + continue + # first attempt to parse the state value assuming the state object is namespaced on stream names state_value = cursor_field.parse(record=state[stream_name], path=state_cursor_paths[stream_name]) except KeyError: - # try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value) - state_value = cursor_field.parse(record=state, path=state_cursor_paths[stream_name]) + try: + # try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value) + state_value = cursor_field.parse(record=state, path=state_cursor_paths[stream_name]) + except KeyError: + continue yield record_value, state_value, stream_name @@ -136,6 +142,68 @@ def test_two_sequential_reads( record_value, state_value, threshold_days ), f"Second incremental sync should produce records older or equal to cursor value from the state. Stream: {stream_name}" + def test_read_sequential_slices( + self, inputs: IncrementalConfig, connector_config, configured_catalog_for_incremental, cursor_paths, docker_runner: ConnectorRunner + ): + """ + Incremental test that makes calls the read method without a state checkpoint. Then we partition the results by stream and + slice checkpoints resulting in batches of messages that look like: + + + ... + + + Using these batches, we then make additional read method calls using the state message and verify the correctness of the + messages in the response. + """ + if inputs.skip_comprehensive_incremental_tests: + pytest.skip("Skipping new incremental test based on acceptance-test-config.yml") + return + + threshold_days = getattr(inputs, "threshold_days") or 0 + stream_mapping = {stream.stream.name: stream for stream in configured_catalog_for_incremental.streams} + + output = docker_runner.call_read(connector_config, configured_catalog_for_incremental) + records_1 = filter_output(output, type_=Type.RECORD) + states_1 = filter_output(output, type_=Type.STATE) + + assert states_1, "Should produce at least one state" + assert records_1, "Should produce at least one record" + + latest_state = states_1[-1].state.data + for record_value, state_value, stream_name in records_with_state(records_1, latest_state, stream_mapping, cursor_paths): + assert ( + record_value <= state_value + ), f"First incremental sync should produce records younger or equal to cursor value from the state. Stream: {stream_name}" + + # Create partitions made up of one state message followed by any records that come before the next state + filtered_messages = [message for message in output if message.type == Type.STATE or message.type == Type.RECORD] + right_index = len(filtered_messages) + checkpoint_messages = [] + for index, message in reversed(list(enumerate(filtered_messages))): + if message.type == Type.STATE: + message_group = (filtered_messages[index], filtered_messages[index + 1 : right_index]) + checkpoint_messages.insert(0, message_group) + right_index = index + + # We sometimes have duplicate identical state messages in a stream which we can filter out to speed things up + checkpoint_messages = [message for index, message in enumerate(checkpoint_messages) if message not in checkpoint_messages[:index]] + + # To avoid spamming APIs we only test a fraction of slices + num_slices_to_test = 1 if len(checkpoint_messages) <= 5 else len(checkpoint_messages) // 5 + for message_batch in checkpoint_messages[::num_slices_to_test]: + assert len(message_batch) > 0 and message_batch[0].type == Type.STATE + current_state = message_batch[0] + output = docker_runner.call_read_with_state(connector_config, configured_catalog_for_incremental, current_state.state.data) + records = filter_output(output, type_=Type.RECORD) + + for record_value, state_value, stream_name in records_with_state( + records, current_state.state.data, stream_mapping, cursor_paths + ): + assert compare_cursor_with_threshold( + record_value, state_value, threshold_days + ), f"Second incremental sync should produce records older or equal to cursor value from the state. Stream: {stream_name}" + def test_state_with_abnormally_large_values(self, connector_config, configured_catalog, future_state, docker_runner: ConnectorRunner): configured_catalog = incremental_only_catalog(configured_catalog) output = docker_runner.call_read_with_state(config=connector_config, catalog=configured_catalog, state=future_state) diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py index 150addea0f3a..ee80ed21918a 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py @@ -3,7 +3,7 @@ # from datetime import datetime -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pendulum import pytest @@ -14,6 +14,8 @@ AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, Type, ) from source_acceptance_test.config import IncrementalConfig @@ -21,10 +23,12 @@ from source_acceptance_test.tests.test_incremental import compare_cursor_with_threshold -def build_messages_from_record_data(records: list[dict]) -> list[AirbyteMessage]: - return [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data=data, emitted_at=111)) for data in records - ] +def build_messages_from_record_data(stream: str, records: list[dict]) -> list[AirbyteMessage]: + return [build_record_message(stream, data) for data in records] + + +def build_record_message(stream: str, data: dict) -> AirbyteMessage: + return AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream=stream, data=data, emitted_at=111)) def build_state_message(state: dict) -> AirbyteMessage: @@ -79,18 +83,21 @@ def test_incremental_two_sequential_reads(records1, records2, latest_state, thre stream=AirbyteStream( name="test_stream", json_schema={"type": "object", "properties": {"date": {"type": cursor_type}}}, - supported_sync_modes=["full_refresh", "incremental"], + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], ), - sync_mode="incremental", - destination_sync_mode="overwrite", + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.overwrite, cursor_field=["date"], ) ] ) docker_runner_mock = MagicMock() - docker_runner_mock.call_read.return_value = [*build_messages_from_record_data(records1), build_state_message({"date": latest_state})] - docker_runner_mock.call_read_with_state.return_value = build_messages_from_record_data(records2) + docker_runner_mock.call_read.return_value = [ + *build_messages_from_record_data("test_stream", records1), + build_state_message({"date": latest_state}), + ] + docker_runner_mock.call_read_with_state.return_value = build_messages_from_record_data("test_stream", records2) t = _TestIncremental() if expected_error: @@ -110,3 +117,338 @@ def test_incremental_two_sequential_reads(records1, records2, latest_state, thre cursor_paths=cursor_paths, docker_runner=docker_runner_mock, ) + + +@pytest.mark.parametrize( + "test_name, records, state_records, threshold_days, expected_error", + [ + ( + "test_incremental_with_2_states", + [ + build_state_message(state={}), + # *build_messages_from_record_data(stream="test_stream", records=[{"date": "2022-05-07"}]), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-09"}), + build_record_message(stream="test_stream", data={"date": "2022-05-10"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + [ + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-09"}), + build_record_message(stream="test_stream", data={"date": "2022-05-10"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-09"}), + build_record_message(stream="test_stream", data={"date": "2022-05-10"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + ], + 0, + None, + ), + ( + "test_first_incremental_only_younger_records", + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_record_message(stream="test_stream", data={"date": "2022-05-13"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + ], + [ + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_record_message(stream="test_stream", data={"date": "2022-05-13"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_record_message(stream="test_stream", data={"date": "2022-05-13"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + ], + [build_state_message(state={"test_stream": {"date": "2022-05-11"}})], + ], + 0, + AssertionError, + ), + ( + "test_incremental_with_threshold", + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + ], + [ + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + ], + [build_state_message(state={"test_stream": {"date": "2022-05-11"}})], + ], + 3, + None, + ), + ( + "test_incremental_with_incorrect_messages", + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-04"}), + build_record_message(stream="test_stream", data={"date": "2022-05-05"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + [ + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-04"}), + build_record_message(stream="test_stream", data={"date": "2022-05-05"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-04"}), # out of order + build_record_message(stream="test_stream", data={"date": "2022-05-05"}), # out of order + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-13"}}), + ], + [build_state_message(state={"test_stream": {"date": "2022-05-13"}})], + ], + 0, + AssertionError, + ), + ( + "test_incremental_with_multiple_streams", + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-09"}), + build_record_message(stream="test_stream", data={"date": "2022-05-10"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-13"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-14"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}}), + ], + [ + [ + build_state_message(state={}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-09"}), + build_record_message(stream="test_stream", data={"date": "2022-05-10"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-13"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-14"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + build_record_message(stream="test_stream", data={"date": "2022-05-09"}), + build_record_message(stream="test_stream", data={"date": "2022-05-10"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-13"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-14"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-11"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-11"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-12"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-13"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-14"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-13"}}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-13"}), + build_record_message(stream="test_stream_2", data={"date": "2022-05-14"}), + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}}), + ], + [ + build_state_message(state={"test_stream": {"date": "2022-05-11"}, "test_stream_2": {"date": "2022-05-15"}}), + ], + ], + 0, + None, + ), + ( + "test_incremental_with_none_state", + [ + build_state_message(state={"test_stream": None}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + ], + [ + [ + build_state_message(state={"test_stream": None}), + build_record_message(stream="test_stream", data={"date": "2022-05-07"}), + build_record_message(stream="test_stream", data={"date": "2022-05-08"}), + build_state_message(state={"test_stream": {"date": "2022-05-09"}}), + ], + [], + ], + 0, + None, + ), + ], +) +def test_read_with_multiple_states(test_name, records, state_records, threshold_days, expected_error): + input_config = IncrementalConfig(threshold_days=threshold_days) + cursor_paths = {"test_stream": ["date"]} + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"date": {"type": "date"}}}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.overwrite, + cursor_field=["date"], + ), + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="test_stream_2", + json_schema={"type": "object", "properties": {"date": {"type": "date"}}}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.overwrite, + cursor_field=["date"], + ), + ] + ) + + docker_runner_mock = MagicMock() + docker_runner_mock.call_read.return_value = records + docker_runner_mock.call_read_with_state.side_effect = state_records + + t = _TestIncremental() + if expected_error: + with pytest.raises(expected_error): + t.test_read_sequential_slices( + inputs=input_config, + connector_config=MagicMock(), + configured_catalog_for_incremental=catalog, + cursor_paths=cursor_paths, + docker_runner=docker_runner_mock, + ) + else: + t.test_read_sequential_slices( + inputs=input_config, + connector_config=MagicMock(), + configured_catalog_for_incremental=catalog, + cursor_paths=cursor_paths, + docker_runner=docker_runner_mock, + ) + + +def test_config_skip_test(): + docker_runner_mock = MagicMock() + docker_runner_mock.call_read.return_value = [] + t = _TestIncremental() + with patch.object(pytest, "skip", return_value=None): + t.test_read_sequential_slices( + inputs=IncrementalConfig(skip_comprehensive_incremental_tests=True), + connector_config=MagicMock(), + configured_catalog_for_incremental=ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"date": {"type": "date"}}}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.overwrite, + cursor_field=["date"], + ) + ] + ), + cursor_paths={}, + docker_runner=docker_runner_mock, + ) + + # This is guaranteed to fail when the test gets executed + docker_runner_mock.call_read.assert_not_called() diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_json_schema_helper.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_json_schema_helper.py index 44b30e5b7401..e2e529e1513e 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_json_schema_helper.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_json_schema_helper.py @@ -32,6 +32,11 @@ def simple_state_fixture(): } +@pytest.fixture(name="none_state") +def none_state_fixture(): + return {"my_stream": None} + + @pytest.fixture(name="nested_state") def nested_state_fixture(simple_state): return {"my_stream": {"some_account_id": simple_state["my_stream"]}} @@ -102,15 +107,6 @@ def test_nested_path(records, stream_mapping, nested_state): assert state_value == pendulum.datetime(2015, 1, 1, 22, 3, 11), "state value must be correctly found" -def test_nested_path_unknown(records, stream_mapping, simple_state): - stream_mapping["my_stream"].cursor_field = ["ts_created"] - paths = {"my_stream": ["unknown", "ts_created"]} - - result = records_with_state(records=records, state=simple_state, stream_mapping=stream_mapping, state_cursor_paths=paths) - with pytest.raises(KeyError): - next(result) - - def test_absolute_path(records, stream_mapping, singer_state): stream_mapping["my_stream"].cursor_field = ["ts_created"] paths = {"my_stream": ["bookmarks", "my_stream", "ts_created"]} @@ -122,6 +118,14 @@ def test_absolute_path(records, stream_mapping, singer_state): assert state_value == pendulum.datetime(2014, 1, 1, 22, 3, 11), "state value must be correctly found" +def test_none_state(records, stream_mapping, none_state): + stream_mapping["my_stream"].cursor_field = ["ts_created"] + paths = {"my_stream": ["unknown", "ts_created"]} + + result = records_with_state(records=records, state=none_state, stream_mapping=stream_mapping, state_cursor_paths=paths) + assert next(result, None) is None + + def test_json_schema_helper_pydantic_generated(): class E(str, Enum): A = "dda" diff --git a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md index 355c0f9222c5..2004493828ac 100644 --- a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md +++ b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md @@ -194,14 +194,28 @@ This test verifies that all streams in the input catalog which support increment | `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds | | `threshold_days` | int | 0 | For date-based cursors, allow records to be emitted with a cursor value this number of days before the state value. | +### TestReadSequentialSlices + +This test offers more comprehensive verification that all streams in the input catalog which support incremental syncs perform the sync correctly. It does so in two phases. The first phase uses the configured catalog and config provided to this test as input to make a request to the partner API and assemble the complete set of messages to be synced. It then verifies that the sync produced a non-zero number of `RECORD` and `STATE` messages. This set of messages is partitioned into batches of a `STATE` message followed by zero or more `RECORD` messages. For each batch of messages, the initial `STATE` message is used as input for a read operation to get records with respect to the cursor. The test then verifies that all of the `RECORDS` retrieved have a cursor value greater or equal to the cursor from the current `STATE` message. This test is performed only for streams that support incremental. Streams that do not support incremental sync are ignored. If no streams in the input catalog support incremental sync, this test is skipped. + +| Input | Type | Default | Note | +|:--------------------------|:-------|:--------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | +| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | +| `cursor_paths` | dict | {} | For each stream, the path of its cursor field in the output state messages. If omitted the path will be taken from the last piece of path from stream cursor\_field. | +| `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds | +| `threshold_days` | int | 0 | For date-based cursors, allow records to be emitted with a cursor value this number of days before the state value. | +| `skip_comprehensive_incremental_tests` | bool | false | For non-GA and in-development connectors, control whether the more comprehensive incremental tests will be skipped | + +**Note that this test samples a fraction of stream slices across an incremental sync in order to reduce test duration and avoid spamming partner APIs** + ### TestStateWithAbnormallyLargeValues This test verifies that sync produces no records when run with the STATE with abnormally large values -| Input | Type | Default | Note | -| :--- | :--- | :--- | :--- | -| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | -| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | -| `future_state_path` | string | None | Path to the state file with abnormally large cursor values | -| `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds | - +| Input | Type | Default | Note | | +|:--------------------------|:-------|:--------|:-----|:--------------------------------------------------------------------------| +| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | +| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | +| `future_state_path` | string | None | Path to the state file with abnormally large cursor values | +| `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds |