Skip to content

Commit

Permalink
Get tests passing on all streams (singer-io#97)
Browse files Browse the repository at this point in the history
* Actually paginate the transactions stream

* WIP: Update pagination test to test all streams

* Break up streams between two shopify stores

* Fix test_pagination

- Restrict a test run to just the streams passed in as a parameter
- Don't create connection in the test

* Fix test_bookmarks

- Run the test twice, once with each store
- Restrict streams tested to what was passed as a param
- Don't create a connection in the test

* Fix test_start_date

- Run with only the new store
- Run with all streams

* Add task to remove config parameter

* Add error handling to Transactions requests

* Add post-request filtering to Transactions and Order Refunds

There was not a query param for these streams to ask for records after a
certain date. This caused a failure in the start date test because we
would get too many records.

This change also includes a call to update and write a state message. The
bookmarks test was failing because there wouldn't be an entry in the final
state to assert on.

* Wrap each store's test in a subtest

* Fix test_automatic

- Run test once per store
- Remove assertion on foreign key metadata

* Make pylint happy

* Fix whitespace

* Rename environment variables

* Trigger tests

* PR Feedback: Add bug comment in more places, API_LIMIT falls back to
config value

* Pin pylint to the previous working version

We saw a failure on `pylint-2.8.1`

```
tap_shopify/streams/base.py:146:12: R1730: Consider using 'updated_at_max = min(updated_at_max, stop_time)' instead of unnecessary if block (consider-using-min-builtin)
```
  • Loading branch information
luandy64 authored and tmck-code committed Feb 3, 2022
1 parent a84e411 commit 7402384
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 51 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
],
extras_require={
'dev': [
'pylint',
'pylint==2.7.4',
'ipdb',
'requests==2.20.0',
'nose',
Expand Down
15 changes: 12 additions & 3 deletions tap_shopify/streams/order_refunds.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import shopify

from singer.utils import strftime, strptime_to_utc
from tap_shopify.context import Context
from tap_shopify.streams.base import (Stream,
RESULTS_PER_PAGE,
shopify_error_handling,
OutOfOrderIdsError)


class OrderRefunds(Stream):
name = 'order_refunds'
replication_object = shopify.Refund
Expand Down Expand Up @@ -42,8 +41,18 @@ def get_objects(self):
since_id = refunds[-1].id

def sync(self):
bookmark = self.get_bookmark()
max_bookmark = bookmark
for refund in self.get_objects():
refund_dict = refund.to_dict()
yield refund_dict
replication_value = strptime_to_utc(refund_dict[self.replication_key])
if replication_value >= bookmark:
yield refund_dict

if replication_value > max_bookmark:
max_bookmark = replication_value

self.update_bookmark(strftime(max_bookmark))


Context.stream_objects['order_refunds'] = OrderRefunds
31 changes: 26 additions & 5 deletions tap_shopify/streams/transactions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import shopify
import singer
from singer.utils import strftime, strptime_to_utc
from tap_shopify.context import Context
from tap_shopify.streams.base import (Stream,
shopify_error_handling)
Expand Down Expand Up @@ -56,6 +57,12 @@ class Transactions(Stream):
# https://help.shopify.com/en/api/reference/orders/transaction#properties

@shopify_error_handling
def call_api_for_transactions(self, parent_object):
return self.replication_object.find(
limit=TRANSACTIONS_RESULTS_PER_PAGE,
order_id=parent_object.id,
)

def get_transactions(self, parent_object):
# We do not need to support paging on this substream. If that
# were to become untrue, reference Metafields.
Expand All @@ -65,8 +72,13 @@ def get_transactions(self, parent_object):
# support limit overrides.
#
# https://github.com/Shopify/shopify_python_api/blob/e8c475ccc84b1516912b37f691d00ecd24921e9b/shopify/resources/order.py#L17-L18
return self.replication_object.find(
limit=TRANSACTIONS_RESULTS_PER_PAGE, order_id=parent_object.id)

page = self.call_api_for_transactions(parent_object)
yield from page

while page.has_next_page():
page = page.next_page()
yield from page

def get_objects(self):
# Right now, it's ok for the user to select 'transactions' but not
Expand All @@ -87,10 +99,19 @@ def get_objects(self):
yield transaction

def sync(self):
bookmark = self.get_bookmark()
max_bookmark = bookmark
for transaction in self.get_objects():
transaction_dict = transaction.to_dict()
for field_name in ['token', 'version', 'ack']:
canonicalize(transaction_dict, field_name)
yield transaction_dict
replication_value = strptime_to_utc(transaction_dict[self.replication_key])
if replication_value >= bookmark:
for field_name in ['token', 'version', 'ack']:
canonicalize(transaction_dict, field_name)
yield transaction_dict

if replication_value > max_bookmark:
max_bookmark = replication_value

self.update_bookmark(strftime(max_bookmark))

Context.stream_objects['transactions'] = Transactions
40 changes: 30 additions & 10 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BaseTapTest(unittest.TestCase):
INCREMENTAL = "INCREMENTAL"
FULL = "FULL_TABLE"
START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
DEFAULT_RESULTS_PER_PAGE = 175

@staticmethod
def tap_name():
Expand All @@ -40,7 +41,9 @@ def get_properties(self, original: bool = True):
return_value = {
'start_date': '2017-07-01T00:00:00Z',
'shop': 'stitchdatawearhouse',
'date_window_size': 30
'date_window_size': 30,
# BUG: https://jira.talendforge.org/browse/TDL-13180
'results_per_page': '50'
}

if original:
Expand All @@ -50,13 +53,20 @@ def get_properties(self, original: bool = True):
assert self.start_date > return_value["start_date"]

return_value["start_date"] = self.start_date
return_value['shop'] = 'talenddatawearhouse'
return return_value

@staticmethod
def get_credentials():
def get_credentials(original_credentials: bool = True):
"""Authentication information for the test account"""

if original_credentials:
return {
'api_key': os.getenv('TAP_SHOPIFY_API_KEY_STITCHDATAWEARHOUSE')
}

return {
'api_key': os.getenv('TAP_SHOPIFY_API_KEY')
'api_key': os.getenv('TAP_SHOPIFY_API_KEY_TALENDDATAWEARHOUSE')
}

def expected_metadata(self):
Expand All @@ -66,13 +76,18 @@ def expected_metadata(self):
self.REPLICATION_KEYS: {"updated_at"},
self.PRIMARY_KEYS: {"id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.API_LIMIT: 250}
self.API_LIMIT: self.DEFAULT_RESULTS_PER_PAGE}

meta = default.copy()
meta.update({self.FOREIGN_KEYS: {"owner_id", "owner_resource"}})

return {
"abandoned_checkouts": default,
"abandoned_checkouts": {
self.REPLICATION_KEYS: {"updated_at"},
self.PRIMARY_KEYS: {"id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
# BUG: https://jira.talendforge.org/browse/TDL-13180
self.API_LIMIT: 50},
"collects": default,
"custom_collections": default,
"smart_collections": default,
Expand All @@ -84,15 +99,15 @@ def expected_metadata(self):
self.REPLICATION_KEYS: {"created_at"},
self.PRIMARY_KEYS: {"id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.API_LIMIT: 250},
self.API_LIMIT: self.DEFAULT_RESULTS_PER_PAGE},
"products": default,
"metafields": meta,
"transactions": {
self.REPLICATION_KEYS: {"created_at"},
self.PRIMARY_KEYS: {"id"},
self.FOREIGN_KEYS: {"order_id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.API_LIMIT: 250}
self.API_LIMIT: self.DEFAULT_RESULTS_PER_PAGE}
}

def expected_streams(self):
Expand Down Expand Up @@ -142,18 +157,21 @@ def expected_replication_method(self):

def setUp(self):
"""Verify that you have set the prerequisites to run the tap (creds, etc.)"""
missing_envs = [x for x in [os.getenv('TAP_SHOPIFY_API_KEY')] if x is None]
missing_envs = [x
for x in [os.getenv('TAP_SHOPIFY_API_KEY_STITCHDATAWEARHOUSE'),
os.getenv('TAP_SHOPIFY_API_KEY_TALENDDATAWEARHOUSE')]
if x is None]
if missing_envs:
raise Exception("set environment variables")

#########################
# Helper Methods #
#########################

def create_connection(self, original_properties: bool = True):
def create_connection(self, original_properties: bool = True, original_credentials: bool = True):
"""Create a new connection with the test name"""
# Create the connection
conn_id = connections.ensure_connection(self, original_properties)
conn_id = connections.ensure_connection(self, original_properties, original_credentials)

# Run a check job using orchestrator (discovery)
check_job_name = runner.run_check_mode(self, conn_id)
Expand Down Expand Up @@ -262,3 +280,5 @@ def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = T
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_date = self.get_properties().get("start_date")
self.store_1_streams = {'custom_collections', 'orders', 'products', 'customers'}
self.store_2_streams = {'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds', 'products'}
33 changes: 23 additions & 10 deletions tests/test_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,20 @@ class MinimumSelectionTest(BaseTapTest):
def name():
return "tap_tester_shopify_no_fields_test"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_date = '2021-04-01T00:00:00Z'

def test_run(self):
with self.subTest(store="store_1"):
conn_id = self.create_connection(original_credentials=True)
self.automatic_test(conn_id, self.store_1_streams)

with self.subTest(store="store_2"):
conn_id = self.create_connection(original_properties=False, original_credentials=False)
self.automatic_test(conn_id, self.store_2_streams)

def automatic_test(self, conn_id, testable_streams):
"""
Verify that for each stream you can get multiple pages of data
when no fields are selected and only the automatic fields are replicated.
Expand All @@ -24,42 +37,42 @@ def test_run(self):
fetch of data. For instance if you have a limit of 250 records ensure
that 251 (or more) records have been posted for that stream.
"""
conn_id = self.create_connection()

incremental_streams = {key for key, value in self.expected_replication_method().items()
if value == self.INCREMENTAL}
if value == self.INCREMENTAL and key in testable_streams}

# Select all streams and no fields within streams
# IF THERE ARE NO AUTOMATIC FIELDS FOR A STREAM
# WE WILL NEED TO UPDATE THE BELOW TO SELECT ONE
found_catalogs = menagerie.get_catalogs(conn_id)
untested_streams = self.child_streams().union({'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds'})
our_catalogs = [catalog for catalog in found_catalogs if
catalog.get('tap_stream_id') in incremental_streams.difference(
untested_streams)]
catalog.get('tap_stream_id') in incremental_streams]
self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=False)

# Run a sync job using orchestrator
record_count_by_stream = self.run_sync(conn_id)

actual_fields_by_stream = runner.examine_target_output_for_fields()

for stream in self.expected_streams().difference(untested_streams):
for stream in incremental_streams:
with self.subTest(stream=stream):

# verify that you get more than a page of data
# SKIP THIS ASSERTION FOR STREAMS WHERE YOU CANNOT GET
# MORE THAN 1 PAGE OF DATA IN THE TEST ACCOUNT
stream_metadata = self.expected_metadata().get(stream, {})
minimum_record_count = stream_metadata.get(
self.API_LIMIT,
self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE)
)
self.assertGreater(
record_count_by_stream.get(stream, -1),
self.expected_metadata().get(stream, {}).get(self.API_LIMIT, 0),
minimum_record_count,
msg="The number of records is not over the stream max limit")

# verify that only the automatic fields are sent to the target
self.assertEqual(
actual_fields_by_stream.get(stream, set()),
self.expected_primary_keys().get(stream, set()) |
self.expected_replication_keys().get(stream, set()) |
self.expected_foreign_keys().get(stream, set()),
self.expected_replication_keys().get(stream, set()),
msg="The fields sent to the target are not the automatic fields"
)
24 changes: 17 additions & 7 deletions tests/test_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,20 @@ class BookmarkTest(BaseTapTest):
def name():
return "tap_tester_shopify_bookmark_test"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_date = '2021-04-01T00:00:00Z'

def test_run(self):
with self.subTest(store="store_1"):
conn_id = self.create_connection(original_credentials=True)
self.bookmarks_test(conn_id, self.store_1_streams)

with self.subTest(store="store_2"):
conn_id = self.create_connection(original_properties=False, original_credentials=False)
self.bookmarks_test(conn_id, self.store_2_streams)

def bookmarks_test(self, conn_id, testable_streams):
"""
Verify that for each stream you can do a sync which records bookmarks.
That the bookmark is the maximum value sent to the target for the replication key.
Expand All @@ -32,26 +45,23 @@ def test_run(self):
For EACH stream that is incrementally replicated there are multiple rows of data with
different values for the replication key
"""
conn_id = self.create_connection()

# Select all streams and no fields within streams
found_catalogs = menagerie.get_catalogs(conn_id)
incremental_streams = {key for key, value in self.expected_replication_method().items()
if value == self.INCREMENTAL}
if value == self.INCREMENTAL and key in testable_streams}

# Our test data sets for Shopify do not have any abandoned_checkouts
untested_streams = self.child_streams().union({'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds'})
our_catalogs = [catalog for catalog in found_catalogs if
catalog.get('tap_stream_id') in incremental_streams.difference(
untested_streams)]
catalog.get('tap_stream_id') in incremental_streams]
self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=False)

# Run a sync job using orchestrator
first_sync_record_count = self.run_sync(conn_id)

# verify that the sync only sent records to the target for selected streams (catalogs)
self.assertEqual(set(first_sync_record_count.keys()),
incremental_streams.difference(untested_streams))
incremental_streams)

first_sync_state = menagerie.get_state(conn_id)

Expand All @@ -69,7 +79,7 @@ def test_run(self):

# THIS MAKES AN ASSUMPTION THAT CHILD STREAMS DO NOT HAVE BOOKMARKS.
# ADJUST IF NECESSARY
for stream in incremental_streams.difference(untested_streams):
for stream in incremental_streams:
with self.subTest(stream=stream):

# get bookmark values from state and target data
Expand Down
Loading

0 comments on commit 7402384

Please sign in to comment.