From acaf26f1b32d679927c236aff9b1345c2df281ea Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 15 Feb 2024 12:53:46 -0800 Subject: [PATCH 01/11] added batch_size_rows tests to target --- tests/core/test_target_base.py | 38 ++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/core/test_target_base.py b/tests/core/test_target_base.py index eaff6d6a1..f9bcdb871 100644 --- a/tests/core/test_target_base.py +++ b/tests/core/test_target_base.py @@ -77,6 +77,7 @@ def test_target_about_info(): assert "flattening_max_depth" in about.settings["properties"] assert "batch_config" in about.settings["properties"] assert "add_record_metadata" in about.settings["properties"] + assert "batch_size_rows" in about.settings["properties"] def test_sql_get_sink(): @@ -142,3 +143,40 @@ def test_add_sqlsink_and_get_sink(): target.get_sink( "bar", ) + + +def test_batch_size_rows_and_max_size(): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + key_properties = [] + target_default = TargetMock() + sink_default = BatchSinkMock( + target=target_default, + stream_name="foo", + schema=input_schema_1, + key_properties=key_properties, + ) + target_set = TargetMock(config={"batch_size_rows": 100000}) + sink_set = BatchSinkMock( + target=target_set, + stream_name="bar", + schema=input_schema_1, + key_properties=key_properties, + ) + assert sink_default.stream_name == "foo" + assert sink_default._batch_size_rows is None + assert sink_default.batch_size_rows is None + assert sink_default.max_size == 10000 + assert sink_set.stream_name == "bar" + assert sink_set._batch_size_rows == 100000 + assert sink_set.batch_size_rows == 100000 + assert sink_set.max_size == 100000 From 49759b4b335023e01657cce806b9d2eb3be02338 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 15 Feb 2024 13:38:51 -0800 Subject: [PATCH 02/11] add batch_size_rows test to test target sql --- tests/core/targets/test_target_sql.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/core/targets/test_target_sql.py b/tests/core/targets/test_target_sql.py index fd71c0aeb..e47845a70 100644 --- a/tests/core/targets/test_target_sql.py +++ b/tests/core/targets/test_target_sql.py @@ -46,5 +46,9 @@ class MyTarget(SQLTargetMock, capabilities=capabilities): pass about = MyTarget._get_about_info() - default_settings = {"add_record_metadata", "load_method"} + default_settings = { + "add_record_metadata", + "load_method", + "batch_size_rows", + } assert set(about.settings["properties"]) == expected_settings | default_settings From 9dbed19b6fc087ce2ef50e50e50d614751d27726 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 15 Feb 2024 13:54:24 -0800 Subject: [PATCH 03/11] added batch size row to target capabilities --- singer_sdk/helpers/capabilities.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index b48d290a4..f76400c5a 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -152,6 +152,13 @@ default=True, ), ).to_dict() +TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList( + Property( + "batch_size_rows", + IntegerType, + description="Maximum number of rows in each batch.", + ), +).to_dict() class TargetLoadMethods(str, Enum): From f8caa5ab1dc32295500838b6fbf14ba79d5a6010 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 15 Feb 2024 14:02:03 -0800 Subject: [PATCH 04/11] add batch_size_rows to target as a bulitin config --- singer_sdk/target_base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 5a78bf362..d24b5e863 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -18,6 +18,7 @@ from singer_sdk.helpers.capabilities import ( ADD_RECORD_METADATA_CONFIG, BATCH_CONFIG, + TARGET_BATCH_SIZE_ROWS_CONFIG, TARGET_HARD_DELETE_CONFIG, TARGET_LOAD_METHOD_CONFIG, TARGET_SCHEMA_CONFIG, @@ -610,6 +611,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: _merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema) _merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema) + _merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema) capabilities = cls.capabilities From 93dcdb13fc40080049bf382b5befa2f855aed6f2 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 15 Feb 2024 14:44:53 -0800 Subject: [PATCH 05/11] added code for batch_size_rows --- singer_sdk/sinks/core.py | 38 ++++++++++++++++++++++++++++---------- singer_sdk/target_base.py | 3 ++- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index e35353789..5226eae5e 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -182,6 +182,11 @@ def __init__( self._batch_records_read: int = 0 self._batch_dupe_records_merged: int = 0 + # Batch full markers + self._batch_size_rows: int | None = target.config.get( + "batch_size_rows", + ) + self._validator: BaseJSONSchemaValidator | None = self.get_validator() @cached_property @@ -249,15 +254,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002 # Size properties - @property - def max_size(self) -> int: - """Get max batch size. - - Returns: - Max number of records to batch before `is_full=True` - """ - return self.MAX_SIZE_DEFAULT - @property def current_size(self) -> int: """Get current batch size. @@ -269,13 +265,35 @@ def current_size(self) -> int: @property def is_full(self) -> bool: - """Check against size limit. + """Calls the size limit check funtion. Returns: True if the sink needs to be drained. """ return self.current_size >= self.max_size + @property + def batch_size_rows(self) -> int | None: + """Get batch_size_rows object. + + Returns: + A batch_size_rows object. + """ + return self._batch_size_rows + + @property + def max_size(self) -> int: + """Get max batch size. + + Returns: + Max number of records to batch before `is_full=True` + """ + return ( + self.batch_size_rows + if self.batch_size_rows is not None + else self.MAX_SIZE_DEFAULT + ) + # Tally methods @t.final diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index d24b5e863..3c0d37234 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -364,8 +364,9 @@ def _process_record_message(self, message_dict: dict) -> None: if sink.is_full: self.logger.info( - "Target sink for '%s' is full. Draining...", + "Target sink for '%s' is full. Current size is '%s'. Draining...", sink.stream_name, + sink.current_size, ) self.drain_one(sink) From 51f2e5706bd58a1180b922387186c58148029e26 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Mon, 19 Feb 2024 11:31:39 -0700 Subject: [PATCH 06/11] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> --- singer_sdk/sinks/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 5226eae5e..0367ada91 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -265,7 +265,7 @@ def current_size(self) -> int: @property def is_full(self) -> bool: - """Calls the size limit check funtion. + """Check against the batch size limit. Returns: True if the sink needs to be drained. From 89f684646aa4441d9d688ddb5912824e1c94b2cf Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Mon, 19 Feb 2024 11:22:15 -0800 Subject: [PATCH 07/11] applied documentaion update from review to max_size --- singer_sdk/sinks/core.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 0367ada91..3dd8892cf 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -287,6 +287,9 @@ def max_size(self) -> int: Returns: Max number of records to batch before `is_full=True` + + .. versionremoved:: 0.36.0 + This property now takes into account the ``batch_size_rows`` target setting. """ return ( self.batch_size_rows From 8b278b4b6c4526c8212f0d456f16dc9868576ce9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Mon, 19 Feb 2024 13:29:15 -0600 Subject: [PATCH 08/11] Update singer_sdk/sinks/core.py --- singer_sdk/sinks/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 3dd8892cf..1dad00f52 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -288,7 +288,7 @@ def max_size(self) -> int: Returns: Max number of records to batch before `is_full=True` - .. versionremoved:: 0.36.0 + .. versionchanged:: 0.36.0 This property now takes into account the ``batch_size_rows`` target setting. """ return ( From 678e17ff75868b8cb7b7d0d92e21758c1177fdc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Mon, 19 Feb 2024 13:41:36 -0600 Subject: [PATCH 09/11] Update singer_sdk/sinks/core.py --- singer_sdk/sinks/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 1dad00f52..179b03035 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -289,7 +289,7 @@ def max_size(self) -> int: Max number of records to batch before `is_full=True` .. versionchanged:: 0.36.0 - This property now takes into account the ``batch_size_rows`` target setting. + This property now takes into account the ``batch_size_rows`` target setting. """ return ( self.batch_size_rows From c247f826948fe57a7bdaa7f28491de461bac285e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 19 Feb 2024 13:46:30 -0600 Subject: [PATCH 10/11] chore: Link to `batch_size_rows` attribute docs --- singer_sdk/sinks/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 179b03035..c69af0fe8 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -289,7 +289,9 @@ def max_size(self) -> int: Max number of records to batch before `is_full=True` .. versionchanged:: 0.36.0 - This property now takes into account the ``batch_size_rows`` target setting. + This property now takes into account the + :meth:`~singer_sdk.Sink.batch_size_rows` attribute and the corresponding + ``batch_size_rows`` target setting. """ return ( self.batch_size_rows From e0b4425de181a05a600c9705a72787731bb50b57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 19 Feb 2024 13:54:29 -0600 Subject: [PATCH 11/11] Update `batch_size_rows` docs --- singer_sdk/sinks/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index c69af0fe8..f41d3d97f 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -274,10 +274,10 @@ def is_full(self) -> bool: @property def batch_size_rows(self) -> int | None: - """Get batch_size_rows object. + """The maximum number of rows a batch can accumulate before being processed. Returns: - A batch_size_rows object. + The max number of rows or None if not set. """ return self._batch_size_rows @@ -290,7 +290,7 @@ def max_size(self) -> int: .. versionchanged:: 0.36.0 This property now takes into account the - :meth:`~singer_sdk.Sink.batch_size_rows` attribute and the corresponding + :attr:`~singer_sdk.Sink.batch_size_rows` attribute and the corresponding ``batch_size_rows`` target setting. """ return (