diff --git a/.circleci/config.yml b/.circleci/config.yml index b7e033f..7cd5d1f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -11,7 +11,7 @@ jobs: python3 -mvenv /usr/local/share/virtualenvs/tap-pendo source /usr/local/share/virtualenvs/tap-pendo/bin/activate pip install -U pip setuptools - pip install .[dev] + pip install .[test] - run: name: 'JSON Validator' command: | @@ -23,6 +23,17 @@ jobs: source /usr/local/share/virtualenvs/tap-pendo/bin/activate # TODO: Adjust the pylint disables pylint tap_pendo --disable 'broad-except,chained-comparison,empty-docstring,fixme,invalid-name,line-too-long,missing-class-docstring,missing-function-docstring,missing-module-docstring,no-else-raise,no-else-return,too-few-public-methods,too-many-arguments,too-many-branches,too-many-lines,too-many-locals,ungrouped-imports,wrong-spelling-in-comment,wrong-spelling-in-docstring,bad-whitespace,missing-class-docstring' + - run: + name: 'Unit Tests' + command: | + source /usr/local/share/virtualenvs/tap-pendo/bin/activate + pip install coverage + nosetests --with-coverage --cover-erase --cover-package=tap_pendo --cover-html-dir=htmlcov tests/unittests + coverage html + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov - add_ssh_keys - run: name: 'Integration Tests' diff --git a/setup.py b/setup.py index dcf47d4..9aa83c5 100755 --- a/setup.py +++ b/setup.py @@ -17,9 +17,12 @@ 'ijson==3.1.4', ], extras_require={ - 'dev': [ - 'ipdb==0.11', + 'test': [ 'pylint==2.5.3', + 'nose' + ], + 'dev': [ + 'ipdb==0.11' ] }, entry_points=""" diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index fea0a73..c962355 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -406,7 +406,8 @@ def sync_substream(self, state, parent, sub_stream, parent_response): integer_datetime_fmt= "unix-milliseconds-integer-datetime-parsing" ) as transformer: - stream_events = sub_stream.sync(state, new_bookmark, + # syncing child streams from start date or state file date + stream_events = sub_stream.sync(state, bookmark_dttm, record.get(parent.key_properties[0])) for event in stream_events: counter.increment() @@ -928,12 +929,10 @@ def get_params(self, start_time): def sync(self, state, start_date=None, key_id=None): update_currently_syncing(state, self.name) - bookmark_date = self.get_bookmark(state, self.name, - self.config.get('start_date'), - self.replication_key) - bookmark_dttm = strptime_to_utc(bookmark_date) - - abs_start, abs_end = get_absolute_start_end_time(bookmark_dttm) + # using "start_date" that is passed and not using the bookmark + # value stored in the state file, as it will be updated after + # every sync of child stream for parent stream + abs_start, abs_end = get_absolute_start_end_time(start_date) lookback = abs_start - timedelta(days=self.lookback_window()) window_next = lookback diff --git a/tests/tap_tester/base.py b/tests/tap_tester/base.py new file mode 100644 index 0000000..559caaa --- /dev/null +++ b/tests/tap_tester/base.py @@ -0,0 +1,388 @@ +import os +import unittest +from datetime import datetime as dt +from datetime import timedelta + +import dateutil.parser +import pytz + +import tap_tester.connections as connections +import tap_tester.runner as runner +from tap_tester import menagerie + + +class TestPendoBase(unittest.TestCase): + + REPLICATION_KEYS = "valid-replication-keys" + PRIMARY_KEYS = "table-key-properties" + FOREIGN_KEYS = "table-foreign-key-properties" + REPLICATION_METHOD = "forced-replication-method" + INCREMENTAL = "INCREMENTAL" + FULL_TABLE = "FULL_TABLE" + START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" + BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT%H:%M%S%z" + start_date = "" + + @staticmethod + def name(): + return "test_sync" + + @staticmethod + def tap_name(): + """The name of the tap""" + return "tap-pendo" + + @staticmethod + def get_type(): + """the expected url route ending""" + return "platform.pendo" + + def expected_metadata(self): + """The expected streams and metadata about the streams""" + return { + "accounts": { + self.PRIMARY_KEYS: {'account_id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'lastupdated'} + }, + "features": { + self.PRIMARY_KEYS: {'id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'last_updated_at'} + }, + "guides": { + self.PRIMARY_KEYS: {'id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'last_updated_at'} + }, + "pages": { + self.PRIMARY_KEYS: {'id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'last_updated_at'} + }, + # Add back when visitor_history stream causing this test to take + # 4+ hours is solved, tracked in this JIRA: + # https://stitchdata.atlassian.net/browse/SRCE-4755 + # "visitor_history": { + # self.PRIMARY_KEYS: {'visitor_id'}, + # self.REPLICATION_METHOD: self.INCREMENTAL, + # self.REPLICATION_KEYS: {'modified_ts'} + # }, + + "visitors": { + self.PRIMARY_KEYS: {'visitor_id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'lastupdated'} + }, + "track_types": { + self.PRIMARY_KEYS: {'id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'last_updated_at'} + }, + "feature_events":{ + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'day'} + }, + "events": { + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'day'} + }, + "page_events": { + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'day'} + }, + "guide_events": { + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server_name", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'browser_time'} + }, + "poll_events":{ + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server_name", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'browser_time'} + }, + "track_events": { + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'day'} + }, + "metadata_accounts": { + self.REPLICATION_METHOD: self.FULL_TABLE, + }, + "metadata_visitors": { + self.REPLICATION_METHOD: self.FULL_TABLE, + }, + } + + def setUp(self): + missing_envs = [x for x in [ + "TAP_PENDO_INTEGRATION_KEY", + ] if os.getenv(x) is None] + + if missing_envs: + raise Exception("Missing environment variables: {}".format(missing_envs)) + + @staticmethod + def get_credentials(): + """Authentication information for the test account""" + return { + "x_pendo_integration_key": os.getenv("TAP_PENDO_INTEGRATION_KEY") + } + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + "start_date": "2020-09-10T00:00:00Z", + "lookback_window": "1", + "period": "dayRange", + } + if original: + return return_value + + return_value["start_date"] = self.start_date + return return_value + + + def expected_streams(self): + """A set of expected stream names""" + + return set(self.expected_metadata().keys()) + + def expected_pks(self): + """return a dictionary with key of table name and value as a set of primary key fields""" + return {table: properties.get(self.PRIMARY_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_replication_keys(self): + """return a dictionary with key of table name and value as a set of replication key fields""" + return {table: properties.get(self.REPLICATION_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_replication_method(self): + """return a dictionary with key of table name nd value of replication method""" + return {table: properties.get(self.REPLICATION_METHOD, None) + for table, properties + in self.expected_metadata().items()} + + def expected_automatic_fields(self): + """return a dictionary with key of table name and value as a set of automatic key fields""" + auto_fields = {} + for k, v in self.expected_metadata().items(): + + auto_fields[k] = v.get(self.PRIMARY_KEYS, set()) | v.get(self.REPLICATION_KEYS, set()) \ + | v.get(self.FOREIGN_KEYS, set()) + return auto_fields + + + ######################### + # Helper Methods # + ######################### + + def run_and_verify_check_mode(self, conn_id): + """ + Run the tap in check mode and verify it succeeds. + This should be ran prior to field selection and initial sync. + Return the connection id and found catalogs from menagerie. + """ + # run in check mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.assertGreater(len( + found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) + + found_catalog_names = set( + map(lambda c: c['stream_name'], found_catalogs)) + + subset = self.expected_streams().issubset(found_catalog_names) + self.assertTrue( + subset, msg="Expected check streams are not subset of discovered catalog") + print("discovered schemas are OK") + + return found_catalogs + + def run_and_verify_sync(self, conn_id): + """ + Run a sync job and make sure it exited properly. + Return a dictionary with keys of streams synced + and values of records synced for each stream + """ + + # Run a sync job using orchestrator + sync_job_name = runner.run_sync_mode(self, conn_id) + + # Verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # Verify actual rows were synced + sync_record_count = runner.examine_target_output_file( + self, conn_id, self.expected_streams(), self.expected_pks()) + self.assertGreater( + sum(sync_record_count.values()), 0, + msg="failed to replicate any data: {}".format(sync_record_count) + ) + print("total replicated row count: {}".format( + sum(sync_record_count.values()))) + + return sync_record_count + + def perform_and_verify_table_and_field_selection(self, conn_id, test_catalogs, select_all_fields=True): + """ + Perform table and field selection based off of the streams to select + set and field selection parameters. + Verify this results in the expected streams selected and all or no + fields selected for those streams. + """ + + # Select all available fields or select no fields from all testable streams + self.select_all_streams_and_fields( + conn_id, test_catalogs, select_all_fields) + + catalogs = menagerie.get_catalogs(conn_id) + + # Ensure our selection affects the catalog + expected_selected = [tc.get('stream_name') for tc in test_catalogs] + + for cat in catalogs: + catalog_entry = menagerie.get_annotated_schema( + conn_id, cat['stream_id']) + + # Verify all testable streams are selected + selected = catalog_entry.get('annotated-schema').get('selected') + print("Validating selection on {}: {}".format( + cat['stream_name'], selected)) + if cat['stream_name'] not in expected_selected: + self.assertFalse( + selected, msg="Stream selected, but not testable.") + continue # Skip remaining assertions if we aren't selecting this stream + self.assertTrue(selected, msg="Stream not selected.") + + if select_all_fields: + # Verify all fields within each selected stream are selected + for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): + field_selected = field_props.get('selected') + print("\tValidating selection on {}.{}: {}".format( + cat['stream_name'], field, field_selected)) + self.assertTrue(field_selected, msg="Field not selected.") + else: + # Verify only automatic fields are selected + expected_automatic_fields = self.expected_automatic_fields().get( + cat['stream_name']) + selected_fields = self.get_selected_fields_from_metadata( + catalog_entry['metadata']) + self.assertEqual(expected_automatic_fields, selected_fields) + + def get_selected_fields_from_metadata(self, metadata): + selected_fields = set() + for field in metadata: + is_field_metadata = len(field['breadcrumb']) > 1 + + inclusion_automatic_or_selected = ( + field['metadata'].get('selected') is True or + field['metadata'].get('inclusion') == 'automatic' + ) + if is_field_metadata and inclusion_automatic_or_selected: + selected_fields.add(field['breadcrumb'][1]) + return selected_fields + + def select_all_streams_and_fields(self, conn_id, catalogs, select_all_fields: bool = True): + """Select all streams and all fields within streams""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema( + conn_id, catalog['stream_id']) + + non_selected_properties = [] + if not select_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = schema.get('annotated-schema', {}).get( + 'properties', {}).keys() + + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, [], non_selected_properties) + + def calculated_states_by_stream(self, current_state): + timedelta_by_stream = {stream: [0,0,0,5] # {stream_name: [days, hours, minutes, seconds], ...} + for stream in self.expected_streams()} + + stream_to_calculated_state = {stream: "" for stream in current_state['bookmarks'].keys()} + for stream, state in current_state['bookmarks'].items(): + state_key, state_value = next(iter(state.keys())), next(iter(state.values())) + state_as_datetime = dateutil.parser.parse(state_value) + + days, hours, minutes, seconds = timedelta_by_stream[stream] + calculated_state_as_datetime = state_as_datetime - timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds) + + state_format = '%Y-%m-%dT%H:%M:%S-00:00' + calculated_state_formatted = dt.strftime(calculated_state_as_datetime, state_format) + + stream_to_calculated_state[stream] = {state_key: calculated_state_formatted} + + return stream_to_calculated_state + + def parse_date(self, date_value): + """ + Pass in string-formatted-datetime, parse the value, and return it as an unformatted datetime object. + """ + date_formats = { + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%f+00:00", + "%Y-%m-%dT%H:%M:%S+00:00", + "%Y-%m-%d" + } + for date_format in date_formats: + try: + date_stripped = dt.strptime(date_value, date_format) + return date_stripped + except ValueError: + continue + + raise NotImplementedError( + "Tests do not account for dates of this format: {}".format(date_value)) + + ########################################################################## + # Tap Specific Methods + ########################################################################## + + def convert_state_to_utc(self, date_str): + """ + Convert a saved bookmark value of the form '2020-08-25T13:17:36-07:00' to + a string formatted utc datetime, + in order to compare aginast json formatted datetime values + """ + date_object = dateutil.parser.parse(date_str) + date_object_utc = date_object.astimezone(tz=pytz.UTC) + return dt.strftime(date_object_utc, "%Y-%m-%dT%H:%M:%SZ") + + def timedelta_formatted(self, dtime, days=0): + try: + date_stripped = dt.strptime(dtime, "%Y-%m-%dT%H:%M:%SZ") + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, "%Y-%m-%dT%H:%M:%SZ") + + except ValueError: + try: + date_stripped = dt.strptime(dtime, self.BOOKMARK_COMPARISON_FORMAT) + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, self.BOOKMARK_COMPARISON_FORMAT) + + except ValueError: + return Exception("Datetime object is not of the format: {}".format(self.START_DATE_FORMAT)) + + def is_incremental(self, stream): + return self.expected_metadata().get(stream).get(self.REPLICATION_METHOD) == self.INCREMENTAL + + def is_event(self, stream): + return stream.endswith('events') \ No newline at end of file diff --git a/tests/tap_tester/test_child_stream_start_date.py b/tests/tap_tester/test_child_stream_start_date.py new file mode 100644 index 0000000..2a86bf0 --- /dev/null +++ b/tests/tap_tester/test_child_stream_start_date.py @@ -0,0 +1,65 @@ +from tap_tester import connections, runner +from base import TestPendoBase +from datetime import datetime + +class PendoChildStreamStartDateTest(TestPendoBase): + + def name(self): + return "pendo_child_stream_start_date_test" + + def test_run(self): + + streams_to_test = {"guides", "guide_events"} + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in streams_to_test] + + self.perform_and_verify_table_and_field_selection(conn_id,test_catalogs_all_fields) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + # check if all streams have collected records + for stream in streams_to_test: + self.assertGreater(record_count_by_stream.get(stream, -1), 0, + msg="failed to replicate any data for stream : {}".format(stream)) + + # collect "guide" and "guide_events" data + guides = synced_records.get("guides") + guide_events = synced_records.get("guide_events") + + # find the first guide's id + first_guide_id = guides.get("messages")[0].get("data").get("id") + + first_guide_ids_events = [] + rest_guide_events = [] + + # seperate guide events based on guide id + for guide_event in guide_events.get("messages"): + if guide_event.get("data").get("guide_id") == first_guide_id: + first_guide_ids_events.append(guide_event.get("data")) + else: + rest_guide_events.append(guide_event.get("data")) + + replication_key_for_guide_events = next(iter(self.expected_replication_keys().get("guide_events"))) + + # find the maximun bookmark date for first guide's events + sorted_first_guide_ids_events = sorted(first_guide_ids_events, key=lambda i: i[replication_key_for_guide_events], reverse=True) + max_bookmark = sorted_first_guide_ids_events[0].get(replication_key_for_guide_events) + + # used for verifying if we synced guide events before + # than the maximum bookmark of first guide's events + synced_older_data = False + for rest_guide_event in rest_guide_events: + event_time = datetime.strptime(rest_guide_event.get(replication_key_for_guide_events), "%Y-%m-%dT%H:%M:%S.%fZ") + max_bookmark_time = datetime.strptime(max_bookmark, "%Y-%m-%dT%H:%M:%S.%fZ") + if event_time < max_bookmark_time: + synced_older_data = True + break + + self.assertTrue(synced_older_data) diff --git a/tests/unittests/test_child_stream_start_date.py b/tests/unittests/test_child_stream_start_date.py new file mode 100644 index 0000000..acf9f8b --- /dev/null +++ b/tests/unittests/test_child_stream_start_date.py @@ -0,0 +1,105 @@ +import unittest +import tap_pendo.streams as streams +from unittest import mock +from singer.utils import strftime +from dateutil.parser import parse + +# stores the arguments that are passed in the 'sync' +# function of child stream for assertion +TEST = [] + +class Schema: + schema = None + + def __init__(self, schema): + self.schema = schema + + def to_dict(self): + return self.schema + +class Test: + schema = Schema({}) + metadata = {} + tap_stream_id = "test" + +# dummy child stream class +class ChildStream: + schema = None + stream = Test() + config = None + name = "test_stream" + replication_key = "date" + key_properties = ["id"] + + # return the data which was passed as argument for transformation in the argument + def transform(*args, **kwargs): + return args[1] + + def sync(*args, **kwargs): + # append 'args' in the TEST variable for assertion + TEST.append(args) + # return dummy data + return [{"id": 1, "date": "2021-02-01T00:00:00Z"}, + {"id": 2, "date": "2021-03-01T00:00:00Z"}] + + def __init__(self, config): + self.config = config + +# dummy parent stream class +class ParentStream: + schema = None + name = "test_stream" + key_properties = ["id"] + + def transform(*args, **kwargs): + return {} + + def sync(*args, **kwargs): + return [] + +def update_bookmark(state, stream, bookmark_value, bookmark_key): + if not state.get("bookmarks").get(stream): + state["bookmarks"][stream] = {} + state["bookmarks"][stream][bookmark_key] = bookmark_value + +def transform(*args, **kwargs): + # return the data with was passed for transformation in the argument + return args[0] + +class TestStartDateOfChildStream(unittest.TestCase): + + @mock.patch("singer.write_schema") + @mock.patch("tap_pendo.streams.Stream.update_bookmark") + @mock.patch("tap_pendo.streams.update_currently_syncing") + @mock.patch("singer.metadata.to_map") + @mock.patch("singer.Transformer.transform") + @mock.patch("singer.write_records") + def test_run(self, mocked_write_records, mocked_transform, mocked_metadata_to_map, mocked_update_currently_syncing, mocked_update_bookmark, mocked_write_schema): + """ + Test case for verifying if the start date / bookmark is used for fetching records + of child stream rather than the updated bookmark from previous child stream sync + """ + # config file + config = {"start_date": "2021-01-01T00:00:00Z"} + + # create dummy parent records + mock_records = [{"id":1}, {"id":2}, {"id":3}] + + # mock update bookmark + mocked_update_bookmark.side_effect = update_bookmark + # mock singer transform + mocked_transform.side_effect = transform + + stream_instance = streams.Stream(config) + + # call function + stream_instance.sync_substream({"bookmarks": {}}, ParentStream(), ChildStream(config), mock_records) + + # iterate over 'TEST' and verify if the start date was passed as argument rather than the updated bookmark + for test in TEST: + # get start date from TEST + start_date = test[2] + # parse start date as it is in the format: 2021-01-01T00:00:00.000000Z + parsed_start_date = parse(strftime(start_date)).strftime("%Y-%m-%dT%H:%M:%SZ") + # verify if the 'parsed_start_date' is same as the start date from config file + self.assertEquals(parsed_start_date, config.get("start_date"))