Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bookmark since_id #23

Merged
merged 2 commits into from
Dec 14, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions tap_shopify/streams/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,26 @@ def get_bookmark(self):
or Context.config["start_date"])
return utils.strptime_with_tz(bookmark)

def update_bookmark(self, bookmark_value):
def get_since_id(self):
return singer.get_bookmark(Context.state,
# name is overridden by some substreams
self.name,
'since_id')

def update_bookmark(self, bookmark_value, bookmark_key=None):
# NOTE: Bookmarking can never be updated to not get the most
# recent thing it saw the next time you run, because the querying
# only allows greater than or equal semantics.
singer.write_bookmark(
Context.state,
# name is overridden by some substreams
self.name,
self.replication_key,
bookmark_key or self.replication_key,
bookmark_value
)
singer.write_state(Context.state)


# This function can be overridden by subclasses for specialized API
# interactions. If you override it you need to remember to decorate it
# with shopify_error_handling to get 429 and 500 handling.
Expand All @@ -112,17 +119,25 @@ def call_api(self, query_params):
return self.replication_object.find(**query_params)

def get_objects(self):
# Temporarily translate untruncated state to truncated
# state. Can be removed once all state has migrated.
updated_at_min = self.get_bookmark().replace(microsecond=0)
updated_at_min = self.get_bookmark()

# Bookmarking can also occur on the since_id
since_id = self.get_since_id() or 1

if since_id != 1:
LOGGER.info("Resuming sync from since_id %d", since_id)

stop_time = singer.utils.now().replace(microsecond=0)
date_window_size = int(Context.config.get("date_window_size", DATE_WINDOW_SIZE))

# Page through till the end of the resultset
while updated_at_min < stop_time:
since_id = 1
# It's important that this has microseconds truncated

# It's important that `updated_at_min` has microseconds
# truncated. Why has been lost to the mists of time but we
# think it has something to do with how the API treats
# microseconds on its date windows. Maybe it's possible to
# drop data due to rounding errors or something like that?
updated_at_max = updated_at_min + datetime.timedelta(days=date_window_size)
if updated_at_max > stop_time:
updated_at_max = stop_time
Expand All @@ -137,6 +152,9 @@ def get_objects(self):
objects = self.call_api(query_params)
for obj in objects:
if obj.id < since_id:
# This verifies the api behavior expectation we
# have that all results actually honor the
# since_id parameter.
raise OutOfOrderIdsError("obj.id < since_id: {} < {}".format(
obj.id, since_id))
yield obj
Expand All @@ -145,15 +163,23 @@ def get_objects(self):
# less than the request size limits you set.
if len(objects) < RESULTS_PER_PAGE:
# Save the updated_at_max as our bookmark as we've synced all rows up in our
# window and can move forward
# window and can move forward. Also remove the since_id because we want to
# restart at 1.
Context.state.get(self.name, {}).pop('since_id', None)
self.update_bookmark(utils.strftime(updated_at_max))
break

if objects[-1].id != max([o.id for o in objects]):
# This verifies the api behavior expectation we have
# that all pages are internally ordered by the
# `since_id`.
raise OutOfOrderIdsError("{} is not the max id in objects ({})".format(
objects[-1].id, max([o.id for o in objects])))
since_id = objects[-1].id

# Put since_id into the state.
self.update_bookmark(since_id, bookmark_key='since_id')

updated_at_min = updated_at_max

def sync(self):
Expand Down