Skip to content

Commit

Permalink
Merge branch 'main' into kgpayne/issue1033
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Nov 8, 2022
2 parents ace3a15 + a981bf9 commit c97022d
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 160 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/constraints.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pip==22.3
pip==22.3.1
poetry==1.2.2
virtualenv==20.16.6
nox==2022.8.7
nox-poetry==1.0.1
nox-poetry==1.0.2
159 changes: 75 additions & 84 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ debugging = [
commitizen-version-bump = { git = "https://github.com/meltano/commitizen-version-bump.git", branch = "main" }
pytest = "^7.2.0"
xdoctest = "^1.1.0"
mypy = "^0.982"
mypy = "^0.990"
cookiecutter = "^2.1.1"
PyYAML = "^6.0"
pyarrow = {version = "^10.0.0", optional = true, python = "<3.11"}
Expand Down
5 changes: 4 additions & 1 deletion singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Defines a common set of exceptions which developers can raise and/or catch."""

from __future__ import annotations

import requests


Expand Down Expand Up @@ -29,7 +32,7 @@ class RecordsWithoutSchemaException(Exception):
class RetriableAPIError(Exception):
"""Exception raised when a failed request can be safely retried."""

def __init__(self, message: str, response: requests.Response = None) -> None:
def __init__(self, message: str, response: requests.Response | None = None) -> None:
"""Extends the default with the failed response as an attribute.
Args:
Expand Down
18 changes: 10 additions & 8 deletions singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Internal helper library for record flatteting functions."""

from __future__ import annotations

import collections
import itertools
import json
import re
from copy import deepcopy
from typing import Any, List, Mapping, MutableMapping, NamedTuple, Optional, Tuple
from typing import Any, Mapping, MutableMapping, NamedTuple

import inflection

Expand All @@ -22,7 +24,7 @@ class FlatteningOptions(NamedTuple):

def get_flattening_options(
plugin_config: Mapping,
) -> Optional[FlatteningOptions]:
) -> FlatteningOptions | None:
"""Get flattening options, if flattening is enabled.
Args:
Expand All @@ -37,7 +39,7 @@ def get_flattening_options(
return None


def flatten_key(key_name: str, parent_keys: List[str], separator: str = "__") -> str:
def flatten_key(key_name: str, parent_keys: list[str], separator: str = "__") -> str:
"""Concatenate `key_name` with its `parent_keys` using `separator`.
Args:
Expand Down Expand Up @@ -206,7 +208,7 @@ def flatten_schema(

def _flatten_schema(
schema_node: dict,
parent_keys: List[str] = None,
parent_keys: list[str] | None = None,
separator: str = "__",
level: int = 0,
max_level: int = 0,
Expand All @@ -226,7 +228,7 @@ def _flatten_schema(
if parent_keys is None:
parent_keys = []

items: List[Tuple[str, dict]] = []
items: list[tuple[str, dict]] = []
if "properties" not in schema_node:
return {}

Expand Down Expand Up @@ -297,8 +299,8 @@ def flatten_record(

def _flatten_record(
record_node: MutableMapping[Any, Any],
flattened_schema: dict = None,
parent_key: List[str] = None,
flattened_schema: dict | None = None,
parent_key: list[str] | None = None,
separator: str = "__",
level: int = 0,
max_level: int = 0,
Expand All @@ -322,7 +324,7 @@ def _flatten_record(
if parent_key is None:
parent_key = []

items: List[Tuple[str, Any]] = []
items: list[tuple[str, Any]] = []
for k, v in record_node.items():
new_key = flatten_key(k, parent_key, separator)
if isinstance(v, collections.abc.MutableMapping) and level < max_level:
Expand Down
12 changes: 7 additions & 5 deletions singer_sdk/mapper_base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Abstract base class for stream mapper plugins."""

from __future__ import annotations

import abc
from io import FileIO
from typing import Callable, Iterable, List, Tuple
from typing import Callable, Iterable

import click

Expand All @@ -23,7 +25,7 @@ def _env_prefix(cls) -> str:
return f"{cls.name.upper().replace('-', '_')}_"

@classproperty
def capabilities(self) -> List[CapabilitiesEnum]:
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get capabilities.
Returns:
Expand Down Expand Up @@ -126,9 +128,9 @@ def cli(cls) -> Callable:
def cli(
version: bool = False,
about: bool = False,
config: Tuple[str, ...] = (),
format: str = None,
file_input: FileIO = None,
config: tuple[str, ...] = (),
format: str | None = None,
file_input: FileIO | None = None,
) -> None:
"""Handle command line execution.
Expand Down
60 changes: 31 additions & 29 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Tap abstract class."""

from __future__ import annotations

import abc
import json
from enum import Enum
from pathlib import Path, PurePath
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, cast
from typing import Any, Callable, cast

import click

Expand Down Expand Up @@ -47,9 +49,9 @@ class Tap(PluginBase, metaclass=abc.ABCMeta):

def __init__(
self,
config: Optional[Union[dict, PurePath, str, List[Union[PurePath, str]]]] = None,
catalog: Union[PurePath, str, dict, Catalog, None] = None,
state: Union[PurePath, str, dict, None] = None,
config: dict | PurePath | str | list[PurePath | str] | None = None,
catalog: PurePath | str | dict | Catalog | None = None,
state: PurePath | str | dict | None = None,
parse_env_config: bool = False,
validate_config: bool = True,
) -> None:
Expand All @@ -72,10 +74,10 @@ def __init__(
)

# Declare private members
self._streams: Optional[Dict[str, Stream]] = None
self._input_catalog: Optional[Catalog] = None
self._state: Dict[str, Stream] = {}
self._catalog: Optional[Catalog] = None # Tap's working catalog
self._streams: dict[str, Stream] | None = None
self._input_catalog: Catalog | None = None
self._state: dict[str, Stream] = {}
self._catalog: Catalog | None = None # Tap's working catalog

# Process input catalog
if isinstance(catalog, Catalog):
Expand Down Expand Up @@ -105,7 +107,7 @@ def __init__(
# Class properties

@property
def streams(self) -> Dict[str, Stream]:
def streams(self) -> dict[str, Stream]:
"""Get streams discovered or catalogued for this tap.
Results will be cached after first execution.
Expand Down Expand Up @@ -138,7 +140,7 @@ def state(self) -> dict:
return self._state

@property
def input_catalog(self) -> Optional[Catalog]:
def input_catalog(self) -> Catalog | None:
"""Get the catalog passed to the tap.
Returns:
Expand All @@ -159,7 +161,7 @@ def catalog(self) -> Catalog:
return self._catalog

@classproperty
def capabilities(self) -> List[CapabilitiesEnum]:
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get tap capabilities.
Returns:
Expand Down Expand Up @@ -249,7 +251,7 @@ def _singer_catalog(self) -> Catalog:
for stream in self.streams.values()
)

def discover_streams(self) -> List[Stream]:
def discover_streams(self) -> list[Stream]:
"""Initialize all available streams and return them as a list.
Return:
Expand All @@ -265,7 +267,7 @@ def discover_streams(self) -> List[Stream]:
)

@final
def load_streams(self) -> List[Stream]:
def load_streams(self) -> list[Stream]:
"""Load streams from discovery and initialize DAG.
Return the output of `self.discover_streams()` to enumerate
Expand All @@ -277,7 +279,7 @@ def load_streams(self) -> List[Stream]:
# Build the parent-child dependency DAG

# Index streams by type
streams_by_type: Dict[Type[Stream], List[Stream]] = {}
streams_by_type: dict[type[Stream], list[Stream]] = {}
for stream in self.discover_streams():
stream_type = type(stream)
if stream_type not in streams_by_type:
Expand All @@ -304,7 +306,7 @@ def load_streams(self) -> List[Stream]:

# Bookmarks and state management

def load_state(self, state: Dict[str, Any]) -> None:
def load_state(self, state: dict[str, Any]) -> None:
"""Merge or initialize stream state with the provided state dictionary input.
Override this method to perform validation and backwards-compatibility patches
Expand Down Expand Up @@ -362,7 +364,7 @@ def sync_all(self) -> None:
"""Sync all streams."""
self._reset_state_progress_markers()
self._set_compatible_replication_methods()
stream: "Stream"
stream: Stream
for stream in self.streams.values():
if not stream.selected and not stream.has_selected_descendents:
self.logger.info(f"Skipping deselected stream '{stream.name}'.")
Expand Down Expand Up @@ -433,10 +435,10 @@ def cli(
about: bool = False,
discover: bool = False,
test: CliTestOptionValue = CliTestOptionValue.Disabled,
config: Tuple[str, ...] = (),
state: str = None,
catalog: str = None,
format: str = None,
config: tuple[str, ...] = (),
state: str | None = None,
catalog: str | None = None,
format: str | None = None,
) -> None:
"""Handle command line execution.
Expand Down Expand Up @@ -470,7 +472,7 @@ def cli(
validate_config = False

parse_env_config = False
config_files: List[PurePath] = []
config_files: list[PurePath] = []
for config_path in config:
if config_path == "ENV":
# Allow parse from env vars:
Expand Down Expand Up @@ -512,13 +514,13 @@ class SQLTap(Tap):
"""A specialized Tap for extracting from SQL streams."""

# Stream class used to initialize new SQL streams from their catalog declarations.
default_stream_class: Type[SQLStream]
default_stream_class: type[SQLStream]

def __init__(
self,
config: Optional[Union[dict, PurePath, str, List[Union[PurePath, str]]]] = None,
catalog: Union[PurePath, str, dict, None] = None,
state: Union[PurePath, str, dict, None] = None,
config: dict | PurePath | str | list[PurePath | str] | None = None,
catalog: PurePath | str | dict | None = None,
state: PurePath | str | dict | None = None,
parse_env_config: bool = False,
validate_config: bool = True,
) -> None:
Expand All @@ -536,7 +538,7 @@ def __init__(
variables.
validate_config: True to require validation of config settings.
"""
self._catalog_dict: Optional[dict] = None
self._catalog_dict: dict | None = None
super().__init__(
config=config,
catalog=catalog,
Expand All @@ -560,19 +562,19 @@ def catalog_dict(self) -> dict:

connector = self.default_stream_class.connector_class(dict(self.config))

result: Dict[str, List[dict]] = {"streams": []}
result: dict[str, list[dict]] = {"streams": []}
result["streams"].extend(connector.discover_catalog_entries())

self._catalog_dict = result
return self._catalog_dict

def discover_streams(self) -> List[Stream]:
def discover_streams(self) -> list[Stream]:
"""Initialize all available streams and return them as a list.
Returns:
List of discovered Stream objects.
"""
result: List[Stream] = []
result: list[Stream] = []
for catalog_entry in self.catalog_dict["streams"]:
result.append(self.default_stream_class(self, catalog_entry))

Expand Down
Loading

0 comments on commit c97022d

Please sign in to comment.