Skip to content

Commit

Permalink
Merge branch 'main' into 2257-feat-handle-database-disconnects-in-sql…
Browse files Browse the repository at this point in the history
…-taps-and-targets
  • Loading branch information
edgarrmondragon authored Feb 23, 2024
2 parents 84f5463 + 3e2c3d3 commit 6d6da57
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from singer_sdk.streams import {{ cookiecutter.stream_type }}Stream

{% elif cookiecutter.auth_method == "Basic Auth" -%}
from singer_sdk.authenticators import BasicAuthenticator
from requests.auth import HTTPBasicAuth
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002
from singer_sdk.streams import {{ cookiecutter.stream_type }}Stream
Expand Down Expand Up @@ -110,14 +110,13 @@ def authenticator(self) -> BearerTokenAuthenticator:
{%- elif cookiecutter.auth_method == "Basic Auth" %}

@property
def authenticator(self) -> BasicAuthenticator:
def authenticator(self) -> HTTPBasicAuth:
"""Return a new authenticator object.
Returns:
An authenticator instance.
"""
return BasicAuthenticator.create_for_stream(
self,
return HTTPBasicAuth(
username=self.config.get("username", ""),
password=self.config.get("password", ""),
)
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ jsonl = "singer_sdk.contrib.batch_encoder_jsonl:JSONLinesBatcher"
parquet = "singer_sdk.contrib.batch_encoder_parquet:ParquetBatcher"

[tool.ruff]
extend-exclude = [
"cookiecutter/*",
"*simpleeval*",
]
line-length = 88
src = ["samples", "singer_sdk", "tests"]
target-version = "py38"
Expand All @@ -257,10 +261,6 @@ target-version = "py38"
docstring-code-format = true

[tool.ruff.lint]
exclude = [
"cookiecutter/*",
"*simpleeval*",
]
ignore = [
"ANN101", # Missing type annotation for `self` in method
"ANN102", # Missing type annotation for `cls` in class method
Expand Down
13 changes: 12 additions & 1 deletion singer_sdk/authenticators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import base64
import math
import typing as t
import warnings
from datetime import timedelta
from types import MappingProxyType
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
Expand Down Expand Up @@ -297,7 +298,10 @@ def create_for_stream(
class BasicAuthenticator(APIAuthenticatorBase):
"""Implements basic authentication for REST Streams.
This Authenticator implements basic authentication by concatinating a
.. deprecated:: 0.36.0
Use :class:`requests.auth.HTTPBasicAuth` instead.
This Authenticator implements basic authentication by concatenating a
username and password then base64 encoding the string. The resulting
token will be merged with any HTTP headers specified on the stream.
"""
Expand All @@ -316,6 +320,13 @@ def __init__(
password: API password.
"""
super().__init__(stream=stream)
warnings.warn(
"BasicAuthenticator is deprecated. Use "
"requests.auth.HTTPBasicAuth instead.",
DeprecationWarning,
stacklevel=2,
)

credentials = f"{username}:{password}".encode()
auth_token = base64.b64encode(credentials).decode("ascii")
auth_credentials = {"Authorization": f"Basic {auth_token}"}
Expand Down
2 changes: 1 addition & 1 deletion singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_flattening_options(
Returns:
A new FlatteningOptions object or None if flattening is disabled.
"""
if plugin_config.get("flattening_enabled"):
if plugin_config.get("flattening_enabled", False):
return FlatteningOptions(max_level=int(plugin_config["flattening_max_depth"]))

return None
Expand Down
7 changes: 7 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@
default=True,
),
).to_dict()
TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList(
Property(
"batch_size_rows",
IntegerType,
description="Maximum number of rows in each batch.",
),
).to_dict()


class TargetLoadMethods(str, Enum):
Expand Down
43 changes: 33 additions & 10 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def __init__(
self._batch_records_read: int = 0
self._batch_dupe_records_merged: int = 0

# Batch full markers
self._batch_size_rows: int | None = target.config.get(
"batch_size_rows",
)

self._validator: BaseJSONSchemaValidator | None = self.get_validator()

@cached_property
Expand Down Expand Up @@ -249,15 +254,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002

# Size properties

@property
def max_size(self) -> int:
"""Get max batch size.
Returns:
Max number of records to batch before `is_full=True`
"""
return self.MAX_SIZE_DEFAULT

@property
def current_size(self) -> int:
"""Get current batch size.
Expand All @@ -269,13 +265,40 @@ def current_size(self) -> int:

@property
def is_full(self) -> bool:
"""Check against size limit.
"""Check against the batch size limit.
Returns:
True if the sink needs to be drained.
"""
return self.current_size >= self.max_size

@property
def batch_size_rows(self) -> int | None:
"""The maximum number of rows a batch can accumulate before being processed.
Returns:
The max number of rows or None if not set.
"""
return self._batch_size_rows

@property
def max_size(self) -> int:
"""Get max batch size.
Returns:
Max number of records to batch before `is_full=True`
.. versionchanged:: 0.36.0
This property now takes into account the
:attr:`~singer_sdk.Sink.batch_size_rows` attribute and the corresponding
``batch_size_rows`` target setting.
"""
return (
self.batch_size_rows
if self.batch_size_rows is not None
else self.MAX_SIZE_DEFAULT
)

# Tally methods

@t.final
Expand Down
5 changes: 4 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from singer_sdk.helpers.capabilities import (
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_BATCH_SIZE_ROWS_CONFIG,
TARGET_HARD_DELETE_CONFIG,
TARGET_LOAD_METHOD_CONFIG,
TARGET_SCHEMA_CONFIG,
Expand Down Expand Up @@ -363,8 +364,9 @@ def _process_record_message(self, message_dict: dict) -> None:

if sink.is_full:
self.logger.info(
"Target sink for '%s' is full. Draining...",
"Target sink for '%s' is full. Current size is '%s'. Draining...",
sink.stream_name,
sink.current_size,
)
self.drain_one(sink)

Expand Down Expand Up @@ -610,6 +612,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:

_merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema)
_merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema)
_merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema)

capabilities = cls.capabilities

Expand Down
16 changes: 15 additions & 1 deletion tests/core/rest/test_authenticators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
)
from requests.auth import HTTPProxyAuth, _basic_auth_str

