-
Notifications
You must be signed in to change notification settings - Fork 34
/
emails.py
66 lines (48 loc) · 2.02 KB
/
emails.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import FuelSDK
import copy
import singer
from tap_exacttarget.client import request
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)
from tap_exacttarget.state import incorporate, save_state, \
get_last_record_value_for_table
LOGGER = singer.get_logger()
class EmailDataAccessObject(DataAccessObject):
TABLE = 'email'
KEY_PROPERTIES = ['ID']
REPLICATION_METHOD = 'INCREMENTAL'
REPLICATION_KEYS = ['ModifiedDate']
def parse_object(self, obj):
to_return = obj.copy()
content_area_ids = []
for content_area in to_return.get('ContentAreas', []):
content_area_ids.append(content_area.get('ID'))
to_return['EmailID'] = to_return.get('Email', {}).get('ID')
to_return['ContentAreaIDs'] = content_area_ids
return super().parse_object(to_return)
@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
selector = FuelSDK.ET_Email
search_filter = None
# pass config to return start date if not bookmark is found
retrieve_all_since = get_last_record_value_for_table(self.state, table, self.config)
if retrieve_all_since is not None:
search_filter = {
'Property': 'ModifiedDate',
'SimpleOperator': 'greaterThan',
'Value': retrieve_all_since
}
stream = request('Email',
selector,
self.auth_stub,
search_filter,
batch_size=self.batch_size)
catalog_copy = copy.deepcopy(self.catalog)
for email in stream:
email = self.filter_keys_and_parse(email)
self.state = incorporate(self.state,
table,
'ModifiedDate',
email.get('ModifiedDate'))
self.write_records_with_transform(email, catalog_copy, table)
save_state(self.state)