diff --git a/tap_airbyte/tap.py b/tap_airbyte/tap.py index 9feaaee..1958659 100644 --- a/tap_airbyte/tap.py +++ b/tap_airbyte/tap.py @@ -12,6 +12,7 @@ from __future__ import annotations +from copy import deepcopy import errno import os import shutil @@ -513,8 +514,15 @@ def run_read(self) -> t.Iterator[subprocess.Popen]: catalog.write(orjson.dumps(self.configured_airbyte_catalog)) if self.airbyte_state: with open(f"{host_tmpdir}/state.json", "wb") as state: - self.logger.debug("Using state: %s", self.airbyte_state) - state.write(orjson.dumps(self.airbyte_state)) + # Use the new airbyte state container if it exists. + state_dict = self.airbyte_state + if 'airbyte_state' in self.airbyte_state: + # This is airbyte state V2 + state_dict = self.airbyte_state['airbyte_state'] + + self.logger.debug("Using state: %s", state_dict) + state.write(orjson.dumps(state_dict)) + runtime_conf_dir = host_tmpdir if self.is_native() else self.airbyte_mount_dir proc = subprocess.Popen( self.to_command( @@ -732,8 +740,55 @@ def sync_all(self) -> None: ): self._process_log_message(airbyte_message) elif airbyte_message["type"] == AirbyteMessage.STATE: - state_message = airbyte_message["state"] + # See: https://docs.airbyte.com/understanding-airbyte/database-data-catalog + # for how this state should be handled. + state_message = deepcopy(airbyte_message["state"]) state_type = state_message["type"] + + if "airbyte_state" not in self.airbyte_state: + self.airbyte_state["airbyte_state"] = [] + + # The airbyte_state_v2 here should adhere to the link above. + existing_airbyte_state_v2: list[dict] = deepcopy(self.airbyte_state["airbyte_state"]) + if state_type == "STREAM": + stream_descriptor = state_message["stream"]["stream_descriptor"] + stream_state = state_message["stream"]["stream_state"] + + # Update the state for this stream descriptor or add it to the list. + found = False + for existing_state in existing_airbyte_state_v2: + if existing_state["type"] == "STREAM" and existing_state["stream"]["stream_descriptor"] == stream_descriptor: + existing_state["stream"]["stream_state"] = stream_state + found = True + break + if not found: + existing_airbyte_state_v2.append({ + "type": "STREAM", + "stream": state_message["stream"] + }) + elif state_type == "GLOBAL": + # Update the global state. + found = False + for existing_state in existing_airbyte_state_v2: + if existing_state["type"] == "GLOBAL": + existing_state["global"] = state_message["global"] + found = True + break + if not found: + existing_airbyte_state_v2.append({ + "type": "GLOBAL", + "global": state_message["global"] + }) + elif state_type == "LEGACY": + # One record per connector. + existing_airbyte_state_v2.clear() + existing_airbyte_state_v2.append( + { + "type": "LEGACY", + "legacy": state_message["legacy"] + } + ) + if "data" in state_message: unpacked_state = state_message["data"] elif state_type == "STREAM": @@ -742,7 +797,12 @@ def sync_all(self) -> None: unpacked_state = state_message["global"] elif state_type == "LEGACY": unpacked_state = state_message["legacy"] - self.airbyte_state = unpacked_state + + # Keep the legacy state behavior, but append the new state under a new key. + # Deepcopy here since existing_airbyte_state_v2 can refernce the same object. + self.airbyte_state = deepcopy(unpacked_state) + self.airbyte_state['airbyte_state'] = existing_airbyte_state_v2 + with STDOUT_LOCK: singer.write_message(singer.StateMessage(self.airbyte_state)) else: