Skip to content

Commit

Permalink
Merge branch 'main' into kgpayne/record-before-stream-error
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Jun 21, 2023
2 parents 205f79b + d27f8ad commit a4b8221
Show file tree
Hide file tree
Showing 22 changed files with 251 additions and 135 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:

steps:
- name: Checkout repository
uses: actions/[email protected].2
uses: actions/[email protected].3

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/constraints.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pip==23.1.2
poetry==1.5.1
pre-commit==3.3.2
pre-commit==3.3.3
nox==2023.4.22
nox-poetry==1.0.2
2 changes: 1 addition & 1 deletion .github/workflows/cookiecutter-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:

steps:
- name: Check out the repository
uses: actions/[email protected].2
uses: actions/[email protected].3

- name: Upgrade pip
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dependency-review.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout the repository
uses: actions/[email protected].2
uses: actions/[email protected].3

- name: GitHub dependency vulnerability check
if: ${{ github.event_name == 'pull_request_target' }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:

steps:
- name: Checkout code
uses: actions/[email protected].2
uses: actions/[email protected].3

- name: Set up Python
uses: actions/[email protected]
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:

steps:
- name: Check out the repository
uses: actions/[email protected].2
uses: actions/[email protected].3

- name: Install Poetry
env:
Expand Down Expand Up @@ -91,7 +91,7 @@ jobs:

steps:
- name: Check out the repository
uses: actions/[email protected].2
uses: actions/[email protected].3

- name: Install Poetry
env:
Expand Down Expand Up @@ -133,7 +133,7 @@ jobs:
needs: tests
steps:
- name: Check out the repository
uses: actions/[email protected].2
uses: actions/[email protected].3

- name: Install Poetry
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/version_bump.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
pull-requests: write # to create and update PRs

steps:
- uses: actions/[email protected].2
- uses: actions/[email protected].3
with:
fetch-depth: 0

Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ repos:
)$
- repo: https://github.com/python-jsonschema/check-jsonschema
rev: 0.23.1
rev: 0.23.2
hooks:
- id: check-dependabot
- id: check-github-workflows
- id: check-readthedocs

- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.270
rev: v0.0.272
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
3 changes: 3 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@
"source_branch": "main",
"source_directory": "docs/",
"sidebar_hide_name": True,
"announcement": '<a href="https://meltano.com/cloud/?utm_campaign=top_banner_sdk">Sign up for Public Beta today</a>! Get a 20% discount on purchases before 27th of July!', # noqa: E501
# branding
"light_css_variables": {
"font-stack": "Hanken Grotesk,-apple-system,Helvetica,sans-serif",
"color-announcement-background": "#3A64FA",
"color-announcement-text": "#EEEBEE",
"color-foreground-primary": "#080216",
"color-background-primary": "#E9E5FB",
"color-link": "#3A64FA",
Expand Down
114 changes: 57 additions & 57 deletions poetry.lock

Large diffs are not rendered by default.

20 changes: 17 additions & 3 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,34 @@
"encoding",
description="Specifies the format and compression of the batch files.",
wrapped=ObjectType(
Property("format", StringType, allowed_values=["jsonl"]),
Property(
"format",
StringType,
allowed_values=["jsonl"],
description="Format to use for batch files.",
),
Property(
"compression",
StringType,
allowed_values=["gzip", "none"],
description="Compression format to use for batch files.",
),
),
),
Property(
"storage",
description="Defines the storage layer to use when writing batch files",
wrapped=ObjectType(
Property("root", StringType),
Property("prefix", StringType),
Property(
"root",
StringType,
description="Root path to use when writing batch files.",
),
Property(
"prefix",
StringType,
description="Prefix to use when writing batch files.",
),
),
),
),
Expand Down
12 changes: 7 additions & 5 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import abc
import copy
import datetime
import json
import time
Expand Down Expand Up @@ -67,6 +68,7 @@ def __init__(
"Initializing target sink for stream '%s'...",
stream_name,
)
self.original_schema = copy.deepcopy(schema)
self.schema = schema
if self.include_sdc_metadata_properties:
self._add_sdc_metadata_to_schema()
Expand Down Expand Up @@ -254,17 +256,17 @@ def _add_sdc_metadata_to_schema(self) -> None:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
"""
properties_dict = self.schema["properties"]
for col in {
for col in (
"_sdc_extracted_at",
"_sdc_received_at",
"_sdc_batched_at",
"_sdc_deleted_at",
}:
):
properties_dict[col] = {
"type": ["null", "string"],
"format": "date-time",
}
for col in {"_sdc_sequence", "_sdc_table_version"}:
for col in ("_sdc_sequence", "_sdc_table_version"):
properties_dict[col] = {"type": ["null", "integer"]}

def _remove_sdc_metadata_from_schema(self) -> None:
Expand All @@ -274,14 +276,14 @@ def _remove_sdc_metadata_from_schema(self) -> None:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
"""
properties_dict = self.schema["properties"]
for col in {
for col in (
"_sdc_extracted_at",
"_sdc_received_at",
"_sdc_batched_at",
"_sdc_deleted_at",
"_sdc_sequence",
"_sdc_table_version",
}:
):
properties_dict.pop(col, None)

def _remove_sdc_metadata_from_record(self, record: dict) -> None:
Expand Down
11 changes: 10 additions & 1 deletion singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ def get_starting_timestamp(self, context: dict | None) -> datetime.datetime | No

return t.cast(datetime.datetime, pendulum.parse(value))

@final
@property
def selected(self) -> bool:
"""Check if stream is selected.
Expand All @@ -279,6 +278,16 @@ def selected(self) -> bool:
"""
return self.mask.get((), True)

@selected.setter
def selected(self, value: bool | None) -> None:
"""Set stream selection.
Args:
value: True if the stream is selected.
"""
self.metadata.root.selected = value
self._mask = self.metadata.resolve_selection()

@final
@property
def has_selected_descendents(self) -> bool:
Expand Down
4 changes: 4 additions & 0 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ def run_sync_dry_run(
# Initialize streams' record limits before beginning the sync test.
stream.ABORT_AT_RECORD_COUNT = dry_run_record_limit

# Force selection of streams.
stream.selected = True

for stream in streams:
if stream.parent_stream_type:
self.logger.debug(
Expand All @@ -267,6 +270,7 @@ def run_sync_dry_run(
def write_schemas(self) -> None:
"""Write a SCHEMA message for all known streams to STDOUT."""
for stream in self.streams.values():
stream.selected = True
stream._write_schema_message()

# Stream detection:
Expand Down
2 changes: 1 addition & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def get_sink(
return self.add_sink(stream_name, schema, key_properties)

if (
existing_sink.schema != schema
existing_sink.original_schema != schema
or existing_sink.key_properties != key_properties
):
self.logger.info(
Expand Down
4 changes: 3 additions & 1 deletion singer_sdk/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,9 @@ def _jsonschema_type_check(jsonschema_type: dict, type_check: tuple[str]) -> boo
if jsonschema_type.get("type") in type_check: # noqa: PLR5501
return True

if any(t in type_check for t in jsonschema_type.get("anyOf", ())):
if any(
_jsonschema_type_check(t, type_check) for t in jsonschema_type.get("anyOf", ())
):
return True

return False
Expand Down
52 changes: 52 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

import pytest

from singer_sdk import typing as th
from singer_sdk.sinks import BatchSink
from singer_sdk.target_base import Target

if t.TYPE_CHECKING:
from _pytest.config import Config

Expand Down Expand Up @@ -54,3 +58,51 @@ def outdir() -> t.Generator[str, None, None]:
def snapshot_dir() -> pathlib.Path:
"""Return the path to the snapshot directory."""
return pathlib.Path("tests/snapshots/")


class BatchSinkMock(BatchSink):
"""A mock Sink class."""

name = "batch-sink-mock"

def __init__(
self,
target: TargetMock,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
):
"""Create the Mock batch-based sink."""
super().__init__(target, stream_name, schema, key_properties)
self.target = target

def process_record(self, record: dict, context: dict) -> None:
"""Tracks the count of processed records."""
self.target.num_records_processed += 1
super().process_record(record, context)

def process_batch(self, context: dict) -> None:
"""Write to mock trackers."""
self.target.records_written.extend(context["records"])
self.target.num_batches_processed += 1


class TargetMock(Target):
"""A mock Target class."""

name = "target-mock"
config_jsonschema = th.PropertiesList().to_dict()
default_sink_class = BatchSinkMock

def __init__(self, *args, **kwargs):
"""Create the Mock target sync."""
super().__init__(*args, **kwargs)
self.state_messages_written: list[dict] = []
self.records_written: list[dict] = []
self.num_records_processed: int = 0
self.num_batches_processed: int = 0

def _write_state_message(self, state: dict):
"""Emit the stream's latest state."""
super()._write_state_message(state)
self.state_messages_written.append(state)
30 changes: 30 additions & 0 deletions tests/core/test_target_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

import copy

from tests.conftest import BatchSinkMock, TargetMock


def test_get_sink():
input_schema_1 = {
"properties": {
"id": {
"type": ["string", "null"],
},
"col_ts": {
"format": "date-time",
"type": ["string", "null"],
},
},
}
input_schema_2 = copy.deepcopy(input_schema_1)
key_properties = []
target = TargetMock(config={"add_record_metadata": True})
sink = BatchSinkMock(target, "foo", input_schema_1, key_properties)
target._sinks_active["foo"] = sink
sink_returned = target.get_sink(
"foo",
schema=input_schema_2,
key_properties=key_properties,
)
assert sink_returned == sink
Loading

0 comments on commit a4b8221

Please sign in to comment.