Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement date window logic to fix memory issue on server #130

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions tap_pendo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from singer import metrics as singer_metrics
from singer import utils
from tap_pendo.discover import discover_streams
from tap_pendo.exception import DependencyException
from tap_pendo.streams import STREAMS, SUB_STREAMS, update_currently_syncing
from tap_pendo.sync import sync_full_table, sync_stream

Expand Down Expand Up @@ -37,10 +38,6 @@ def get_sub_stream_ids():
return sub_stream_ids


class DependencyException(Exception):
pass


def validate_dependencies(selected_stream_ids):
# Validate and raise exceptions if sub-streams are selected but related parents are not selected
errs = []
Expand Down
9 changes: 9 additions & 0 deletions tap_pendo/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class DependencyException(Exception):
pass

class QueryExceededMemoryLimitException(Exception):
pass


class Server42xRateLimitError(Exception):
pass
197 changes: 114 additions & 83 deletions tap_pendo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from singer import Transformer, metadata
from singer.utils import now, strftime, strptime_to_utc
from tap_pendo import utils as tap_pendo_utils
from tap_pendo.exception import QueryExceededMemoryLimitException, Server42xRateLimitError


KEY_PROPERTIES = ['id']
Expand Down Expand Up @@ -101,10 +102,6 @@ def reset_request_retry_count(details):
Stream.request_retry_count = 1


class Server42xRateLimitError(Exception):
pass


class Endpoints():
endpoint = ""
method = ""
Expand Down Expand Up @@ -182,6 +179,9 @@ def send_request_get_results(self, req, endpoint, params, count, **kwargs):
time.sleep(retry_after)
raise Server42xRateLimitError(resp.reason)

if resp.status_code in [400, 408] and "Query exceeded memory limit" in resp.text:
raise QueryExceededMemoryLimitException()

resp.raise_for_status() # Check for requests status and raise exception in failure
Stream.request_retry_count = 1 # Reset retry count after success

Expand Down Expand Up @@ -524,7 +524,7 @@ def get_pipeline_key_index(self, body, search_key):
return index
return None

def get_body(self, key_id=None, period=None, first=None):
def get_body(self, key_id=None, period=None, first=None, last="now()"):
"""This method will be overriden in the child class"""
return {}

Expand All @@ -538,7 +538,6 @@ def set_request_body_filters(self, body, start_time, records=None):
limit_index = self.get_pipeline_key_index(body, 'limit')
filter_index = self.get_pipeline_key_index(body, 'filter')

body['request']['pipeline'][limit_index]['limit'] = self.record_limit
if isinstance(self, Accounts):
replication_key = 'metadata.auto.lastupdated'
replication_key_value = records[-1]['metadata']['auto']['lastupdated'] if records and len(records) > 0 else None
Expand Down Expand Up @@ -600,7 +599,7 @@ def remove_last_timestamp_records(self, records):
def get_first_parameter_value(self, body):
return body['request']['pipeline'][0]['source']['timeSeries'].get('first', 0)

def set_time_series_first(self, body, records, first=None):
def set_time_series_first(self, body, records, first=None, last="now()"):
"""Sets the timeSeries 'first' parameter in request body"""
if len(records) > 1:
# This condition considers that within current time window there could be some more records
Expand All @@ -612,6 +611,8 @@ def set_time_series_first(self, body, records, first=None):
else:
body['request']['pipeline'][0]['source']['timeSeries']['first'] = int(datetime.now().timestamp() * 1000)

body['request']['pipeline'][0]['source']['timeSeries']['last'] = str(last)

return body

def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
Expand Down Expand Up @@ -761,16 +762,18 @@ class EventsBase(Stream):
DATE_WINDOW_SIZE = 1
key_properties = ['visitor_id', 'account_id', 'remote_ip']
replication_method = "INCREMENTAL"
start_date = None
date_windows = None

def __init__(self, config):
super().__init__(config=config)
self.config = config
self.period = config.get('period')
self.start_date = config.get('start_date')
self.replication_key = "day" if self.period == 'dayRange' else "hour"
self.last_processed = None


def get_body(self, key_id, period, first):
def get_body(self, key_id, period, first, last="now()"):
"""This method returns generic request body of events steams"""

