Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 15582 check best practices #78

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ jobs:
virtualenv -p python3 /usr/local/share/virtualenvs/tap-zendesk
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
pip install .[test]
pip install coverage
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
make test
pylint tap_zendesk -d missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,too-many-branches,useless-import-alias,no-else-return,logging-not-lazy
nosetests --with-coverage --cover-erase --cover-package=tap_zendesk --cover-html-dir=htmlcov test/unittests
coverage html
- add_ssh_keys
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Integration Tests'
command: |
Expand Down
375 changes: 335 additions & 40 deletions test/base.py

Large diffs are not rendered by default.

83 changes: 83 additions & 0 deletions test/test_all_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import tap_tester.connections as connections
import tap_tester.runner as runner
import tap_tester.menagerie as menagerie
from base import ZendeskTest

class ZendeskAllFields(ZendeskTest):
"""Ensure running the tap with all streams and fields selected results in the replication of all fields."""

def name(self):
return "zendesk_all_fields"

def test_run(self):
"""
• Verify no unexpected streams were replicated
• Verify that more than just the automatic fields are replicated for each stream.
• verify all fields for each stream are replicated
"""


# Streams to verify all fields tests
expected_streams = self.expected_check_streams()

expected_automatic_fields = self.expected_automatic_fields()
conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in expected_streams]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields)

# grab metadata after performing table-and-field selection to set expectations
# used for asserting all fields are replicated
stream_to_all_catalog_fields = dict()
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(
fields_from_field_level_md)

self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())
self.assertSetEqual(expected_streams, synced_stream_names)

for stream in expected_streams:
with self.subTest(stream=stream):

# expected values
expected_all_keys = stream_to_all_catalog_fields[stream]
expected_automatic_keys = expected_automatic_fields.get(
stream, set())

# Verify that more than just the automatic fields are replicated for each stream.
self.assertTrue(expected_automatic_keys.issubset(
expected_all_keys), msg='{} is not in "expected_all_keys"'.format(expected_automatic_keys-expected_all_keys))

messages = synced_records.get(stream)
# collect actual values
actual_all_keys = set()
for message in messages['messages']:
if message['action'] == 'upsert':
actual_all_keys.update(message['data'].keys())

# As we can't generate following fields by zendesk APIs now so expected.
if stream == "ticket_fields":
expected_all_keys = expected_all_keys - {'system_field_options', 'sub_type_id'}
elif stream == "users":
expected_all_keys = expected_all_keys - {'permanently_deleted'}
elif stream == "ticket_metrics":
expected_all_keys = expected_all_keys - {'status', 'instance_id', 'metric', 'type', 'time'}

# verify all fields for each stream are replicated
self.assertSetEqual(expected_all_keys, actual_all_keys)
66 changes: 66 additions & 0 deletions test/test_automatic_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import tap_tester.connections as connections
import tap_tester.runner as runner
from base import ZendeskTest

class ZendeskAutomaticFields(ZendeskTest):
"""
Ensure running the tap with all streams selected and all fields deselected results in the replication of just the
primary keys and replication keys (automatic fields).
"""

def name(self):
return "zendesk_automatic_fields"

def test_run(self):
"""
Verify we can deselect all fields except when inclusion=automatic, which is handled by base.py methods
Verify that only the automatic fields are sent to the target.
Verify that all replicated records have unique primary key values.
"""

streams_to_test = self.expected_check_streams()

conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_automatic_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in streams_to_test]

# Select all streams and no fields within streams
self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_automatic_fields, select_all_fields=False)

record_count_by_stream = self.run_and_verify_sync(conn_id)
synced_records = runner.get_records_from_target_output()

for stream in streams_to_test:
with self.subTest(stream=stream):

# expected values
expected_keys = self.expected_automatic_fields().get(stream)
expected_primary_keys = self.expected_primary_keys()[stream]

# collect actual values
data = synced_records.get(stream, {})
record_messages_keys = [set(row['data'].keys())
for row in data.get('messages', [])]
primary_keys_list = [tuple(message.get('data', {}).get(expected_pk) for expected_pk in expected_primary_keys)
for message in data.get('messages', [])
if message.get('action') == 'upsert']
unique_primary_keys_list = set(primary_keys_list)

# Verify that you get some records for each stream
self.assertGreater(
record_count_by_stream.get(stream, -1), 0,
msg="The number of records is not over the stream min limit")

# Verify that only the automatic fields are sent to the target
for actual_keys in record_messages_keys:
self.assertSetEqual(expected_keys, actual_keys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that this test is covering up a bug as there are invalid expectations in base.py for a few of the streams. If this is the case, a bug should be written up in Jira and a workaround should be implemented in this test with a reference to the bug ticket.

Copy link
Contributor Author

@prijendev prijendev Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ticket_comments, ticket_audits, ticket_metrics is child stream of tickets, and tickets is incremental stream. But child streams do not save their bookmark. It fetches records based on the record of the parent stream. That's why it is incremental but does not have a replication key. But here it does not affect this test case.


#Verify that all replicated records have unique primary key values.
self.assertEqual(len(primary_keys_list),
len(unique_primary_keys_list),
msg="Replicated record does not have unique primary key values.")
129 changes: 129 additions & 0 deletions test/test_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import re

import tap_tester.connections as connections
from base import ZendeskTest
from tap_tester import menagerie

class ZendeskDiscover(ZendeskTest):
"""
Testing that discovery creates the appropriate catalog with valid metadata.
• Verify number of actual streams discovered match expected
• Verify the stream names discovered were what we expect
• Verify stream names follow naming convention
streams should only have lowercase alphas and underscores
• verify there is only 1 top level breadcrumb
• verify replication key(s)
• verify primary key(s)
• verify that if there is a replication key we are doing INCREMENTAL otherwise FULL
• verify the actual replication matches our expected replication method
• verify that primary, replication keys are given the inclusion of automatic.
• verify that all other fields have inclusion of available metadata.
"""

def name(self):
return "zendesk_discover_test"

def test_run(self):
streams_to_test = self.expected_check_streams()

conn_id = connections.ensure_connection(self, payload_hook=None)

# Verify that there are catalogs found
found_catalogs = self.run_and_verify_check_mode(
conn_id)

# Verify stream names follow naming convention
# streams should only have lowercase alphas and underscores
found_catalog_names = {c['tap_stream_id'] for c in found_catalogs}
self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]),
msg="One or more streams don't follow standard naming")

for stream in streams_to_test:
with self.subTest(stream=stream):

# Verify ensure the caatalog is found for a given stream
catalog = next(iter([catalog for catalog in found_catalogs
if catalog["stream_name"] == stream]))
self.assertIsNotNone(catalog)

# collecting expected values
expected_primary_keys = self.expected_primary_keys()[stream]
expected_replication_keys = self.expected_replication_keys()[
stream]
expected_automatic_fields = self.expected_automatic_fields().get(stream)
expected_replication_method = self.expected_replication_method()[
stream]

# collecting actual values...
schema_and_metadata = menagerie.get_annotated_schema(
conn_id, catalog['stream_id'])
metadata = schema_and_metadata["metadata"]
stream_properties = [
item for item in metadata if item.get("breadcrumb") == []]
actual_primary_keys = set(
stream_properties[0].get(
"metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, [])
)
actual_replication_keys = set(
stream_properties[0].get(
"metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, [])
)
actual_replication_method = stream_properties[0].get(
"metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD)
actual_automatic_fields = set(
item.get("breadcrumb", ["properties", None])[1] for item in metadata
if item.get("metadata").get("inclusion") == "automatic"
)

##########################################################################
# metadata assertions
##########################################################################

# verify there is only 1 top level breadcrumb in metadata
self.assertTrue(len(stream_properties) == 1,
msg="There is NOT only one top level breadcrumb for {}".format(stream) +
"\nstream_properties | {}".format(stream_properties))

# verify primary key(s) match expectations
self.assertSetEqual(
expected_primary_keys, actual_primary_keys,
)

# verify that primary keys and replication keys
# are given the inclusion of automatic in metadata.
self.assertSetEqual(expected_automatic_fields,
actual_automatic_fields)

# verify that all other fields have inclusion of available
# This assumes there are no unsupported fields for SaaS sources
self.assertTrue(
all({item.get("metadata").get("inclusion") == "available"
for item in metadata
if item.get("breadcrumb", []) != []
and item.get("breadcrumb", ["properties", None])[1]
not in actual_automatic_fields}),
msg="Not all non key properties are set to available in metadata")

# verify that if there is a replication key we are doing INCREMENTAL otherwise FULL
# Given below streams are child stremas of parent stream `tickets` and tickets is incremental streams
# so, child streams also behave as incremental streams but does not save it's own state. So, skipping it.
if not stream in ["ticket_comments", "ticket_audits", "ticket_metrics"]:

if actual_replication_keys:
self.assertTrue(actual_replication_method == self.INCREMENTAL,
msg="Expected INCREMENTAL replication "
"since there is a replication key")
else:
self.assertTrue(actual_replication_method == self.FULL_TABLE,
msg="Expected FULL replication "
"since there is no replication key")

# verify the actual replication matches our expected replication method
self.assertEqual(expected_replication_method, actual_replication_method,
msg="The actual replication method {} doesn't match the expected {}".format(
actual_replication_method, expected_replication_method))

# verify replication key(s)
self.assertEqual(expected_replication_keys, actual_replication_keys,
msg="expected replication key {} but actual is {}".format(
expected_replication_keys, actual_replication_keys))
70 changes: 70 additions & 0 deletions test/test_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import tap_tester.connections as connections
import tap_tester.runner as runner
import tap_tester.menagerie as menagerie
from base import ZendeskTest


class ZendeskPagination(ZendeskTest):
"""
Ensure tap can replicate multiple pages of data for streams that use pagination.
"""
API_LIMIT = 100
def name(self):
return "zendesk_pagination_test"

def test_run(self):
"""
• Verify that for each stream you can get multiple pages of data.
This requires we ensure more than 1 page of data exists at all times for any given stream.
• Verify by pks that the data replicated matches the data we expect.

"""

# Streams to verify all fields tests
expected_streams = self.expected_check_streams()
#Skip satisfaction_ratings streams as only end user of tickets can create satisfaction_ratings
expected_streams = expected_streams - {"satisfaction_ratings"}

conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in expected_streams]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields)

record_count_by_stream = self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())
self.assertSetEqual(expected_streams, synced_stream_names)

for stream in expected_streams:
with self.subTest(stream=stream):

# expected values
expected_primary_keys = self.expected_primary_keys()[stream]

# verify that we can paginate with all fields selected
record_count_sync = record_count_by_stream.get(stream, 0)
self.assertGreater(record_count_sync, self.API_LIMIT,
msg="The number of records is not over the stream max limit")

primary_keys_list = [tuple([message.get('data').get(expected_pk) for expected_pk in expected_primary_keys])
for message in synced_records.get(stream).get('messages')
if message.get('action') == 'upsert']

primary_keys_list_1 = primary_keys_list[:self.API_LIMIT]
primary_keys_list_2 = primary_keys_list[self.API_LIMIT:2*self.API_LIMIT]

primary_keys_page_1 = set(primary_keys_list_1)
primary_keys_page_2 = set(primary_keys_list_2)

# Verify by primary keys that data is unique for page
self.assertTrue(
primary_keys_page_1.isdisjoint(primary_keys_page_2))
Loading