From 041819810e2ea7b8ae68801e0788aac23c940e73 Mon Sep 17 00:00:00 2001 From: Harsh <80324346+harshpatel4crest@users.noreply.github.com> Date: Wed, 13 Oct 2021 11:44:41 +0530 Subject: [PATCH] TDL-14621: Add retry logic to requests and TDL-14622: Retry ConnectionResetErrors (#71) * added backoff for certain errors * resolve pylint * updated decorator location * added unittests * added comment * added comments --- tap_exacttarget/dao.py | 10 + tap_exacttarget/endpoints/campaigns.py | 3 +- tap_exacttarget/endpoints/content_areas.py | 3 +- tap_exacttarget/endpoints/data_extensions.py | 6 +- tap_exacttarget/endpoints/emails.py | 3 +- tap_exacttarget/endpoints/events.py | 3 +- tap_exacttarget/endpoints/folders.py | 3 +- tap_exacttarget/endpoints/list_sends.py | 3 +- tap_exacttarget/endpoints/list_subscribers.py | 4 +- tap_exacttarget/endpoints/lists.py | 3 +- tap_exacttarget/endpoints/sends.py | 3 +- tap_exacttarget/endpoints/subscribers.py | 3 +- tests/unittests/test_backoff.py | 810 ++++++++++++++++++ tests/unittests/test_pagination.py | 2 + tests/unittests/test_state.py | 5 + tests/unittests/test_util.py | 1 + 16 files changed, 854 insertions(+), 11 deletions(-) create mode 100644 tests/unittests/test_backoff.py diff --git a/tap_exacttarget/dao.py b/tap_exacttarget/dao.py index ec7224d..445d30d 100644 --- a/tap_exacttarget/dao.py +++ b/tap_exacttarget/dao.py @@ -1,3 +1,6 @@ +import backoff +import socket +import functools import singer from singer import metadata @@ -11,6 +14,13 @@ def _get_catalog_schema(catalog): return catalog.get('schema', {}).get('properties') +# decorator for retrying on error +def exacttarget_error_handling(fnc): + @backoff.on_exception(backoff.expo, (socket.timeout, ConnectionError), max_tries=5, factor=2) + @functools.wraps(fnc) + def wrapper(*args, **kwargs): + return fnc(*args, **kwargs) + return wrapper class DataAccessObject(): diff --git a/tap_exacttarget/endpoints/campaigns.py b/tap_exacttarget/endpoints/campaigns.py index 6c3d264..03ed37e 100644 --- a/tap_exacttarget/endpoints/campaigns.py +++ b/tap_exacttarget/endpoints/campaigns.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.schemas import with_properties LOGGER = singer.get_logger() @@ -37,6 +37,7 @@ class CampaignDataAccessObject(DataAccessObject): TABLE = 'campaign' KEY_PROPERTIES = ['id'] + @exacttarget_error_handling def sync_data(self): cursor = request( 'Campaign', diff --git a/tap_exacttarget/endpoints/content_areas.py b/tap_exacttarget/endpoints/content_areas.py index e44f6a9..1f5f621 100644 --- a/tap_exacttarget/endpoints/content_areas.py +++ b/tap_exacttarget/endpoints/content_areas.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \ CREATED_DATE_FIELD, MODIFIED_DATE_FIELD, CUSTOMER_KEY_FIELD, \ OBJECT_ID_FIELD, with_properties @@ -105,6 +105,7 @@ class ContentAreaDataAccessObject(DataAccessObject): TABLE = 'content_area' KEY_PROPERTIES = ['ID'] + @exacttarget_error_handling def sync_data(self): table = self.__class__.TABLE selector = FuelSDK.ET_ContentArea diff --git a/tap_exacttarget/endpoints/data_extensions.py b/tap_exacttarget/endpoints/data_extensions.py index 8479585..d87a9e0 100644 --- a/tap_exacttarget/endpoints/data_extensions.py +++ b/tap_exacttarget/endpoints/data_extensions.py @@ -4,7 +4,7 @@ from funcy import set_in, update_in, merge from tap_exacttarget.client import request, request_from_cursor -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.pagination import get_date_page, before_now, \ increment_date from tap_exacttarget.state import incorporate, save_state, \ @@ -44,6 +44,7 @@ class DataExtensionDataAccessObject(DataAccessObject): def matches_catalog(cls, catalog): return 'data_extension.' in catalog.get('stream') + @exacttarget_error_handling def _get_extensions(self): result = request( 'DataExtension', @@ -88,6 +89,7 @@ def _get_extensions(self): return to_return + @exacttarget_error_handling def _get_fields(self, extensions): to_return = extensions.copy() @@ -184,6 +186,7 @@ def filter_keys_and_parse(self, obj): return to_return + @exacttarget_error_handling def _replicate(self, customer_key, keys, parent_category_id, table, partial=False, start=None, @@ -225,6 +228,7 @@ def _replicate(self, customer_key, keys, save_state(self.state) + @exacttarget_error_handling def sync_data(self): tap_stream_id = self.catalog.get('tap_stream_id') table = self.catalog.get('stream') diff --git a/tap_exacttarget/endpoints/emails.py b/tap_exacttarget/endpoints/emails.py index 4c0c089..cf7ec34 100644 --- a/tap_exacttarget/endpoints/emails.py +++ b/tap_exacttarget/endpoints/emails.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \ CREATED_DATE_FIELD, CUSTOMER_KEY_FIELD, OBJECT_ID_FIELD, \ MODIFIED_DATE_FIELD, with_properties @@ -120,6 +120,7 @@ def parse_object(self, obj): return super(EmailDataAccessObject, self).parse_object(to_return) + @exacttarget_error_handling def sync_data(self): table = self.__class__.TABLE selector = FuelSDK.ET_Email diff --git a/tap_exacttarget/endpoints/events.py b/tap_exacttarget/endpoints/events.py index 8ed64d5..c7044f0 100644 --- a/tap_exacttarget/endpoints/events.py +++ b/tap_exacttarget/endpoints/events.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.pagination import get_date_page, before_now, \ increment_date from tap_exacttarget.schemas import SUBSCRIBER_KEY_FIELD, with_properties @@ -46,6 +46,7 @@ class EventDataAccessObject(DataAccessObject): TABLE = 'event' KEY_PROPERTIES = ['SendID', 'EventType', 'SubscriberKey', 'EventDate'] + @exacttarget_error_handling def sync_data(self): table = self.__class__.TABLE endpoints = { diff --git a/tap_exacttarget/endpoints/folders.py b/tap_exacttarget/endpoints/folders.py index e52247a..5e27adc 100644 --- a/tap_exacttarget/endpoints/folders.py +++ b/tap_exacttarget/endpoints/folders.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \ CREATED_DATE_FIELD, CUSTOMER_KEY_FIELD, MODIFIED_DATE_FIELD, \ DESCRIPTION_FIELD, OBJECT_ID_FIELD, with_properties @@ -60,6 +60,7 @@ def parse_object(self, obj): return super(FolderDataAccessObject, self).parse_object(to_return) + @exacttarget_error_handling def sync_data(self): table = self.__class__.TABLE selector = FuelSDK.ET_Folder diff --git a/tap_exacttarget/endpoints/list_sends.py b/tap_exacttarget/endpoints/list_sends.py index 136697a..2ed4c82 100644 --- a/tap_exacttarget/endpoints/list_sends.py +++ b/tap_exacttarget/endpoints/list_sends.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \ CREATED_DATE_FIELD, CUSTOMER_KEY_FIELD, OBJECT_ID_FIELD, \ MODIFIED_DATE_FIELD, with_properties @@ -102,6 +102,7 @@ def parse_object(self, obj): return super(ListSendDataAccessObject, self).parse_object(to_return) + @exacttarget_error_handling def sync_data(self): table = self.__class__.TABLE selector = FuelSDK.ET_ListSend diff --git a/tap_exacttarget/endpoints/list_subscribers.py b/tap_exacttarget/endpoints/list_subscribers.py index 5a7d23f..2d8025c 100644 --- a/tap_exacttarget/endpoints/list_subscribers.py +++ b/tap_exacttarget/endpoints/list_subscribers.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.endpoints.subscribers import SubscriberDataAccessObject from tap_exacttarget.pagination import get_date_page, before_now, \ increment_date @@ -63,6 +63,7 @@ def __init__(self, config, state, auth_stub, catalog): self.replicate_subscriber = False self.subscriber_catalog = None + @exacttarget_error_handling def _get_all_subscribers_list(self): """ Find the 'All Subscribers' list via the SOAP API, and return it. @@ -82,6 +83,7 @@ def _get_all_subscribers_list(self): return sudsobj_to_dict(lists[0]) + @exacttarget_error_handling def sync_data(self): table = self.__class__.TABLE subscriber_dao = SubscriberDataAccessObject( diff --git a/tap_exacttarget/endpoints/lists.py b/tap_exacttarget/endpoints/lists.py index b9f2dbf..263beb4 100644 --- a/tap_exacttarget/endpoints/lists.py +++ b/tap_exacttarget/endpoints/lists.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \ CREATED_DATE_FIELD, OBJECT_ID_FIELD, DESCRIPTION_FIELD, \ MODIFIED_DATE_FIELD, with_properties @@ -50,6 +50,7 @@ class ListDataAccessObject(DataAccessObject): TABLE = 'list' KEY_PROPERTIES = ['ID'] + @exacttarget_error_handling def sync_data(self): table = self.__class__.TABLE selector = FuelSDK.ET_List diff --git a/tap_exacttarget/endpoints/sends.py b/tap_exacttarget/endpoints/sends.py index 5129197..93fb41e 100644 --- a/tap_exacttarget/endpoints/sends.py +++ b/tap_exacttarget/endpoints/sends.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \ CREATED_DATE_FIELD, MODIFIED_DATE_FIELD, with_properties from tap_exacttarget.state import incorporate, save_state, \ @@ -89,6 +89,7 @@ def parse_object(self, obj): return super(SendDataAccessObject, self).parse_object(to_return) + @exacttarget_error_handling def sync_data(self): table = self.__class__.TABLE selector = FuelSDK.ET_Send diff --git a/tap_exacttarget/endpoints/subscribers.py b/tap_exacttarget/endpoints/subscribers.py index 9a846a2..eff94aa 100644 --- a/tap_exacttarget/endpoints/subscribers.py +++ b/tap_exacttarget/endpoints/subscribers.py @@ -2,7 +2,7 @@ import singer from tap_exacttarget.client import request -from tap_exacttarget.dao import DataAccessObject +from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) from tap_exacttarget.schemas import CUSTOM_PROPERTY_LIST, ID_FIELD, \ CREATED_DATE_FIELD, CUSTOMER_KEY_FIELD, OBJECT_ID_FIELD, \ SUBSCRIBER_KEY_FIELD, MODIFIED_DATE_FIELD, with_properties @@ -124,6 +124,7 @@ def parse_object(self, obj): def sync_data(self): pass + @exacttarget_error_handling def pull_subscribers_batch(self, subscriber_keys): if not subscriber_keys: return diff --git a/tests/unittests/test_backoff.py b/tests/unittests/test_backoff.py new file mode 100644 index 0000000..bf9daeb --- /dev/null +++ b/tests/unittests/test_backoff.py @@ -0,0 +1,810 @@ +import unittest +import socket +from unittest import mock +from tap_exacttarget.endpoints import ( + campaigns, content_areas, data_extensions, + emails, events, folders, list_sends, + list_subscribers, lists, sends, subscribers) + +# prepare mock response +class Mockresponse: + def __init__(self, status, json): + self.status = status + self.results = json + self.more_results = False + +# get mock response +def get_response(status, json={}): + return Mockresponse(status, json) + +@mock.patch("time.sleep") +class TestConnectionResetError(unittest.TestCase): + """ + Tests for verifying that the backoff is working as expected for 'ConnectionResetError' + """ + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__content_area(self, mocked_get, mocked_sleep): + # mocked 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'ContentAreaDataAccessObject' + obj = content_areas.ContentAreaDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_connection_reset_error_occurred__content_area(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'ContentAreaDataAccessObject' + obj = content_areas.ContentAreaDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupportRest.get") + def test_connection_reset_error_occurred__campaign(self, mocked_get_rest, mocked_sleep): + # mock 'get' and raise error + mocked_get_rest.side_effect = socket.error(104, 'Connection reset by peer') + # # make the object of 'CampaignDataAccessObject' + obj = campaigns.CampaignDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get_rest.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupportRest.get") + @mock.patch("singer.write_records") + def test_no_connection_reset_error_occurred__campaign(self, mocked_write_records, mocked_get_rest, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get_rest.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'CampaignDataAccessObject' + obj = campaigns.CampaignDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__data_extension(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'DataExtensionDataAccessObject' + obj = data_extensions.DataExtensionDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, { + # dummy catalog file + "stream": "data_extention.e1", + "tap_stream_id": "data_extention.e1", + "schema": { + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "CategoryID": { + "type": [ + "null", + "string" + ] + } + } + }}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__data_extension_get_extensions(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'DataExtensionDataAccessObject' + obj = data_extensions.DataExtensionDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call function + obj._get_extensions() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.objects.ET_DataExtension_Column.get") + def test_connection_reset_error_occurred__data_extension_get_fields(self, mocked_data_ext_column, mocked_sleep): + # mock 'get' and raise error + mocked_data_ext_column.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'DataExtensionDataAccessObject' + obj = data_extensions.DataExtensionDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call function + obj._get_fields([]) + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_data_ext_column.call_count, 5) + + @mock.patch("FuelSDK.objects.ET_DataExtension_Row.get") + def test_connection_reset_error_occurred__data_extension_replicate(self, mocked_data_ext_column, mocked_sleep): + # mock 'get' and raise error + mocked_data_ext_column.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'DataExtensionDataAccessObject' + obj = data_extensions.DataExtensionDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call function + obj._replicate(None, None, None, None) + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_data_ext_column.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__email(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'EmailDataAccessObject' + obj = emails.EmailDataAccessObject({}, {}, None, {}) + try: + # call function + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_connection_reset_error_occurred__email(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'EmailDataAccessObject' + obj = emails.EmailDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__events(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'EventDataAccessObject' + obj = events.EventDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__folder(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'FolderDataAccessObject' + obj = folders.FolderDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_connection_reset_error_occurred__folder(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'FolderDataAccessObject' + obj = folders.FolderDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__list_send(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'ListSendDataAccessObject' + obj = list_sends.ListSendDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_connection_reset_error_occurred__list_send(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'ListSendDataAccessObject' + obj = list_sends.ListSendDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("tap_exacttarget.endpoints.list_subscribers.ListSubscriberDataAccessObject._get_all_subscribers_list") + def test_connection_reset_error_occurred__list_subscriber(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'ListSubscriberDataAccessObject' + obj = list_subscribers.ListSubscriberDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__list_subscriber__get_all_subscribers_list(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'ListSubscriberDataAccessObject' + obj = list_subscribers.ListSubscriberDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call function + obj._get_all_subscribers_list() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_no_connection_reset_error_occurred__list_subscriber__get_all_subscribers_list(self, mocked_get, mocked_sleep): + json = { + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + } + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [json])] + # make the object of 'ListSubscriberDataAccessObject' + obj = list_subscribers.ListSubscriberDataAccessObject({}, {}, None, {}) + # call function + actual = obj._get_all_subscribers_list() + # verify if the record was returned as response + self.assertEquals(actual, json) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__list(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'ListDataAccessObject' + obj = lists.ListDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_connection_reset_error_occurred__list(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'ListDataAccessObject' + obj = lists.ListDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__sends(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'SendDataAccessObject' + obj = sends.SendDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_connection_reset_error_occurred__sends(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'SendDataAccessObject' + obj = sends.SendDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_connection_reset_error_occurred__subscriber(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.error(104, 'Connection reset by peer') + # make the object of 'SubscriberDataAccessObject' + obj = subscribers.SubscriberDataAccessObject({}, {}, None, {}) + try: + # call function + obj.pull_subscribers_batch(['sub1']) + except ConnectionError: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_connection_reset_error_occurred__subscriber(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'SubscriberDataAccessObject' + obj = subscribers.SubscriberDataAccessObject({}, {}, None, {}) + # call function + obj.pull_subscribers_batch(['sub1']) + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + +@mock.patch("time.sleep") +class TestSocketTimeoutError(unittest.TestCase): + """ + Tests for verifying that the backoff is working as expected for 'socket.timeout' error + """ + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__content_area(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'ContentAreaDataAccessObject' + obj = content_areas.ContentAreaDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_socket_timeout_error_occurred__content_area(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'ContentAreaDataAccessObject' + obj = content_areas.ContentAreaDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupportRest.get") + def test_socket_timeout_error_occurred__campaign(self, mocked_get_rest, mocked_sleep): + # mock 'get' and raise error + mocked_get_rest.side_effect = socket.timeout("The read operation timed out") + # make the object of 'CampaignDataAccessObject' + obj = campaigns.CampaignDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get_rest.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupportRest.get") + @mock.patch("singer.write_records") + def test_no_socket_timeout_error_occurred__campaign(self, mocked_write_records, mocked_get_rest, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get_rest.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'CampaignDataAccessObject' + obj = campaigns.CampaignDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__data_extension(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'DataExtensionDataAccessObject' + obj = data_extensions.DataExtensionDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, { + # dummy catalog file + "stream": "data_extention.e1", + "tap_stream_id": "data_extention.e1", + "schema": { + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "CategoryID": { + "type": [ + "null", + "string" + ] + } + } + }}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__data_extension_get_extensions(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'DataExtensionDataAccessObject' + obj = data_extensions.DataExtensionDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call function + obj._get_extensions() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.objects.ET_DataExtension_Column.get") + def test_socket_timeout_error_occurred__data_extension_get_fields(self, mocked_data_ext_column, mocked_sleep): + # mock 'get' and raise error + mocked_data_ext_column.side_effect = socket.timeout("The read operation timed out") + # make the object of 'DataExtensionDataAccessObject' + obj = data_extensions.DataExtensionDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call function + obj._get_fields([]) + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_data_ext_column.call_count, 5) + + @mock.patch("FuelSDK.objects.ET_DataExtension_Row.get") + def test_socket_timeout_error_occurred__data_extension_replicate(self, mocked_data_ext_column, mocked_sleep): + # mock 'get' and raise error + mocked_data_ext_column.side_effect = socket.timeout("The read operation timed out") + # make the object of 'DataExtensionDataAccessObject' + obj = data_extensions.DataExtensionDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call function + obj._replicate(None, None, None, None) + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_data_ext_column.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__email(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # # make the object of 'EmailDataAccessObject' + obj = emails.EmailDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_socket_timeout_error_occurred__email(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'EmailDataAccessObject' + obj = emails.EmailDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__events(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'EventDataAccessObject' + obj = events.EventDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__folder(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'FolderDataAccessObject' + obj = folders.FolderDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_socket_timeout_error_occurred__folder(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'FolderDataAccessObject' + obj = folders.FolderDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__list_send(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'ListSendDataAccessObject' + obj = list_sends.ListSendDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_socket_timeout_error_occurred__list_send(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'ListSendDataAccessObject' + obj = list_sends.ListSendDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("tap_exacttarget.endpoints.list_subscribers.ListSubscriberDataAccessObject._get_all_subscribers_list") + def test_socket_timeout_error_occurred__list_subscriber(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'ListSubscriberDataAccessObject' + obj = list_subscribers.ListSubscriberDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__list_subscriber__get_all_subscribers_list(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'ListSubscriberDataAccessObject' + obj = list_subscribers.ListSubscriberDataAccessObject({"start_date": "2020-01-01T00:00:00Z"}, {}, None, {}) + try: + # call function + obj._get_all_subscribers_list() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_no_socket_timeout_error_occurred__list_subscriber__get_all_subscribers_list(self, mocked_get, mocked_sleep): + json = { + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + } + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [json])] + # make the object of 'ListSubscriberDataAccessObject' + obj = list_subscribers.ListSubscriberDataAccessObject({}, {}, None, {}) + # call function + actual = obj._get_all_subscribers_list() + # verify if the record was returned as response + self.assertEquals(actual, json) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__list(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'ListDataAccessObject' + obj = lists.ListDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_socket_timeout_error_occurred__list(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'ListDataAccessObject' + obj = lists.ListDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__sends(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'SendDataAccessObject' + obj = sends.SendDataAccessObject({}, {}, None, {}) + try: + # call sync + obj.sync_data() + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_socket_timeout_error_occurred__sends(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'SendDataAccessObject' + obj = sends.SendDataAccessObject({}, {}, None, {}) + # call sync + obj.sync_data() + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + def test_socket_timeout_error_occurred__subscriber(self, mocked_get, mocked_sleep): + # mock 'get' and raise error + mocked_get.side_effect = socket.timeout("The read operation timed out") + # make the object of 'SubscriberDataAccessObject' + obj = subscribers.SubscriberDataAccessObject({}, {}, None, {}) + try: + # call function + obj.pull_subscribers_batch(['sub1']) + except socket.timeout: + pass + # verify the code backed off and requested for 5 times + self.assertEquals(mocked_get.call_count, 5) + + @mock.patch("FuelSDK.rest.ET_GetSupport.get") + @mock.patch("singer.write_records") + def test_no_socket_timeout_error_occurred__subscriber(self, mocked_write_records, mocked_get, mocked_sleep): + # mock 'get' and return the dummy data + mocked_get.side_effect = [get_response(True, [{ + "CategoryID": 12345, + "ContentCheckStatus": "Not Checked", + "CreatedDate": "2021-01-01T00:00:00Z", + "EmailType": "Normal" + }])] + # make the object of 'SubscriberDataAccessObject' + obj = subscribers.SubscriberDataAccessObject({}, {}, None, {}) + # call function + obj.pull_subscribers_batch(['sub1']) + # verify if 'singer.write_records' was called + # once as there is only one record + self.assertEquals(mocked_write_records.call_count, 1) diff --git a/tests/unittests/test_pagination.py b/tests/unittests/test_pagination.py index 0784f64..f76c139 100644 --- a/tests/unittests/test_pagination.py +++ b/tests/unittests/test_pagination.py @@ -6,9 +6,11 @@ class TestPagination(unittest.TestCase): def test_increment_date(self): + # verify that if there is no 'unit' mentioned, then the date should be incremented by '1 day' self.assertEqual( increment_date("2015-09-28T10:05:53Z"), "2015-09-29T10:05:53Z") + # verify that the 'increment_date' correctly increments the date by sepcified 'unit'(here: 1 hour) self.assertEqual( increment_date("2015-09-28T10:05:53Z", {'hours': 1}), "2015-09-28T11:05:53Z") diff --git a/tests/unittests/test_state.py b/tests/unittests/test_state.py index c05be93..8250be6 100644 --- a/tests/unittests/test_state.py +++ b/tests/unittests/test_state.py @@ -6,6 +6,7 @@ class TestState(unittest.TestCase): def test_incorporate(self): + # verify that the state file is updated if there is no previous bookmark present self.assertEqual( incorporate({}, 'table', 'modifieddate', '2017-11-01'), { @@ -17,6 +18,8 @@ def test_incorporate(self): } }) + # verify that the bookmark value is updated as the previous + # bookmark value is smaller than the current record's value self.assertEqual( incorporate({ 'bookmarks': { @@ -35,6 +38,8 @@ def test_incorporate(self): } }) + # verify that the bookmark value is not updated as the previous + # bookmark value is greater than the current record's value self.assertEqual( incorporate({ 'bookmarks': { diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 4ffbf66..224e9fe 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -6,6 +6,7 @@ class TestPartitionAll(unittest.TestCase): def test__partition_all(self): + # verify that the 'partion_all' correctly divides the records into the specified chunk size self.assertEqual( list(partition_all([1, 2, 3, 4, 5, 6, 7], 3)), [[1, 2, 3], [4, 5, 6], [7]])