sort_key = humps.camelize(self.replication_key)
Expand All @@ -785,13 +788,13 @@ def get_body(self, key_id, period, first):
"timeSeries": {
"period": period,
"first": first,
"last": "now()"
"last": str(last)
}
}
}, {
"sort": [sort_key]
}, {
"filter": f"{sort_key}>=1"
"filter": f"{sort_key}>={first}"
}, {
"limit": self.record_limit
}
Expand All @@ -802,6 +805,69 @@ def get_body(self, key_id, period, first):
def get_first_parameter_value(self, body):
return body['request']['pipeline'][0]['source']['timeSeries'].get('first', 0)

def get_last_parameter_value(self, body):
return body['request']['pipeline'][0]['source']['timeSeries'].get('last', 'now()')

def get_record_count(self, first, last, key_id):
# Get number of records to be fetched using current filter
body = self.get_body(key_id, self.period, first, last)
filter_index = self.get_pipeline_key_index(body, "filter")
body["request"]["pipeline"][filter_index]["filter"] = body["request"]["pipeline"][filter_index]["filter"].replace(
">=", ">")
body["request"]["pipeline"].pop(self.get_pipeline_key_index(body, 'limit'))
body["request"]["pipeline"].append({"count": None})
return self.request(self.name, json=body)["results"][0]["count"]

def create_date_windows(self, from_date, to_date, key_id=None):
max_record_count = 5 * API_RECORD_LIMIT
mid_date = int(from_date + (to_date - from_date) / 2)
first_record_count = self.get_record_count(from_date, mid_date, key_id)
last_record_count = self.get_record_count(mid_date + 1, to_date, key_id)
date_windows = list()
if first_record_count:
if first_record_count <= max_record_count:
date_windows.append((from_date, mid_date, first_record_count))
else:
date_windows += self.create_date_windows(from_date + 1, mid_date, key_id)

if last_record_count:
if last_record_count <= max_record_count:
date_windows.append((mid_date, to_date, last_record_count))
else:
date_windows += self.create_date_windows(mid_date + 1, to_date, key_id)

# merge the adjacent windows if their sum is lesser than max record limit
if len(date_windows) > 1:
merged_date_windows = []
current_start, current_end, record_count = date_windows[0]

for start, end, value in date_windows[1:]:
if record_count + value < max_record_count:
current_end = end
record_count += value
else:
merged_date_windows.append((current_start, current_end, record_count))
current_start, current_end, record_count = start, end, value

merged_date_windows.append((current_start, current_end, record_count))
date_windows = merged_date_windows

return date_windows

def remove_duplicate_records(self, new_records, last_processed):
if not last_processed:
return new_records
# Convert lists of dictionaries to sets of tuples
new_records_set = set(tuple(sorted(d.items())) for d in new_records)

# Merge sets to remove duplicates
unique_set = new_records_set.difference(set(tuple(sorted(d.items())) for d in last_processed))

# Convert set of tuples back to list of dictionaries
unique_records = [dict(item) for item in unique_set]

return unique_records

def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
update_currently_syncing(state, self.name)

Expand All @@ -814,75 +880,39 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):

