diff --git a/source-twilio/source_twilio/streams.py b/source-twilio/source_twilio/streams.py index 7fb215c17..c86569615 100644 --- a/source-twilio/source_twilio/streams.py +++ b/source-twilio/source_twilio/streams.py @@ -246,6 +246,34 @@ def read_records( raise err +class IncrementalTwilioStreamWithLookbackWindow(IncrementalTwilioStream): + # This overwrites IncrementalTwilioStream's generate_date_ranges method. The only difference is that + # this method pushes back the start datetime based on the configured lookback window. + def generate_date_ranges(self) -> Iterable[Optional[MutableMapping[str, Any]]]: + def align_to_dt_format(dt: DateTime) -> DateTime: + return pendulum.parse(dt.format(self.time_filter_template)) + + end_datetime = pendulum.now("utc") + pushed_back_start_datetime = pendulum.parse(self.state.get(self.cursor_field, self._start_date)) - pendulum.duration(minutes=self._lookback_window) + start_datetime = min(end_datetime, pushed_back_start_datetime) + current_start = start_datetime + current_end = start_datetime + # Aligning to a datetime format is done to avoid the following scenario: + # start_dt = 2021-11-14T00:00:00, end_dt (now) = 2022-11-14T12:03:01, time_filter_template = "YYYY-MM-DD" + # First slice: (2021-11-14, 2022-11-14) + # (!) Second slice: (2022-11-15, 2022-11-14) - because 2022-11-14T00:00:00 (prev end) < 2022-11-14T12:03:01, + # so we have to compare dates, not date-times to avoid yielding that last slice + while align_to_dt_format(current_end) < align_to_dt_format(end_datetime): + current_end = min(end_datetime, current_start + self.slice_step) + slice_ = { + self.lower_boundary_filter_field: current_start.format(self.time_filter_template), + self.upper_boundary_filter_field: current_end.format(self.time_filter_template), + } + yield slice_ + current_start = current_end + self.slice_granularity + + + class TwilioNestedStream(TwilioStream): """ Basic class for the streams that are dependant on the results of another stream output (parent-child relations). @@ -538,7 +566,7 @@ class Queues(TwilioNestedStream): parent_stream = Accounts -class Messages(IncrementalTwilioStream, TwilioNestedStream): +class Messages(IncrementalTwilioStreamWithLookbackWindow, TwilioNestedStream): """https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources""" parent_stream = Accounts @@ -547,6 +575,32 @@ class Messages(IncrementalTwilioStream, TwilioNestedStream): upper_boundary_filter_field = "DateSent<" cursor_field = "date_sent" + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + unsorted_records = [] + initial_cursor = self.state.get(self.cursor_field, self._start_date) + + # Skip the IncrementalTwilioStream's read_records method since it filters only based on date_sent instead of + # both date_sent and date_updated. + for record in super(IncrementalTwilioStream, self).read_records(sync_mode, cursor_field, stream_slice, stream_state): + record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string() + record["date_updated"] = pendulum.parse(record["date_updated"], strict=False).to_iso8601_string() + unsorted_records.append(record) + sorted_records = sorted(unsorted_records, key=lambda x: x[self.cursor_field]) + for record in sorted_records: + # If this is a new record, yield it and update the cursor. + if record[self.cursor_field] >= initial_cursor: + self._cursor_value = record[self.cursor_field] + yield record + # Otherwise if it's an update to a record we've seen before, yield it. + elif record["date_updated"] >= initial_cursor: + yield record + class MessageMedia(IncrementalTwilioStream, TwilioNestedStream): """https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources""" @@ -567,7 +621,7 @@ class MessageMedia(IncrementalTwilioStream, TwilioNestedStream): @cached_property def parent_stream_instance(self): most_recent_cursor = self.state.get(self.cursor_field, self._start_date) - return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor, lookback_window=self._lookback_window) + return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor) class UsageNestedStream(TwilioNestedStream):