Skip to content

Commit

Permalink
feat: Add end of pipe clean up hook to Sinks (#750)
Browse files Browse the repository at this point in the history
* add clean up hook to sinks called at end of pipe drain

* make message more clear for devs and add decorator

* unwrap the comprehension so mypy doesnt get mad

* remove abstract method

* fix flake8 errors

Co-authored-by: Edgar R. M <[email protected]>
  • Loading branch information
z3z1ma and edgarrmondragon committed Jun 30, 2022
1 parent b4bc2c7 commit 77993a0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
9 changes: 9 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,12 @@ def activate_version(self, new_version: int) -> None:
"ACTIVATE_VERSION message received but not implemented by this target. "
"Ignoring."
)

def clean_up(self) -> None:
"""Perform any clean up actions required at end of a stream.
Implementations should ensure that clean up does not affect resources
that may be in use from other instances of the same sink. Stream name alone
should not be relied on, it's recommended to use a uuid as well.
"""
pass
16 changes: 14 additions & 2 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _process_lines(self, file_input: IO[str]) -> Counter[str]:

def _process_endofpipe(self) -> None:
"""Called after all input lines have been read."""
self.drain_all()
self.drain_all(is_endofpipe=True)

def _process_record_message(self, message_dict: dict) -> None:
"""Process a RECORD message.
Expand Down Expand Up @@ -403,15 +403,27 @@ def _process_activate_version_message(self, message_dict: dict) -> None:
# Sink drain methods

@final
def drain_all(self) -> None:
def drain_all(self, is_endofpipe: bool = False) -> None:
"""Drains all sinks, starting with those cleared due to changed schema.
This method is internal to the SDK and should not need to be overridden.
Args:
is_endofpipe: This is passed by the
:meth:`~singer_sdk.Sink._process_endofpipe()` which
is called after the target instance has finished
listening to the stdin
"""
state = copy.deepcopy(self._latest_state)
self._drain_all(self._sinks_to_clear, 1)
if is_endofpipe:
for sink in self._sinks_to_clear:
sink.clean_up()
self._sinks_to_clear = []
self._drain_all(list(self._sinks_active.values()), self.max_parallelism)
if is_endofpipe:
for sink in self._sinks_active.values():
sink.clean_up()
self._write_state_message(state)
self._reset_max_record_age()

Expand Down

0 comments on commit 77993a0

Please sign in to comment.