Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-15566: Data loss of child streams #57

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
python3 -mvenv /usr/local/share/virtualenvs/tap-pendo
source /usr/local/share/virtualenvs/tap-pendo/bin/activate
pip install -U pip setuptools
pip install .[dev]
pip install .[test]
- run:
name: 'JSON Validator'
command: |
Expand Down
7 changes: 5 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
'ijson==3.1.4',
],
extras_require={
'dev': [
'ipdb==0.11',
'test': [
'pylint==2.5.3',
'nose'
],
'dev': [
'ipdb==0.11'
]
},
entry_points="""
Expand Down
14 changes: 7 additions & 7 deletions tap_pendo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ def sync_substream(self, state, parent, sub_stream, parent_response):
integer_datetime_fmt=
"unix-milliseconds-integer-datetime-parsing"
) as transformer:
stream_events = sub_stream.sync(state, new_bookmark,
# bug fix for syncing child streams from start date or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@savan-chovatiya Instead of writing 'bug fix', you can update the comment to say:
"syncing child streams from start date or state file date"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

# state file date and not newly updated bookmark
stream_events = sub_stream.sync(state, bookmark_dttm,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@savan-chovatiya Can you please add unit test cases for this code change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unittest.

record.get(parent.key_properties[0]))
for event in stream_events:
counter.increment()
Expand Down Expand Up @@ -928,12 +930,10 @@ def get_params(self, start_time):
def sync(self, state, start_date=None, key_id=None):
update_currently_syncing(state, self.name)

bookmark_date = self.get_bookmark(state, self.name,
self.config.get('start_date'),
self.replication_key)
bookmark_dttm = strptime_to_utc(bookmark_date)

abs_start, abs_end = get_absolute_start_end_time(bookmark_dttm)
# using "start_date" that is passed and not using the bookmark
# value stored in the state file, as it will be updated after
# every sync of child stream for parent stream
abs_start, abs_end = get_absolute_start_end_time(start_date)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@savan-chovatiya What if I run the sync again once all the child streams and parent streams are synced?
Will it still try collecting the data from the "start-date" in the second sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will start collecting from the bookmark present in the state file, here the start_date is just a variable.

lookback = abs_start - timedelta(days=self.lookback_window())
window_next = lookback

Expand Down
Loading