# If last processed records exists, then set first to timestamp of first record
try:
first = self.last_processed[0][humps.decamelize(
start_time = self.last_processed[0][humps.decamelize(
self.replication_key)] if self.last_processed else int(lookback.timestamp()) * 1000
except Exception as e:
LOGGER.info(str(e))
end_time = int(datetime.now().timestamp() * 1000)

# Setup body for first request
body = self.get_body(key_id, period, first)
self.set_time_series_first(body, [], first)
self.set_request_body_filters(body, first, [])

ts_now = int(datetime.now().timestamp() * 1000)
events = []
while self.get_first_parameter_value(body) <= ts_now:
records = self.request(self.name, json=body).get('results') or []
self.set_time_series_first(body, records)

if len(records) > 1:
if len(records) < self.record_limit:
# If response returns less records than record limit means there are no more records to sync
events += records
self.last_processed = None
break

# Remove last processed and none replication_value records
self.remove_empty_replication_key_records(records)
records, removed_records = self.remove_last_timestamp_records(records)
if len(records) > 0:
events += records

if self.last_processed == removed_records:
events += removed_records
self.last_processed = None
break

self.last_processed = removed_records
else:
# This block handles race condition where all records have same replication key value
first = self.last_processed[0][humps.decamelize(
self.replication_key)] if self.last_processed else int(lookback.timestamp()) * 1000

body = self.get_body(key_id, period, first)
continue

elif len(records) == 1:
events += records
self.last_processed = None
break
if self.date_windows is None:
self.date_windows = self.create_date_windows(start_time, end_time, key_id)
LOGGER.info("Date windows identified:")
for window in self.date_windows:
LOGGER.info(f"\t{window}")

# Set first and filters for next request
self.set_request_body_filters(
body,
self.get_first_parameter_value(body),
records)
if len(self.date_windows):
start_time, end_time, record_count = self.date_windows.pop(0)
body = self.get_body(key_id, period, start_time, end_time)
body = self.set_time_series_first(body, [], start_time, end_time)
body = self.set_request_body_filters(body, start_time, [])

# If record limit set is reached, then return the extracted records
# Set the last processed records to ressume the extraction
if len(events) >= self.record_limit:
return (self.stream, events), True
stream_records = self.request(self.name, json=body).get('results') or []

# These is a corner cases where this limit may get changed so reseeting it before next iteration
self.record_limit = self.get_default_record_limit()
if len(stream_records) >= self.record_limit:
# if record limit has reached then there is a possibility to have more records to extract
# so we should set the start next extraction from replication value of last record.
# But we must remove overlapping records from first sync
stream_records, self.last_processed = self.remove_last_timestamp_records(stream_records)

# Add none replication_value records into records with valid replication_value
events.extend(self.empty_replication_key_records)
self.empty_replication_key_records = []
start_time = self.last_processed[0][humps.decamelize(self.replication_key)]
self.date_windows.insert(0, (start_time, end_time, record_count - len(self.last_processed)))

update_currently_syncing(state, None)
return (self.stream, events), False
return (self.stream, stream_records), len(self.date_windows) > 0

self.date_windows = None
return (self.stream, []), False

class Accounts(Stream):
name = "accounts"
Expand Down Expand Up @@ -963,8 +993,8 @@ def __init__(self, config):
super().__init__(config=config)
self.key_properties.append("day" if self.period == 'dayRange' else "hour")

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"featureEvents": {"featureId": key_id}})
return body

Expand Down Expand Up @@ -1044,8 +1074,8 @@ def get_events(self, window_start_date, state, bookmark_dttm):
def transform(self, record):
return humps.decamelize(record)

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"events": {"appId": self.config["app_ids"]}})
return body

Expand All @@ -1061,8 +1091,8 @@ def __init__(self, config):
self.period = config.get('period')
self.replication_key = 'browser_time'

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"pollEvents": {"appId": self.config["app_ids"]}})
return body

Expand All @@ -1076,8 +1106,8 @@ def __init__(self, config):
super().__init__(config=config)
self.key_properties.append("day" if self.period == 'dayRange' else "hour")

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"trackEvents": {"trackTypeId": key_id}})
return body

Expand All @@ -1090,8 +1120,8 @@ def __init__(self, config):
super().__init__(config=config)
self.replication_key = 'browser_time'

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"guideEvents": {"guideId": key_id}})
return body

Expand Down Expand Up @@ -1187,8 +1217,8 @@ def __init__(self, config):
super().__init__(config=config)
self.key_properties.append("day" if self.period == 'dayRange' else "hour")

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
def get_body(self, key_id, period, first, last=None):
body = super().get_body(key_id, period, first, last)
body['request']['pipeline'][0]['source'].update({"pageEvents": {"pageId": key_id}})
return body

Expand Down Expand Up @@ -1304,7 +1334,8 @@ def get_record_count(self):
# Get number of records to be fetched using current filter
body = self.get_body()
body["request"]["pipeline"].append({"count": None})
return list(self.request(self.name, json=body))[0]["count"]
record_count = list(self.request(self.name, json=body))[0]["count"]
return record_count

def is_loop_required(self):
# If number of records equal to record then assume there are more records to be synced
Expand Down
Loading