Skip to content

Commit

Permalink
HGI-6439: updates lastest_state
Browse files Browse the repository at this point in the history
  • Loading branch information
davi-souza committed Sep 18, 2024
1 parent 2e70409 commit 46a3861
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions target_api/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,21 @@ def _process_record_message(self, message_dict: dict) -> None:
sink.process_record(transformed_record, context)
sink._after_process_record(context)

if sink.is_full:
self.logger.info(
f"Target sink for '{sink.stream_name}' is full. Draining..."
)
self.drain_one(sink)

if not self._latest_state:
# If "self._latest_state" is empty, save the value of "sink.latest_state"
self._latest_state = sink.latest_state
else:
# If "self._latest_state" is not empty, update all its fields with the
# fields from "sink.latest_state" (if they exist)
for key in self._latest_state.keys():
sink_latest_state = sink.latest_state or dict()
self._latest_state[key].update(sink_latest_state.get(key) or dict())

if __name__ == "__main__":
TargetApi.cli()

0 comments on commit 46a3861

Please sign in to comment.