Skip to content

Commit

Permalink
fix: Use airbyte state message format + tracking.
Browse files Browse the repository at this point in the history
This change will fix incremental extraction for a few TAPs, like tap-amplitude and tap-s3.

 - Adds a new entry to state tracking json, 'airbyte_state'.
   - This new entry is populated on any invocation, and contains airbyte's way to track state as documented here: https://docs.airbyte.com/understanding-airbyte/database-data-catalog.
   - If this entry does not exist, we fallback to what was originally being done.
   - If this entry does exist, we use this data as the input state.json to airbyte's docker container.
  • Loading branch information
JichaoS committed May 22, 2024
1 parent e1349aa commit 4fc1775
Showing 1 changed file with 64 additions and 4 deletions.
68 changes: 64 additions & 4 deletions tap_airbyte/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from __future__ import annotations

from copy import deepcopy
import errno
import os
import shutil
Expand Down Expand Up @@ -513,8 +514,15 @@ def run_read(self) -> t.Iterator[subprocess.Popen]:
catalog.write(orjson.dumps(self.configured_airbyte_catalog))
if self.airbyte_state:
with open(f"{host_tmpdir}/state.json", "wb") as state:
self.logger.debug("Using state: %s", self.airbyte_state)
state.write(orjson.dumps(self.airbyte_state))
# Use the new airbyte state container if it exists.
state_dict = self.airbyte_state
if 'airbyte_state' in self.airbyte_state:
# This is airbyte state V2
state_dict = self.airbyte_state['airbyte_state']

self.logger.debug("Using state: %s", state_dict)
state.write(orjson.dumps(state_dict))

runtime_conf_dir = host_tmpdir if self.is_native() else self.airbyte_mount_dir
proc = subprocess.Popen(
self.to_command(
Expand Down Expand Up @@ -732,8 +740,55 @@ def sync_all(self) -> None:
):
self._process_log_message(airbyte_message)
elif airbyte_message["type"] == AirbyteMessage.STATE:
state_message = airbyte_message["state"]
# See: https://docs.airbyte.com/understanding-airbyte/database-data-catalog
# for how this state should be handled.
state_message = deepcopy(airbyte_message["state"])
state_type = state_message["type"]

if "airbyte_state" not in self.airbyte_state:
self.airbyte_state["airbyte_state"] = []

# The airbyte_state_v2 here should adhere to the link above.
existing_airbyte_state_v2: list[dict] = deepcopy(self.airbyte_state["airbyte_state"])
if state_type == "STREAM":
stream_descriptor = state_message["stream"]["stream_descriptor"]
stream_state = state_message["stream"]["stream_state"]

# Update the state for this stream descriptor or add it to the list.
found = False
for existing_state in existing_airbyte_state_v2:
if existing_state["type"] == "STREAM" and existing_state["stream"]["stream_descriptor"] == stream_descriptor:
existing_state["stream"]["stream_state"] = stream_state
found = True
break
if not found:
existing_airbyte_state_v2.append({
"type": "STREAM",
"stream": state_message["stream"]
})
elif state_type == "GLOBAL":
# Update the global state.
found = False
for existing_state in existing_airbyte_state_v2:
if existing_state["type"] == "GLOBAL":
existing_state["global"] = state_message["global"]
found = True
break
if not found:
existing_airbyte_state_v2.append({
"type": "GLOBAL",
"global": state_message["global"]
})
elif state_type == "LEGACY":
# One record per connector.
existing_airbyte_state_v2.clear()
existing_airbyte_state_v2.append(
{
"type": "LEGACY",
"legacy": state_message["legacy"]
}
)

if "data" in state_message:
unpacked_state = state_message["data"]
elif state_type == "STREAM":
Expand All @@ -742,7 +797,12 @@ def sync_all(self) -> None:
unpacked_state = state_message["global"]
elif state_type == "LEGACY":
unpacked_state = state_message["legacy"]
self.airbyte_state = unpacked_state

# Keep the legacy state behavior, but append the new state under a new key.
# Deepcopy here since existing_airbyte_state_v2 can refernce the same object.
self.airbyte_state = deepcopy(unpacked_state)
self.airbyte_state['airbyte_state'] = existing_airbyte_state_v2

with STDOUT_LOCK:
singer.write_message(singer.StateMessage(self.airbyte_state))
else:
Expand Down

0 comments on commit 4fc1775

Please sign in to comment.