from singer_sdk.authenticators import OAuthAuthenticator, OAuthJWTAuthenticator
from singer_sdk.authenticators import (
BasicAuthenticator,
OAuthAuthenticator,
OAuthJWTAuthenticator,
)

if t.TYPE_CHECKING:
import requests_mock
Expand Down Expand Up @@ -218,3 +222,13 @@ def test_requests_library_auth(rest_tap: Tap):

assert isinstance(stream.authenticator, HTTPProxyAuth)
assert r.headers["Proxy-Authorization"] == _basic_auth_str("username", "password")


def test_basic_auth_deprecation_warning(rest_tap: Tap):
"""Validate that a warning is emitted when using BasicAuthenticator."""
stream: RESTStream = rest_tap.streams["some_stream"]
with pytest.deprecated_call(match="BasicAuthenticator is deprecated") as recorder:
BasicAuthenticator(stream=stream, username="username", password="password") # noqa: S106

assert len(recorder.list) == 1
assert recorder.list[0].filename.endswith("test_authenticators.py")
6 changes: 5 additions & 1 deletion tests/core/targets/test_target_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,9 @@ class MyTarget(SQLTargetMock, capabilities=capabilities):
pass

about = MyTarget._get_about_info()
default_settings = {"add_record_metadata", "load_method"}
default_settings = {
"add_record_metadata",
"load_method",
"batch_size_rows",
}
assert set(about.settings["properties"]) == expected_settings | default_settings
38 changes: 38 additions & 0 deletions tests/core/test_target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def test_target_about_info():
assert "flattening_max_depth" in about.settings["properties"]
assert "batch_config" in about.settings["properties"]
assert "add_record_metadata" in about.settings["properties"]
assert "batch_size_rows" in about.settings["properties"]


def test_sql_get_sink():
Expand Down Expand Up @@ -142,3 +143,40 @@ def test_add_sqlsink_and_get_sink():
target.get_sink(
"bar",
)


def test_batch_size_rows_and_max_size():
input_schema_1 = {
"properties": {
"id": {
"type": ["string", "null"],
},
"col_ts": {
"format": "date-time",
"type": ["string", "null"],
},
},
}
key_properties = []
target_default = TargetMock()
sink_default = BatchSinkMock(
target=target_default,
stream_name="foo",
schema=input_schema_1,
key_properties=key_properties,
)
target_set = TargetMock(config={"batch_size_rows": 100000})
sink_set = BatchSinkMock(
target=target_set,
stream_name="bar",
schema=input_schema_1,
key_properties=key_properties,
)
assert sink_default.stream_name == "foo"
assert sink_default._batch_size_rows is None
assert sink_default.batch_size_rows is None
assert sink_default.max_size == 10000
assert sink_set.stream_name == "bar"
assert sink_set._batch_size_rows == 100000
assert sink_set.batch_size_rows == 100000
assert sink_set.max_size == 100000

0 comments on commit 6d6da57

Please sign in to comment.