Skip to content

Commit

Permalink
fix(taps): Always emit a STATE message at the start of the sync process
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jun 6, 2023
1 parent 5267160 commit c6f3383
Show file tree
Hide file tree
Showing 18 changed files with 33 additions and 16 deletions.
4 changes: 3 additions & 1 deletion singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import click

from singer_sdk._singerlib import Catalog
from singer_sdk._singerlib import Catalog, StateMessage, write_message
from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema
from singer_sdk.exceptions import AbortedSyncFailedException, AbortedSyncPausedException
from singer_sdk.helpers import _state
Expand Down Expand Up @@ -431,6 +431,8 @@ def sync_all(self) -> None:
"""Sync all streams."""
self._reset_state_progress_markers()
self._set_compatible_replication_methods()
write_message(StateMessage(value=self.state))

stream: Stream
for stream in self.streams.values():
if not stream.selected and not stream.has_selected_descendents:
Expand Down
28 changes: 14 additions & 14 deletions tests/core/test_parent_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,19 @@ def test_parent_context_fields_in_child(tap: MyTap):
messages = _get_messages(tap)

# Parent schema is emitted
assert messages[0]
assert messages[0]["type"] == SingerMessageType.SCHEMA
assert messages[0]["stream"] == parent_stream.name
assert messages[0]["schema"] == parent_stream.schema

# Child schema is emitted
assert messages[1]
assert messages[1]["type"] == SingerMessageType.SCHEMA
assert messages[1]["stream"] == child_stream.name
assert messages[1]["schema"] == child_stream.schema
assert messages[1]["stream"] == parent_stream.name
assert messages[1]["schema"] == parent_stream.schema

# Child schema is emitted
assert messages[2]
assert messages[2]["type"] == SingerMessageType.SCHEMA
assert messages[2]["stream"] == child_stream.name
assert messages[2]["schema"] == child_stream.schema

# Child records are emitted
child_record_messages = messages[2:5]
child_record_messages = messages[3:6]
assert child_record_messages
assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages)
assert all(msg["stream"] == child_stream.name for msg in child_record_messages)
Expand Down Expand Up @@ -155,13 +155,13 @@ def test_child_deselected_parent(tap_with_deselected_parent: MyTap):
messages = _get_messages(tap_with_deselected_parent)

# First message is a schema for the child stream, not the parent
assert messages[0]
assert messages[0]["type"] == SingerMessageType.SCHEMA
assert messages[0]["stream"] == child_stream.name
assert messages[0]["schema"] == child_stream.schema
assert messages[1]
assert messages[1]["type"] == SingerMessageType.SCHEMA
assert messages[1]["stream"] == child_stream.name
assert messages[1]["schema"] == child_stream.schema

# Child records are emitted
child_record_messages = messages[1:4]
child_record_messages = messages[2:5]
assert child_record_messages
assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages)
assert all(msg["stream"] == child_stream.name for msg in child_record_messages)
Expand Down
2 changes: 1 addition & 1 deletion tests/samples/test_tap_countries.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,4 @@ def tally_messages(messages: list) -> t.Counter:
assert counter["SCHEMA", "countries"] == 1
assert counter["BATCH", "countries"] == 1

assert counter[("STATE",)] == 2
assert counter[("STATE",)] == 3
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/aliased_stream.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "aliased_stream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/changed_key_properties.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"email_hash": {"type": ["string", "null"]}}}, "key_properties": ["email_hash"]}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/drop_property.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/flatten_all.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub__num": 1}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub__num": 2}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/flatten_depth_1.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub": "{\"num\": 1}"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub": "{\"num\": 2}"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/keep_all_fields.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/map_and_flatten.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": ["email_hash"]}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub__num": 1, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub__num": 2, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/no_map.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/non_pk_passthrough.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"count": {"type": ["integer", "null"]}}}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/only_mapped_fields.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"email_hash": {"type": ["string", "null"]}, "fixed_count": {"type": ["integer", "null"]}}}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "c160f8cc69a4f0bf2b0362752353d060", "fixed_count": 20}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "4b9bb80620f03eb3719e0a061c14283d", "fixed_count": 12}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"email_hash": {"type": ["string", "null"]}, "fixed_count": {"type": ["integer", "null"]}}}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "c160f8cc69a4f0bf2b0362752353d060", "fixed_count": 20}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email_hash": "4b9bb80620f03eb3719e0a061c14283d", "fixed_count": 12}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/sourced_stream_1.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "sourced_stream_1", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "sourced_stream_1", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "sourced_stream_1", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "sourced_stream_1", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "sourced_stream_1", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "sourced_stream_1", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down
1 change: 1 addition & 0 deletions tests/snapshots/mapped_stream/sourced_stream_2.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "sourced_stream_2", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "sourced_stream_2", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "sourced_stream_2", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
Expand Down

0 comments on commit c6f3383

Please sign in to comment.