From 8e505a074c1bca31bdc7bb3e2c8102fe95bcbe05 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 8 Jun 2022 11:29:56 +0300 Subject: [PATCH 01/10] Add api costs hook --- singer_sdk/streams/core.py | 15 ++++++++++ singer_sdk/streams/rest.py | 57 ++++++++++++++++++++++++++++++++++++++ singer_sdk/tap_base.py | 3 ++ 3 files changed, 75 insertions(+) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index bfe082562..28f7a58ab 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -76,6 +76,9 @@ class Stream(metaclass=abc.ABCMeta): parent_stream_type: Optional[Type["Stream"]] = None ignore_parent_replication_key: bool = False + # Internal API cost aggregator + _api_costs: Dict[str, int] = {} + def __init__( self, tap: TapBaseClass, @@ -854,6 +857,18 @@ def _write_request_duration_log( extra_tags["context"] = context self._write_metric_log(metric=request_duration_metric, extra_tags=extra_tags) + def log_api_costs(self) -> None: + """Log a summary of API costs. + + The costs are calculated via `calculate_api_request_cost`. + This method can be overridden to log results in a custom + format. It is only called once at the end of the life of + the stream. + """ + if len(self._api_costs) > 0: + msg = f"Total API costs for stream {self.name}: {self._api_costs}" + self.logger.info(msg) + def _check_max_record_limit(self, record_count: int) -> None: """TODO. diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index fa6040936..c88bb9bdb 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -320,6 +320,7 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]: context, next_page_token=next_page_token ) resp = decorated_request(prepared_request, context) + self.update_api_costs(prepared_request, resp, context) yield from self.parse_response(resp) previous_token = copy.deepcopy(next_page_token) next_page_token = self.get_next_page_token( @@ -333,8 +334,64 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]: # Cycle until get_next_page_token() no longer returns a value finished = not next_page_token + def update_api_costs( + self, + request: requests.PreparedRequest, + response: requests.Response, + context: Optional[dict], + ) -> dict[str, int]: + """Update internal calculation of API costs. + + Args: + request: the API Request object that was just called. + response: the `requests.Response` object + context: the context passed to the call + + Returns: + A dict of costs (for the single request) whose keys are + the "cost domains". See `calculate_api_request_cost` for details. + """ + call_costs = self.calculate_api_request_cost(request, response, context) + self._api_costs = { + k: self._api_costs.get(k, 0) + call_costs.get(k, 0) + for k in call_costs.keys() + } + return self._api_costs + # Overridable: + def calculate_api_request_cost( + self, + request: requests.PreparedRequest, + response: requests.Response, + context: Optional[dict], + ) -> dict[str, int]: + """Calculate the cost of the last API call made. + + This method can optionally be implemented in streams to calculate + the costs (in arbitrary units to be defined by the tap developer) + associated with a single API call. The request and response objects + are available in the callback, as well as the context. + + The method returns a dict where the keys are arbitrary cost dimensions, + and the values the cost along each dimension for this one call. For + instance: { "rest": 0, "graphql": 42 } for a call to github's graphql API. + All keys should be present in the dict. + + This method can be overridden by tap streams. By default it won't do + anything. + + Args: + request: the API Request object that was just called. + response: the `requests.Response` object + context: the context passed to the call + + Returns: + A dict of accumulated costs whose keys are the "cost domains". See + `calculate_api_request_cost` for details. + """ + return {} + def prepare_request_payload( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Optional[dict]: diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 5e9d814f4..9d71ce29c 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -379,6 +379,9 @@ def sync_all(self) -> None: stream.sync() stream.finalize_state_progress_markers() + for stream in self.streams.values(): + stream.log_api_costs() + # Command Line Execution @classproperty From c3160a7c09d1182e753c542eee310c8280b7d5fa Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 8 Jun 2022 13:02:00 +0300 Subject: [PATCH 02/10] Correct typing hints for older pythons --- singer_sdk/streams/rest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index c88bb9bdb..56ecf0365 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -338,8 +338,8 @@ def update_api_costs( self, request: requests.PreparedRequest, response: requests.Response, - context: Optional[dict], - ) -> dict[str, int]: + context: Optional[Dict], + ) -> Dict[str, int]: """Update internal calculation of API costs. Args: @@ -364,8 +364,8 @@ def calculate_api_request_cost( self, request: requests.PreparedRequest, response: requests.Response, - context: Optional[dict], - ) -> dict[str, int]: + context: Optional[Dict], + ) -> Dict[str, int]: """Calculate the cost of the last API call made. This method can optionally be implemented in streams to calculate From 002af0a31c8535ee3def59bef891398e3d1e7f61 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Tue, 14 Jun 2022 06:49:56 +0000 Subject: [PATCH 03/10] Rename api to sync Co-authored-by: Edgar R. M. --- singer_sdk/streams/core.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 28f7a58ab..2f0a2621f 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -857,16 +857,16 @@ def _write_request_duration_log( extra_tags["context"] = context self._write_metric_log(metric=request_duration_metric, extra_tags=extra_tags) - def log_api_costs(self) -> None: - """Log a summary of API costs. + def log_sync_costs(self) -> None: + """Log a summary of Sync costs. - The costs are calculated via `calculate_api_request_cost`. + The costs are calculated via `calculate_sync_cost`. This method can be overridden to log results in a custom format. It is only called once at the end of the life of the stream. """ - if len(self._api_costs) > 0: - msg = f"Total API costs for stream {self.name}: {self._api_costs}" + if len(self._sync_costs) > 0: + msg = f"Total Sync costs for stream {self.name}: {self._sync_costs}" self.logger.info(msg) def _check_max_record_limit(self, record_count: int) -> None: From 937d81a286832f8f711c7bde1aab193286f00c25 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Tue, 14 Jun 2022 06:51:46 +0000 Subject: [PATCH 04/10] Apply suggestions from code review Co-authored-by: Edgar R. M. --- singer_sdk/streams/core.py | 2 +- singer_sdk/streams/rest.py | 8 ++++---- singer_sdk/tap_base.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 2f0a2621f..000bf738a 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -77,7 +77,7 @@ class Stream(metaclass=abc.ABCMeta): ignore_parent_replication_key: bool = False # Internal API cost aggregator - _api_costs: Dict[str, int] = {} + _sync_costs: Dict[str, int] = {} def __init__( self, diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index 56ecf0365..b11766442 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -320,7 +320,7 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]: context, next_page_token=next_page_token ) resp = decorated_request(prepared_request, context) - self.update_api_costs(prepared_request, resp, context) + self.update_sync_costs(prepared_request, resp, context) yield from self.parse_response(resp) previous_token = copy.deepcopy(next_page_token) next_page_token = self.get_next_page_token( @@ -352,11 +352,11 @@ def update_api_costs( the "cost domains". See `calculate_api_request_cost` for details. """ call_costs = self.calculate_api_request_cost(request, response, context) - self._api_costs = { - k: self._api_costs.get(k, 0) + call_costs.get(k, 0) + self._sync_costs = { + k: self._sync_costs.get(k, 0) + call_costs.get(k, 0) for k in call_costs.keys() } - return self._api_costs + return self._sync_costs # Overridable: diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 9d71ce29c..5ce65a814 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -380,7 +380,7 @@ def sync_all(self) -> None: stream.finalize_state_progress_markers() for stream in self.streams.values(): - stream.log_api_costs() + stream.log_sync_costs() # Command Line Execution From 3502aa05c063ff5f3d590adae150677bee3f65e2 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Tue, 14 Jun 2022 10:48:44 +0300 Subject: [PATCH 05/10] Rename cost methods --- singer_sdk/streams/rest.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index b11766442..fa9174bef 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -334,24 +334,24 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]: # Cycle until get_next_page_token() no longer returns a value finished = not next_page_token - def update_api_costs( + def update_sync_costs( self, request: requests.PreparedRequest, response: requests.Response, context: Optional[Dict], ) -> Dict[str, int]: - """Update internal calculation of API costs. + """Update internal calculation of Sync costs. Args: - request: the API Request object that was just called. + request: the Request object that was just called. response: the `requests.Response` object context: the context passed to the call Returns: A dict of costs (for the single request) whose keys are - the "cost domains". See `calculate_api_request_cost` for details. + the "cost domains". See `calculate_sync_cost` for details. """ - call_costs = self.calculate_api_request_cost(request, response, context) + call_costs = self.calculate_sync_cost(request, response, context) self._sync_costs = { k: self._sync_costs.get(k, 0) + call_costs.get(k, 0) for k in call_costs.keys() @@ -360,7 +360,7 @@ def update_api_costs( # Overridable: - def calculate_api_request_cost( + def calculate_sync_cost( self, request: requests.PreparedRequest, response: requests.Response, @@ -370,7 +370,7 @@ def calculate_api_request_cost( This method can optionally be implemented in streams to calculate the costs (in arbitrary units to be defined by the tap developer) - associated with a single API call. The request and response objects + associated with a single API/network call. The request and response objects are available in the callback, as well as the context. The method returns a dict where the keys are arbitrary cost dimensions, @@ -387,8 +387,7 @@ def calculate_api_request_cost( context: the context passed to the call Returns: - A dict of accumulated costs whose keys are the "cost domains". See - `calculate_api_request_cost` for details. + A dict of accumulated costs whose keys are the "cost domains". """ return {} From d7ca4618c2f1e8dff142982111bdb4c0d7ca8fb8 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Tue, 14 Jun 2022 10:48:54 +0300 Subject: [PATCH 06/10] Add sync costs calculation test --- tests/core/test_streams.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index 8e1d44030..9a0c07f3d 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -357,3 +357,23 @@ def test_cached_jsonpath(): # cached objects should point to the same memory location assert recompiled is compiled + + +def test_sync_costs_calculation(tap: SimpleTestTap): + """Test sync costs are added up correctly.""" + fake_request = requests.PreparedRequest() + fake_response = requests.Response() + + stream = RestTestStream(tap) + + def calculate_test_cost( + request: requests.PreparedRequest, + response: requests.Response, + context: Optional[Dict], + ): + return {"dim1": 1, "dim2": 2} + + stream.calculate_sync_cost = calculate_test_cost + stream.update_sync_costs(fake_request, fake_response, None) + stream.update_sync_costs(fake_request, fake_response, None) + assert stream._sync_costs == {"dim1": 2, "dim2": 4} From ad0c4483c8b2742d5f83383f3e413c7111d68947 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Sun, 19 Jun 2022 09:26:40 +0000 Subject: [PATCH 07/10] Use a single loop for logging costs Co-authored-by: Edgar R. M. --- singer_sdk/tap_base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 5ce65a814..41d0eab41 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -378,10 +378,7 @@ def sync_all(self) -> None: stream.sync() stream.finalize_state_progress_markers() - - for stream in self.streams.values(): stream.log_sync_costs() - # Command Line Execution @classproperty From 09722061ae815713b040f3d11766edbdc4b8ce58 Mon Sep 17 00:00:00 2001 From: Eric Boucher Date: Sun, 19 Jun 2022 19:55:17 +0200 Subject: [PATCH 08/10] Update tap_base.py --- singer_sdk/tap_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 41d0eab41..8f4f59e27 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -379,6 +379,7 @@ def sync_all(self) -> None: stream.sync() stream.finalize_state_progress_markers() stream.log_sync_costs() + # Command Line Execution @classproperty From 47cad7979676edf0e073f87147a640f11f48aa16 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Tue, 21 Jun 2022 10:25:37 +0000 Subject: [PATCH 09/10] Add test for log_sync_costs Co-authored-by: Edgar R. M. --- tests/core/test_streams.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index 9a0c07f3d..df44a6a00 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -359,7 +359,7 @@ def test_cached_jsonpath(): assert recompiled is compiled -def test_sync_costs_calculation(tap: SimpleTestTap): +def test_sync_costs_calculation(tap: SimpleTestTap, caplog): """Test sync costs are added up correctly.""" fake_request = requests.PreparedRequest() fake_response = requests.Response() @@ -377,3 +377,12 @@ def calculate_test_cost( stream.update_sync_costs(fake_request, fake_response, None) stream.update_sync_costs(fake_request, fake_response, None) assert stream._sync_costs == {"dim1": 2, "dim2": 4} + + with caplog.at_level(logging.INFO, logger=tap.name): + stream.log_sync_costs() + + assert len(caplog.records) == 1 + + for record in caplog.records: + assert record.levelname == "INFO" + assert f"Total Sync costs for stream {stream.name}" in record.message From e60949251eb869aae0ef3a1300cc38f80aa6c8cf Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Tue, 21 Jun 2022 21:22:16 +0300 Subject: [PATCH 10/10] Add missing import --- tests/core/test_streams.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index df44a6a00..2b87dd75b 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -1,5 +1,6 @@ """Stream tests.""" +import logging from typing import Any, Dict, Iterable, List, Optional, cast import pendulum