Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-17610: Multiple apps support and data fetching #94

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
521991b
Implement configurable app_ids and wirte test case for them
karanpanchal-crest Apr 6, 2022
07ce0bb
correct message in unit tets
karanpanchal-crest Apr 6, 2022
1685dd4
change README.md file for multiapp selection
karanpanchal-crest Apr 6, 2022
82a1f7d
added logger and exception message in app_ids fetching logic
karanpanchal-crest Apr 7, 2022
8f709c6
Add test cases and correct exception and logger message
karanpanchal-crest Apr 7, 2022
f043603
chnage logic for app_ids configuration
karanpanchal-crest Apr 8, 2022
aea8de0
correct pylint error
karanpanchal-crest Apr 8, 2022
df1bfc9
correct Trailing whitespace error
karanpanchal-crest Apr 8, 2022
83bd358
Add asserts in unittest
karanpanchal-crest Apr 18, 2022
8e8d156
placed invalid app_ids exception inside the main function
karanpanchal-crest Apr 19, 2022
f70f51e
add app_ids exception to discovery and sync mode
karanpanchal-crest Apr 19, 2022
93f061c
Add Integration test for the app_ids
karanpanchal-crest Apr 27, 2022
a935175
add integration test comments for app_ids
karanpanchal-crest Apr 27, 2022
3cf8ef2
updated filter function
somethingmorerelevant Mar 21, 2023
b42f285
Merge branch 'master' into TDL-17610-multiple-apps-support-and-data-f…
somethingmorerelevant Mar 23, 2023
c5ae630
updatd filter statement
somethingmorerelevant Mar 23, 2023
470cda8
Merge branch 'master' into TDL-17610-multiple-apps-support-and-data-f…
somethingmorerelevant Apr 3, 2023
4faca9f
Merge branch 'master' into TDL-17610-multiple-apps-support-and-data-f…
somethingmorerelevant Apr 10, 2023
a80271b
fixed broken UT
somethingmorerelevant Apr 10, 2023
e701b24
Merge branch 'master' into TDL-17610-multiple-apps-support-and-data-f…
somethingmorerelevant Apr 10, 2023
ab08c89
fix multi app-ids support to all streams
Apr 11, 2023
0c0a1d4
- fix integration test reviews
RushiT0122 Apr 12, 2023
f82d5e6
updated readme
somethingmorerelevant Jul 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,12 @@ Interrupted syncs for Event type stream are resumed via a bookmark placed during
- `start_date` - the default value to use if no bookmark exists for an endpoint (rfc3339 date string)
- `x_pendo_integration_key` (string, `ABCdef123`): an integration key from Pendo.
- `period` (string, `ABCdef123`): `dayRange` or `hourRange`
- `lookback_window` (integer, 10): For event objects. Default: 0 days
- `request_timeout` (integer, 300) For passing timeout to the request. Default: 300 seconds
- `lookback_window` (integer): 10 (For event objects. Default: 0)
- `request_timeout` (integer): 300 (For passing timeout to the request. Default: 300)
- `record_limit` (integer, 100000): maximum number of records Pendo API can retrieve in a single request. Default: 100000 records

```
- `app_ids` (string, `8877665523, 1234545`): (comma seperated appIDs. If this parameter is not provided, then the data will be collected from all the apps.)
Note: It is important to set `record_limit` parameter to an appropriate value, as selecting a smaller value may have a negative effect on the Pendo API's performance, while a larger value may result in connection errors, request timeouts, or memory overflows.
```

```json
{
"x_pendo_integration_key": "YOUR_INTEGRATION_KEY",
Expand All @@ -278,7 +276,8 @@ Interrupted syncs for Event type stream are resumed via a bookmark placed during
"lookback_window": 10,
"request_timeout": 300,
"record_limit": 100000,
"include_anonymous_visitors": "true"
"include_anonymous_visitors": "true",
"app_ids": "1234545, 8877665523"
}
```

