forked from singer-io/tap-exacttarget
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscribers.py
117 lines (94 loc) · 3.36 KB
/
subscribers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import FuelSDK
import copy
import singer
from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject, exacttarget_error_handling
from tap_exacttarget.pagination import get_date_page, before_now, increment_date
from tap_exacttarget.state import (
incorporate,
save_state,
get_last_record_value_for_table,
)
LOGGER = singer.get_logger()
class SubscriberDataAccessObject(DataAccessObject):
TABLE = "subscriber"
KEY_PROPERTIES = ["ID"]
REPLICATION_METHOD = "INCREMENTAL"
REPLICATION_KEYS = ["CreatedDate"]
def parse_object(self, obj):
to_return = obj.copy()
if "ListIDs" in to_return:
to_return["ListIDs"] = [
_list.get("ObjectID") for _list in to_return.get("Lists", [])
]
if "Lists" in to_return:
del to_return["Lists"]
if to_return.get("Addresses") is None:
to_return["Addresses"] = []
if to_return.get("PartnerProperties") is None:
to_return["PartnerProperties"] = []
return super().parse_object(obj)
def sync_data(self):
start = get_last_record_value_for_table(
self.state, self.__class__.TABLE, self.config
)
unit = {"days": 7}
while before_now(start):
search_filter = get_date_page(
self.__class__.REPLICATION_KEYS[0], start, unit
)
end = increment_date(start, unit)
stream = request(
"Subscriber",
FuelSDK.ET_Subscriber,
self.auth_stub,
search_filter,
batch_size=self.batch_size,
)
for subscriber in stream:
subscriber = self.filter_keys_and_parse(subscriber)
self.write_records_with_transform(
subscriber, self.catalog, self.__class__.TABLE
)
self.state = incorporate(
self.state,
self.__class__.TABLE,
self.__class__.REPLICATION_KEYS[0],
start,
)
save_state(self.state)
start = end
end = increment_date(end, unit)
# fetch subscriber records based in the 'subscriber_keys' provided
@exacttarget_error_handling
def pull_subscribers_batch(self, subscriber_keys):
if not subscriber_keys:
return
table = self.__class__.TABLE
_filter = {}
if len(subscriber_keys) == 1:
_filter = {
"Property": "SubscriberKey",
"SimpleOperator": "equals",
"Value": subscriber_keys[0],
}
elif len(subscriber_keys) > 1:
_filter = {
"Property": "SubscriberKey",
"SimpleOperator": "IN",
"Value": subscriber_keys,
}
else:
LOGGER.info("Got empty set of subscriber keys, moving on")
return
stream = request(
"Subscriber",
FuelSDK.ET_Subscriber,
self.auth_stub,
_filter,
batch_size=self.batch_size,
)
catalog_copy = copy.deepcopy(self.catalog)
for subscriber in stream:
subscriber = self.filter_keys_and_parse(subscriber)
self.write_records_with_transform(subscriber, catalog_copy, table)