Skip to content

Commit

Permalink
improve incremental SAT testing to walk through each stream state ins…
Browse files Browse the repository at this point in the history
…tead of just the latest (#12802)

* improve SAT test for incrementals to walk through steam states not just the latest

* convert batch to tuple and pr feedback

* bump version and update changelog

* patch pytest so test doesn't get skipped

* update changelog and dockerfile again
  • Loading branch information
brianjlai authored and suhomud committed May 23, 2022
1 parent 50a621b commit b36bde7
Show file tree
Hide file tree
Showing 7 changed files with 463 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.53
Add more granular incremental testing that walks through syncs and verifies records according to cursor value.

## 0.1.52
Add test case for `AirbyteTraceMessage` emission on connector failure: [#12796](https://github.com/airbytehq/airbyte/pull/12796/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.1.52
LABEL io.airbyte.version=0.1.53
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class IncrementalConfig(BaseConfig):
default=0,
ge=0,
)
skip_comprehensive_incremental_tests: Optional[bool] = Field(
description="Determines whether to skip more granular testing for incremental syncs", default=False
)


class TestConfig(BaseConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,17 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It
cursor_field = helper.field(stream.cursor_field)
record_value = cursor_field.parse(record=record.record.data)
try:
if state[stream_name] is None:
continue

# first attempt to parse the state value assuming the state object is namespaced on stream names
state_value = cursor_field.parse(record=state[stream_name], path=state_cursor_paths[stream_name])
except KeyError:
# try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value)
state_value = cursor_field.parse(record=state, path=state_cursor_paths[stream_name])
try:
# try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value)
state_value = cursor_field.parse(record=state, path=state_cursor_paths[stream_name])
except KeyError:
continue
yield record_value, state_value, stream_name


Expand Down Expand Up @@ -136,6 +142,68 @@ def test_two_sequential_reads(
record_value, state_value, threshold_days
), f"Second incremental sync should produce records older or equal to cursor value from the state. Stream: {stream_name}"

def test_read_sequential_slices(
self, inputs: IncrementalConfig, connector_config, configured_catalog_for_incremental, cursor_paths, docker_runner: ConnectorRunner
):
"""
Incremental test that makes calls the read method without a state checkpoint. Then we partition the results by stream and
slice checkpoints resulting in batches of messages that look like:
<state message>
<record message>
...
<record message>
Using these batches, we then make additional read method calls using the state message and verify the correctness of the
messages in the response.
"""
if inputs.skip_comprehensive_incremental_tests:
pytest.skip("Skipping new incremental test based on acceptance-test-config.yml")
return

threshold_days = getattr(inputs, "threshold_days") or 0
stream_mapping = {stream.stream.name: stream for stream in configured_catalog_for_incremental.streams}

output = docker_runner.call_read(connector_config, configured_catalog_for_incremental)
records_1 = filter_output(output, type_=Type.RECORD)
states_1 = filter_output(output, type_=Type.STATE)

assert states_1, "Should produce at least one state"
assert records_1, "Should produce at least one record"

latest_state = states_1[-1].state.data
for record_value, state_value, stream_name in records_with_state(records_1, latest_state, stream_mapping, cursor_paths):
assert (
record_value <= state_value
), f"First incremental sync should produce records younger or equal to cursor value from the state. Stream: {stream_name}"

# Create partitions made up of one state message followed by any records that come before the next state
filtered_messages = [message for message in output if message.type == Type.STATE or message.type == Type.RECORD]
right_index = len(filtered_messages)
checkpoint_messages = []
for index, message in reversed(list(enumerate(filtered_messages))):
if message.type == Type.STATE:
message_group = (filtered_messages[index], filtered_messages[index + 1 : right_index])
checkpoint_messages.insert(0, message_group)
right_index = index

# We sometimes have duplicate identical state messages in a stream which we can filter out to speed things up
checkpoint_messages = [message for index, message in enumerate(checkpoint_messages) if message not in checkpoint_messages[:index]]

# To avoid spamming APIs we only test a fraction of slices
num_slices_to_test = 1 if len(checkpoint_messages) <= 5 else len(checkpoint_messages) // 5
for message_batch in checkpoint_messages[::num_slices_to_test]:
assert len(message_batch) > 0 and message_batch[0].type == Type.STATE
current_state = message_batch[0]
output = docker_runner.call_read_with_state(connector_config, configured_catalog_for_incremental, current_state.state.data)
records = filter_output(output, type_=Type.RECORD)

for record_value, state_value, stream_name in records_with_state(
records, current_state.state.data, stream_mapping, cursor_paths
):
assert compare_cursor_with_threshold(
record_value, state_value, threshold_days
), f"Second incremental sync should produce records older or equal to cursor value from the state. Stream: {stream_name}"

def test_state_with_abnormally_large_values(self, connector_config, configured_catalog, future_state, docker_runner: ConnectorRunner):
configured_catalog = incremental_only_catalog(configured_catalog)
output = docker_runner.call_read_with_state(config=connector_config, catalog=configured_catalog, state=future_state)
Expand Down
Loading

0 comments on commit b36bde7

Please sign in to comment.