diff --git a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/rest-client.py b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/rest-client.py index 26c1447c9..16ad9d23f 100644 --- a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/rest-client.py +++ b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/rest-client.py @@ -22,7 +22,7 @@ from singer_sdk.streams import {{ cookiecutter.stream_type }}Stream {% elif cookiecutter.auth_method == "Basic Auth" -%} -from singer_sdk.authenticators import BasicAuthenticator +from requests.auth import HTTPBasicAuth from singer_sdk.helpers.jsonpath import extract_jsonpath from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002 from singer_sdk.streams import {{ cookiecutter.stream_type }}Stream @@ -110,14 +110,13 @@ def authenticator(self) -> BearerTokenAuthenticator: {%- elif cookiecutter.auth_method == "Basic Auth" %} @property - def authenticator(self) -> BasicAuthenticator: + def authenticator(self) -> HTTPBasicAuth: """Return a new authenticator object. Returns: An authenticator instance. """ - return BasicAuthenticator.create_for_stream( - self, + return HTTPBasicAuth( username=self.config.get("username", ""), password=self.config.get("password", ""), ) diff --git a/pyproject.toml b/pyproject.toml index d22e6c693..5de8456b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -249,6 +249,10 @@ jsonl = "singer_sdk.contrib.batch_encoder_jsonl:JSONLinesBatcher" parquet = "singer_sdk.contrib.batch_encoder_parquet:ParquetBatcher" [tool.ruff] +extend-exclude = [ + "cookiecutter/*", + "*simpleeval*", +] line-length = 88 src = ["samples", "singer_sdk", "tests"] target-version = "py38" @@ -257,10 +261,6 @@ target-version = "py38" docstring-code-format = true [tool.ruff.lint] -exclude = [ - "cookiecutter/*", - "*simpleeval*", -] ignore = [ "ANN101", # Missing type annotation for `self` in method "ANN102", # Missing type annotation for `cls` in class method diff --git a/singer_sdk/authenticators.py b/singer_sdk/authenticators.py index 82e0556bc..faf34e85a 100644 --- a/singer_sdk/authenticators.py +++ b/singer_sdk/authenticators.py @@ -5,6 +5,7 @@ import base64 import math import typing as t +import warnings from datetime import timedelta from types import MappingProxyType from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit @@ -297,7 +298,10 @@ def create_for_stream( class BasicAuthenticator(APIAuthenticatorBase): """Implements basic authentication for REST Streams. - This Authenticator implements basic authentication by concatinating a + .. deprecated:: 0.36.0 + Use :class:`requests.auth.HTTPBasicAuth` instead. + + This Authenticator implements basic authentication by concatenating a username and password then base64 encoding the string. The resulting token will be merged with any HTTP headers specified on the stream. """ @@ -316,6 +320,13 @@ def __init__( password: API password. """ super().__init__(stream=stream) + warnings.warn( + "BasicAuthenticator is deprecated. Use " + "requests.auth.HTTPBasicAuth instead.", + DeprecationWarning, + stacklevel=2, + ) + credentials = f"{username}:{password}".encode() auth_token = base64.b64encode(credentials).decode("ascii") auth_credentials = {"Authorization": f"Basic {auth_token}"} diff --git a/singer_sdk/helpers/_flattening.py b/singer_sdk/helpers/_flattening.py index 2a3e194d0..9dcbf1281 100644 --- a/singer_sdk/helpers/_flattening.py +++ b/singer_sdk/helpers/_flattening.py @@ -33,7 +33,7 @@ def get_flattening_options( Returns: A new FlatteningOptions object or None if flattening is disabled. """ - if plugin_config.get("flattening_enabled"): + if plugin_config.get("flattening_enabled", False): return FlatteningOptions(max_level=int(plugin_config["flattening_max_depth"])) return None diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index b48d290a4..f76400c5a 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -152,6 +152,13 @@ default=True, ), ).to_dict() +TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList( + Property( + "batch_size_rows", + IntegerType, + description="Maximum number of rows in each batch.", + ), +).to_dict() class TargetLoadMethods(str, Enum): diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index e35353789..f41d3d97f 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -182,6 +182,11 @@ def __init__( self._batch_records_read: int = 0 self._batch_dupe_records_merged: int = 0 + # Batch full markers + self._batch_size_rows: int | None = target.config.get( + "batch_size_rows", + ) + self._validator: BaseJSONSchemaValidator | None = self.get_validator() @cached_property @@ -249,15 +254,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002 # Size properties - @property - def max_size(self) -> int: - """Get max batch size. - - Returns: - Max number of records to batch before `is_full=True` - """ - return self.MAX_SIZE_DEFAULT - @property def current_size(self) -> int: """Get current batch size. @@ -269,13 +265,40 @@ def current_size(self) -> int: @property def is_full(self) -> bool: - """Check against size limit. + """Check against the batch size limit. Returns: True if the sink needs to be drained. """ return self.current_size >= self.max_size + @property + def batch_size_rows(self) -> int | None: + """The maximum number of rows a batch can accumulate before being processed. + + Returns: + The max number of rows or None if not set. + """ + return self._batch_size_rows + + @property + def max_size(self) -> int: + """Get max batch size. + + Returns: + Max number of records to batch before `is_full=True` + + .. versionchanged:: 0.36.0 + This property now takes into account the + :attr:`~singer_sdk.Sink.batch_size_rows` attribute and the corresponding + ``batch_size_rows`` target setting. + """ + return ( + self.batch_size_rows + if self.batch_size_rows is not None + else self.MAX_SIZE_DEFAULT + ) + # Tally methods @t.final diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 5a78bf362..3c0d37234 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -18,6 +18,7 @@ from singer_sdk.helpers.capabilities import ( ADD_RECORD_METADATA_CONFIG, BATCH_CONFIG, + TARGET_BATCH_SIZE_ROWS_CONFIG, TARGET_HARD_DELETE_CONFIG, TARGET_LOAD_METHOD_CONFIG, TARGET_SCHEMA_CONFIG, @@ -363,8 +364,9 @@ def _process_record_message(self, message_dict: dict) -> None: if sink.is_full: self.logger.info( - "Target sink for '%s' is full. Draining...", + "Target sink for '%s' is full. Current size is '%s'. Draining...", sink.stream_name, + sink.current_size, ) self.drain_one(sink) @@ -610,6 +612,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: _merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema) _merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema) + _merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema) capabilities = cls.capabilities diff --git a/tests/core/rest/test_authenticators.py b/tests/core/rest/test_authenticators.py index 7e1da91eb..d95d69edb 100644 --- a/tests/core/rest/test_authenticators.py +++ b/tests/core/rest/test_authenticators.py @@ -21,7 +21,11 @@ ) from requests.auth import HTTPProxyAuth, _basic_auth_str -from singer_sdk.authenticators import OAuthAuthenticator, OAuthJWTAuthenticator +from singer_sdk.authenticators import ( + BasicAuthenticator, + OAuthAuthenticator, + OAuthJWTAuthenticator, +) if t.TYPE_CHECKING: import requests_mock @@ -218,3 +222,13 @@ def test_requests_library_auth(rest_tap: Tap): assert isinstance(stream.authenticator, HTTPProxyAuth) assert r.headers["Proxy-Authorization"] == _basic_auth_str("username", "password") + + +def test_basic_auth_deprecation_warning(rest_tap: Tap): + """Validate that a warning is emitted when using BasicAuthenticator.""" + stream: RESTStream = rest_tap.streams["some_stream"] + with pytest.deprecated_call(match="BasicAuthenticator is deprecated") as recorder: + BasicAuthenticator(stream=stream, username="username", password="password") # noqa: S106 + + assert len(recorder.list) == 1 + assert recorder.list[0].filename.endswith("test_authenticators.py") diff --git a/tests/core/targets/test_target_sql.py b/tests/core/targets/test_target_sql.py index fd71c0aeb..e47845a70 100644 --- a/tests/core/targets/test_target_sql.py +++ b/tests/core/targets/test_target_sql.py @@ -46,5 +46,9 @@ class MyTarget(SQLTargetMock, capabilities=capabilities): pass about = MyTarget._get_about_info() - default_settings = {"add_record_metadata", "load_method"} + default_settings = { + "add_record_metadata", + "load_method", + "batch_size_rows", + } assert set(about.settings["properties"]) == expected_settings | default_settings diff --git a/tests/core/test_target_base.py b/tests/core/test_target_base.py index eaff6d6a1..f9bcdb871 100644 --- a/tests/core/test_target_base.py +++ b/tests/core/test_target_base.py @@ -77,6 +77,7 @@ def test_target_about_info(): assert "flattening_max_depth" in about.settings["properties"] assert "batch_config" in about.settings["properties"] assert "add_record_metadata" in about.settings["properties"] + assert "batch_size_rows" in about.settings["properties"] def test_sql_get_sink(): @@ -142,3 +143,40 @@ def test_add_sqlsink_and_get_sink(): target.get_sink( "bar", ) + + +def test_batch_size_rows_and_max_size(): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + key_properties = [] + target_default = TargetMock() + sink_default = BatchSinkMock( + target=target_default, + stream_name="foo", + schema=input_schema_1, + key_properties=key_properties, + ) + target_set = TargetMock(config={"batch_size_rows": 100000}) + sink_set = BatchSinkMock( + target=target_set, + stream_name="bar", + schema=input_schema_1, + key_properties=key_properties, + ) + assert sink_default.stream_name == "foo" + assert sink_default._batch_size_rows is None + assert sink_default.batch_size_rows is None + assert sink_default.max_size == 10000 + assert sink_set.stream_name == "bar" + assert sink_set._batch_size_rows == 100000 + assert sink_set.batch_size_rows == 100000 + assert sink_set.max_size == 100000