From cf5743901c1b87ad9691e0136b1b08fb42680a6e Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Wed, 7 Jun 2023 08:02:12 -0600 Subject: [PATCH] fix(taps): Always emit a STATE message at the start of the sync process (#1753) --- singer_sdk/tap_base.py | 4 ++- tests/core/test_parent_child.py | 28 +++++++++---------- tests/samples/test_tap_countries.py | 2 +- .../mapped_stream/aliased_stream.jsonl | 1 + .../changed_key_properties.jsonl | 1 + .../mapped_stream/drop_property.jsonl | 1 + .../drop_property_null_string.jsonl | 1 + .../snapshots/mapped_stream/flatten_all.jsonl | 1 + .../mapped_stream/flatten_depth_1.jsonl | 1 + .../mapped_stream/keep_all_fields.jsonl | 1 + .../mapped_stream/map_and_flatten.jsonl | 1 + tests/snapshots/mapped_stream/no_map.jsonl | 1 + .../mapped_stream/non_pk_passthrough.jsonl | 1 + .../mapped_stream/only_mapped_fields.jsonl | 1 + .../only_mapped_fields_null_string.jsonl | 1 + .../mapped_stream/sourced_stream_1.jsonl | 1 + .../sourced_stream_1_null_string.jsonl | 1 + .../mapped_stream/sourced_stream_2.jsonl | 1 + 18 files changed, 33 insertions(+), 16 deletions(-) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index e6d999a02..23d8e815b 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -11,7 +11,7 @@ import click -from singer_sdk._singerlib import Catalog +from singer_sdk._singerlib import Catalog, StateMessage, write_message from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema from singer_sdk.exceptions import AbortedSyncFailedException, AbortedSyncPausedException from singer_sdk.helpers import _state @@ -431,6 +431,8 @@ def sync_all(self) -> None: """Sync all streams.""" self._reset_state_progress_markers() self._set_compatible_replication_methods() + write_message(StateMessage(value=self.state)) + stream: Stream for stream in self.streams.values(): if not stream.selected and not stream.has_selected_descendents: diff --git a/tests/core/test_parent_child.py b/tests/core/test_parent_child.py index 5d7d4f2de..a91726bc3 100644 --- a/tests/core/test_parent_child.py +++ b/tests/core/test_parent_child.py @@ -103,19 +103,19 @@ def test_parent_context_fields_in_child(tap: MyTap): messages = _get_messages(tap) # Parent schema is emitted - assert messages[0] - assert messages[0]["type"] == SingerMessageType.SCHEMA - assert messages[0]["stream"] == parent_stream.name - assert messages[0]["schema"] == parent_stream.schema - - # Child schema is emitted assert messages[1] assert messages[1]["type"] == SingerMessageType.SCHEMA - assert messages[1]["stream"] == child_stream.name - assert messages[1]["schema"] == child_stream.schema + assert messages[1]["stream"] == parent_stream.name + assert messages[1]["schema"] == parent_stream.schema + + # Child schema is emitted + assert messages[2] + assert messages[2]["type"] == SingerMessageType.SCHEMA + assert messages[2]["stream"] == child_stream.name + assert messages[2]["schema"] == child_stream.schema # Child records are emitted - child_record_messages = messages[2:5] + child_record_messages = messages[3:6] assert child_record_messages assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages) assert all(msg["stream"] == child_stream.name for msg in child_record_messages) @@ -155,13 +155,13 @@ def test_child_deselected_parent(tap_with_deselected_parent: MyTap): messages = _get_messages(tap_with_deselected_parent) # First message is a schema for the child stream, not the parent - assert messages[0] - assert messages[0]["type"] == SingerMessageType.SCHEMA - assert messages[0]["stream"] == child_stream.name - assert messages[0]["schema"] == child_stream.schema + assert messages[1] + assert messages[1]["type"] == SingerMessageType.SCHEMA + assert messages[1]["stream"] == child_stream.name + assert messages[1]["schema"] == child_stream.schema # Child records are emitted - child_record_messages = messages[1:4] + child_record_messages = messages[2:5] assert child_record_messages assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages) assert all(msg["stream"] == child_stream.name for msg in child_record_messages) diff --git a/tests/samples/test_tap_countries.py b/tests/samples/test_tap_countries.py index 5e7a5cb98..f822f5c8b 100644 --- a/tests/samples/test_tap_countries.py +++ b/tests/samples/test_tap_countries.py @@ -135,4 +135,4 @@ def tally_messages(messages: list) -> t.Counter: assert counter["SCHEMA", "countries"] == 1 assert counter["BATCH", "countries"] == 1 - assert counter[("STATE",)] == 2 + assert counter[("STATE",)] == 3 diff --git a/tests/snapshots/mapped_stream/aliased_stream.jsonl b/tests/snapshots/mapped_stream/aliased_stream.jsonl index 49089dda9..46d5daffe 100644 --- a/tests/snapshots/mapped_stream/aliased_stream.jsonl +++ b/tests/snapshots/mapped_stream/aliased_stream.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "aliased_stream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "aliased_stream", "record": {"email": "alice@example.com", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "aliased_stream", "record": {"email": "bob@example.com", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/changed_key_properties.jsonl b/tests/snapshots/mapped_stream/changed_key_properties.jsonl index f62049ce8..c5168a45b 100644 --- a/tests/snapshots/mapped_stream/changed_key_properties.jsonl +++ b/tests/snapshots/mapped_stream/changed_key_properties.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"email_hash": {"type": ["string", "null"]}}}, "key_properties": ["email_hash"]} {"type": "RECORD", "stream": "mystream", "record": {"email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/drop_property.jsonl b/tests/snapshots/mapped_stream/drop_property.jsonl index f4b0b6f71..8694f4736 100644 --- a/tests/snapshots/mapped_stream/drop_property.jsonl +++ b/tests/snapshots/mapped_stream/drop_property.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/drop_property_null_string.jsonl b/tests/snapshots/mapped_stream/drop_property_null_string.jsonl index f4b0b6f71..8694f4736 100644 --- a/tests/snapshots/mapped_stream/drop_property_null_string.jsonl +++ b/tests/snapshots/mapped_stream/drop_property_null_string.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/flatten_all.jsonl b/tests/snapshots/mapped_stream/flatten_all.jsonl index 03e7af0cc..c54db1563 100644 --- a/tests/snapshots/mapped_stream/flatten_all.jsonl +++ b/tests/snapshots/mapped_stream/flatten_all.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"email": "alice@example.com", "count": 21, "user__id": 1, "user__sub__num": 1}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"email": "bob@example.com", "count": 13, "user__id": 2, "user__sub__num": 2}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/flatten_depth_1.jsonl b/tests/snapshots/mapped_stream/flatten_depth_1.jsonl index 6765ccdef..275e3295c 100644 --- a/tests/snapshots/mapped_stream/flatten_depth_1.jsonl +++ b/tests/snapshots/mapped_stream/flatten_depth_1.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"email": "alice@example.com", "count": 21, "user__id": 1, "user__sub": "{\"num\": 1}"}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"email": "bob@example.com", "count": 13, "user__id": 2, "user__sub": "{\"num\": 2}"}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/keep_all_fields.jsonl b/tests/snapshots/mapped_stream/keep_all_fields.jsonl index 2be7c1105..13ddce438 100644 --- a/tests/snapshots/mapped_stream/keep_all_fields.jsonl +++ b/tests/snapshots/mapped_stream/keep_all_fields.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"email": "alice@example.com", "count": 21, "user": {"id": 1, "sub": {"num": 1}}, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"email": "bob@example.com", "count": 13, "user": {"id": 2, "sub": {"num": 2}}, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/map_and_flatten.jsonl b/tests/snapshots/mapped_stream/map_and_flatten.jsonl index 40ecc7e40..bf2620184 100644 --- a/tests/snapshots/mapped_stream/map_and_flatten.jsonl +++ b/tests/snapshots/mapped_stream/map_and_flatten.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": ["email_hash"]} {"type": "RECORD", "stream": "mystream", "record": {"email": "alice@example.com", "count": 21, "user__id": 1, "user__sub__num": 1, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"email": "bob@example.com", "count": 13, "user__id": 2, "user__sub__num": 2, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/no_map.jsonl b/tests/snapshots/mapped_stream/no_map.jsonl index f2eeb77fd..019b1f9d9 100644 --- a/tests/snapshots/mapped_stream/no_map.jsonl +++ b/tests/snapshots/mapped_stream/no_map.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"email": "alice@example.com", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"email": "bob@example.com", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/non_pk_passthrough.jsonl b/tests/snapshots/mapped_stream/non_pk_passthrough.jsonl index 4c2490c6c..0cbbf451a 100644 --- a/tests/snapshots/mapped_stream/non_pk_passthrough.jsonl +++ b/tests/snapshots/mapped_stream/non_pk_passthrough.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"count": {"type": ["integer", "null"]}}}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"count": 21}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"count": 13}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/only_mapped_fields.jsonl b/tests/snapshots/mapped_stream/only_mapped_fields.jsonl index d5341c622..e53042958 100644 --- a/tests/snapshots/mapped_stream/only_mapped_fields.jsonl +++ b/tests/snapshots/mapped_stream/only_mapped_fields.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"email_hash": {"type": ["string", "null"]}, "fixed_count": {"type": ["integer", "null"]}}}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"email_hash": "c160f8cc69a4f0bf2b0362752353d060", "fixed_count": 20}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"email_hash": "4b9bb80620f03eb3719e0a061c14283d", "fixed_count": 12}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/only_mapped_fields_null_string.jsonl b/tests/snapshots/mapped_stream/only_mapped_fields_null_string.jsonl index d5341c622..e53042958 100644 --- a/tests/snapshots/mapped_stream/only_mapped_fields_null_string.jsonl +++ b/tests/snapshots/mapped_stream/only_mapped_fields_null_string.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"email_hash": {"type": ["string", "null"]}, "fixed_count": {"type": ["integer", "null"]}}}, "key_properties": []} {"type": "RECORD", "stream": "mystream", "record": {"email_hash": "c160f8cc69a4f0bf2b0362752353d060", "fixed_count": 20}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "mystream", "record": {"email_hash": "4b9bb80620f03eb3719e0a061c14283d", "fixed_count": 12}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/sourced_stream_1.jsonl b/tests/snapshots/mapped_stream/sourced_stream_1.jsonl index 3d83ea30e..e63d03815 100644 --- a/tests/snapshots/mapped_stream/sourced_stream_1.jsonl +++ b/tests/snapshots/mapped_stream/sourced_stream_1.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "sourced_stream_1", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "sourced_stream_1", "record": {"email": "alice@example.com", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "sourced_stream_1", "record": {"email": "bob@example.com", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/sourced_stream_1_null_string.jsonl b/tests/snapshots/mapped_stream/sourced_stream_1_null_string.jsonl index 3d83ea30e..e63d03815 100644 --- a/tests/snapshots/mapped_stream/sourced_stream_1_null_string.jsonl +++ b/tests/snapshots/mapped_stream/sourced_stream_1_null_string.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "sourced_stream_1", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "sourced_stream_1", "record": {"email": "alice@example.com", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "sourced_stream_1", "record": {"email": "bob@example.com", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} diff --git a/tests/snapshots/mapped_stream/sourced_stream_2.jsonl b/tests/snapshots/mapped_stream/sourced_stream_2.jsonl index 6a378a5ca..41cce23d7 100644 --- a/tests/snapshots/mapped_stream/sourced_stream_2.jsonl +++ b/tests/snapshots/mapped_stream/sourced_stream_2.jsonl @@ -1,3 +1,4 @@ +{"type": "STATE", "value": {}} {"type": "SCHEMA", "stream": "sourced_stream_2", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []} {"type": "RECORD", "stream": "sourced_stream_2", "record": {"email": "alice@example.com", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} {"type": "RECORD", "stream": "sourced_stream_2", "record": {"email": "bob@example.com", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}