Expand Down
21 changes: 21 additions & 0 deletions tap_pendo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,32 @@ def sync(config, state, catalog):
singer.write_state(state)
LOGGER.info("Finished sync")

# To check that given number is numeric or not
def filter_app_ids(config):
# reason to use expandAppIds
# https://support.pendo.io/hc/en-us/community/posts/360078029732-How-to-retrieve-all-application-data-using-Pendo-API-through-Python
app_ids = config.get("app_ids") or "expandAppIds(\"*\")"

invalid_app_ids = []
if app_ids != "expandAppIds(\"*\")":
app_ids = [app_id.strip() for app_id in app_ids.split(",")]
for app_id in app_ids:
try:
int(app_id)
except ValueError:
invalid_app_ids.append(app_id)
if invalid_app_ids:
raise Exception(f"Invalid appIDs provided during the configuration:{invalid_app_ids}")
config["app_ids"] = app_ids
return config


@utils.handle_top_exception(LOGGER)
def main():

# Parse command line arguments
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
args.config = filter_app_ids(args.config)

if args.discover:
do_discover(args.config)
Expand Down
3 changes: 3 additions & 0 deletions tap_pendo/schemas/features.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{
"type": ["null", "object"],
"properties": {
"app_id": {
"type": ["null", "number"]
},
"created_by_user": {
"type": ["null", "object"],
"properties": {
Expand Down
3 changes: 3 additions & 0 deletions tap_pendo/schemas/guides.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{
"type": ["null", "object"],
"properties": {
"app_id": {
"type": ["null", "number"]
},
"created_by_user": {
"type": ["null", "object"],
"properties": {
Expand Down
3 changes: 3 additions & 0 deletions tap_pendo/schemas/pages.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"type": ["null", "object"],
"additional_properties": false,
"properties": {
"app_id": {
"type": ["null", "number"]
},
"created_by_user": {
"type": ["null", "object"],
"properties": {
Expand Down
3 changes: 3 additions & 0 deletions tap_pendo/schemas/poll_events.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"additional_properties": false,
"type": "object",
"properties": {
"app_id": {
"type": ["null", "number"]
},
"account_id": {
"type": ["null", "string"]
},
Expand Down
3 changes: 3 additions & 0 deletions tap_pendo/schemas/track_types.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"type": ["null", "object"],
"additional_properties": false,
"properties": {
"app_id": {
"type": ["null", "number"]
},
"created_by_user": {
"type": ["null", "object"],
"properties": {
Expand Down
23 changes: 11 additions & 12 deletions tap_pendo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ def get_body(self, key_id=None, period=None, first=None):
"name": "all-accounts",
"pipeline": [{
"source": {
"accounts": None
"accounts": {"appId": self.config["app_ids"]}
},
}, {
"sort": ["metadata.auto.lastupdated"]
Expand Down Expand Up @@ -932,7 +932,7 @@ def get_body(self, key_id=None, period=None, first=None):
"all-features",
"pipeline": [{
"source": {
"features": None
"features": {"appId": self.config["app_ids"]}
}
}, {
"sort": [f"{self.replication_key}"]
Expand All @@ -952,7 +952,7 @@ class FeatureEvents(EventsBase):

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
body['request']['pipeline'][0]['source'].update({"featureEvents": {"featureId": key_id,}})
body['request']['pipeline'][0]['source'].update({"featureEvents": {"featureId": key_id}})
return body


Expand Down Expand Up @@ -1032,11 +1032,10 @@ def transform(self, record):

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
body['request']['pipeline'][0]['source'].update({"events": None})
body['request']['pipeline'][0]['source'].update({"events": {"appId": self.config["app_ids"]}})
return body



class PollEvents(EventsBase):
replication_method = "INCREMENTAL"
name = "poll_events"
Expand All @@ -1050,7 +1049,7 @@ def __init__(self, config):

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
body['request']['pipeline'][0]['source'].update({"pollEvents": None})
body['request']['pipeline'][0]['source'].update({"pollEvents": {"appId": self.config["app_ids"]}})
return body


Expand All @@ -1062,7 +1061,7 @@ class TrackEvents(EventsBase):

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
body['request']['pipeline'][0]['source'].update({"trackEvents": {"trackTypeId": key_id,}})
body['request']['pipeline'][0]['source'].update({"trackEvents": {"trackTypeId": key_id}})
return body

class GuideEvents(EventsBase):
Expand All @@ -1076,7 +1075,7 @@ def __init__(self, config):

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
body['request']['pipeline'][0]['source'].update({"guideEvents": {"guideId": key_id,}})
body['request']['pipeline'][0]['source'].update({"guideEvents": {"guideId": key_id}})
return body


Expand All @@ -1094,7 +1093,7 @@ def get_body(self, key_id=None, period=None, first=None):
"name": "all-track-types",
"pipeline": [{
"source": {
"trackTypes": None
"trackTypes": {"appId": self.config["app_ids"]}
}
}, {
"sort": [f"{self.replication_key}"]
Expand All @@ -1121,7 +1120,7 @@ def get_body(self, key_id=None, period=None, first=None):
"all-guides",
"pipeline": [{
"source": {
"guides": None
"guides": {"appId": self.config["app_ids"]}
}
}, {
"sort": [f"{self.replication_key}"]
Expand Down Expand Up @@ -1149,7 +1148,7 @@ def get_body(self, key_id=None, period=None, first=None):
"all-pages",
"pipeline": [{
"source": {
"pages": None
"pages": {"appId": self.config["app_ids"]}
}
}, {
"sort": [f"{self.replication_key}"]
Expand All @@ -1169,7 +1168,7 @@ class PageEvents(EventsBase):

def get_body(self, key_id, period, first):
body = super().get_body(key_id, period, first)
body['request']['pipeline'][0]['source'].update({"pageEvents": {"pageId": key_id,}})
body['request']['pipeline'][0]['source'].update({"pageEvents": {"pageId": key_id}})
return body


Expand Down
13 changes: 11 additions & 2 deletions tests/tap_tester/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


class TestPendoBase(unittest.TestCase):

REPLICATION_KEYS = "valid-replication-keys"
PRIMARY_KEYS = "table-key-properties"
FOREIGN_KEYS = "table-foreign-key-properties"
Expand All @@ -24,7 +24,13 @@ class TestPendoBase(unittest.TestCase):
START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z"
BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT%H:%M%S%z"
start_date = ""

app_ids = None

# After 180 days visitor_history data cannot be synced
# so We need to manually create test data for visitors and vistor_history streams
# Once created, we should update this start date to optimise the execution time
START_DATE_VISTOR_HISTORY = "2023-03-15T00:00:00Z"

@staticmethod
def name():
return "test_sync"
Expand Down Expand Up @@ -137,6 +143,9 @@ def get_properties(self, original: bool = True):
"lookback_window": "1",
"period": "dayRange",
}
if self.app_ids is not None:
return_value["app_ids"] = self.app_ids

if original:
return return_value

Expand Down
18 changes: 5 additions & 13 deletions tests/tap_tester/test_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,24 @@
# Remove skipped fields once bug is fixed
MISSING_FILEDS = {"events": {"hour", "feature_id", "parameters"},
"guide_events": {"poll_response", "poll_id"},
"guides": {"audience"},
"features": {"page_id"},
"feature_events": {"hour"},
"feature_events": {"hour", "parameters"},
"page_events": {"hour"},
"track_events": {"hour", "properties"},
"visitor_history": {"feature_id", "untagged_url"}}
"track_events": {"hour"}}

class PendoAllFieldsTest(TestPendoBase):
def name(self):
return "pendo_all_fields_test"

def test_run(self):
# To limit the execution time skipping following streams:
# - 'features', 'feature_events'
# - 'guides', 'guide_events'
# - 'pages', 'page_events'
# All above streams have similar impletementation like track_types and track_events streams

self.run_test({'accounts', 'events', 'poll_events', 'track_events', 'track_types'})
self.run_test(self.expected_streams() - {"visitors", "visitor_history"})
self.run_test({"visitors", "visitor_history"})

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
if self.streams_to_test == {"visitors", "visitor_history"}:
return_value = {
# To reduce the execution time to test this stream taking recently start_date
"start_date": "2022-07-20T00:00:00Z",
"start_date": self.START_DATE_VISTOR_HISTORY,
"lookback_window": "1",
"period": "dayRange",
}
Expand All @@ -55,6 +46,7 @@ def run_test(self, expected_streams, start_date=None):

self.start_date = start_date
self.streams_to_test = expected_streams
self.app_ids = None

expected_automatic_fields = self.expected_automatic_fields()
conn_id = connections.ensure_connection(self)
Expand Down
77 changes: 77 additions & 0 deletions tests/tap_tester/test_app_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import tap_tester.connections as connections
import tap_tester.runner as runner
import tap_tester.menagerie as menagerie
from base import TestPendoBase


class PendoMultiAppIdsTest(TestPendoBase):
def name(self):
return "pendo_multi_app_ids_test"

def test_run(self):
expected_streams = {"features", "pages", "events", "guides", "track_types", "track_events"}
self.run_test(expected_streams=expected_streams, app_ids="-323232", is_multi_apps=False)
self.run_test(expected_streams=expected_streams, app_ids=None, is_multi_apps=True)
self.run_test(expected_streams={"visitors", "visitor_history"}, app_ids=None, is_multi_apps=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the visitors and visitor_history the only streams that support this test? I'm leaning towards using other streams here if possible. The logic being that this suite may be cumbersome to maintain since it's noted that data for these streams must be manually generated every 3 months. It would be nice not to lose all coverage and have the whole suite fail when that occurs and just have certain relevant tests fail (all streams, all fields, etc).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only "accounts", "visitors", "metadata_accounts" and "metadata_visitors" streams are not supported for this test and to ensure maximum test coverage within the three-hour maximum execution time in cci, we handle visitor_history stream in a specific manner. Note earlier we used to skip this stream due same limitation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok


def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
if self.streams_to_test == {"visitors", "visitor_history"}:
return_value = {
# To reduce the execution time to test this stream taking recently start_date
"start_date": self.START_DATE_VISTOR_HISTORY,
"lookback_window": "1",
"period": "dayRange",
}
if original:
return return_value

return return_value
else:
return super().get_properties()

def run_test(self, expected_streams, app_ids, is_multi_apps, start_date=None):
"""
- Verify tap syncs records for multiple app_ids if no app_ids provided
- Verify tap syncs records for specific app_id if single app_id is provided
"""

self.start_date = start_date
self.streams_to_test = expected_streams
self.app_ids = app_ids

conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in expected_streams]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields)

# grab metadata after performing table-and-field selection to set expectations
# used for asserting all fields are replicated
stream_to_all_catalog_fields = dict()
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(
fields_from_field_level_md)

self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

for stream in expected_streams:
# below four streams are independent of the app_id
if stream in ["accounts", "visitors", "metadata_accounts", "metadata_visitors"]:
continue

with self.subTest(stream=stream):
records_appid_set = set([message.get('data').get('app_id') for message in synced_records.get(stream).get("messages")])
self.assertEqual(len(records_appid_set)>1, is_multi_apps)
4 changes: 2 additions & 2 deletions tests/tap_tester/test_bookmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class PendoBookMarkTest(TestPendoBase):
"""Test tap sets a bookmark and respects it for the next sync of a stream"""

def name(self):
return "pendo_bookmark_test"

Expand All @@ -14,7 +14,7 @@ def get_properties(self, original: bool = True):
if self.streams_to_test == {"visitors", "visitor_history"}:
return_value = {
# To reduce the execution time to test this stream taking recently start_date
"start_date": "2022-07-20T00:00:00Z",
"start_date": self.START_DATE_VISTOR_HISTORY,
"lookback_window": "1",
"period": "dayRange",
}
Expand Down
Loading