Skip to content

Commit

Permalink
feat: Add _sdc_sync_started_at metadata column to indicate the star…
Browse files Browse the repository at this point in the history
…t of the target process (#1878)
  • Loading branch information
edgarrmondragon authored Jul 27, 2023
1 parent b0a8621 commit b1b3bd2
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 6 deletions.
13 changes: 13 additions & 0 deletions singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import sys
import time
import typing as t
from pathlib import Path, PurePath
from types import MappingProxyType
Expand Down Expand Up @@ -155,6 +156,9 @@ def __init__(
metrics._setup_logging(self.config)
self.metrics_logger = metrics.get_metrics_logger()

# Initialization timestamp
self.__initialized_at = int(time.time() * 1000)

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
self._mapper = PluginMapper(
Expand Down Expand Up @@ -185,6 +189,15 @@ def mapper(self, mapper: PluginMapper) -> None:
"""
self._mapper = mapper

@property
def initialized_at(self) -> int:
"""Start time of the plugin.
Returns:
The start time of the plugin.
"""
return self.__initialized_at

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get capabilities.
Expand Down
12 changes: 8 additions & 4 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
if t.TYPE_CHECKING:
from logging import Logger

from singer_sdk.plugin_base import PluginBase
from singer_sdk.target_base import Target

JSONSchemaValidator = Draft7Validator

Expand All @@ -48,7 +48,7 @@ class Sink(metaclass=abc.ABCMeta):

def __init__(
self,
target: PluginBase,
target: Target,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
Expand All @@ -62,6 +62,7 @@ def __init__(
key_properties: Primary key of the stream to sink.
"""
self.logger = target.logger
self.sync_started_at = target.initialized_at
self._config = dict(target.config)
self._pending_batch: dict | None = None
self.stream_name = stream_name
Expand Down Expand Up @@ -238,7 +239,7 @@ def _add_sdc_metadata_to_record(
Args:
record: Individual record in the stream.
message: TODO
message: The record message.
context: Stream partition or context dictionary.
"""
record["_sdc_extracted_at"] = message.get("time_extracted")
Expand All @@ -252,6 +253,7 @@ def _add_sdc_metadata_to_record(
record["_sdc_deleted_at"] = record.get("_sdc_deleted_at")
record["_sdc_sequence"] = int(round(time.time() * 1000))
record["_sdc_table_version"] = message.get("version")
record["_sdc_sync_started_at"] = self.sync_started_at

def _add_sdc_metadata_to_schema(self) -> None:
"""Add _sdc metadata columns.
Expand All @@ -270,7 +272,7 @@ def _add_sdc_metadata_to_schema(self) -> None:
"type": ["null", "string"],
"format": "date-time",
}
for col in ("_sdc_sequence", "_sdc_table_version"):
for col in ("_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at"):
properties_dict[col] = {"type": ["null", "integer"]}

def _remove_sdc_metadata_from_schema(self) -> None:
Expand All @@ -287,6 +289,7 @@ def _remove_sdc_metadata_from_schema(self) -> None:
"_sdc_deleted_at",
"_sdc_sequence",
"_sdc_table_version",
"_sdc_sync_started_at",
):
properties_dict.pop(col, None)

Expand All @@ -305,6 +308,7 @@ def _remove_sdc_metadata_from_record(self, record: dict) -> None:
record.pop("_sdc_deleted_at", None)
record.pop("_sdc_sequence", None)
record.pop("_sdc_table_version", None)
record.pop("_sdc_sync_started_at", None)

# Record validation

Expand Down
4 changes: 2 additions & 2 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
if t.TYPE_CHECKING:
from sqlalchemy.sql import Executable

from singer_sdk.plugin_base import PluginBase
from singer_sdk.target_base import Target


class SQLSink(BatchSink):
Expand All @@ -32,7 +32,7 @@ class SQLSink(BatchSink):

def __init__(
self,
target: PluginBase,
target: Target,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
Expand Down
Empty file added tests/core/sinks/__init__.py
Empty file.
55 changes: 55 additions & 0 deletions tests/core/sinks/test_sdc_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

from freezegun import freeze_time

from tests.conftest import BatchSinkMock, TargetMock


def test_sdc_metadata():
with freeze_time("2023-01-01T00:00:00+00:00"):
target = TargetMock()

sink = BatchSinkMock(
target,
"users",
{"type": "object", "properties": {"id": {"type": "integer"}}},
["id"],
)

record_message = {
"type": "RECORD",
"stream": "users",
"record": {"id": 1},
"time_extracted": "2021-01-01T00:00:00+00:00",
"version": 100,
}
record = record_message["record"]

with freeze_time("2023-01-01T00:05:00+00:00"):
sink._add_sdc_metadata_to_record(record, record_message, {})

assert record == {
"id": 1,
"_sdc_extracted_at": "2021-01-01T00:00:00+00:00",
"_sdc_received_at": "2023-01-01T00:05:00+00:00",
"_sdc_batched_at": "2023-01-01T00:05:00+00:00",
"_sdc_deleted_at": None,
"_sdc_sequence": 1672531500000,
"_sdc_table_version": 100,
"_sdc_sync_started_at": 1672531200000,
}

sink._add_sdc_metadata_to_schema()
assert sink.schema == {
"type": "object",
"properties": {
"id": {"type": "integer"},
"_sdc_extracted_at": {"type": ["null", "string"], "format": "date-time"},
"_sdc_received_at": {"type": ["null", "string"], "format": "date-time"},
"_sdc_batched_at": {"type": ["null", "string"], "format": "date-time"},
"_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"},
"_sdc_sequence": {"type": ["null", "integer"]},
"_sdc_table_version": {"type": ["null", "integer"]},
"_sdc_sync_started_at": {"type": ["null", "integer"]},
},
}

0 comments on commit b1b3bd2

Please sign in to comment.