Skip to content

Commit

Permalink
Crest Work to Merge (#72)
Browse files Browse the repository at this point in the history
* TDL-14621: Add retry logic to requests and TDL-14622: Retry ConnectionResetErrors  (#71)

* added backoff for certain errors

* resolve pylint

* updated decorator location

* added unittests

* added comment

* added comments

* TDL-14890: Print user friendly error messages (#73)

* updated error message when generating auth_stub

* made changes according to the comments

* updated the code acording to the comments

* updated the tap tester image

* updated pylint and astroid to latest version

* updated the code as, on updating tap tester image it was throwing cascading errors

* updated config.yml file

* updated the start date for integration tests as per the params

* removed scenario as new tap-tester version does not support it

* updated start date in the base file test

* TDL-14989: Check best practices (#74)

* added best practices

* resolve pylint

* resolve test failure

* test: updated the test cases

* test: updated some test cases

* updated the code as per comments

* resolve test case failure

* updated the code as per comments

* resolve integration test failure

* resolve tes case failure

* resolve test case failure

* add data extension stream in test cases

* added data extension incremental stream

* test: run bookmark test

* test: run bookmark test

* test: run bookmark test, debug failing test

* test: pylint resolve

* test: run all data extenstion stream test in bookmark test

* test: updated data extension code

* updated the test to run data extension stream

* run all tests

* added sys.exit and updated pylint and astroid to latest versions

* resolve pylint

* updated the files as per comments

* updated the code

* TDL-14889: Keys should be marked automatic and TDL-14891: list_sends stream does not bookmark correctly (#75)

* make keys automatic

* pylint resolve

* add full replication test case

* added code change for data extension stream

* pylint resolve

* added comment

* added comments

* added comment in base file

* updated discovery test and removed full replication test

* updated the code

* added a comment explaining subscriber and list subscriber syncing

* added comments

* TDL-14887: Respect field selection (#76)

* make keys automatic

* pylint resolve

* add full replication test case

* use transformation

* resolve pylint

* added code change for data extension stream

* pylint resolve

* updated test case for data extension

* added comment

* added comments

* added comments and optimized that condition

* added code change for tranformation function in base file

* pylint resolve

* disabled pylint error

* test: removed disable pylint code

* added comment in base file

* updated comment

* updated the comment for skipping streams

* updated discovery test and removed full replication test

* added verification of unique records

* updated start date

* updated the code

* updated the code

* added a comment explaining subscriber and list subscriber syncing

* added comments

* updated comment

* made separate files for schemas

* resolve pylint

* resolve integration test

* corrected typo

* updated the code

* resolved unittest case error

* TDL-14888: Respect the start date for all streams (#77)

* added code change to use start date when state is not passed

* updated code based on comments

* resolve test case failure

* added data extension stream in tests

* add data extension incremental stream

* added future date test

* updated start date test case

* updated future start date test code

* updated the code

* added comment

* added more comments in the code

* resolve unittests failure

* resolve integration test failure

* TDL-14896: Make batch_size apply to the first page and TDL-14895: Use the pagination__list_subscriber_interval_quantity config value correctly (#78)

* added code change to respect batch size for 1st page sync

* updated code according to comment

* resolve pylint error

* updated code based on comments

* resolve test cases failure

* added data extension stream in the test

* added batch size for data extension stream

* pylint resolve

* updated logger messages

* pylint resolve

* added data extension incremental stream

* added assertion for verifying that we did not duplicate any records across pages

* updated the code

* updated the code

* resolve unittest failure

* updated comments

* added comments

* resolve unittests failure

* Added comments in the code. (#79)

* added comments in the code

* resolved typo
  • Loading branch information
hpatel41 authored Oct 14, 2021
1 parent c9e4218 commit e4a6539
Show file tree
Hide file tree
Showing 51 changed files with 3,220 additions and 947 deletions.
14 changes: 4 additions & 10 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
Expand All @@ -11,7 +11,7 @@ jobs:
python3 -mvenv /usr/local/share/virtualenvs/tap-exacttarget
source /usr/local/share/virtualenvs/tap-exacttarget/bin/activate
pip install -U 'pip<19.2' 'setuptools<51.0.0'
pip install .[dev]
pip install .[test]
- run:
name: 'unittest'
command: |
Expand All @@ -26,16 +26,10 @@ jobs:
- run:
name: 'Integration Tests'
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-exacttarget \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
run-test --tap=tap-exacttarget tests
workflows:
version: 2
commit:
Expand Down
17 changes: 11 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@
py_modules=['tap_exacttarget'],
install_requires=[
'funcy==1.9.1',
'singer-python==5.9.0',
'singer-python==5.12.1',
'python-dateutil==2.6.0',
'voluptuous==0.10.5',
'Salesforce-FuelSDK==1.3.0'
],
extras_require={
'dev': [
'ipdb==0.11',
'pylint==2.1.1',
'astroid==2.1.0',
'test': [
'pylint==2.10.2',
'astroid==2.7.3',
'nose'
],
'dev': [
'ipdb==0.11'
]
},
entry_points='''
[console_scripts]
tap-exacttarget=tap_exacttarget:main
''',
packages=find_packages()
packages=find_packages(),
package_data={
'tap_exacttarget': ['schemas/*.json']
}
)
24 changes: 18 additions & 6 deletions tap_exacttarget/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import argparse
import json

import sys

import singer
from singer import utils
from singer import metadata
Expand Down Expand Up @@ -50,7 +52,7 @@
SubscriberDataAccessObject,
]


# run discover mode
def do_discover(args):
LOGGER.info("Starting discovery.")

Expand All @@ -69,14 +71,14 @@ def do_discover(args):

print(json.dumps({'streams': catalog}, indent=4))


# check if the stream is selected or not
def _is_selected(catalog_entry):
mdata = metadata.to_map(catalog_entry['metadata'])
return singer.should_sync_field(metadata.get(mdata, (), 'inclusion'),
metadata.get(mdata, (), 'selected'),
default=False)


# run sync mode
def do_sync(args):
LOGGER.info("Starting sync.")

Expand All @@ -102,6 +104,15 @@ def do_sync(args):
.format(stream_catalog.get('stream')))
continue

# for 'subscriber' stream if it is selected, add values for 'subscriber_catalog' and
# 'subscriber_selected', and it will replicated via 'list_subscribers' stream
# The 'subscribers' stream is the child stream of 'list_subscribers'
# When we sync 'list_subscribers', it makes the list of subscriber's
# 'SubscriberKey' that were returned as part of 'list_subscribers' records
# and pass that list to 'subscribers' stream and thus 'subscribers' stream
# will only sync records of subscribers that are present in the list.
# Hence, for different start dates the 'SubscriberKey' list will differ and
# thus 'subscribers' records will also be different for different start dates.
if SubscriberDataAccessObject.matches_catalog(stream_catalog):
subscriber_selected = True
subscriber_catalog = stream_catalog
Expand All @@ -119,11 +130,12 @@ def do_sync(args):

break

# do not replicate 'subscriber' stream without selecting 'list_subscriber' stream
if subscriber_selected and not list_subscriber_selected:
LOGGER.fatal('Cannot replicate `subscriber` without '
'`list_subscriber`. Please select `list_subscriber` '
'and try again.')
exit(1)
sys.exit(1)

for stream_accessor in stream_accessors:
if isinstance(stream_accessor, ListSubscriberDataAccessObject) and \
Expand Down Expand Up @@ -161,10 +173,10 @@ def main():

if success:
LOGGER.info("Completed successfully, exiting.")
exit(0)
sys.exit(0)
else:
LOGGER.info("Run failed, exiting.")
exit(1)
sys.exit(1)

if __name__ == '__main__':
main()
31 changes: 20 additions & 11 deletions tap_exacttarget/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

LOGGER = singer.get_logger()


def _get_response_items(response):
# prints the number of records fetched from the passed endpoint
def _get_response_items(response, name):
items = response.results

if 'count' in response.results:
LOGGER.info('Got {} results.'.format(response.results.get('count')))
items = response.results.get('items')

LOGGER.info('Got %s results from %s endpoint.', len(items), name)
return items


Expand Down Expand Up @@ -61,7 +61,8 @@ def get_auth_stub(config):
LOGGER.info('Failed to auth using V1 endpoint')
if not config.get('tenant_subdomain'):
LOGGER.warning('No tenant_subdomain found, will not attempt to auth with V2 endpoint')
raise e
message = f"{str(e)}. Please check your \'client_id\', \'client_secret\' or try adding the \'tenant_subdomain\'."
raise Exception(message) from None

# Next try V2
# Move to OAuth2: https://help.salesforce.com/articleView?id=mc_rn_january_2019_platform_ip_remove_legacy_package_create_ability.htm&type=5
Expand All @@ -77,7 +78,8 @@ def get_auth_stub(config):
transport=transport)
except Exception as e:
LOGGER.info('Failed to auth using V2 endpoint')
raise e
message = f"{str(e)}. Please check your \'client_id\', \'client_secret\' or \'tenant_subdomain\'."
raise Exception(message) from None

LOGGER.info("Success.")
return auth_stub
Expand Down Expand Up @@ -107,6 +109,9 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz
"""
cursor = selector()
cursor.auth_stub = auth_stub
# set batch size ie. the page size defined by the user as the
# FuelSDK supports setting page size in the "BatchSize" value in "options" parameter
cursor.options = {"BatchSize": batch_size}

if props is not None:
cursor.props = props
Expand Down Expand Up @@ -142,22 +147,26 @@ def request_from_cursor(name, cursor, batch_size):
raise RuntimeError("Request failed with '{}'"
.format(response.message))

for item in _get_response_items(response):
for item in _get_response_items(response, name):
yield item

while response.more_results:
LOGGER.info("Getting more results from '{}' endpoint".format(name))

# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)
LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name))
if isinstance(cursor, FuelSDK.ET_Campaign):
# use 'getMoreResults' for campaigns as it does not use
# batch_size, rather it uses $page and $pageSize and REST Call
response = cursor.getMoreResults()
else:
# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)

if not response.status:
raise RuntimeError("Request failed with '{}'"
.format(response.message))

for item in _get_response_items(response):
for item in _get_response_items(response, name):
yield item

LOGGER.info("Done retrieving results from '{}' endpoint".format(name))
74 changes: 67 additions & 7 deletions tap_exacttarget/dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import backoff
import socket
import functools
import singer
from singer import metadata
import os
from singer import metadata, Transformer, utils

from funcy import project

Expand All @@ -11,6 +15,34 @@
def _get_catalog_schema(catalog):
return catalog.get('schema', {}).get('properties')

def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

# function to load the fields in the 'definitions' which contains the reference fields
def load_schema_references():
shared_schema_path = get_abs_path('schemas/definitions.json')

refs = {}
# load json from the path
refs["definitions.json"] = utils.load_json(shared_schema_path)

return refs

# function to load schema from json file
def load_schema(stream):
path = get_abs_path('schemas/{}s.json'.format(stream))
# load json from the path
schema = utils.load_json(path)

return schema

# decorator for retrying on error
def exacttarget_error_handling(fnc):
@backoff.on_exception(backoff.expo, (socket.timeout, ConnectionError), max_tries=5, factor=2)
@functools.wraps(fnc)
def wrapper(*args, **kwargs):
return fnc(*args, **kwargs)
return wrapper

class DataAccessObject():

Expand All @@ -19,45 +51,71 @@ def __init__(self, config, state, auth_stub, catalog):
self.state = state.copy()
self.catalog = catalog
self.auth_stub = auth_stub
# initialize batch size
self.batch_size = int(self.config.get('batch_size', 2500))

@classmethod
def matches_catalog(cls, catalog):
return catalog.get('stream') == cls.TABLE

# generate schema and metadata for adding in catalog file
def generate_catalog(self):
cls = self.__class__

# get the reference schemas
refs = load_schema_references()
# resolve the schema reference and make final schema
schema = singer.resolve_schema_references(load_schema(cls.TABLE), refs)
mdata = metadata.new()
metadata.write(mdata, (), 'inclusion', 'available')
for prop in cls.SCHEMA['properties']: # pylint:disable=unsubscriptable-object
metadata.write(mdata, ('properties', prop), 'inclusion', 'available')

# use 'get_standard_metadata' with primary key, replication key and replication method
mdata = metadata.get_standard_metadata(schema=schema,
key_properties=self.KEY_PROPERTIES,
valid_replication_keys=self.REPLICATION_KEYS if self.REPLICATION_KEYS else None,
replication_method=self.REPLICATION_METHOD)

mdata_map = metadata.to_map(mdata)

# make 'automatic' inclusion for replication keys
for replication_key in self.REPLICATION_KEYS:
mdata_map[('properties', replication_key)]['inclusion'] = 'automatic'

return [{
'tap_stream_id': cls.TABLE,
'stream': cls.TABLE,
'key_properties': cls.KEY_PROPERTIES,
'schema': cls.SCHEMA,
'metadata': metadata.to_list(mdata)
'schema': schema,
'metadata': metadata.to_list(mdata_map)
}]

# convert suds object to dictionary
def filter_keys_and_parse(self, obj):
to_return = sudsobj_to_dict(obj)

return self.parse_object(to_return)

# get the list for keys present in the schema
def get_catalog_keys(self):
return list(
self.catalog.get('schema', {}).get('properties', {}).keys())

def parse_object(self, obj):
return project(obj, self.get_catalog_keys())

# a function to write records by applying transformation
@staticmethod
def write_records_with_transform(record, catalog, table):
with Transformer() as transformer:
rec = transformer.transform(record, catalog.get('schema'), metadata.to_map(catalog.get('metadata')))
singer.write_record(table, rec)

def write_schema(self):
singer.write_schema(
self.catalog.get('stream'),
self.catalog.get('schema'),
key_properties=self.catalog.get('key_properties'))

# main 'sync' function
def sync(self):
mdata = metadata.to_map(self.catalog['metadata'])
if not metadata.get(mdata, (), 'selected'):
Expand All @@ -75,9 +133,11 @@ def sync(self):

# OVERRIDE THESE TO IMPLEMENT A NEW DAO:

SCHEMA = None
TABLE = None
KEY_PROPERTIES = None
REPLICATION_KEYS = []
REPLICATION_METHOD = None

# function to be overridden by the respective stream files and implement sync
def sync_data(self): # pylint: disable=no-self-use
raise RuntimeError('sync_data is not implemented!')
Loading

0 comments on commit e4a6539

Please sign in to comment.