Skip to content

Commit

Permalink
TDL-14621: Add retry logic to requests and TDL-14622: Retry Connectio…
Browse files Browse the repository at this point in the history
…nResetErrors (#71)

* added backoff for certain errors

* resolve pylint

* updated decorator location

* added unittests

* added comment

* added comments
  • Loading branch information
hpatel41 authored Oct 13, 2021
1 parent c9e4218 commit 0418198
Show file tree
Hide file tree
Showing 16 changed files with 854 additions and 11 deletions.
10 changes: 10 additions & 0 deletions tap_exacttarget/dao.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import backoff
import socket
import functools
import singer
from singer import metadata

Expand All @@ -11,6 +14,13 @@
def _get_catalog_schema(catalog):
return catalog.get('schema', {}).get('properties')

# decorator for retrying on error
def exacttarget_error_handling(fnc):
@backoff.on_exception(backoff.expo, (socket.timeout, ConnectionError), max_tries=5, factor=2)
@functools.wraps(fnc)
def wrapper(*args, **kwargs):
return fnc(*args, **kwargs)
return wrapper

class DataAccessObject():

Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/campaigns.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.schemas import with_properties

LOGGER = singer.get_logger()
Expand Down Expand Up @@ -37,6 +37,7 @@ class CampaignDataAccessObject(DataAccessObject):
TABLE = 'campaign'
KEY_PROPERTIES = ['id']

@exacttarget_error_handling
def sync_data(self):
cursor = request(
'Campaign',
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/content_areas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \
CREATED_DATE_FIELD, MODIFIED_DATE_FIELD, CUSTOMER_KEY_FIELD, \
OBJECT_ID_FIELD, with_properties
Expand Down Expand Up @@ -105,6 +105,7 @@ class ContentAreaDataAccessObject(DataAccessObject):
TABLE = 'content_area'
KEY_PROPERTIES = ['ID']

@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
selector = FuelSDK.ET_ContentArea
Expand Down
6 changes: 5 additions & 1 deletion tap_exacttarget/endpoints/data_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from funcy import set_in, update_in, merge

from tap_exacttarget.client import request, request_from_cursor
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.pagination import get_date_page, before_now, \
increment_date
from tap_exacttarget.state import incorporate, save_state, \
Expand Down Expand Up @@ -44,6 +44,7 @@ class DataExtensionDataAccessObject(DataAccessObject):
def matches_catalog(cls, catalog):
return 'data_extension.' in catalog.get('stream')

@exacttarget_error_handling
def _get_extensions(self):
result = request(
'DataExtension',
Expand Down Expand Up @@ -88,6 +89,7 @@ def _get_extensions(self):

return to_return

@exacttarget_error_handling
def _get_fields(self, extensions):
to_return = extensions.copy()

Expand Down Expand Up @@ -184,6 +186,7 @@ def filter_keys_and_parse(self, obj):

return to_return

@exacttarget_error_handling
def _replicate(self, customer_key, keys,
parent_category_id, table,
partial=False, start=None,
Expand Down Expand Up @@ -225,6 +228,7 @@ def _replicate(self, customer_key, keys,

save_state(self.state)

@exacttarget_error_handling
def sync_data(self):
tap_stream_id = self.catalog.get('tap_stream_id')
table = self.catalog.get('stream')
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \
CREATED_DATE_FIELD, CUSTOMER_KEY_FIELD, OBJECT_ID_FIELD, \
MODIFIED_DATE_FIELD, with_properties
Expand Down Expand Up @@ -120,6 +120,7 @@ def parse_object(self, obj):

return super(EmailDataAccessObject, self).parse_object(to_return)

@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
selector = FuelSDK.ET_Email
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.pagination import get_date_page, before_now, \
increment_date
from tap_exacttarget.schemas import SUBSCRIBER_KEY_FIELD, with_properties
Expand Down Expand Up @@ -46,6 +46,7 @@ class EventDataAccessObject(DataAccessObject):
TABLE = 'event'
KEY_PROPERTIES = ['SendID', 'EventType', 'SubscriberKey', 'EventDate']

@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
endpoints = {
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/folders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \
CREATED_DATE_FIELD, CUSTOMER_KEY_FIELD, MODIFIED_DATE_FIELD, \
DESCRIPTION_FIELD, OBJECT_ID_FIELD, with_properties
Expand Down Expand Up @@ -60,6 +60,7 @@ def parse_object(self, obj):

return super(FolderDataAccessObject, self).parse_object(to_return)

@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
selector = FuelSDK.ET_Folder
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/list_sends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \
CREATED_DATE_FIELD, CUSTOMER_KEY_FIELD, OBJECT_ID_FIELD, \
MODIFIED_DATE_FIELD, with_properties
Expand Down Expand Up @@ -102,6 +102,7 @@ def parse_object(self, obj):

return super(ListSendDataAccessObject, self).parse_object(to_return)

@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
selector = FuelSDK.ET_ListSend
Expand Down
4 changes: 3 additions & 1 deletion tap_exacttarget/endpoints/list_subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.endpoints.subscribers import SubscriberDataAccessObject
from tap_exacttarget.pagination import get_date_page, before_now, \
increment_date
Expand Down Expand Up @@ -63,6 +63,7 @@ def __init__(self, config, state, auth_stub, catalog):
self.replicate_subscriber = False
self.subscriber_catalog = None

@exacttarget_error_handling
def _get_all_subscribers_list(self):
"""
Find the 'All Subscribers' list via the SOAP API, and return it.
Expand All @@ -82,6 +83,7 @@ def _get_all_subscribers_list(self):

return sudsobj_to_dict(lists[0])

@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
subscriber_dao = SubscriberDataAccessObject(
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \
CREATED_DATE_FIELD, OBJECT_ID_FIELD, DESCRIPTION_FIELD, \
MODIFIED_DATE_FIELD, with_properties
Expand Down Expand Up @@ -50,6 +50,7 @@ class ListDataAccessObject(DataAccessObject):
TABLE = 'list'
KEY_PROPERTIES = ['ID']

@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
selector = FuelSDK.ET_List
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/sends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \
CREATED_DATE_FIELD, MODIFIED_DATE_FIELD, with_properties
from tap_exacttarget.state import incorporate, save_state, \
Expand Down Expand Up @@ -89,6 +89,7 @@ def parse_object(self, obj):

return super(SendDataAccessObject, self).parse_object(to_return)

@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
selector = FuelSDK.ET_Send
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.schemas import CUSTOM_PROPERTY_LIST, ID_FIELD, \
CREATED_DATE_FIELD, CUSTOMER_KEY_FIELD, OBJECT_ID_FIELD, \
SUBSCRIBER_KEY_FIELD, MODIFIED_DATE_FIELD, with_properties
Expand Down Expand Up @@ -124,6 +124,7 @@ def parse_object(self, obj):
def sync_data(self):
pass

@exacttarget_error_handling
def pull_subscribers_batch(self, subscriber_keys):
if not subscriber_keys:
return
Expand Down
Loading

0 comments on commit 0418198

Please sign in to comment.