-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
improve incremental SAT testing to walk through each stream state instead of just the latest #12802
Conversation
sample_rate = 1 if len(checkpoint_messages) <= 5 else len(checkpoint_messages) // 5 | ||
for message_batch in checkpoint_messages[::sample_rate]: | ||
assert len(message_batch) > 0 and message_batch[0].type == Type.STATE | ||
current_state = message_batch.pop(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may seem a bit heavy handed to assemble all these partitions just to pop off the state which is used in the next call_read_with_state
, but its meant to set up the next part validation in a subsequent PR which uses the record messages from the batch and verifies that they are all present in the result of the current call_read_with_state
.
Alternatively, I can remove all the logic and just build a list of state messages, if the check-ins should remain isolated
checkpoint_messages = [message for index, message in enumerate(checkpoint_messages) if message not in checkpoint_messages[:index]] | ||
|
||
# To avoid spamming APIs we only test a fraction of slices | ||
sample_rate = 1 if len(checkpoint_messages) <= 5 else len(checkpoint_messages) // 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the most sophisticated way of fractionally determining how many API requests to make
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another approach could be to try all slices until we reach a slice that has new messages, but no old ones - that should be enough to confirm incrementals work as expected
@brianjlai can you run the tests for the GA connectors? Looks like there's only 3 of them https://github.com/airbytehq/airbyte/blob/master/airbyte-config/init/src/main/resources/seed/source_definitions.yaml |
try: | ||
# try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value) | ||
state_value = cursor_field.parse(record=state, path=state_cursor_paths[stream_name]) | ||
except KeyError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai do you know why this sometimes fails to find the state value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question! So I am not sure this was working as intended up to this point. In the original comparison, we tested all the record
messages against the latest state
message. This works for the latest state message because it carries the final states of all prior streams. But going one at a time, I noticed it would fail both parses and the test would fail.
When we compare each stream one at a time forward, the current state
message might not have state for the upcoming stream if it is the first one. This allows us to basically skip cursor comparisons if there is no cursor to compare on in state. Also, when reading w/ state, we get messages for the remaining streams in the configured catalog which we should also skip since there is no cursor in the state for it yet. And we'll check it on subsequent reads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. makes sense!
@@ -70,11 +70,17 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It | |||
cursor_field = helper.field(stream.cursor_field) | |||
record_value = cursor_field.parse(record=record.record.data) | |||
try: | |||
if state[stream_name] is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this needed? I'd expect the test to only run for streams that exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason yes. It's annoying, but w/o it we end up with a TypeErro when we parse the None type. This was a real state coming from our SAT tests from hubspot:
type=<Type.STATE: 'STATE'> log=None spec=None connectionStatus=None catalog=None record=None
state=AirbyteStateMessage(data={'companies': {'updatedAt': '2022-05-02T11:15:24.476000+00:00'},
'contact_lists': {'updatedAt': 1634044407353}, 'contacts': {'updatedAt': '2022-05-05T14:31:15.068000+00:00'},
'deals': {'updatedAt': '2022-05-11T05:39:31.131000+00:00'}, 'email_events': None})
checkpoint_messages = [message for index, message in enumerate(checkpoint_messages) if message not in checkpoint_messages[:index]] | ||
|
||
# To avoid spamming APIs we only test a fraction of slices | ||
sample_rate = 1 if len(checkpoint_messages) <= 5 else len(checkpoint_messages) // 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another approach could be to try all slices until we reach a slice that has new messages, but no old ones - that should be enough to confirm incrementals work as expected
checkpoint_messages = [message for index, message in enumerate(checkpoint_messages) if message not in checkpoint_messages[:index]] | ||
|
||
# To avoid spamming APIs we only test a fraction of slices | ||
sample_rate = 1 if len(checkpoint_messages) <= 5 else len(checkpoint_messages) // 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: this is the number of slices to check, not a rate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I would suggest renaming - was a bit confused when first reading through
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approved pending integration tests passing for GA sources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! nice setup for even more validations 😄
@@ -110,6 +110,9 @@ class IncrementalConfig(BaseConfig): | |||
default=0, | |||
ge=0, | |||
) | |||
skip_new_incremental_tests: Optional[bool] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"new" as an option feels like it might quickly become an inadequate name , I would suggest a more descriptive name (e.g. ensure_stream_state_consistency
- don't love the name but perhaps something similar)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great point yeah, the name doesn't tell you much in its current form. I'll adjust it! trying to balance something generic so we don't have too many of these toggles. something like skip_comprehensive_incremental_tests
so at least new doesn't get stale, but it still doesn't tell much about what is being skipped :/
response. | ||
""" | ||
if inputs.skip_new_incremental_tests: | ||
pytest.skip("Skipping new incremental test based on acceptance-test-config.yml") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I used pytest.skip
within another SAT test I found it was also marking as skipped my unit test that tested the skipping. You may want to mock it and add a return
here.
airbyte/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py
Lines 414 to 416 in 7322b86
if not inputs.ensure_trace_message_on_failure: | |
pytest.skip("Skipping `test_airbyte_trace_message_on_failure` because `inputs.ensure_trace_message_on_failure=False`") | |
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
were you able to mock the pytest.skip
method in your tests? i was playing around with some variations of mocking, but haven't been able to override the behavior to just skip and return
pytest_mock = MagicMock()
pytest_mock.skip.return_value = ""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was! Here's an example:
airbyte/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py
Line 240 in 957a0be
with patch.object(pytest, "skip", return_value=None): |
The return would need to be added after pytest.skip
in the SAT case itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sweet thanks! and yup already added the return below, but will add the mock in now as well
Incremental test that makes an initial API request without a state checkpoint. Then we partition the results by stream and | ||
slice checkpoints resulting in batches of messages that look like: | ||
<state message> | ||
<record message> | ||
... | ||
<record message> | ||
|
||
Using these batches, we then make API requests using the state message and verify the the correctness of the messages in the | ||
response. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice description! I'm not sure it's appropriate to talk about "APIs" here though. Maybe say "call the read method" instead of "make an API request"?
checkpoint_messages = [message for index, message in enumerate(checkpoint_messages) if message not in checkpoint_messages[:index]] | ||
|
||
# To avoid spamming APIs we only test a fraction of slices | ||
sample_rate = 1 if len(checkpoint_messages) <= 5 else len(checkpoint_messages) // 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I would suggest renaming - was a bit confused when first reading through
/test connector=connectors/source-salesforce
|
/test connector=connectors/source-facebook-marketing
|
/test connector=connectors/source-google-sheets
|
/test connector=connectors/source-hubspot
|
/test connector=connectors/source-sendgrid
|
/test connector=connectors/source-stripe
|
@brianjlai do you know if these |
Yup! The command should include the new tests. The test runs above have my new tests in run output during the action:
|
/test connector=connectors/source-zendesk-support
|
/publish connector=bases/source-acceptance-test auto-bump-version=false
|
/test connector=connectors/source-facebook-marketing
|
…tead of just the latest (#12802) * improve SAT test for incrementals to walk through steam states not just the latest * convert batch to tuple and pr feedback * bump version and update changelog * patch pytest so test doesn't get skipped * update changelog and dockerfile again
What
Our current SAT incremental tests doesn't provide a very strong verification of correct behavior because it only checks the read w/o a state and read using the latest state (which would ultimately result in 0 records returned). This new test provides stronger validation of correctness by walking through multiple stream checkpoints instead of just the first and last. Note this is a first start that still uses cursor date as the assertion criteria. A follow on ticket will be explored to assert against message correctness.
How
This solution adds a new test
test_read_sequential_slices
that performs a more granular test by checking multiple streams and checkpoints. It does so by performing the following set of steps:I've tested this against a handful of integrations to verify this won't block validation of our GA or Beta connectors, but in order to avoid adding too many barriers, I've also added the option to disable the test if necessary. By default it will be enabled, but it can be disabled using
skip_new_incremental_tests
which can be set inacceptance-test-config.yml
on a per connector basis.Recommended reading order
test_incremental.py
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.