From 14d4461ef37d42eb89b9bd21cc7ffedf54bf3ef3 Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Thu, 28 Mar 2024 20:51:53 +0000 Subject: [PATCH 1/3] Implement date window logic to fix memory issue on server --- tap_pendo/streams.py | 170 +++++++++++++++++++++++++------------------ 1 file changed, 99 insertions(+), 71 deletions(-) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 48d7eb2..7f7c92a 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -20,7 +20,7 @@ from singer import Transformer, metadata from singer.utils import now, strftime, strptime_to_utc from tap_pendo import utils as tap_pendo_utils - +from tap_pendo.exception import QueryExceededMemoryLimitException KEY_PROPERTIES = ['id'] US_BASE_URL = "https://app.pendo.io" @@ -182,6 +182,9 @@ def send_request_get_results(self, req, endpoint, params, count, **kwargs): time.sleep(retry_after) raise Server42xRateLimitError(resp.reason) + if resp.status_code in [400, 408] and "Query exceeded memory limit" in resp.text: + raise QueryExceededMemoryLimitException() + resp.raise_for_status() # Check for requests status and raise exception in failure Stream.request_retry_count = 1 # Reset retry count after success @@ -524,7 +527,7 @@ def get_pipeline_key_index(self, body, search_key): return index return None - def get_body(self, key_id=None, period=None, first=None): + def get_body(self, key_id=None, period=None, first=None, last="now()"): """This method will be overriden in the child class""" return {} @@ -538,7 +541,6 @@ def set_request_body_filters(self, body, start_time, records=None): limit_index = self.get_pipeline_key_index(body, 'limit') filter_index = self.get_pipeline_key_index(body, 'filter') - body['request']['pipeline'][limit_index]['limit'] = self.record_limit if isinstance(self, Accounts): replication_key = 'metadata.auto.lastupdated' replication_key_value = records[-1]['metadata']['auto']['lastupdated'] if records and len(records) > 0 else None @@ -600,7 +602,7 @@ def remove_last_timestamp_records(self, records): def get_first_parameter_value(self, body): return body['request']['pipeline'][0]['source']['timeSeries'].get('first', 0) - def set_time_series_first(self, body, records, first=None): + def set_time_series_first(self, body, records, first=None, last="now()"): """Sets the timeSeries 'first' parameter in request body""" if len(records) > 1: # This condition considers that within current time window there could be some more records @@ -612,6 +614,8 @@ def set_time_series_first(self, body, records, first=None): else: body['request']['pipeline'][0]['source']['timeSeries']['first'] = int(datetime.now().timestamp() * 1000) + body['request']['pipeline'][0]['source']['timeSeries']['last'] = str(last) + return body def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): @@ -761,16 +765,18 @@ class EventsBase(Stream): DATE_WINDOW_SIZE = 1 key_properties = ['visitor_id', 'account_id', 'remote_ip'] replication_method = "INCREMENTAL" + start_date = None + date_windows = None def __init__(self, config): super().__init__(config=config) self.config = config self.period = config.get('period') + self.start_date = config.get('start_date') self.replication_key = "day" if self.period == 'dayRange' else "hour" self.last_processed = None - - def get_body(self, key_id, period, first): + def get_body(self, key_id, period, first, last="now()"): """This method returns generic request body of events steams""" sort_key = humps.camelize(self.replication_key) @@ -785,13 +791,13 @@ def get_body(self, key_id, period, first): "timeSeries": { "period": period, "first": first, - "last": "now()" + "last": str(last) } } }, { "sort": [sort_key] }, { - "filter": f"{sort_key}>=1" + "filter": f"{sort_key}>={first}" }, { "limit": self.record_limit } @@ -802,6 +808,67 @@ def get_body(self, key_id, period, first): def get_first_parameter_value(self, body): return body['request']['pipeline'][0]['source']['timeSeries'].get('first', 0) + def get_last_parameter_value(self, body): + return body['request']['pipeline'][0]['source']['timeSeries'].get('last', 'now()') + + def get_record_count(self, first, last): + # Get number of records to be fetched using current filter + body = self.get_body(None, self.period, first, last) + filter_index = self.get_pipeline_key_index(body, "filter") + body["request"]["pipeline"][filter_index]["filter"] = body["request"]["pipeline"][filter_index]["filter"].replace( + ">=", ">") + body["request"]["pipeline"].pop(self.get_pipeline_key_index(body, 'limit')) + body["request"]["pipeline"].append({"count": None}) + return self.request(self.name, json=body)["results"][0]["count"] + + def create_date_windows(self, from_date, to_date): + max_record_count = 5 * API_RECORD_LIMIT + mid_date = int(from_date + (to_date - from_date) / 2) + first_record_count = self.get_record_count(from_date, mid_date) + last_record_count = self.get_record_count(mid_date + 1, to_date) + date_windows = list() + if first_record_count: + if first_record_count <= max_record_count: + date_windows.append((from_date, mid_date, first_record_count)) + else: + date_windows += self.create_date_windows(from_date + 1, mid_date) + + if last_record_count: + if last_record_count <= max_record_count: + date_windows.append((mid_date, to_date, last_record_count)) + else: + date_windows += self.create_date_windows(mid_date + 1, to_date) + + # merge the adjacent windows if their sum is lesser than max record limit + merged_date_windows = [] + current_start, current_end, record_count = date_windows[0] + + for start, end, value in date_windows[1:]: + if record_count + value < max_record_count: + current_end = end + record_count += value + else: + merged_date_windows.append((current_start, current_end, record_count)) + current_start, current_end, record_count = start, end, value + + merged_date_windows.append((current_start, current_end, record_count)) + + return merged_date_windows + + def remove_duplicate_records(self, new_records, last_processed): + if not last_processed: + return new_records + # Convert lists of dictionaries to sets of tuples + new_records_set = set(tuple(sorted(d.items())) for d in new_records) + + # Merge sets to remove duplicates + unique_set = new_records_set.difference(set(tuple(sorted(d.items())) for d in last_processed)) + + # Convert set of tuples back to list of dictionaries + unique_records = [dict(item) for item in unique_set] + + return unique_records + def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): update_currently_syncing(state, self.name) @@ -814,75 +881,35 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): # If last processed records exists, then set first to timestamp of first record try: - first = self.last_processed[0][humps.decamelize( + start_time = self.last_processed[0][humps.decamelize( self.replication_key)] if self.last_processed else int(lookback.timestamp()) * 1000 except Exception as e: LOGGER.info(str(e)) + end_time = int(datetime.now().timestamp() * 1000) - # Setup body for first request - body = self.get_body(key_id, period, first) - self.set_time_series_first(body, [], first) - self.set_request_body_filters(body, first, []) - - ts_now = int(datetime.now().timestamp() * 1000) - events = [] - while self.get_first_parameter_value(body) <= ts_now: - records = self.request(self.name, json=body).get('results') or [] - self.set_time_series_first(body, records) + if self.date_windows is None: + self.date_windows = self.create_date_windows(start_time, end_time) + LOGGER.info("Date windows identified:") + for window in self.date_windows: + LOGGER.info(f"\t{window}") - if len(records) > 1: - if len(records) < self.record_limit: - # If response returns less records than record limit means there are no more records to sync - events += records - self.last_processed = None - break + start_time, end_time, record_count = self.date_windows.pop(0) + body = self.get_body(key_id, period, start_time, end_time) + body = self.set_time_series_first(body, [], start_time, end_time) + body = self.set_request_body_filters(body, start_time, []) - # Remove last processed and none replication_value records - self.remove_empty_replication_key_records(records) - records, removed_records = self.remove_last_timestamp_records(records) - if len(records) > 0: - events += records + stream_records = self.request(self.name, json=body).get('results') or [] - if self.last_processed == removed_records: - events += removed_records - self.last_processed = None - break + if len(stream_records) >= self.record_limit: + # if record limit has reached then there is a possibility to have more records to extract + # so we should set the start next extraction from replication value of last record. + # But we must remove overlapping records from first sync + stream_records, self.last_processed = self.remove_last_timestamp_records(stream_records) - self.last_processed = removed_records - else: - # This block handles race condition where all records have same replication key value - first = self.last_processed[0][humps.decamelize( - self.replication_key)] if self.last_processed else int(lookback.timestamp()) * 1000 - - body = self.get_body(key_id, period, first) - continue - - elif len(records) == 1: - events += records - self.last_processed = None - break - - # Set first and filters for next request - self.set_request_body_filters( - body, - self.get_first_parameter_value(body), - records) - - # If record limit set is reached, then return the extracted records - # Set the last processed records to ressume the extraction - if len(events) >= self.record_limit: - return (self.stream, events), True - - # These is a corner cases where this limit may get changed so reseeting it before next iteration - self.record_limit = self.get_default_record_limit() - - # Add none replication_value records into records with valid replication_value - events.extend(self.empty_replication_key_records) - self.empty_replication_key_records = [] - - update_currently_syncing(state, None) - return (self.stream, events), False + start_time = self.last_processed[0][humps.decamelize(self.replication_key)] + self.date_windows.insert(0, (start_time, end_time, record_count - len(self.last_processed))) + return (self.stream, stream_records), len(self.date_windows) > 0 class Accounts(Stream): name = "accounts" @@ -1044,8 +1071,8 @@ def get_events(self, window_start_date, state, bookmark_dttm): def transform(self, record): return humps.decamelize(record) - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"events": {"appId": self.config["app_ids"]}}) return body @@ -1304,7 +1331,8 @@ def get_record_count(self): # Get number of records to be fetched using current filter body = self.get_body() body["request"]["pipeline"].append({"count": None}) - return list(self.request(self.name, json=body))[0]["count"] + record_count = list(self.request(self.name, json=body))[0]["count"] + return record_count def is_loop_required(self): # If number of records equal to record then assume there are more records to be synced From 4e07ab39c4430e3e90ec5b44a8b8d688112ae740 Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Mon, 1 Apr 2024 11:46:08 +0000 Subject: [PATCH 2/3] - Add query exceeded memory limit exception - Add exception.py --- tap_pendo/__init__.py | 5 +---- tap_pendo/exception.py | 9 +++++++++ tap_pendo/streams.py | 6 +----- 3 files changed, 11 insertions(+), 9 deletions(-) create mode 100644 tap_pendo/exception.py diff --git a/tap_pendo/__init__.py b/tap_pendo/__init__.py index 4392710..714489b 100644 --- a/tap_pendo/__init__.py +++ b/tap_pendo/__init__.py @@ -8,6 +8,7 @@ from singer import metrics as singer_metrics from singer import utils from tap_pendo.discover import discover_streams +from tap_pendo.exception import DependencyException from tap_pendo.streams import STREAMS, SUB_STREAMS, update_currently_syncing from tap_pendo.sync import sync_full_table, sync_stream @@ -37,10 +38,6 @@ def get_sub_stream_ids(): return sub_stream_ids -class DependencyException(Exception): - pass - - def validate_dependencies(selected_stream_ids): # Validate and raise exceptions if sub-streams are selected but related parents are not selected errs = [] diff --git a/tap_pendo/exception.py b/tap_pendo/exception.py new file mode 100644 index 0000000..79038b3 --- /dev/null +++ b/tap_pendo/exception.py @@ -0,0 +1,9 @@ +class DependencyException(Exception): + pass + +class QueryExceededMemoryLimitException(Exception): + pass + + +class Server42xRateLimitError(Exception): + pass diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 7f7c92a..1a5d6eb 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -20,7 +20,7 @@ from singer import Transformer, metadata from singer.utils import now, strftime, strptime_to_utc from tap_pendo import utils as tap_pendo_utils -from tap_pendo.exception import QueryExceededMemoryLimitException +from tap_pendo.exception import QueryExceededMemoryLimitException, Server42xRateLimitError KEY_PROPERTIES = ['id'] US_BASE_URL = "https://app.pendo.io" @@ -101,10 +101,6 @@ def reset_request_retry_count(details): Stream.request_retry_count = 1 -class Server42xRateLimitError(Exception): - pass - - class Endpoints(): endpoint = "" method = "" From 527fd3aa08cf6c32cd13cad2a2d00b8f4892ecbb Mon Sep 17 00:00:00 2001 From: RushiT0122 Date: Wed, 3 Apr 2024 16:43:55 +0000 Subject: [PATCH 3/3] Fix child event streams --- tap_pendo/streams.py | 93 ++++++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 43 deletions(-) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 1a5d6eb..83e2552 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -22,6 +22,7 @@ from tap_pendo import utils as tap_pendo_utils from tap_pendo.exception import QueryExceededMemoryLimitException, Server42xRateLimitError + KEY_PROPERTIES = ['id'] US_BASE_URL = "https://app.pendo.io" EU_BASE_URL = "https://app.eu.pendo.io" @@ -807,9 +808,9 @@ def get_first_parameter_value(self, body): def get_last_parameter_value(self, body): return body['request']['pipeline'][0]['source']['timeSeries'].get('last', 'now()') - def get_record_count(self, first, last): + def get_record_count(self, first, last, key_id): # Get number of records to be fetched using current filter - body = self.get_body(None, self.period, first, last) + body = self.get_body(key_id, self.period, first, last) filter_index = self.get_pipeline_key_index(body, "filter") body["request"]["pipeline"][filter_index]["filter"] = body["request"]["pipeline"][filter_index]["filter"].replace( ">=", ">") @@ -817,39 +818,41 @@ def get_record_count(self, first, last): body["request"]["pipeline"].append({"count": None}) return self.request(self.name, json=body)["results"][0]["count"] - def create_date_windows(self, from_date, to_date): + def create_date_windows(self, from_date, to_date, key_id=None): max_record_count = 5 * API_RECORD_LIMIT mid_date = int(from_date + (to_date - from_date) / 2) - first_record_count = self.get_record_count(from_date, mid_date) - last_record_count = self.get_record_count(mid_date + 1, to_date) + first_record_count = self.get_record_count(from_date, mid_date, key_id) + last_record_count = self.get_record_count(mid_date + 1, to_date, key_id) date_windows = list() if first_record_count: if first_record_count <= max_record_count: date_windows.append((from_date, mid_date, first_record_count)) else: - date_windows += self.create_date_windows(from_date + 1, mid_date) + date_windows += self.create_date_windows(from_date + 1, mid_date, key_id) if last_record_count: if last_record_count <= max_record_count: date_windows.append((mid_date, to_date, last_record_count)) else: - date_windows += self.create_date_windows(mid_date + 1, to_date) + date_windows += self.create_date_windows(mid_date + 1, to_date, key_id) # merge the adjacent windows if their sum is lesser than max record limit - merged_date_windows = [] - current_start, current_end, record_count = date_windows[0] - - for start, end, value in date_windows[1:]: - if record_count + value < max_record_count: - current_end = end - record_count += value - else: - merged_date_windows.append((current_start, current_end, record_count)) - current_start, current_end, record_count = start, end, value + if len(date_windows) > 1: + merged_date_windows = [] + current_start, current_end, record_count = date_windows[0] + + for start, end, value in date_windows[1:]: + if record_count + value < max_record_count: + current_end = end + record_count += value + else: + merged_date_windows.append((current_start, current_end, record_count)) + current_start, current_end, record_count = start, end, value - merged_date_windows.append((current_start, current_end, record_count)) + merged_date_windows.append((current_start, current_end, record_count)) + date_windows = merged_date_windows - return merged_date_windows + return date_windows def remove_duplicate_records(self, new_records, last_processed): if not last_processed: @@ -884,28 +887,32 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): end_time = int(datetime.now().timestamp() * 1000) if self.date_windows is None: - self.date_windows = self.create_date_windows(start_time, end_time) + self.date_windows = self.create_date_windows(start_time, end_time, key_id) LOGGER.info("Date windows identified:") for window in self.date_windows: LOGGER.info(f"\t{window}") - start_time, end_time, record_count = self.date_windows.pop(0) - body = self.get_body(key_id, period, start_time, end_time) - body = self.set_time_series_first(body, [], start_time, end_time) - body = self.set_request_body_filters(body, start_time, []) + if len(self.date_windows): + start_time, end_time, record_count = self.date_windows.pop(0) + body = self.get_body(key_id, period, start_time, end_time) + body = self.set_time_series_first(body, [], start_time, end_time) + body = self.set_request_body_filters(body, start_time, []) + + stream_records = self.request(self.name, json=body).get('results') or [] - stream_records = self.request(self.name, json=body).get('results') or [] + if len(stream_records) >= self.record_limit: + # if record limit has reached then there is a possibility to have more records to extract + # so we should set the start next extraction from replication value of last record. + # But we must remove overlapping records from first sync + stream_records, self.last_processed = self.remove_last_timestamp_records(stream_records) - if len(stream_records) >= self.record_limit: - # if record limit has reached then there is a possibility to have more records to extract - # so we should set the start next extraction from replication value of last record. - # But we must remove overlapping records from first sync - stream_records, self.last_processed = self.remove_last_timestamp_records(stream_records) + start_time = self.last_processed[0][humps.decamelize(self.replication_key)] + self.date_windows.insert(0, (start_time, end_time, record_count - len(self.last_processed))) - start_time = self.last_processed[0][humps.decamelize(self.replication_key)] - self.date_windows.insert(0, (start_time, end_time, record_count - len(self.last_processed))) + return (self.stream, stream_records), len(self.date_windows) > 0 - return (self.stream, stream_records), len(self.date_windows) > 0 + self.date_windows = None + return (self.stream, []), False class Accounts(Stream): name = "accounts" @@ -986,8 +993,8 @@ def __init__(self, config): super().__init__(config=config) self.key_properties.append("day" if self.period == 'dayRange' else "hour") - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"featureEvents": {"featureId": key_id}}) return body @@ -1084,8 +1091,8 @@ def __init__(self, config): self.period = config.get('period') self.replication_key = 'browser_time' - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"pollEvents": {"appId": self.config["app_ids"]}}) return body @@ -1099,8 +1106,8 @@ def __init__(self, config): super().__init__(config=config) self.key_properties.append("day" if self.period == 'dayRange' else "hour") - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"trackEvents": {"trackTypeId": key_id}}) return body @@ -1113,8 +1120,8 @@ def __init__(self, config): super().__init__(config=config) self.replication_key = 'browser_time' - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"guideEvents": {"guideId": key_id}}) return body @@ -1210,8 +1217,8 @@ def __init__(self, config): super().__init__(config=config) self.key_properties.append("day" if self.period == 'dayRange' else "hour") - def get_body(self, key_id, period, first): - body = super().get_body(key_id, period, first) + def get_body(self, key_id, period, first, last=None): + body = super().get_body(key_id, period, first, last) body['request']['pipeline'][0]['source'].update({"pageEvents": {"pageId": key_id}}) return body