diff --git a/CHANGELOG.md b/CHANGELOG.md index ef28894..ac3b8f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 1.1.0 + * Add the the custom pagination support for the Visitors stream [#126](https://github.com/singer-io/tap-pendo/pull/129) + ## 1.0.1 * Fix infinite loop issue [#126](https://github.com/singer-io/tap-pendo/pull/126) diff --git a/setup.py b/setup.py index 648c67e..ef4025a 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="tap-pendo", - version="1.0.1", + version="1.1.0", description="Singer.io tap for extracting data", author="Stitch", url="https://github.com/singer-io/tap-pendo", diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 8ba4575..48d7eb2 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -141,6 +141,7 @@ class Stream(): period = None request_retry_count = 1 last_processed = None + last_synced_record = {} # initialized the endpoint attribute which can be overriden by child streams based on # the different parameters used by the stream. @@ -507,7 +508,8 @@ def sync_substream(self, state, parent, sub_stream, parent_response): sub_stream.name, record[parent.key_properties[0]]) # After processing for all parent ids we can remove our resumption state - state.get('bookmarks').get(sub_stream.name).pop('last_processed') + if 'last_processed' in state.get('bookmarks').get(sub_stream.name): + state.get('bookmarks').get(sub_stream.name).pop('last_processed') self.update_child_stream_bookmarks(state=state, sub_stream=sub_stream, @@ -729,7 +731,14 @@ def send_request_get_results(self, req, endpoint, params, count, **kwargs): # Request retry yield from self.request(endpoint, params, count, **kwargs) + def get_record_count(self): + return 0 + + def is_loop_required(self): + return False + def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): + loop_for_records = self.is_loop_required() stream_response = self.request(self.name, json=self.get_body()) or [] # Get and intialize sub-stream for the current stream @@ -746,7 +755,7 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None): # which flush out during sync_substream call above stream_response = self.request(self.name, json=self.get_body()) or [] - return (self.stream, stream_response), False + return (self.stream, stream_response), loop_for_records class EventsBase(Stream): DATE_WINDOW_SIZE = 1 @@ -1280,14 +1289,37 @@ def get_body(self, key_id=None, period=None, first=None): "identified": not anons } } + }, { + "sort": ["visitorId"] + }, { + "filter": self.set_filter_value() + }, { + "limit": self.record_limit }], - "requestId": "all-visitors", - "sort": [ - "visitorId" - ] + "requestId": "all-visitors" } } + def get_record_count(self): + # Get number of records to be fetched using current filter + body = self.get_body() + body["request"]["pipeline"].append({"count": None}) + return list(self.request(self.name, json=body))[0]["count"] + + def is_loop_required(self): + # If number of records equal to record then assume there are more records to be synced + # and save the last filter value. Otherwise assume we have extracted all records + return self.get_record_count() >= self.record_limit + + def set_filter_value(self): + # Set the value of filter parameter in request body + if self.last_synced_record: + filter_value = f'visitorId>"{self.last_synced_record["visitor_id"]}"' + else: + filter_value = 'visitorId>\"\"' + + return filter_value + def transform(self, record): # Transform data of accounts into one level dictionary with following transformation record['lastupdated'] = record.get('metadata').get('auto').get( diff --git a/tap_pendo/sync.py b/tap_pendo/sync.py index 934a407..c27b1a0 100644 --- a/tap_pendo/sync.py +++ b/tap_pendo/sync.py @@ -59,6 +59,9 @@ def sync_stream(state, start_date, instance): LOGGER.info('Replication Value NULL for tap_stream_id: %s', stream.tap_stream_id) counter.increment() + # preserve the last processed record which will be useful if stream supports pagination + instance.last_synced_record = record + # Update bookmark and write state for the stream with new_bookmark instance.update_bookmark(state, instance.name, strftime(new_bookmark), instance.replication_key) diff --git a/tests/tap_tester/base.py b/tests/tap_tester/base.py index c2b58d9..280892f 100644 --- a/tests/tap_tester/base.py +++ b/tests/tap_tester/base.py @@ -31,7 +31,7 @@ class TestPendoBase(unittest.TestCase): # After 180 days visitor_history data cannot be synced # so We need to manually create test data for visitors and vistor_history streams # Once created, we should update this start date to optimise the execution time - START_DATE_VISTOR_HISTORY = "2023-03-15T00:00:00Z" + START_DATE_VISTOR_HISTORY = "2023-11-06T00:00:00Z" @staticmethod def name(): diff --git a/tests/tap_tester/test_start_date.py b/tests/tap_tester/test_start_date.py index a09db54..09cb44e 100644 --- a/tests/tap_tester/test_start_date.py +++ b/tests/tap_tester/test_start_date.py @@ -31,11 +31,11 @@ def test_run(self): # All these streams have similar implementation like guides and guide_events so removing this test to limit the execution time # self.run_test("2020-09-01T00:00:00Z", "2021-03-01T00:00:00Z", {"features", "feature_events", "pages", "page_events", "events", "track_types", "track_events"}) - + # Visitors history can be retrieved only for 180 days so to reduce execution time setting first start time older than 180 days back self.run_test( - start_date_1="2022-06-25T00:00:00Z", - start_date_2="2022-07-20T00:00:00Z", + start_date_1=self.START_DATE_VISTOR_HISTORY, + start_date_2="2023-11-09T00:00:00Z", streams={"visitors", "visitor_history"}) def expected_metadata(self): @@ -72,7 +72,7 @@ def run_test(self, start_date_1, start_date_2, streams): self.start_date_1 = start_date_1 self.start_date_2 = start_date_2 self.streams = streams - + self.start_date = self.start_date_1 expected_streams = streams @@ -100,7 +100,7 @@ def run_test(self, start_date_1, start_date_2, streams): ########################################################################## # Update START DATE Between Syncs ########################################################################## - + LOGGER.info("REPLICATION START DATE CHANGE: {} ===>>> {} ".format( self.start_date, self.start_date_2)) self.start_date = self.start_date_2 diff --git a/tests/tap_tester/test_visitors_pagination.py b/tests/tap_tester/test_visitors_pagination.py new file mode 100644 index 0000000..77e0a74 --- /dev/null +++ b/tests/tap_tester/test_visitors_pagination.py @@ -0,0 +1,84 @@ +import tap_tester.connections as connections +import tap_tester.menagerie as menagerie +import tap_tester.runner as runner +from base import TestPendoBase + + +class PendoAllFieldsTest(TestPendoBase): + start_date = "2019-09-10T00:00:00Z" + record_limit = 50 + include_anonymous_visitors = False + def name(self): + return "pendo_visitors_pagination_test" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return { + # To reduce the execution time to test this stream taking recently start_date + "start_date": self.start_date, + "lookback_window": "1", + "period": "dayRange", + "record_limit": self.record_limit, + "include_anonymous_visitors": self.include_anonymous_visitors + } + + def test_run(self): + # Verify that visitors pagination logic work as expected named and anonymous visitors + # without impacting other stream replication + # Note: there are 21000+ named and anonymous visitors + self.run_pagination_test(expected_streams= {"accounts", "features", "feature_events", "visitors"}, + start_date="2019-09-10T00:00:00Z", + record_limit=10000, + include_anonymous_visitors="true") + + # Verify with visitors pagination, we are able to sync child stream records i.e. visitor_history + # Note: there are only 58 named and anonymous visitors but only recently updated visitors will be synced + self.run_pagination_test(expected_streams={"visitors", "visitor_history"}, + start_date=self.START_DATE_VISTOR_HISTORY, + record_limit=50, + include_anonymous_visitors="false") + + + def run_pagination_test(self, expected_streams, start_date, record_limit, include_anonymous_visitors): + """ + This is a canary test to verify pagination implementation for the Visitors stream. + """ + self.streams_to_test = expected_streams + self.start_date = start_date + self.record_limit = record_limit + self.include_anonymous_visitors = include_anonymous_visitors + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_all_fields) + + # Grab metadata after performing table-and-field selection to set expectations + # used for asserting all fields are replicated + stream_to_all_catalog_fields = dict() + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [md_entry['breadcrumb'][1] + for md_entry in catalog_entry['metadata'] + if md_entry['breadcrumb'] != []] + stream_to_all_catalog_fields[stream_name] = set( + fields_from_field_level_md) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + + self.assertSetEqual(expected_streams, synced_stream_names) + for stream in expected_streams: + with self.subTest(stream=stream): + self.assertGreaterEqual(record_count_by_stream.get(stream), 1) diff --git a/tests/unittests/test_lazy_aggregation_sync.py b/tests/unittests/test_lazy_aggregation_sync.py index 2df5546..9daaf78 100644 --- a/tests/unittests/test_lazy_aggregation_sync.py +++ b/tests/unittests/test_lazy_aggregation_sync.py @@ -39,7 +39,8 @@ class TestLazyAggregationSync(unittest.TestCase): @mock.patch("requests.Session.send") @mock.patch("tap_pendo.streams.Stream.is_selected") @mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream) - def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_selected, mocked_request): + @mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100) + def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request): ''' Verify that if sub stream is present then also all data should be return for super stream and sync_substream should be called @@ -60,7 +61,8 @@ def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_select @mock.patch("requests.Session.send") @mock.patch("tap_pendo.streams.Stream.is_selected") @mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream) - def test_lazzy_aggregation_without_sub_stream(self, mocked_substream, mocked_selected, mocked_request): + @mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100) + def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request): ''' Verify that if sub stream is not selected then also all data should be return for super stream and sync_substream should not be called