diff --git a/README.md b/README.md index 44e744d..1d6bac6 100644 --- a/README.md +++ b/README.md @@ -262,14 +262,13 @@ Interrupted syncs for Event type stream are resumed via a bookmark placed during - `start_date` - the default value to use if no bookmark exists for an endpoint (rfc3339 date string) - `x_pendo_integration_key` (string, `ABCdef123`): an integration key from Pendo. - `period` (string, `ABCdef123`): `dayRange` or `hourRange` - - `lookback_window` (integer, 10): For event objects. Default: 0 days - - `request_timeout` (integer, 300) For passing timeout to the request. Default: 300 seconds + - `lookback_window` (integer): 10 (For event objects. Default: 0) + - `request_timeout` (integer): 300 (For passing timeout to the request. Default: 300) - `record_limit` (integer, 100000): maximum number of records Pendo API can retrieve in a single request. Default: 100000 records + - `app_ids` (string, `8877665523, 1234545`): (comma seperated appIDs. If this parameter is not provided, then the data will be collected from all the apps) - ``` Note: It is important to set `record_limit` parameter to an appropriate value, as selecting a smaller value may have a negative effect on the Pendo API's performance, while a larger value may result in connection errors, request timeouts, or memory overflows. ``` - ```json { "x_pendo_integration_key": "YOUR_INTEGRATION_KEY", @@ -278,7 +277,8 @@ Interrupted syncs for Event type stream are resumed via a bookmark placed during "lookback_window": 10, "request_timeout": 300, "record_limit": 100000, - "include_anonymous_visitors": "true" + "include_anonymous_visitors": "true", + "app_ids": "1234545, 8877665523" } ``` diff --git a/tap_pendo/__init__.py b/tap_pendo/__init__.py index a23d830..4392710 100644 --- a/tap_pendo/__init__.py +++ b/tap_pendo/__init__.py @@ -152,11 +152,32 @@ def sync(config, state, catalog): singer.write_state(state) LOGGER.info("Finished sync") +# To check that given number is numeric or not +def filter_app_ids(config): + # reason to use expandAppIds + # https://support.pendo.io/hc/en-us/community/posts/360078029732-How-to-retrieve-all-application-data-using-Pendo-API-through-Python + app_ids = config.get("app_ids") or "expandAppIds(\"*\")" + + invalid_app_ids = [] + if app_ids != "expandAppIds(\"*\")": + app_ids = [app_id.strip() for app_id in app_ids.split(",")] + for app_id in app_ids: + try: + int(app_id) + except ValueError: + invalid_app_ids.append(app_id) + if invalid_app_ids: + raise Exception(f"Invalid appIDs provided during the configuration:{invalid_app_ids}") + config["app_ids"] = app_ids + return config + + @utils.handle_top_exception(LOGGER) def main(): # Parse command line arguments args = utils.parse_args(REQUIRED_CONFIG_KEYS) + args.config = filter_app_ids(args.config) if args.discover: do_discover(args.config) diff --git a/tap_pendo/schemas/features.json b/tap_pendo/schemas/features.json index 4c15bb5..11178ce 100644 --- a/tap_pendo/schemas/features.json +++ b/tap_pendo/schemas/features.json @@ -1,6 +1,9 @@ { "type": ["null", "object"], "properties": { + "app_id": { + "type": ["null", "number"] + }, "created_by_user": { "type": ["null", "object"], "properties": { diff --git a/tap_pendo/schemas/guides.json b/tap_pendo/schemas/guides.json index 752508f..cad309f 100644 --- a/tap_pendo/schemas/guides.json +++ b/tap_pendo/schemas/guides.json @@ -1,6 +1,9 @@ { "type": ["null", "object"], "properties": { + "app_id": { + "type": ["null", "number"] + }, "created_by_user": { "type": ["null", "object"], "properties": { diff --git a/tap_pendo/schemas/pages.json b/tap_pendo/schemas/pages.json index c888ba4..a4a9d89 100644 --- a/tap_pendo/schemas/pages.json +++ b/tap_pendo/schemas/pages.json @@ -2,6 +2,9 @@ "type": ["null", "object"], "additional_properties": false, "properties": { + "app_id": { + "type": ["null", "number"] + }, "created_by_user": { "type": ["null", "object"], "properties": { diff --git a/tap_pendo/schemas/poll_events.json b/tap_pendo/schemas/poll_events.json index 49e05ba..ba9e438 100644 --- a/tap_pendo/schemas/poll_events.json +++ b/tap_pendo/schemas/poll_events.json @@ -2,6 +2,9 @@ "additional_properties": false, "type": "object", "properties": { + "app_id": { + "type": ["null", "number"] + }, "account_id": { "type": ["null", "string"] }, diff --git a/tap_pendo/schemas/track_types.json b/tap_pendo/schemas/track_types.json index 20449f8..5f561e6 100644 --- a/tap_pendo/schemas/track_types.json +++ b/tap_pendo/schemas/track_types.json @@ -2,6 +2,9 @@ "type": ["null", "object"], "additional_properties": false, "properties": { + "app_id": { + "type": ["null", "number"] + }, "created_by_user": { "type": ["null", "object"], "properties": { diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index 5eff952..973a451 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -890,7 +890,7 @@ def get_body(self, key_id=None, period=None, first=None): "name": "all-accounts", "pipeline": [{ "source": { - "accounts": None + "accounts": {"appId": self.config["app_ids"]} }, }, { "sort": ["metadata.auto.lastupdated"] @@ -932,7 +932,7 @@ def get_body(self, key_id=None, period=None, first=None): "all-features", "pipeline": [{ "source": { - "features": None + "features": {"appId": self.config["app_ids"]} } }, { "sort": [f"{self.replication_key}"] @@ -952,7 +952,7 @@ class FeatureEvents(EventsBase): def get_body(self, key_id, period, first): body = super().get_body(key_id, period, first) - body['request']['pipeline'][0]['source'].update({"featureEvents": {"featureId": key_id,}}) + body['request']['pipeline'][0]['source'].update({"featureEvents": {"featureId": key_id}}) return body @@ -1032,11 +1032,10 @@ def transform(self, record): def get_body(self, key_id, period, first): body = super().get_body(key_id, period, first) - body['request']['pipeline'][0]['source'].update({"events": None}) + body['request']['pipeline'][0]['source'].update({"events": {"appId": self.config["app_ids"]}}) return body - class PollEvents(EventsBase): replication_method = "INCREMENTAL" name = "poll_events" @@ -1050,7 +1049,7 @@ def __init__(self, config): def get_body(self, key_id, period, first): body = super().get_body(key_id, period, first) - body['request']['pipeline'][0]['source'].update({"pollEvents": None}) + body['request']['pipeline'][0]['source'].update({"pollEvents": {"appId": self.config["app_ids"]}}) return body @@ -1062,7 +1061,7 @@ class TrackEvents(EventsBase): def get_body(self, key_id, period, first): body = super().get_body(key_id, period, first) - body['request']['pipeline'][0]['source'].update({"trackEvents": {"trackTypeId": key_id,}}) + body['request']['pipeline'][0]['source'].update({"trackEvents": {"trackTypeId": key_id}}) return body class GuideEvents(EventsBase): @@ -1076,7 +1075,7 @@ def __init__(self, config): def get_body(self, key_id, period, first): body = super().get_body(key_id, period, first) - body['request']['pipeline'][0]['source'].update({"guideEvents": {"guideId": key_id,}}) + body['request']['pipeline'][0]['source'].update({"guideEvents": {"guideId": key_id}}) return body @@ -1094,7 +1093,7 @@ def get_body(self, key_id=None, period=None, first=None): "name": "all-track-types", "pipeline": [{ "source": { - "trackTypes": None + "trackTypes": {"appId": self.config["app_ids"]} } }, { "sort": [f"{self.replication_key}"] @@ -1121,7 +1120,7 @@ def get_body(self, key_id=None, period=None, first=None): "all-guides", "pipeline": [{ "source": { - "guides": None + "guides": {"appId": self.config["app_ids"]} } }, { "sort": [f"{self.replication_key}"] @@ -1149,7 +1148,7 @@ def get_body(self, key_id=None, period=None, first=None): "all-pages", "pipeline": [{ "source": { - "pages": None + "pages": {"appId": self.config["app_ids"]} } }, { "sort": [f"{self.replication_key}"] @@ -1169,7 +1168,7 @@ class PageEvents(EventsBase): def get_body(self, key_id, period, first): body = super().get_body(key_id, period, first) - body['request']['pipeline'][0]['source'].update({"pageEvents": {"pageId": key_id,}}) + body['request']['pipeline'][0]['source'].update({"pageEvents": {"pageId": key_id}}) return body diff --git a/tests/tap_tester/base.py b/tests/tap_tester/base.py index b041896..58a3c48 100644 --- a/tests/tap_tester/base.py +++ b/tests/tap_tester/base.py @@ -14,7 +14,7 @@ class TestPendoBase(unittest.TestCase): - + REPLICATION_KEYS = "valid-replication-keys" PRIMARY_KEYS = "table-key-properties" FOREIGN_KEYS = "table-foreign-key-properties" @@ -24,7 +24,13 @@ class TestPendoBase(unittest.TestCase): START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT%H:%M%S%z" start_date = "" - + app_ids = None + + # After 180 days visitor_history data cannot be synced + # so We need to manually create test data for visitors and vistor_history streams + # Once created, we should update this start date to optimise the execution time + START_DATE_VISTOR_HISTORY = "2023-03-15T00:00:00Z" + @staticmethod def name(): return "test_sync" @@ -137,6 +143,9 @@ def get_properties(self, original: bool = True): "lookback_window": "1", "period": "dayRange", } + if self.app_ids is not None: + return_value["app_ids"] = self.app_ids + if original: return return_value diff --git a/tests/tap_tester/test_all_fields.py b/tests/tap_tester/test_all_fields.py index 4a5cdbd..f554799 100644 --- a/tests/tap_tester/test_all_fields.py +++ b/tests/tap_tester/test_all_fields.py @@ -9,25 +9,16 @@ # Remove skipped fields once bug is fixed MISSING_FILEDS = {"events": {"hour", "feature_id", "parameters"}, "guide_events": {"poll_response", "poll_id"}, - "guides": {"audience"}, - "features": {"page_id"}, - "feature_events": {"hour"}, + "feature_events": {"hour", "parameters"}, "page_events": {"hour"}, - "track_events": {"hour", "properties"}, - "visitor_history": {"feature_id", "untagged_url"}} + "track_events": {"hour"}} class PendoAllFieldsTest(TestPendoBase): def name(self): return "pendo_all_fields_test" def test_run(self): - # To limit the execution time skipping following streams: - # - 'features', 'feature_events' - # - 'guides', 'guide_events' - # - 'pages', 'page_events' - # All above streams have similar impletementation like track_types and track_events streams - - self.run_test({'accounts', 'events', 'poll_events', 'track_events', 'track_types'}) + self.run_test(self.expected_streams() - {"visitors", "visitor_history"}) self.run_test({"visitors", "visitor_history"}) def get_properties(self, original: bool = True): @@ -35,7 +26,7 @@ def get_properties(self, original: bool = True): if self.streams_to_test == {"visitors", "visitor_history"}: return_value = { # To reduce the execution time to test this stream taking recently start_date - "start_date": "2022-07-20T00:00:00Z", + "start_date": self.START_DATE_VISTOR_HISTORY, "lookback_window": "1", "period": "dayRange", } @@ -55,6 +46,7 @@ def run_test(self, expected_streams, start_date=None): self.start_date = start_date self.streams_to_test = expected_streams + self.app_ids = None expected_automatic_fields = self.expected_automatic_fields() conn_id = connections.ensure_connection(self) diff --git a/tests/tap_tester/test_app_ids.py b/tests/tap_tester/test_app_ids.py new file mode 100644 index 0000000..722466a --- /dev/null +++ b/tests/tap_tester/test_app_ids.py @@ -0,0 +1,77 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +import tap_tester.menagerie as menagerie +from base import TestPendoBase + + +class PendoMultiAppIdsTest(TestPendoBase): + def name(self): + return "pendo_multi_app_ids_test" + + def test_run(self): + expected_streams = {"features", "pages", "events", "guides", "track_types", "track_events"} + self.run_test(expected_streams=expected_streams, app_ids="-323232", is_multi_apps=False) + self.run_test(expected_streams=expected_streams, app_ids=None, is_multi_apps=True) + self.run_test(expected_streams={"visitors", "visitor_history"}, app_ids=None, is_multi_apps=True) + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + if self.streams_to_test == {"visitors", "visitor_history"}: + return_value = { + # To reduce the execution time to test this stream taking recently start_date + "start_date": self.START_DATE_VISTOR_HISTORY, + "lookback_window": "1", + "period": "dayRange", + } + if original: + return return_value + + return return_value + else: + return super().get_properties() + + def run_test(self, expected_streams, app_ids, is_multi_apps, start_date=None): + """ + - Verify tap syncs records for multiple app_ids if no app_ids provided + - Verify tap syncs records for specific app_id if single app_id is provided + """ + + self.start_date = start_date + self.streams_to_test = expected_streams + self.app_ids = app_ids + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_all_fields) + + # grab metadata after performing table-and-field selection to set expectations + # used for asserting all fields are replicated + stream_to_all_catalog_fields = dict() + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [md_entry['breadcrumb'][1] + for md_entry in catalog_entry['metadata'] + if md_entry['breadcrumb'] != []] + stream_to_all_catalog_fields[stream_name] = set( + fields_from_field_level_md) + + self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + # below four streams are independent of the app_id + if stream in ["accounts", "visitors", "metadata_accounts", "metadata_visitors"]: + continue + + with self.subTest(stream=stream): + records_appid_set = set([message.get('data').get('app_id') for message in synced_records.get(stream).get("messages")]) + self.assertEqual(len(records_appid_set)>1, is_multi_apps) diff --git a/tests/tap_tester/test_bookmark.py b/tests/tap_tester/test_bookmark.py index 19d1d95..c732db1 100644 --- a/tests/tap_tester/test_bookmark.py +++ b/tests/tap_tester/test_bookmark.py @@ -5,7 +5,7 @@ class PendoBookMarkTest(TestPendoBase): """Test tap sets a bookmark and respects it for the next sync of a stream""" - + def name(self): return "pendo_bookmark_test" @@ -14,7 +14,7 @@ def get_properties(self, original: bool = True): if self.streams_to_test == {"visitors", "visitor_history"}: return_value = { # To reduce the execution time to test this stream taking recently start_date - "start_date": "2022-07-20T00:00:00Z", + "start_date": self.START_DATE_VISTOR_HISTORY, "lookback_window": "1", "period": "dayRange", } diff --git a/tests/unittests/test_app_ids_configurable.py b/tests/unittests/test_app_ids_configurable.py new file mode 100644 index 0000000..f78a21c --- /dev/null +++ b/tests/unittests/test_app_ids_configurable.py @@ -0,0 +1,50 @@ +import unittest +from unittest import mock +from argparse import Namespace +from tap_pendo import main + + +class TestAppIdConfiguration(unittest.TestCase): + @mock.patch("tap_pendo.utils.parse_args", side_effect=lambda required_config_keys: Namespace(config={"start_date": "", "x_pendo_integration_key": "", "period":""}, discover=True)) + @mock.patch("tap_pendo.discover_streams", side_effect=lambda config: {}) + @mock.patch('tap_pendo.do_discover') + def test_app_ids_not_in_config(self, mocked_do_discover, mocked_discover_stream, mocked_parse_args): + """ + To verify that if app_ids is not in configure file then select all apps and run discover mode. + """ + main() + config = {'app_ids': 'expandAppIds("*")', 'start_date': '', 'x_pendo_integration_key': '', 'period': ''} + mocked_do_discover.assert_called_with(config) + + @mock.patch("tap_pendo.utils.parse_args", side_effect=lambda required_config_keys: Namespace(config={"app_ids": "123, 456","start_date": "", "x_pendo_integration_key": "", "period":""}, discover=True)) + @mock.patch("tap_pendo.discover_streams", side_effect=lambda config: {}) + @mock.patch('tap_pendo.do_discover') + def test_app_ids_coma_seperated_string_in_config(self, mocked_do_discover, mocked_discover_stream, mocked_parse_args): + """ + To verify that if app_ids is comma seperated string in configure file then get list of those app_ids and run discover mode. + """ + main() + config = {'app_ids': ['123', '456'], 'start_date': '', 'x_pendo_integration_key': '', 'period': ''} + mocked_do_discover.assert_called_with(config) + + @mock.patch("tap_pendo.utils.parse_args", side_effect=lambda required_config_keys: Namespace(config={"app_ids": "","start_date": "", "x_pendo_integration_key": "", "period":""}, discover=True)) + @mock.patch("tap_pendo.discover_streams", side_effect=lambda config: {}) + @mock.patch('tap_pendo.do_discover') + def test_app_ids_empty_in_config(self, mocked_do_discover, mocked_discover_stream, mocked_parse_args): + """ + To verify that if app_ids is blank string or empty string in configure file then select all apps and run discover mode. + """ + main() + config = {'app_ids': 'expandAppIds("*")', 'start_date': '', 'x_pendo_integration_key': '', 'period': ''} + mocked_do_discover.assert_called_with(config) + + @mock.patch("tap_pendo.utils.parse_args", side_effect=lambda required_config_keys: Namespace(config={"app_ids": "123, test, test123, 123test, ","start_date": "", "x_pendo_integration_key": "", "period":""}, discover=True)) + def test_app_ids_valid_app_ids_with_invalid_app_ids_config(self, mocked_parse_args): + """ + To verify that if app_ids is comma seperated blanks with string in configure file then then raise exception. + """ + + with self.assertRaises(Exception) as e: + main() + + self.assertEqual(str(e.exception), "Invalid appIDs provided during the configuration:['test', 'test123', '123test', '']", "Not get expected exception") \ No newline at end of file diff --git a/tests/unittests/test_parent_stream_sync.py b/tests/unittests/test_parent_stream_sync.py index 5c1c2a2..a5f4d17 100644 --- a/tests/unittests/test_parent_stream_sync.py +++ b/tests/unittests/test_parent_stream_sync.py @@ -4,12 +4,12 @@ from parameterized import parameterized from tap_pendo.streams import * - -default_config = { +from tap_pendo import filter_app_ids +default_config = filter_app_ids({ "record_limit": 10, "request_timeout": 10, "period": "hourRage" - } + }) default_kwargs = {"key_id": "key_id", "period": "hour", "first": 1} @@ -54,15 +54,15 @@ def generate_null_records(self, stream_obj, num_records, num_none_replication_va return test_records @parameterized.expand( - [(Accounts, "accounts", None, "metadata.auto.lastupdated", None), - (Features, "features", None, "lastUpdatedAt", None), - (TrackTypes, "trackTypes", None, "lastUpdatedAt", None), - (Guides, "guides", None, "lastUpdatedAt", None), - (Pages, "pages", None, "lastUpdatedAt", None), - (Events, "events", None, "hour", default_kwargs), + [(Accounts, "accounts", {'appId': 'expandAppIds("*")'}, "metadata.auto.lastupdated", None), + (Features, "features", {'appId': 'expandAppIds("*")'}, "lastUpdatedAt", None), + (TrackTypes, "trackTypes", {'appId': 'expandAppIds("*")'}, "lastUpdatedAt", None), + (Guides, "guides", {'appId': 'expandAppIds("*")'}, "lastUpdatedAt", None), + (Pages, "pages", {'appId': 'expandAppIds("*")'}, "lastUpdatedAt", None), + (Events, "events", {'appId': 'expandAppIds("*")'}, "hour", default_kwargs), (GuideEvents, "guideEvents", {"guideId": "key_id",}, "browser_time", default_kwargs), (FeatureEvents, "featureEvents", {"featureId": "key_id",}, "hour", default_kwargs), - (PollEvents, "pollEvents", None, "browser_time", default_kwargs), + (PollEvents, "pollEvents", {'appId': 'expandAppIds("*")'}, "browser_time", default_kwargs), (TrackEvents, "trackEvents", {"trackTypeId": "key_id",}, "hour", default_kwargs)]) def test_get_body(self, stream_class, exp_event_type, exp_event_type_value, exp_filter_key, kwargs): """Verify get_body method returns an expected request body object"""