forked from singer-io/tap-exacttarget
-
Notifications
You must be signed in to change notification settings - Fork 0
/
emails.py
64 lines (46 loc) · 1.88 KB
/
emails.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import FuelSDK
import copy
import singer
from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject, exacttarget_error_handling
from tap_exacttarget.state import incorporate, save_state, get_last_record_value_for_table
LOGGER = singer.get_logger()
class EmailDataAccessObject(DataAccessObject):
TABLE = "email"
KEY_PROPERTIES = ["ID"]
REPLICATION_METHOD = "INCREMENTAL"
REPLICATION_KEYS = ["ModifiedDate"]
def parse_object(self, obj):
to_return = obj.copy()
content_area_ids = []
for content_area in to_return.get("ContentAreas", []):
content_area_ids.append(content_area.get("ID"))
to_return["EmailID"] = to_return.get("Email", {}).get("ID")
to_return["ContentAreaIDs"] = content_area_ids
return super().parse_object(to_return)
@exacttarget_error_handling
def sync_data(self):
table = self.__class__.TABLE
selector = FuelSDK.ET_Email
search_filter = None
# pass config to return start date if not bookmark is found
retrieve_all_since = get_last_record_value_for_table(
self.state, table, self.config
)
if retrieve_all_since is not None:
search_filter = {
"Property": "ModifiedDate",
"SimpleOperator": "greaterThan",
"Value": retrieve_all_since,
}
stream = request(
"Email", selector, self.auth_stub, search_filter, batch_size=self.batch_size
)
catalog_copy = copy.deepcopy(self.catalog)
for email in stream:
email = self.filter_keys_and_parse(email)
self.state = incorporate(
self.state, table, "ModifiedDate", email.get("ModifiedDate")
)
self.write_records_with_transform(email, catalog_copy, table)
save_state(self.state)