From 1d4ba81dda8fba1d834a4c42217a9070e804071d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Tue, 11 Apr 2023 14:25:54 -0600 Subject: [PATCH 1/3] feat: Allow skipping child streams by returning an empty child context from parent stream --- singer_sdk/streams/core.py | 18 ++++++++++++--- tests/core/test_parent_child.py | 40 ++++++++++++++++++++------------- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index aebbdfe07..83e7d86c6 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -1194,7 +1194,15 @@ def sync(self, context: dict | None = None) -> None: for _ in self._sync_records(context=context): pass - def _sync_children(self, child_context: dict) -> None: + def _sync_children(self, child_context: dict | None) -> None: + if child_context is None: + self.logger.warning( + "Context for child streams of '%s' is null, " + "skipping sync of any child streams", + self.name, + ) + return + for child_stream in self.child_streams: if child_stream.selected or child_stream.has_selected_descendents: child_stream.sync(context=child_context) @@ -1238,7 +1246,7 @@ def _get_state_partition_context(self, context: dict | None) -> dict | None: return {k: v for k, v in context.items() if k in self.state_partitioning_keys} - def get_child_context(self, record: dict, context: dict | None) -> dict: + def get_child_context(self, record: dict, context: dict | None) -> dict | None: """Return a child context object from the record and optional provided context. By default, will return context if provided and otherwise the record dict. @@ -1246,12 +1254,16 @@ def get_child_context(self, record: dict, context: dict | None) -> dict: Developers may override this behavior to send specific information to child streams for context. + Return ``None`` if no child streams should be synced, for example if the + parent record was deleted and the child records can no longer be synced. + Args: record: Individual record in the stream. context: Stream partition or context dictionary. Returns: - A dictionary with context values for a child stream. + A dictionary with context values for a child stream, or None if no child + streams should be synced. Raises: NotImplementedError: If the stream has children but this method is not diff --git a/tests/core/test_parent_child.py b/tests/core/test_parent_child.py index b648d73d7..f77aa936c 100644 --- a/tests/core/test_parent_child.py +++ b/tests/core/test_parent_child.py @@ -25,7 +25,7 @@ def get_child_context( self, record: dict, context: dict | None, # noqa: ARG002 - ) -> dict: + ) -> dict | None: """Create context for children streams.""" return { "pid": record["id"], @@ -86,18 +86,21 @@ def tap_with_deselected_parent(tap: MyTap): tap.catalog["parent"].metadata[()].selected = original -def test_parent_context_fields_in_child(tap: MyTap): - """Test that parent context fields are available in child streams.""" - parent_stream = tap.streams["parent"] - child_stream = tap.streams["child"] - +def _get_messages(tap: Tap): + """Redirect stdout to a buffer.""" buf = io.StringIO() with redirect_stdout(buf): tap.sync_all() - buf.seek(0) lines = buf.read().splitlines() - messages = [json.loads(line) for line in lines] + return [json.loads(line) for line in lines] + + +def test_parent_context_fields_in_child(tap: MyTap): + """Test that parent context fields are available in child streams.""" + parent_stream = tap.streams["parent"] + child_stream = tap.streams["child"] + messages = _get_messages(tap) # Parent schema is emitted assert messages[0] @@ -119,6 +122,19 @@ def test_parent_context_fields_in_child(tap: MyTap): assert all("pid" in msg["record"] for msg in child_record_messages) +def test_skip_deleted_parent_child_streams(tap: MyTap): + """Test tap output with parent stream deselected.""" + parent_stream = tap.streams["parent"] + + buf = io.StringIO() + with redirect_stdout(buf): + parent_stream._sync_children(None) + + buf.seek(0) + + assert not buf.read().splitlines() + + def test_child_deselected_parent(tap_with_deselected_parent: MyTap): """Test tap output with parent stream deselected.""" parent_stream = tap_with_deselected_parent.streams["parent"] @@ -127,13 +143,7 @@ def test_child_deselected_parent(tap_with_deselected_parent: MyTap): assert not parent_stream.selected assert parent_stream.has_selected_descendents - buf = io.StringIO() - with redirect_stdout(buf): - tap_with_deselected_parent.sync_all() - - buf.seek(0) - lines = buf.read().splitlines() - messages = [json.loads(line) for line in lines] + messages = _get_messages(tap_with_deselected_parent) # First message is a schema for the child stream, not the parent assert messages[0] From 0e2d2b6729f715a3665401899966b602092b80ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Tue, 11 Apr 2023 14:30:21 -0600 Subject: [PATCH 2/3] Test captured log --- tests/core/test_parent_child.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/core/test_parent_child.py b/tests/core/test_parent_child.py index f77aa936c..5d7d4f2de 100644 --- a/tests/core/test_parent_child.py +++ b/tests/core/test_parent_child.py @@ -122,17 +122,26 @@ def test_parent_context_fields_in_child(tap: MyTap): assert all("pid" in msg["record"] for msg in child_record_messages) -def test_skip_deleted_parent_child_streams(tap: MyTap): +def test_skip_deleted_parent_child_streams( + tap: MyTap, + caplog: pytest.LogCaptureFixture, +): """Test tap output with parent stream deselected.""" parent_stream = tap.streams["parent"] buf = io.StringIO() - with redirect_stdout(buf): + with redirect_stdout(buf), caplog.at_level("WARNING"): parent_stream._sync_children(None) buf.seek(0) assert not buf.read().splitlines() + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "WARNING" + assert caplog.records[0].message == ( + "Context for child streams of 'parent' is null, " + "skipping sync of any child streams" + ) def test_child_deselected_parent(tap_with_deselected_parent: MyTap): From 32fb526ce37ce2aed11ee7457115b0eff63d0669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Tue, 11 Apr 2023 16:10:24 -0600 Subject: [PATCH 3/3] Update docstring and annotation for `Stream.get_records` --- singer_sdk/streams/core.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 83e7d86c6..101ff7ef6 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -1287,7 +1287,10 @@ def get_child_context(self, record: dict, context: dict | None) -> dict | None: # Abstract Methods @abc.abstractmethod - def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict]]: + def get_records( + self, + context: dict | None, + ) -> Iterable[dict | tuple[dict, dict | None]]: """Abstract record generator function. Must be overridden by the child class. Each record emitted should be a dictionary of property names to their values. @@ -1306,6 +1309,11 @@ def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict] Parent streams can optionally return a tuple, in which case the second item in the tuple being a `child_context` dictionary for the stream's `context`. + + If the child context object in the tuple is ``None``, the child streams will + be skipped. This is useful for cases where the parent record was deleted and + the child records can no longer be synced. + More info: :doc:`/parent_streams` Args: